Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id DF80E18765 for ; Mon, 22 Feb 2016 16:22:28 +0000 (UTC) Received: (qmail 10739 invoked by uid 500); 22 Feb 2016 16:22:28 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 10480 invoked by uid 500); 22 Feb 2016 16:22:28 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 8971 invoked by uid 99); 22 Feb 2016 16:22:26 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 22 Feb 2016 16:22:26 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C3490E03C2; Mon, 22 Feb 2016 16:22:25 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: misty@apache.org To: commits@hbase.apache.org Date: Mon, 22 Feb 2016 16:22:35 -0000 Message-Id: In-Reply-To: <5477148816384a289eb664741b93dfba@git.apache.org> References: <5477148816384a289eb664741b93dfba@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [11/51] [partial] hbase-site git commit: Published site at e58c0385a738df63fa3fff287e1ddcfe6da1d046. http://git-wip-us.apache.org/repos/asf/hbase-site/blob/f6cc9224/devapidocs/src-html/org/apache/hadoop/hbase/tool/Canary.ExtendedSink.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/tool/Canary.ExtendedSink.html b/devapidocs/src-html/org/apache/hadoop/hbase/tool/Canary.ExtendedSink.html index dac2d4d..9c063a3 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/tool/Canary.ExtendedSink.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/tool/Canary.ExtendedSink.html @@ -102,1116 +102,1164 @@ 094public final class Canary implements Tool { 095 // Sink interface used by the canary to outputs information 096 public interface Sink { -097 public void publishReadFailure(HRegionInfo region, Exception e); -098 public void publishReadFailure(HRegionInfo region, HColumnDescriptor column, Exception e); -099 public void publishReadTiming(HRegionInfo region, HColumnDescriptor column, long msTime); -100 public void publishWriteFailure(HRegionInfo region, Exception e); -101 public void publishWriteFailure(HRegionInfo region, HColumnDescriptor column, Exception e); -102 public void publishWriteTiming(HRegionInfo region, HColumnDescriptor column, long msTime); -103 } -104 // new extended sink for output regionserver mode info -105 // do not change the Sink interface directly due to maintaining the API -106 public interface ExtendedSink extends Sink { -107 public void publishReadFailure(String table, String server); -108 public void publishReadTiming(String table, String server, long msTime); -109 } -110 -111 // Simple implementation of canary sink that allows to plot on -112 // file or standard output timings or failures. -113 public static class StdOutSink implements Sink { -114 @Override -115 public void publishReadFailure(HRegionInfo region, Exception e) { -116 LOG.error(String.format("read from region %s failed", region.getRegionNameAsString()), e); -117 } -118 -119 @Override -120 public void publishReadFailure(HRegionInfo region, HColumnDescriptor column, Exception e) { -121 LOG.error(String.format("read from region %s column family %s failed", -122 region.getRegionNameAsString(), column.getNameAsString()), e); +097 public long getReadFailureCount(); +098 public long incReadFailureCount(); +099 public void publishReadFailure(HRegionInfo region, Exception e); +100 public void publishReadFailure(HRegionInfo region, HColumnDescriptor column, Exception e); +101 public void publishReadTiming(HRegionInfo region, HColumnDescriptor column, long msTime); +102 public long getWriteFailureCount(); +103 public void publishWriteFailure(HRegionInfo region, Exception e); +104 public void publishWriteFailure(HRegionInfo region, HColumnDescriptor column, Exception e); +105 public void publishWriteTiming(HRegionInfo region, HColumnDescriptor column, long msTime); +106 } +107 // new extended sink for output regionserver mode info +108 // do not change the Sink interface directly due to maintaining the API +109 public interface ExtendedSink extends Sink { +110 public void publishReadFailure(String table, String server); +111 public void publishReadTiming(String table, String server, long msTime); +112 } +113 +114 // Simple implementation of canary sink that allows to plot on +115 // file or standard output timings or failures. +116 public static class StdOutSink implements Sink { +117 private AtomicLong readFailureCount = new AtomicLong(0), +118 writeFailureCount = new AtomicLong(0); +119 +120 @Override +121 public long getReadFailureCount() { +122 return readFailureCount.get(); 123 } 124 125 @Override -126 public void publishReadTiming(HRegionInfo region, HColumnDescriptor column, long msTime) { -127 LOG.info(String.format("read from region %s column family %s in %dms", -128 region.getRegionNameAsString(), column.getNameAsString(), msTime)); -129 } -130 -131 @Override -132 public void publishWriteFailure(HRegionInfo region, Exception e) { -133 LOG.error(String.format("write to region %s failed", region.getRegionNameAsString()), e); +126 public long incReadFailureCount() { +127 return readFailureCount.incrementAndGet(); +128 } +129 +130 @Override +131 public void publishReadFailure(HRegionInfo region, Exception e) { +132 readFailureCount.incrementAndGet(); +133 LOG.error(String.format("read from region %s failed", region.getRegionNameAsString()), e); 134 } 135 136 @Override -137 public void publishWriteFailure(HRegionInfo region, HColumnDescriptor column, Exception e) { -138 LOG.error(String.format("write to region %s column family %s failed", -139 region.getRegionNameAsString(), column.getNameAsString()), e); -140 } -141 -142 @Override -143 public void publishWriteTiming(HRegionInfo region, HColumnDescriptor column, long msTime) { -144 LOG.info(String.format("write to region %s column family %s in %dms", -145 region.getRegionNameAsString(), column.getNameAsString(), msTime)); -146 } -147 } -148 // a ExtendedSink implementation -149 public static class RegionServerStdOutSink extends StdOutSink implements ExtendedSink { -150 -151 @Override -152 public void publishReadFailure(String table, String server) { -153 LOG.error(String.format("Read from table:%s on region server:%s", table, server)); -154 } -155 -156 @Override -157 public void publishReadTiming(String table, String server, long msTime) { -158 LOG.info(String.format("Read from table:%s on region server:%s in %dms", -159 table, server, msTime)); -160 } -161 } -162 -163 /** -164 * For each column family of the region tries to get one row and outputs the latency, or the -165 * failure. -166 */ -167 static class RegionTask implements Callable<Void> { -168 public enum TaskType{ -169 READ, WRITE -170 } -171 private Connection connection; -172 private HRegionInfo region; -173 private Sink sink; -174 private TaskType taskType; +137 public void publishReadFailure(HRegionInfo region, HColumnDescriptor column, Exception e) { +138 readFailureCount.incrementAndGet(); +139 LOG.error(String.format("read from region %s column family %s failed", +140 region.getRegionNameAsString(), column.getNameAsString()), e); +141 } +142 +143 @Override +144 public void publishReadTiming(HRegionInfo region, HColumnDescriptor column, long msTime) { +145 LOG.info(String.format("read from region %s column family %s in %dms", +146 region.getRegionNameAsString(), column.getNameAsString(), msTime)); +147 } +148 +149 @Override +150 public long getWriteFailureCount() { +151 return writeFailureCount.get(); +152 } +153 +154 @Override +155 public void publishWriteFailure(HRegionInfo region, Exception e) { +156 writeFailureCount.incrementAndGet(); +157 LOG.error(String.format("write to region %s failed", region.getRegionNameAsString()), e); +158 } +159 +160 @Override +161 public void publishWriteFailure(HRegionInfo region, HColumnDescriptor column, Exception e) { +162 writeFailureCount.incrementAndGet(); +163 LOG.error(String.format("write to region %s column family %s failed", +164 region.getRegionNameAsString(), column.getNameAsString()), e); +165 } +166 +167 @Override +168 public void publishWriteTiming(HRegionInfo region, HColumnDescriptor column, long msTime) { +169 LOG.info(String.format("write to region %s column family %s in %dms", +170 region.getRegionNameAsString(), column.getNameAsString(), msTime)); +171 } +172 } +173 // a ExtendedSink implementation +174 public static class RegionServerStdOutSink extends StdOutSink implements ExtendedSink { 175 -176 RegionTask(Connection connection, HRegionInfo region, Sink sink, TaskType taskType) { -177 this.connection = connection; -178 this.region = region; -179 this.sink = sink; -180 this.taskType = taskType; -181 } -182 -183 @Override -184 public Void call() { -185 switch (taskType) { -186 case READ: -187 return read(); -188 case WRITE: -189 return write(); -190 default: -191 return read(); -192 } -193 } -194 -195 public Void read() { -196 Table table = null; -197 HTableDescriptor tableDesc = null; -198 try { -199 if (LOG.isDebugEnabled()) { -200 LOG.debug(String.format("reading table descriptor for table %s", -201 region.getTable())); -202 } -203 table = connection.getTable(region.getTable()); -204 tableDesc = table.getTableDescriptor(); -205 } catch (IOException e) { -206 LOG.debug("sniffRegion failed", e); -207 sink.publishReadFailure(region, e); -208 if (table != null) { -209 try { -210 table.close(); -211 } catch (IOException ioe) { -212 LOG.error("Close table failed", e); -213 } -214 } -215 return null; -216 } -217 -218 byte[] startKey = null; -219 Get get = null; -220 Scan scan = null; -221 ResultScanner rs = null; -222 StopWatch stopWatch = new StopWatch(); -223 for (HColumnDescriptor column : tableDesc.getColumnFamilies()) { -224 stopWatch.reset(); -225 startKey = region.getStartKey(); -226 // Can't do a get on empty start row so do a Scan of first element if any instead. -227 if (startKey.length > 0) { -228 get = new Get(startKey); -229 get.setCacheBlocks(false); -230 get.setFilter(new FirstKeyOnlyFilter()); -231 get.addFamily(column.getName()); -232 } else { -233 scan = new Scan(); -234 scan.setRaw(true); -235 scan.setCaching(1); -236 scan.setCacheBlocks(false); -237 scan.setFilter(new FirstKeyOnlyFilter()); -238 scan.addFamily(column.getName()); -239 scan.setMaxResultSize(1L); -240 scan.setSmall(true); -241 } -242 -243 if (LOG.isDebugEnabled()) { -244 LOG.debug(String.format("reading from table %s region %s column family %s and key %s", -245 tableDesc.getTableName(), region.getRegionNameAsString(), column.getNameAsString(), -246 Bytes.toStringBinary(startKey))); -247 } -248 try { -249 stopWatch.start(); -250 if (startKey.length > 0) { -251 table.get(get); -252 } else { -253 rs = table.getScanner(scan); -254 rs.next(); -255 } -256 stopWatch.stop(); -257 sink.publishReadTiming(region, column, stopWatch.getTime()); -258 } catch (Exception e) { -259 sink.publishReadFailure(region, column, e); -260 } finally { -261 if (rs != null) { -262 rs.close(); -263 } -264 scan = null; -265 get = null; -266 startKey = null; +176 @Override +177 public void publishReadFailure(String table, String server) { +178 incReadFailureCount(); +179 LOG.error(String.format("Read from table:%s on region server:%s", table, server)); +180 } +181 +182 @Override +183 public void publishReadTiming(String table, String server, long msTime) { +184 LOG.info(String.format("Read from table:%s on region server:%s in %dms", +185 table, server, msTime)); +186 } +187 } +188 +189 /** +190 * For each column family of the region tries to get one row and outputs the latency, or the +191 * failure. +192 */ +193 static class RegionTask implements Callable<Void> { +194 public enum TaskType{ +195 READ, WRITE +196 } +197 private Connection connection; +198 private HRegionInfo region; +199 private Sink sink; +200 private TaskType taskType; +201 +202 RegionTask(Connection connection, HRegionInfo region, Sink sink, TaskType taskType) { +203 this.connection = connection; +204 this.region = region; +205 this.sink = sink; +206 this.taskType = taskType; +207 } +208 +209 @Override +210 public Void call() { +211 switch (taskType) { +212 case READ: +213 return read(); +214 case WRITE: +215 return write(); +216 default: +217 return read(); +218 } +219 } +220 +221 public Void read() { +222 Table table = null; +223 HTableDescriptor tableDesc = null; +224 try { +225 if (LOG.isDebugEnabled()) { +226 LOG.debug(String.format("reading table descriptor for table %s", +227 region.getTable())); +228 } +229 table = connection.getTable(region.getTable()); +230 tableDesc = table.getTableDescriptor(); +231 } catch (IOException e) { +232 LOG.debug("sniffRegion failed", e); +233 sink.publishReadFailure(region, e); +234 if (table != null) { +235 try { +236 table.close(); +237 } catch (IOException ioe) { +238 LOG.error("Close table failed", e); +239 } +240 } +241 return null; +242 } +243 +244 byte[] startKey = null; +245 Get get = null; +246 Scan scan = null; +247 ResultScanner rs = null; +248 StopWatch stopWatch = new StopWatch(); +249 for (HColumnDescriptor column : tableDesc.getColumnFamilies()) { +250 stopWatch.reset(); +251 startKey = region.getStartKey(); +252 // Can't do a get on empty start row so do a Scan of first element if any instead. +253 if (startKey.length > 0) { +254 get = new Get(startKey); +255 get.setCacheBlocks(false); +256 get.setFilter(new FirstKeyOnlyFilter()); +257 get.addFamily(column.getName()); +258 } else { +259 scan = new Scan(); +260 scan.setRaw(true); +261 scan.setCaching(1); +262 scan.setCacheBlocks(false); +263 scan.setFilter(new FirstKeyOnlyFilter()); +264 scan.addFamily(column.getName()); +265 scan.setMaxResultSize(1L); +266 scan.setSmall(true); 267 } -268 } -269 try { -270 table.close(); -271 } catch (IOException e) { -272 LOG.error("Close table failed", e); -273 } -274 return null; -275 } -276 -277 /** -278 * Check writes for the canary table -279 * @return -280 */ -281 private Void write() { -282 Table table = null; -283 HTableDescriptor tableDesc = null; -284 try { -285 table = connection.getTable(region.getTable()); -286 tableDesc = table.getTableDescriptor(); -287 byte[] rowToCheck = region.getStartKey(); -288 if (rowToCheck.length == 0) { -289 rowToCheck = new byte[]{0x0}; -290 } -291 int writeValueSize = -292 connection.getConfiguration().getInt(HConstants.HBASE_CANARY_WRITE_VALUE_SIZE_KEY, 10); -293 for (HColumnDescriptor column : tableDesc.getColumnFamilies()) { -294 Put put = new Put(rowToCheck); -295 byte[] value = new byte[writeValueSize]; -296 Bytes.random(value); -297 put.addColumn(column.getName(), HConstants.EMPTY_BYTE_ARRAY, value); -298 -299 if (LOG.isDebugEnabled()) { -300 LOG.debug(String.format("writing to table %s region %s column family %s and key %s", -301 tableDesc.getTableName(), region.getRegionNameAsString(), column.getNameAsString(), -302 Bytes.toStringBinary(rowToCheck))); -303 } -304 try { -305 long startTime = System.currentTimeMillis(); -306 table.put(put); -307 long time = System.currentTimeMillis() - startTime; -308 sink.publishWriteTiming(region, column, time); -309 } catch (Exception e) { -310 sink.publishWriteFailure(region, column, e); -311 } -312 } -313 table.close(); -314 } catch (IOException e) { -315 sink.publishWriteFailure(region, e); -316 } -317 return null; -318 } -319 } -320 -321 /** -322 * Get one row from a region on the regionserver and outputs the latency, or the failure. -323 */ -324 static class RegionServerTask implements Callable<Void> { -325 private Connection connection; -326 private String serverName; -327 private HRegionInfo region; -328 private ExtendedSink sink; -329 private AtomicLong successes; -330 -331 RegionServerTask(Connection connection, String serverName, HRegionInfo region, -332 ExtendedSink sink, AtomicLong successes) { -333 this.connection = connection; -334 this.serverName = serverName; -335 this.region = region; -336 this.sink = sink; -337 this.successes = successes; -338 } -339 -340 @Override -341 public Void call() { -342 TableName tableName = null; -343 Table table = null; -344 Get get = null; -345 byte[] startKey = null; -346 Scan scan = null; -347 StopWatch stopWatch = new StopWatch(); -348 // monitor one region on every region server -349 stopWatch.reset(); -350 try { -351 tableName = region.getTable(); -352 table = connection.getTable(tableName); -353 startKey = region.getStartKey(); -354 // Can't do a get on empty start row so do a Scan of first element if any instead. -355 if (LOG.isDebugEnabled()) { -356 LOG.debug(String.format("reading from region server %s table %s region %s and key %s", -357 serverName, region.getTable(), region.getRegionNameAsString(), -358 Bytes.toStringBinary(startKey))); -359 } -360 if (startKey.length > 0) { -361 get = new Get(startKey); -362 get.setCacheBlocks(false); -363 get.setFilter(new FirstKeyOnlyFilter()); -364 stopWatch.start(); -365 table.get(get); -366 stopWatch.stop(); -367 } else { -368 scan = new Scan(); -369 scan.setCacheBlocks(false); -370 scan.setFilter(new FirstKeyOnlyFilter()); -371 scan.setCaching(1); -372 scan.setMaxResultSize(1L); -373 scan.setSmall(true); -374 stopWatch.start(); -375 ResultScanner s = table.getScanner(scan); -376 s.next(); -377 s.close(); -378 stopWatch.stop(); -379 } -380 successes.incrementAndGet(); -381 sink.publishReadTiming(tableName.getNameAsString(), serverName, stopWatch.getTime()); -382 } catch (TableNotFoundException tnfe) { -383 LOG.error("Table may be deleted", tnfe); -384 // This is ignored because it doesn't imply that the regionserver is dead -385 } catch (TableNotEnabledException tnee) { -386 // This is considered a success since we got a response. -387 successes.incrementAndGet(); -388 LOG.debug("The targeted table was disabled. Assuming success."); -389 } catch (DoNotRetryIOException dnrioe) { -390 sink.publishReadFailure(tableName.getNameAsString(), serverName); -391 LOG.error(dnrioe); -392 } catch (IOException e) { -393 sink.publishReadFailure(tableName.getNameAsString(), serverName); -394 LOG.error(e); -395 } finally { -396 if (table != null) { -397 try { -398 table.close(); -399 } catch (IOException e) {/* DO NOTHING */ -400 LOG.error("Close table failed", e); -401 } -402 } -403 scan = null; -404 get = null; -405 startKey = null; -406 } -407 return null; -408 } -409 } -410 -411 private static final int USAGE_EXIT_CODE = 1; -412 private static final int INIT_ERROR_EXIT_CODE = 2; -413 private static final int TIMEOUT_ERROR_EXIT_CODE = 3; -414 private static final int ERROR_EXIT_CODE = 4; -415 -416 private static final long DEFAULT_INTERVAL = 6000; -417 -418 private static final long DEFAULT_TIMEOUT = 600000; // 10 mins -419 private static final int MAX_THREADS_NUM = 16; // #threads to contact regions -420 -421 private static final Log LOG = LogFactory.getLog(Canary.class); -422 -423 public static final TableName DEFAULT_WRITE_TABLE_NAME = TableName.valueOf( -424 NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "canary"); -425 -426 private static final String CANARY_TABLE_FAMILY_NAME = "Test"; -427 -428 private Configuration conf = null; -429 private long interval = 0; -430 private Sink sink = null; -431 -432 private boolean useRegExp; -433 private long timeout = DEFAULT_TIMEOUT; -434 private boolean failOnError = true; -435 private boolean regionServerMode = false; -436 private boolean regionServerAllRegions = false; -437 private boolean writeSniffing = false; -438 private TableName writeTableName = DEFAULT_WRITE_TABLE_NAME; -439 -440 private ExecutorService executor; // threads to retrieve data from regionservers -441 -442 public Canary() { -443 this(new ScheduledThreadPoolExecutor(1), new RegionServerStdOutSink()); -444 } -445 -446 public Canary(ExecutorService executor, Sink sink) { -447 this.executor = executor; -448 this.sink = sink; -449 } -450 -451 @Override -452 public Configuration getConf() { -453 return conf; -454 } -455 -456 @Override -457 public void setConf(Configuration conf) { -458 this.conf = conf; -459 } -460 -461 private int parseArgs(String[] args) { -462 int index = -1; -463 // Process command line args -464 for (int i = 0; i < args.length; i++) { -465 String cmd = args[i]; -466 -467 if (cmd.startsWith("-")) { -468 if (index >= 0) { -469 // command line args must be in the form: [opts] [table 1 [table 2 ...]] -470 System.err.println("Invalid command line options"); -471 printUsageAndExit(); -472 } +268 +269 if (LOG.isDebugEnabled()) { +270 LOG.debug(String.format("reading from table %s region %s column family %s and key %s", +271 tableDesc.getTableName(), region.getRegionNameAsString(), column.getNameAsString(), +272 Bytes.toStringBinary(startKey))); +273 } +274 try { +275 stopWatch.start(); +276 if (startKey.length > 0) { +277 table.get(get); +278 } else { +279 rs = table.getScanner(scan); +280 rs.next(); +281 } +282 stopWatch.stop(); +283 sink.publishReadTiming(region, column, stopWatch.getTime()); +284 } catch (Exception e) { +285 sink.publishReadFailure(region, column, e); +286 } finally { +287 if (rs != null) { +288 rs.close(); +289 } +290 scan = null; +291 get = null; +292 startKey = null; +293 } +294 } +295 try { +296 table.close(); +297 } catch (IOException e) { +298 LOG.error("Close table failed", e); +299 } +300 return null; +301 } +302 +303 /** +304 * Check writes for the canary table +305 * @return +306 */ +307 private Void write() { +308 Table table = null; +309 HTableDescriptor tableDesc = null; +310 try { +311 table = connection.getTable(region.getTable()); +312 tableDesc = table.getTableDescriptor(); +313 byte[] rowToCheck = region.getStartKey(); +314 if (rowToCheck.length == 0) { +315 rowToCheck = new byte[]{0x0}; +316 } +317 int writeValueSize = +318 connection.getConfiguration().getInt(HConstants.HBASE_CANARY_WRITE_VALUE_SIZE_KEY, 10); +319 for (HColumnDescriptor column : tableDesc.getColumnFamilies()) { +320 Put put = new Put(rowToCheck); +321 byte[] value = new byte[writeValueSize]; +322 Bytes.random(value); +323 put.addColumn(column.getName(), HConstants.EMPTY_BYTE_ARRAY, value); +324 +325 if (LOG.isDebugEnabled()) { +326 LOG.debug(String.format("writing to table %s region %s column family %s and key %s", +327 tableDesc.getTableName(), region.getRegionNameAsString(), column.getNameAsString(), +328 Bytes.toStringBinary(rowToCheck))); +329 } +330 try { +331 long startTime = System.currentTimeMillis(); +332 table.put(put); +333 long time = System.currentTimeMillis() - startTime; +334 sink.publishWriteTiming(region, column, time); +335 } catch (Exception e) { +336 sink.publishWriteFailure(region, column, e); +337 } +338 } +339 table.close(); +340 } catch (IOException e) { +341 sink.publishWriteFailure(region, e); +342 } +343 return null; +344 } +345 } +346 +347 /** +348 * Get one row from a region on the regionserver and outputs the latency, or the failure. +349 */ +350 static class RegionServerTask implements Callable<Void> { +351 private Connection connection; +352 private String serverName; +353 private HRegionInfo region; +354 private ExtendedSink sink; +355 private AtomicLong successes; +356 +357 RegionServerTask(Connection connection, String serverName, HRegionInfo region, +358 ExtendedSink sink, AtomicLong successes) { +359 this.connection = connection; +360 this.serverName = serverName; +361 this.region = region; +362 this.sink = sink; +363 this.successes = successes; +364 } +365 +366 @Override +367 public Void call() { +368 TableName tableName = null; +369 Table table = null; +370 Get get = null; +371 byte[] startKey = null; +372 Scan scan = null; +373 StopWatch stopWatch = new StopWatch(); +374 // monitor one region on every region server +375 stopWatch.reset(); +376 try { +377 tableName = region.getTable(); +378 table = connection.getTable(tableName); +379 startKey = region.getStartKey(); +380 // Can't do a get on empty start row so do a Scan of first element if any instead. +381 if (LOG.isDebugEnabled()) { +382 LOG.debug(String.format("reading from region server %s table %s region %s and key %s", +383 serverName, region.getTable(), region.getRegionNameAsString(), +384 Bytes.toStringBinary(startKey))); +385 } +386 if (startKey.length > 0) { +387 get = new Get(startKey); +388 get.setCacheBlocks(false); +389 get.setFilter(new FirstKeyOnlyFilter()); +390 stopWatch.start(); +391 table.get(get); +392 stopWatch.stop(); +393 } else { +394 scan = new Scan(); +395 scan.setCacheBlocks(false); +396 scan.setFilter(new FirstKeyOnlyFilter()); +397 scan.setCaching(1); +398 scan.setMaxResultSize(1L); +399 scan.setSmall(true); +400 stopWatch.start(); +401 ResultScanner s = table.getScanner(scan); +402 s.next(); +403 s.close(); +404 stopWatch.stop(); +405 } +406 successes.incrementAndGet(); +407 sink.publishReadTiming(tableName.getNameAsString(), serverName, stopWatch.getTime()); +408 } catch (TableNotFoundException tnfe) { +409 LOG.error("Table may be deleted", tnfe); +410 // This is ignored because it doesn't imply that the regionserver is dead +411 } catch (TableNotEnabledException tnee) { +412 // This is considered a success since we got a response. +413 successes.incrementAndGet(); +414 LOG.debug("The targeted table was disabled. Assuming success."); +415 } catch (DoNotRetryIOException dnrioe) { +416 sink.publishReadFailure(tableName.getNameAsString(), serverName); +417 LOG.error(dnrioe); +418 } catch (IOException e) { +419 sink.publishReadFailure(tableName.getNameAsString(), serverName); +420 LOG.error(e); +421 } finally { +422 if (table != null) { +423 try { +424 table.close(); +425 } catch (IOException e) {/* DO NOTHING */ +426 LOG.error("Close table failed", e); +427 } +428 } +429 scan = null; +430 get = null; +431 startKey = null; +432 } +433 return null; +434 } +435 } +436 +437 private static final int USAGE_EXIT_CODE = 1; +438 private static final int INIT_ERROR_EXIT_CODE = 2; +439 private static final int TIMEOUT_ERROR_EXIT_CODE = 3; +440 private static final int ERROR_EXIT_CODE = 4; +441 private static final int FAILURE_EXIT_CODE = 5; +442 +443 private static final long DEFAULT_INTERVAL = 6000; +444 +445 private static final long DEFAULT_TIMEOUT = 600000; // 10 mins +446 private static final int MAX_THREADS_NUM = 16; // #threads to contact regions +447 +448 private static final Log LOG = LogFactory.getLog(Canary.class); +449 +450 public static final TableName DEFAULT_WRITE_TABLE_NAME = TableName.valueOf( +451 NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "canary"); +452 +453 private static final String CANARY_TABLE_FAMILY_NAME = "Test"; +454 +455 private Configuration conf = null; +456 private long interval = 0; +457 private Sink sink = null; +458 +459 private boolean useRegExp; +460 private long timeout = DEFAULT_TIMEOUT; +461 private boolean failOnError = true; +462 private boolean regionServerMode = false; +463 private boolean regionServerAllRegions = false; +464 private boolean writeSniffing = false; +465 private boolean treatFailureAsError = false; +466 private TableName writeTableName = DEFAULT_WRITE_TABLE_NAME; +467 +468 private ExecutorService executor; // threads to retrieve data from regionservers +469 +470 public Canary() { +471 this(new ScheduledThreadPoolExecutor(1), new RegionServerStdOutSink()); +472 } 473 -474 if (cmd.equals("-help")) { -475 // user asked for help, print the help and quit. -476 printUsageAndExit(); -477 } else if (cmd.equals("-daemon") && interval == 0) { -478 // user asked for daemon mode, set a default interval between checks -479 interval = DEFAULT_INTERVAL; -480 } else if (cmd.equals("-interval")) { -481 // user has specified an interval for canary breaths (-interval N) -482 i++; +474 public Canary(ExecutorService executor, Sink sink) { +475 this.executor = executor; +476 this.sink = sink; +477 } +478 +479 @Override +480 public Configuration getConf() { +481 return conf; +482 } 483 -484 if (i == args.length) { -485 System.err.println("-interval needs a numeric value argument."); -486 printUsageAndExit(); -487 } +484 @Override +485 public void setConf(Configuration conf) { +486 this.conf = conf; +487 } 488 -489 try { -490 interval = Long.parseLong(args[i]) * 1000; -491 } catch (NumberFormatException e) { -492 System.err.println("-interval needs a numeric value argument."); -493 printUsageAndExit(); -494 } -495 } else if(cmd.equals("-regionserver")) { -496 this.regionServerMode = true; -497 } else if(cmd.equals("-allRegions")) { -498 this.regionServerAllRegions = true; -499 } else if(cmd.equals("-writeSniffing")) { -500 this.writeSniffing = true; -501 } else if (cmd.equals("-e")) { -502 this.useRegExp = true; -503 } else if (cmd.equals("-t")) { -504 i++; -505 -506 if (i == args.length) { -507 System.err.println("-t needs a numeric value argument."); -508 printUsageAndExit(); -509 } -510 -511 try { -512 this.timeout = Long.parseLong(args[i]); -513 } catch (NumberFormatException e) { -514 System.err.println("-t needs a numeric value argument."); -515 printUsageAndExit(); -516 } -517 } else if (cmd.equals("-writeTable")) { -518 i++; -519 -520 if (i == args.length) { -521 System.err.println("-writeTable needs a string value argument."); -522 printUsageAndExit(); -523 } -524 this.writeTableName = TableName.valueOf(args[i]); -525 } else if (cmd.equals("-f")) { -526 i++; -527 -528 if (i == args.length) { -529 System.err -530 .println("-f needs a boolean value argument (true|false)."); -531 printUsageAndExit(); -532 } -533 -534 this.failOnError = Boolean.parseBoolean(args[i]); -535 } else { -536 // no options match -537 System.err.println(cmd + " options is invalid."); -538 printUsageAndExit(); -539 } -540 } else if (index < 0) { -541 // keep track of first table name specified by the user -542 index = i; -543 } -544 } -545 if (this.regionServerAllRegions && !this.regionServerMode) { -546 System.err.println("-allRegions can only be specified in regionserver mode."); -547 printUsageAndExit(); -548 } -549 return index; -550 } -551 -552 @Override -553 public int run(String[] args) throws Exception { -554 int index = parseArgs(args); -555 ChoreService choreService = null; -556 -557 // Launches chore for refreshing kerberos credentials if security is enabled. -558 // Please see http://hbase.apache.org/book.html#_running_canary_in_a_kerberos_enabled_cluster -559 // for more details. -560 final ScheduledChore authChore = AuthUtil.getAuthChore(conf); -561 if (authChore != null) { -562 choreService = new ChoreService("CANARY_TOOL"); -563 choreService.scheduleChore(authChore); -564 } -565 -566 // Start to prepare the stuffs -567 Monitor monitor = null; -568 Thread monitorThread = null; -569 long startTime = 0; -570 long currentTimeLength = 0; -571 // Get a connection to use in below. -572 try (Connection connection = ConnectionFactory.createConnection(this.conf)) { -573 do { -574 // Do monitor !! -575 try { -576 monitor = this.newMonitor(connection, index, args); -577 monitorThread = new Thread(monitor); -578 startTime = System.currentTimeMillis(); -579 monitorThread.start(); -580 while (!monitor.isDone()) { -581 // wait for 1 sec -582 Thread.sleep(1000); -583 // exit if any error occurs -584 if (this.failOnError && monitor.hasError()) { -585 monitorThread.interrupt(); -586 if (monitor.initialized) { -587 return monitor.errorCode; -588 } else { -589 return INIT_ERROR_EXIT_CODE; -590 } -591 } -592 currentTimeLength = System.currentTimeMillis() - startTime; -593 if (currentTimeLength > this.timeout) { -594 LOG.error("The monitor is running too long (" + currentTimeLength -595 + ") after timeout limit:" + this.timeout -596 + " will be killed itself !!"); -597 if (monitor.initialized) { -598 return TIMEOUT_ERROR_EXIT_CODE; -599 } else { -600 return INIT_ERROR_EXIT_CODE; -601 } -602 } -603 } -604 -605 if (this.failOnError && monitor.hasError()) { -606 monitorThread.interrupt(); -607 return monitor.errorCode; -608 } -609 } finally { -610 if (monitor != null) monitor.close(); -611 } -612 -613 Thread.sleep(interval); -614 } while (interval > 0); -615 } // try-with-resources close -616 -617 if (choreService != null) { -618 choreService.shutdown(); -619 } -620 return monitor.errorCode; -621 } -622 -623 private void printUsageAndExit() { -624 System.err.printf( -625 "Usage: bin/hbase %s [opts] [table1 [table2]...] | [regionserver1 [regionserver2]..]%n", -626 getClass().getName()); -627 System.err.println(" where [opts] are:"); -628 System.err.println(" -help Show this help and exit."); -629 System.err.println(" -regionserver replace the table argument to regionserver,"); -630 System.err.println(" which means to enable regionserver mode"); -631 System.err.println(" -allRegions Tries all regions on a regionserver,"); -632 System.err.println(" only works in regionserver mode."); -633 System.err.println(" -daemon Continuous check at defined intervals."); -634 System.err.println(" -interval <N> Interval between checks (sec)"); -635 System.err.println(" -e Use table/regions