Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 064669923 for ; Tue, 11 Oct 2011 17:43:42 +0000 (UTC) Received: (qmail 15234 invoked by uid 500); 11 Oct 2011 17:43:41 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 15217 invoked by uid 500); 11 Oct 2011 17:43:41 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 15210 invoked by uid 99); 11 Oct 2011 17:43:41 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 11 Oct 2011 17:43:41 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 11 Oct 2011 17:43:36 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 6DB382388C06 for ; Tue, 11 Oct 2011 17:43:15 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1181941 - in /hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase: client/HBaseLocalityCheck.java master/HMaster.java util/FSUtils.java Date: Tue, 11 Oct 2011 17:43:15 -0000 To: commits@hbase.apache.org From: nspiegelberg@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20111011174315.6DB382388C06@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: nspiegelberg Date: Tue Oct 11 17:43:14 2011 New Revision: 1181941 URL: http://svn.apache.org/viewvc?rev=1181941&view=rev Log: Implementing a ThreadPool to parallelize startup region-locality HDFS scans Summary: By using a ThreadPool we can turn the formerly serialized scans of HDFS into a parallel operation. Special care was taken to ensure that if any operation fails, the scans are stopped abruptly and the exception is thrown to a higher level. Test Plan: The mechanism is still the same, so to test that it works, one should just start up the cluster and check with the locality checker that Liyin created, as before. Reviewed By: liyintang Reviewers: liyintang, kannan, nspiegelberg Commenters: kannan, nspiegelberg CC: hbase@lists, kannan, liyintang, nspiegelberg, bogdan Differential Revision: 283312 Task ID: 620114 Conflicts: src/main/java/org/apache/hadoop/hbase/util/FSUtils.java Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HBaseLocalityCheck.java hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/HMaster.java hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HBaseLocalityCheck.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HBaseLocalityCheck.java?rev=1181941&r1=1181940&r2=1181941&view=diff ============================================================================== --- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HBaseLocalityCheck.java (original) +++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HBaseLocalityCheck.java Tue Oct 11 17:43:14 2011 @@ -4,7 +4,6 @@ import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.TreeMap; import java.util.concurrent.atomic.AtomicInteger; @@ -14,6 +13,7 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HServerAddress; import org.apache.hadoop.hbase.MasterNotRunningException; import org.apache.hadoop.hbase.client.HBaseFsck.HbckInfo; @@ -54,8 +54,10 @@ public class HBaseLocalityCheck { LOG.info("Locality information by region"); // Get the locality info for each region by scanning the file system - preferredRegionToRegionServerMapping = - FSUtils.getRegionLocalityMappingFromFS(fs, rootdir); + preferredRegionToRegionServerMapping = FSUtils + .getRegionLocalityMappingFromFS(fs, rootdir, + conf.getInt("hbase.client.localityCheck.threadPoolSize", 2), + conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 60 * 1000)); Map tableToRegionCountMap = new HashMap(); Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/HMaster.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/HMaster.java?rev=1181941&r1=1181940&r2=1181941&view=diff ============================================================================== --- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/HMaster.java (original) +++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/HMaster.java Tue Oct 11 17:43:14 2011 @@ -36,8 +36,7 @@ import java.util.concurrent.atomic.Atomi import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -import org.apache.hadoop.fs.ContentSummary; -import org.apache.hadoop.fs.FileStatus; + import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.Options; @@ -45,6 +44,7 @@ import org.apache.commons.cli.ParseExcep import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -52,6 +52,7 @@ import org.apache.hadoop.hbase.ClusterSt import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HConstants.Modify; import org.apache.hadoop.hbase.HMsg; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; @@ -65,17 +66,16 @@ import org.apache.hadoop.hbase.MasterNot import org.apache.hadoop.hbase.MiniZooKeeperCluster; import org.apache.hadoop.hbase.RemoteExceptionHandler; import org.apache.hadoop.hbase.TableExistsException; -import org.apache.hadoop.hbase.HConstants.Modify; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.MetaScanner; +import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.ServerConnection; import org.apache.hadoop.hbase.client.ServerConnectionManager; -import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor; -import org.apache.hadoop.hbase.executor.HBaseExecutorService; import org.apache.hadoop.hbase.executor.HBaseEventHandler.HBaseEventType; +import org.apache.hadoop.hbase.executor.HBaseExecutorService; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.ipc.HBaseRPC; import org.apache.hadoop.hbase.ipc.HBaseRPCProtocolVersion; @@ -428,6 +428,7 @@ public class HMaster extends Thread impl return this.address; } + @Override public long getProtocolVersion(String protocol, long clientVersion) { return HBaseRPCProtocolVersion.versionID; } @@ -634,10 +635,12 @@ public class HMaster extends Thread impl 1 * 60 * 1000); LOG.debug("get preferredRegionToHostMapping; expecting pause here"); try { - this.preferredRegionToRegionServerMapping = - FSUtils.getRegionLocalityMappingFromFS(fs, rootdir); + this.preferredRegionToRegionServerMapping = FSUtils + .getRegionLocalityMappingFromFS(fs, rootdir, + conf.getInt("hbase.master.localityCheck.threadPoolSize", 5), + threadWakeFrequency); } catch (Exception e) { - LOG.equals("Got unexpected exception when getting " + + LOG.error("Got unexpected exception when getting " + "preferredRegionToHostMapping : " + e.toString()); // do not pause the master's construction preferredRegionToRegionServerMapping = null; @@ -815,6 +818,7 @@ public class HMaster extends Thread impl this.serverManager.notifyServers(); } + @Override public MapWritable regionServerStartup(final HServerInfo serverInfo) throws IOException { // Set the ip into the passed in serverInfo. Its ip is more than likely @@ -845,6 +849,7 @@ public class HMaster extends Thread impl return mw; } + @Override public HMsg [] regionServerReport(HServerInfo serverInfo, HMsg msgs[], HRegionInfo[] mostLoadedRegions) throws IOException { @@ -864,16 +869,19 @@ public class HMaster extends Thread impl return msgs; } + @Override public boolean isMasterRunning() { return !this.closed.get(); } + @Override public void shutdown() { LOG.info("Cluster shutdown requested. Starting to quiesce servers"); this.shutdownRequested.set(true); this.zooKeeperWrapper.setClusterState(false); } + @Override public void createTable(HTableDescriptor desc, byte [][] splitKeys) throws IOException { if (!isMasterRunning()) { @@ -952,6 +960,7 @@ public class HMaster extends Thread impl regionManager.metaScannerThread.triggerNow(); } + @Override public void deleteTable(final byte [] tableName) throws IOException { if (Bytes.equals(tableName, HConstants.ROOT_TABLE_NAME)) { throw new IOException("Can't delete root table"); @@ -960,22 +969,26 @@ public class HMaster extends Thread impl LOG.info("deleted table: " + Bytes.toString(tableName)); } + @Override public void addColumn(byte [] tableName, HColumnDescriptor column) throws IOException { new AddColumn(this, tableName, column).process(); } + @Override public void modifyColumn(byte [] tableName, byte [] columnName, HColumnDescriptor descriptor) throws IOException { new ModifyColumn(this, tableName, columnName, descriptor).process(); } + @Override public void deleteColumn(final byte [] tableName, final byte [] c) throws IOException { new DeleteColumn(this, tableName, KeyValue.parseColumn(c)[0]).process(); } + @Override public void enableTable(final byte [] tableName) throws IOException { if (Bytes.equals(tableName, HConstants.ROOT_TABLE_NAME)) { throw new IOException("Can't enable root table"); @@ -983,6 +996,7 @@ public class HMaster extends Thread impl new ChangeTableState(this, tableName, true).process(); } + @Override public void disableTable(final byte [] tableName) throws IOException { if (Bytes.equals(tableName, HConstants.ROOT_TABLE_NAME)) { throw new IOException("Can't disable root table"); @@ -1130,6 +1144,7 @@ public class HMaster extends Thread impl return this.connection.getHTableDescriptor(tableName); } + @Override public void modifyTable(final byte[] tableName, HConstants.Modify op, Writable[] args) throws IOException { @@ -1270,6 +1285,7 @@ public class HMaster extends Thread impl /** * @return cluster status */ + @Override public ClusterStatus getClusterStatus() { ClusterStatus status = new ClusterStatus(); status.setHBaseVersion(VersionInfo.getVersion()); Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java?rev=1181941&r1=1181940&r2=1181941&view=diff ============================================================================== --- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java (original) +++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java Tue Oct 11 17:43:14 2011 @@ -19,6 +19,23 @@ */ package org.apache.hadoop.hbase.util; +import java.io.DataInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.lang.reflect.InvocationTargetException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -40,22 +57,6 @@ import org.apache.hadoop.hdfs.protocol.F import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException; import org.apache.hadoop.io.SequenceFile; -import java.io.DataInputStream; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.InterruptedIOException; -import java.lang.reflect.InvocationTargetException; -import java.io.PrintStream; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.atomic.AtomicInteger; - /** * Utility methods for interacting with the underlying file system. */ @@ -572,6 +573,7 @@ public class FSUtils { this.fs = fs; } + @Override public boolean accept(Path p) { boolean isdir = false; try { @@ -702,99 +704,126 @@ public class FSUtils { * mapping between the region name and its best locality region server * * @param fs + * the file system to use * @param rootPath - * @param out - * @return + * the root path to start from + * @param threadPoolSize + * the thread pool size to use + * @param threadWakeFrequency + * the wake frequency to perform stat printing + * @return the mapping to consider as best possible assignment * @throws IOException + * in case of file system errors or interrupts */ - public static Map getRegionLocalityMappingFromFS( - final FileSystem fs, final Path rootPath) + public static Map getRegionLocalityMappingFromFS( + final FileSystem fs, final Path rootPath, int threadPoolSize, + final int threadWakeFrequency) throws IOException { // region name to its best locality region server mapping Map regionToBestLocalityRSMapping = - new HashMap(); - // keep the most block count mapping - HashMap blockCountMap = - new HashMap(); + new ConcurrentHashMap(); long startTime = System.currentTimeMillis(); Path queryPath = new Path(rootPath.toString() + "/*/*/"); FileStatus[] statusList = fs.globStatus(queryPath); + LOG.debug("Query Path: " + queryPath + " ; # list of files: " + statusList.length); - if (statusList == null) { - return regionToBestLocalityRSMapping; - } - for (FileStatus regionStatus : statusList) { - if(!regionStatus.isDir()) { - continue; - } + if (null == statusList) { + return regionToBestLocalityRSMapping; + } - // get the region name; it may get some noise data - Path regionPath = regionStatus.getPath(); - String regionName = regionPath.getName(); - if (!regionName.toLowerCase().matches("[0-9a-f]+")) { - continue; + // run in multiple threads + int totalGoodRegionFiles = 0; + ThreadPoolExecutor tpe = null; + FSRegionScanner[] parallelTasks = null; + try { + // lower the number of threads in case we have very few expected regions + int maxRegions = statusList.length; + + if (maxRegions < threadPoolSize) { + threadPoolSize = maxRegions; } - // ignore the empty directory - FileStatus[] cfList = fs.listStatus(regionPath); - if (cfList == null) { - continue; - } - - //get table name - String tableName = regionPath.getParent().getName(); - int totalBlkCount = 0; - blockCountMap.clear(); - - // for each cf, get all the blocks information - for (FileStatus cfStatus : cfList) { - if (!cfStatus.isDir()) { - // skip because this is not a CF directory + + // initialize executor service + tpe = new ThreadPoolExecutor(threadPoolSize, + threadPoolSize, 60, TimeUnit.SECONDS, + new ArrayBlockingQueue( + threadPoolSize)); + + // set defaults + FSRegionScanner.setFileSystem(fs); + FSRegionScanner + .setRegionToBestLocalityRSMapping(regionToBestLocalityRSMapping); + + // start initializing "thread pool" and threads + parallelTasks = new FSRegionScanner[threadPoolSize]; + ArrayList> buckets = new ArrayList>(); + for (int i = 0; i < threadPoolSize; ++i) { + // create buckets for each thread + LinkedList bucket = new LinkedList(); + buckets.add(bucket); + parallelTasks[i] = new FSRegionScanner(bucket); + } + + // ignore all file status items that are not of interest + int current = 0; + for (FileStatus regionStatus : statusList) { + if (null == regionStatus) { continue; } - FileStatus[] storeFileLists = fs.listStatus(cfStatus.getPath()); - if (storeFileLists == null) { - continue; - } - for (FileStatus storeFile : storeFileLists) { - BlockLocation[] blkLocations = - fs.getFileBlockLocations(storeFile, 0, storeFile.getLen()); - if (blkLocations == null) { - continue; - } - totalBlkCount += blkLocations.length; - for(BlockLocation blk: blkLocations) { - for (String host: blk.getHosts()) { - AtomicInteger count = blockCountMap.get(host); - if (count == null) { - count = new AtomicInteger(0); - blockCountMap.put(host, count); - } - count.incrementAndGet(); - } - } + + if (!regionStatus.isDir()) { + continue; } - } - int largestBlkCount = 0; - String hostToRun = null; - for (String host: blockCountMap.keySet()) { - int tmp = blockCountMap.get(host).get(); - if (tmp > largestBlkCount) { - largestBlkCount = tmp; - hostToRun = host; + // get the region name; it may get some noise data + Path regionPath = regionStatus.getPath(); + String regionName = regionPath.getName(); + if (!regionName.toLowerCase().matches("[0-9a-f]+")) { + continue; } - } - if (hostToRun.endsWith(".")) { - hostToRun = hostToRun.substring(0, hostToRun.length()-1); - } - String name = tableName + ":" + regionName; - regionToBestLocalityRSMapping.put(name,hostToRun); + // add to respective bucket, do round-robin additions to make sure all + // threads get things to do; can create empty buckets in the rare case + // in which we end up getting less region paths than we have threads to + // handle them + buckets.get(current % threadPoolSize).add(regionPath); + ++totalGoodRegionFiles; + ++current; + } + + // start each thread in executor service + for (FSRegionScanner task : parallelTasks) { + tpe.execute(task); + } + } finally { + if (null != tpe && null != parallelTasks) { + tpe.shutdown(); + try { + // here we wait until TPE terminates, which is either naturally or by + // exceptions in the execution of the threads + while (!tpe.awaitTermination(threadWakeFrequency, + TimeUnit.MILLISECONDS)) { + int filesDone = 0; + for (FSRegionScanner rs : parallelTasks) { + filesDone += rs.getNumberOfFinishedRegions(); + } + // printing out rough estimate, so as to not introduce + // AtomicInteger + LOG.info("Locality checking is underway: { THREADS : " + + tpe.getCompletedTaskCount() + "/" + parallelTasks.length + + " , Scanned Regions : " + filesDone + "/" + + totalGoodRegionFiles + " }"); + } + } catch (InterruptedException e) { + throw new IOException(e); + } + } } + long overhead = System.currentTimeMillis() - startTime; String overheadMsg = "Scan DFS for locality info takes " + overhead + " ms"; @@ -803,3 +832,143 @@ public class FSUtils { } } + +/** + * Thread to be used for + */ +class FSRegionScanner implements Runnable { + /** + * The shared block count map + */ + private HashMap blockCountMap; + + /** + * The file system used + */ + static private FileSystem fs; + + static void setFileSystem(FileSystem fs) { + FSRegionScanner.fs = fs; + } + + /** + * The locality mapping returned by the above getRegionLocalityMappingFromFS + * method + */ + static private Map regionToBestLocalityRSMapping; + + static void setRegionToBestLocalityRSMapping( + Map regionToBestLocalityRSMapping) { + FSRegionScanner.regionToBestLocalityRSMapping = + regionToBestLocalityRSMapping; + } + + /** + * The respective paths to analyze for each thread + */ + private LinkedList paths; + + /** + * Number of finished blocks by now + */ + private int numerOfFinishedRegions; + + public int getNumberOfFinishedRegions() { + return numerOfFinishedRegions; + } + + FSRegionScanner(LinkedList paths) { + this.paths = paths; + this.numerOfFinishedRegions = 0; + this.blockCountMap = new HashMap(); + } + + @Override + public void run() { + try { + for (Path regionPath : paths) { + try { + + // break here in case this for loop selects some of the null-pading + // elements at the end of the receiving arrays + if (null == regionPath) { + break; + } + //get table name + String tableName = regionPath.getParent().getName(); + int totalBlkCount = 0; + + // ignore null + FileStatus[] cfList = fs.listStatus(regionPath); + if (null == cfList) { + continue; + } + + // for each cf, get all the blocks information + for (FileStatus cfStatus : cfList) { + if (!cfStatus.isDir()) { + // skip because this is not a CF directory + continue; + } + FileStatus[] storeFileLists = fs.listStatus(cfStatus.getPath()); + if (null == storeFileLists) { + continue; + } + + for (FileStatus storeFile : storeFileLists) { + BlockLocation[] blkLocations = + fs.getFileBlockLocations(storeFile, 0, storeFile.getLen()); + if (null == blkLocations) { + continue; + } + + totalBlkCount += blkLocations.length; + for(BlockLocation blk: blkLocations) { + for (String host: blk.getHosts()) { + AtomicInteger count = blockCountMap.get(host); + if (count == null) { + count = new AtomicInteger(0); + blockCountMap.put(host, count); + } + count.incrementAndGet(); + } + } + } + } + + int largestBlkCount = 0; + String hostToRun = null; + for (String host: blockCountMap.keySet()) { + int tmp = blockCountMap.get(host).get(); + if (tmp > largestBlkCount) { + largestBlkCount = tmp; + hostToRun = host; + } + } + + // empty regions could make this null + if (null == hostToRun) { + continue; + } + + if (hostToRun.endsWith(".")) { + hostToRun = hostToRun.substring(0, hostToRun.length()-1); + } + String name = tableName + ":" + regionPath.getName(); + regionToBestLocalityRSMapping.put(name,hostToRun); + + this.numerOfFinishedRegions++; + } catch (IOException e) { + continue; + } catch (RuntimeException e) { + continue; + } + } + } finally { + // sanity check + assert this.numerOfFinishedRegions <= this.paths.size(); + } + } + // we can use the error mechanism to stop the whole process if we want, + // instead of continuing on errors... +}