hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jmhs...@apache.org
Subject svn commit: r1345889 - /hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
Date Mon, 04 Jun 2012 09:40:55 GMT
Author: jmhsieh
Date: Mon Jun  4 09:40:55 2012
New Revision: 1345889

URL: http://svn.apache.org/viewvc?rev=1345889&view=rev
Log:
HBASE-5892 [hbck] Refactor parallel WorkItem* to Futures (Andrew Wang)

Modified:
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java?rev=1345889&r1=1345888&r2=1345889&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java Mon Jun
 4 09:40:55 2012
@@ -32,10 +32,11 @@ import java.util.SortedMap;
 import java.util.SortedSet;
 import java.util.TreeMap;
 import java.util.TreeSet;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
@@ -152,7 +153,7 @@ public class HBaseFsck {
   private HConnection connection;
   private HBaseAdmin admin;
   private HTable meta;
-  private ThreadPoolExecutor executor; // threads to retrieve data from regionservers
+  private ScheduledThreadPoolExecutor executor; // threads to retrieve data from regionservers
   private long startMillis = System.currentTimeMillis();
 
   /***********
@@ -218,10 +219,7 @@ public class HBaseFsck {
     this.conf = conf;
 
     int numThreads = conf.getInt("hbasefsck.numthreads", MAX_NUM_THREADS);
-    executor = new ThreadPoolExecutor(numThreads, numThreads,
-        THREADS_KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
-        new LinkedBlockingQueue<Runnable>());
-    executor.allowCoreThreadTimeOut(true);
+    executor = new ScheduledThreadPoolExecutor(numThreads);
   }
 
   /**
@@ -629,20 +627,25 @@ public class HBaseFsck {
     Collection<HbckInfo> hbckInfos = regionInfoMap.values();
 
     // Parallelized read of .regioninfo files.
-    WorkItemHdfsRegionInfo[] hbis = new WorkItemHdfsRegionInfo[hbckInfos.size()];
-    int num = 0;
+    List<WorkItemHdfsRegionInfo> hbis = new ArrayList<WorkItemHdfsRegionInfo>(hbckInfos.size());
+    List<Future<Void>> hbiFutures;
+
     for (HbckInfo hbi : hbckInfos) {
-      hbis[num] = new WorkItemHdfsRegionInfo(hbi, this, errors);
-      executor.execute(hbis[num]);
-      num++;
+      WorkItemHdfsRegionInfo work = new WorkItemHdfsRegionInfo(hbi, this, errors);
+      hbis.add(work);
     }
 
-    for (int i=0; i < num; i++) {
-      WorkItemHdfsRegionInfo hbi = hbis[i];
-      synchronized(hbi) {
-        while (!hbi.isDone()) {
-          hbi.wait();
-        }
+    // Submit and wait for completion
+    hbiFutures = executor.invokeAll(hbis);
+
+    for(int i=0; i<hbiFutures.size(); i++) {
+      WorkItemHdfsRegionInfo work = hbis.get(i);
+      Future<Void> f = hbiFutures.get(i);
+      try {
+        f.get();
+      } catch(ExecutionException e) {
+        LOG.warn("Failed to read .regioninfo file for region " +
+              work.hbi.getRegionNameAsString(), e.getCause());
       }
     }
 
@@ -1054,22 +1057,22 @@ public class HBaseFsck {
     }
 
     // level 1:  <HBASE_DIR>/*
-    WorkItemHdfsDir[] dirs = new WorkItemHdfsDir[tableDirs.size()];
-    int num = 0;
+    List<WorkItemHdfsDir> dirs = new ArrayList<WorkItemHdfsDir>(tableDirs.size());
+    List<Future<Void>> dirsFutures;
+
     for (FileStatus tableDir : tableDirs) {
       LOG.debug("Loading region dirs from " +tableDir.getPath());
-      dirs[num] = new WorkItemHdfsDir(this, fs, errors, tableDir);
-      executor.execute(dirs[num]);
-      num++;
+      dirs.add(new WorkItemHdfsDir(this, fs, errors, tableDir));
     }
 
-    // wait for all directories to be done
-    for (int i = 0; i < num; i++) {
-      WorkItemHdfsDir dir = dirs[i];
-      synchronized (dir) {
-        while (!dir.isDone()) {
-          dir.wait();
-        }
+    // Invoke and wait for Callables to complete
+    dirsFutures = executor.invokeAll(dirs);
+
+    for(Future<Void> f: dirsFutures) {
+      try {
+        f.get();
+      } catch(ExecutionException e) {
+        LOG.warn("Could not load region dir " , e.getCause());
       }
     }
   }
@@ -1136,22 +1139,24 @@ public class HBaseFsck {
   void processRegionServers(Collection<ServerName> regionServerList)
     throws IOException, InterruptedException {
 
-    WorkItemRegion[] work = new WorkItemRegion[regionServerList.size()];
-    int num = 0;
+    List<WorkItemRegion> workItems = new ArrayList<WorkItemRegion>(regionServerList.size());
+    List<Future<Void>> workFutures;
 
     // loop to contact each region server in parallel
     for (ServerName rsinfo: regionServerList) {
-      work[num] = new WorkItemRegion(this, rsinfo, errors, connection);
-      executor.execute(work[num]);
-      num++;
+      workItems.add(new WorkItemRegion(this, rsinfo, errors, connection));
     }
     
-    // wait for all submitted tasks to be done
-    for (int i = 0; i < num; i++) {
-      synchronized (work[i]) {
-        while (!work[i].isDone()) {
-          work[i].wait();
-        }
+    workFutures = executor.invokeAll(workItems);
+
+    for(int i=0; i<workFutures.size(); i++) {
+      WorkItemRegion item = workItems.get(i);
+      Future<Void> f = workFutures.get(i);
+      try {
+        f.get();
+      } catch(ExecutionException e) {
+        LOG.warn("Could not process regionserver " + item.rsinfo.getHostAndPort(),
+            e.getCause());
       }
     }
   }
@@ -2366,10 +2371,11 @@ public class HBaseFsck {
       if (metaEntry != null) {
         return metaEntry.getRegionNameAsString();
       } else if (hdfsEntry != null) {
-        return hdfsEntry.hri.getRegionNameAsString();
-      } else {
-        return null;
+        if (hdfsEntry.hri != null) {
+          return hdfsEntry.hri.getRegionNameAsString();
+        }
       }
+      return null;
     }
 
     public byte[] getRegionName() {
@@ -2618,12 +2624,11 @@ public class HBaseFsck {
   /**
    * Contact a region server and get all information from it
    */
-  static class WorkItemRegion implements Runnable {
+  static class WorkItemRegion implements Callable<Void> {
     private HBaseFsck hbck;
     private ServerName rsinfo;
     private ErrorReporter errors;
     private HConnection connection;
-    private boolean done;
 
     WorkItemRegion(HBaseFsck hbck, ServerName info,
                    ErrorReporter errors, HConnection connection) {
@@ -2631,16 +2636,10 @@ public class HBaseFsck {
       this.rsinfo = info;
       this.errors = errors;
       this.connection = connection;
-      this.done = false;
-    }
-
-    // is this task done?
-    synchronized boolean isDone() {
-      return done;
     }
 
     @Override
-    public synchronized void run() {
+    public synchronized Void call() throws IOException {
       errors.progress();
       try {
         HRegionInterface server =
@@ -2671,10 +2670,9 @@ public class HBaseFsck {
       } catch (IOException e) {          // unable to connect to the region server. 
         errors.reportError(ERROR_CODE.RS_CONNECT_FAILURE, "RegionServer: " + rsinfo.getServerName()
+
           " Unable to fetch region information. " + e);
-      } finally {
-        done = true;
-        notifyAll(); // wakeup anybody waiting for this item to be done
+        throw e;
       }
+      return null;
     }
 
     private List<HRegionInfo> filterOnlyMetaRegions(List<HRegionInfo> regions)
{
@@ -2692,12 +2690,11 @@ public class HBaseFsck {
    * Contact hdfs and get all information about specified table directory into
    * regioninfo list.
    */
-  static class WorkItemHdfsDir implements Runnable {
+  static class WorkItemHdfsDir implements Callable<Void> {
     private HBaseFsck hbck;
     private FileStatus tableDir;
     private ErrorReporter errors;
     private FileSystem fs;
-    private boolean done;
 
     WorkItemHdfsDir(HBaseFsck hbck, FileSystem fs, ErrorReporter errors, 
                     FileStatus status) {
@@ -2705,27 +2702,25 @@ public class HBaseFsck {
       this.fs = fs;
       this.tableDir = status;
       this.errors = errors;
-      this.done = false;
     }
 
-    synchronized boolean isDone() {
-      return done;
-    } 
-
     @Override
-    public synchronized void run() {
+    public synchronized Void call() throws IOException {
       try {
         String tableName = tableDir.getPath().getName();
         // ignore hidden files
         if (tableName.startsWith(".") &&
-            !tableName.equals( Bytes.toString(HConstants.META_TABLE_NAME)))
-          return;
+            !tableName.equals( Bytes.toString(HConstants.META_TABLE_NAME))) {
+          return null;
+        }
         // level 2: <HBASE_DIR>/<table>/*
         FileStatus[] regionDirs = fs.listStatus(tableDir.getPath());
         for (FileStatus regionDir : regionDirs) {
           String encodedName = regionDir.getPath().getName();
           // ignore directories that aren't hexadecimal
-          if (!encodedName.toLowerCase().matches("[0-9a-f]+")) continue;
+          if (!encodedName.toLowerCase().matches("[0-9a-f]+")) {
+            continue;
+          }
 
           LOG.debug("Loading region info from hdfs:"+ regionDir.getPath());
           HbckInfo hbi = hbck.getOrCreateInfo(encodedName);
@@ -2762,10 +2757,9 @@ public class HBaseFsck {
         errors.reportError(ERROR_CODE.RS_CONNECT_FAILURE, "Table Directory: "
             + tableDir.getPath().getName()
             + " Unable to fetch region information. " + e);
-      } finally {
-        done = true;
-        notifyAll();
+        throw e;
       }
+      return null;
     }
   }
 
@@ -2773,51 +2767,41 @@ public class HBaseFsck {
    * Contact hdfs and get all information about specified table directory into
    * regioninfo list.
    */
-  static class WorkItemHdfsRegionInfo implements Runnable {
+  static class WorkItemHdfsRegionInfo implements Callable<Void> {
     private HbckInfo hbi;
     private HBaseFsck hbck;
     private ErrorReporter errors;
-    private boolean done;
 
     WorkItemHdfsRegionInfo(HbckInfo hbi, HBaseFsck hbck, ErrorReporter errors) {
       this.hbi = hbi;
       this.hbck = hbck;
       this.errors = errors;
-      this.done = false;
-    }
-
-    synchronized boolean isDone() {
-      return done;
     }
 
     @Override
-    public synchronized void run() {
-      try {
-        // only load entries that haven't been loaded yet.
-        if (hbi.getHdfsHRI() == null) {
+    public synchronized Void call() throws IOException {
+      // only load entries that haven't been loaded yet.
+      if (hbi.getHdfsHRI() == null) {
+        try {
+          hbck.loadHdfsRegioninfo(hbi);
+        } catch (IOException ioe) {
+          String msg = "Orphan region in HDFS: Unable to load .regioninfo from table "
+              + Bytes.toString(hbi.getTableName()) + " in hdfs dir "
+              + hbi.getHdfsRegionDir()
+              + "!  It may be an invalid format or version file.  Treating as "
+              + "an orphaned regiondir.";
+          errors.reportError(ERROR_CODE.ORPHAN_HDFS_REGION, msg);
           try {
-            hbck.loadHdfsRegioninfo(hbi);
-          } catch (IOException ioe) {
-            String msg = "Orphan region in HDFS: Unable to load .regioninfo from table "
-                + Bytes.toString(hbi.getTableName()) + " in hdfs dir "
-                + hbi.getHdfsRegionDir()
-                + "!  It may be an invalid format or version file.  Treating as "
-                + "an orphaned regiondir.";
-            errors.reportError(ERROR_CODE.ORPHAN_HDFS_REGION, msg);
-            try {
-              hbck.debugLsr(hbi.getHdfsRegionDir());
-            } catch (IOException ioe2) {
-              LOG.error("Unable to read directory " + hbi.getHdfsRegionDir(), ioe2);
-              return; // TODO convert this in to a future
-            }
-            hbck.orphanHdfsDirs.add(hbi);
-            return;
+            hbck.debugLsr(hbi.getHdfsRegionDir());
+          } catch (IOException ioe2) {
+            LOG.error("Unable to read directory " + hbi.getHdfsRegionDir(), ioe2);
+            throw ioe2;
           }
+          hbck.orphanHdfsDirs.add(hbi);
+          throw ioe;
         }
-      } finally {
-        done = true;
-        notifyAll();
       }
+      return null;
     }
   };
 



Mime
View raw message