Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 17C86200B91 for ; Thu, 29 Sep 2016 22:09:42 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 16752160AE3; Thu, 29 Sep 2016 20:09:42 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id B2EE8160AEB for ; Thu, 29 Sep 2016 22:09:39 +0200 (CEST) Received: (qmail 91739 invoked by uid 500); 29 Sep 2016 20:09:33 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 90433 invoked by uid 99); 29 Sep 2016 20:09:32 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 29 Sep 2016 20:09:32 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C2F71E08B3; Thu, 29 Sep 2016 20:09:32 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: dimaspivak@apache.org To: commits@hbase.apache.org Date: Thu, 29 Sep 2016 20:09:57 -0000 Message-Id: <9c78511f7c8f4034a3bef65e9f483590@git.apache.org> In-Reply-To: <5fc1f7e7d1a64a189b2bef86444ee964@git.apache.org> References: <5fc1f7e7d1a64a189b2bef86444ee964@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [27/52] [partial] hbase-site git commit: Published site at 63808a224c8689d07e55f90efd25f9597b0d04dd. archived-at: Thu, 29 Sep 2016 20:09:42 -0000 http://git-wip-us.apache.org/repos/asf/hbase-site/blob/044b3379/apidocs/src-html/org/apache/hadoop/hbase/client/HTableMultiplexer.html ---------------------------------------------------------------------- diff --git a/apidocs/src-html/org/apache/hadoop/hbase/client/HTableMultiplexer.html b/apidocs/src-html/org/apache/hadoop/hbase/client/HTableMultiplexer.html index d607296..e22025b 100644 --- a/apidocs/src-html/org/apache/hadoop/hbase/client/HTableMultiplexer.html +++ b/apidocs/src-html/org/apache/hadoop/hbase/client/HTableMultiplexer.html @@ -57,616 +57,615 @@ 049import org.apache.hadoop.hbase.TableName; 050import org.apache.hadoop.hbase.classification.InterfaceAudience; 051import org.apache.hadoop.hbase.classification.InterfaceStability; -052import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture; -053import org.apache.hadoop.hbase.ipc.RpcControllerFactory; -054import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -055 -056/** -057 * HTableMultiplexer provides a thread-safe non blocking PUT API across all the tables. -058 * Each put will be sharded into different buffer queues based on its destination region server. -059 * So each region server buffer queue will only have the puts which share the same destination. -060 * And each queue will have a flush worker thread to flush the puts request to the region server. -061 * If any queue is full, the HTableMultiplexer starts to drop the Put requests for that -062 * particular queue. -063 * -064 * Also all the puts will be retried as a configuration number before dropping. -065 * And the HTableMultiplexer can report the number of buffered requests and the number of the -066 * failed (dropped) requests in total or on per region server basis. -067 * -068 * This class is thread safe. -069 */ -070@InterfaceAudience.Public -071@InterfaceStability.Evolving -072public class HTableMultiplexer { -073 private static final Log LOG = LogFactory.getLog(HTableMultiplexer.class.getName()); -074 -075 public static final String TABLE_MULTIPLEXER_FLUSH_PERIOD_MS = -076 "hbase.tablemultiplexer.flush.period.ms"; -077 public static final String TABLE_MULTIPLEXER_INIT_THREADS = "hbase.tablemultiplexer.init.threads"; -078 public static final String TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE = -079 "hbase.client.max.retries.in.queue"; -080 -081 /** The map between each region server to its flush worker */ -082 private final Map<HRegionLocation, FlushWorker> serverToFlushWorkerMap = -083 new ConcurrentHashMap<>(); -084 -085 private final Configuration workerConf; -086 private final ClusterConnection conn; -087 private final ExecutorService pool; -088 private final int maxAttempts; -089 private final int perRegionServerBufferQueueSize; -090 private final int maxKeyValueSize; -091 private final ScheduledExecutorService executor; -092 private final long flushPeriod; -093 -094 /** -095 * @param conf The HBaseConfiguration -096 * @param perRegionServerBufferQueueSize determines the max number of the buffered Put ops for -097 * each region server before dropping the request. -098 */ -099 public HTableMultiplexer(Configuration conf, int perRegionServerBufferQueueSize) -100 throws IOException { -101 this(ConnectionFactory.createConnection(conf), conf, perRegionServerBufferQueueSize); -102 } -103 -104 /** -105 * @param conn The HBase connection. -106 * @param conf The HBase configuration -107 * @param perRegionServerBufferQueueSize determines the max number of the buffered Put ops for -108 * each region server before dropping the request. -109 */ -110 public HTableMultiplexer(Connection conn, Configuration conf, -111 int perRegionServerBufferQueueSize) { -112 this.conn = (ClusterConnection) conn; -113 this.pool = HTable.getDefaultExecutor(conf); -114 // how many times we could try in total, one more than retry number -115 this.maxAttempts = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, -116 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER) + 1; -117 this.perRegionServerBufferQueueSize = perRegionServerBufferQueueSize; -118 this.maxKeyValueSize = HTable.getMaxKeyValueSize(conf); -119 this.flushPeriod = conf.getLong(TABLE_MULTIPLEXER_FLUSH_PERIOD_MS, 100); -120 int initThreads = conf.getInt(TABLE_MULTIPLEXER_INIT_THREADS, 10); -121 this.executor = -122 Executors.newScheduledThreadPool(initThreads, -123 new ThreadFactoryBuilder().setDaemon(true).setNameFormat("HTableFlushWorker-%d").build()); -124 -125 this.workerConf = HBaseConfiguration.create(conf); -126 // We do not do the retry because we need to reassign puts to different queues if regions are -127 // moved. -128 this.workerConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0); -129 } -130 -131 /** -132 * Closes the internal {@link Connection}. Does nothing if the {@link Connection} has already -133 * been closed. -134 * @throws IOException If there is an error closing the connection. -135 */ -136 @SuppressWarnings("deprecation") -137 public synchronized void close() throws IOException { -138 if (!getConnection().isClosed()) { -139 getConnection().close(); -140 } -141 } -142 -143 /** -144 * The put request will be buffered by its corresponding buffer queue. Return false if the queue -145 * is already full. -146 * @param tableName -147 * @param put -148 * @return true if the request can be accepted by its corresponding buffer queue. -149 */ -150 public boolean put(TableName tableName, final Put put) { -151 return put(tableName, put, this.maxAttempts); -152 } -153 -154 /** -155 * The puts request will be buffered by their corresponding buffer queue. -156 * Return the list of puts which could not be queued. -157 * @param tableName -158 * @param puts -159 * @return the list of puts which could not be queued -160 */ -161 public List<Put> put(TableName tableName, final List<Put> puts) { -162 if (puts == null) -163 return null; -164 -165 List <Put> failedPuts = null; -166 boolean result; -167 for (Put put : puts) { -168 result = put(tableName, put, this.maxAttempts); -169 if (result == false) { -170 -171 // Create the failed puts list if necessary -172 if (failedPuts == null) { -173 failedPuts = new ArrayList<Put>(); -174 } -175 // Add the put to the failed puts list -176 failedPuts.add(put); -177 } -178 } -179 return failedPuts; -180 } -181 -182 /** -183 * @deprecated Use {@link #put(TableName, List) } instead. -184 */ -185 @Deprecated -186 public List<Put> put(byte[] tableName, final List<Put> puts) { -187 return put(TableName.valueOf(tableName), puts); -188 } -189 -190 /** -191 * The put request will be buffered by its corresponding buffer queue. And the put request will be -192 * retried before dropping the request. -193 * Return false if the queue is already full. -194 * @return true if the request can be accepted by its corresponding buffer queue. -195 */ -196 public boolean put(final TableName tableName, final Put put, int maxAttempts) { -197 if (maxAttempts <= 0) { -198 return false; -199 } -200 -201 try { -202 HTable.validatePut(put, maxKeyValueSize); -203 // Allow mocking to get at the connection, but don't expose the connection to users. -204 ClusterConnection conn = (ClusterConnection) getConnection(); -205 // AsyncProcess in the FlushWorker should take care of refreshing the location cache -206 // as necessary. We shouldn't have to do that here. -207 HRegionLocation loc = conn.getRegionLocation(tableName, put.getRow(), false); -208 if (loc != null) { -209 // Add the put pair into its corresponding queue. -210 LinkedBlockingQueue<PutStatus> queue = getQueue(loc); -211 -212 // Generate a MultiPutStatus object and offer it into the queue -213 PutStatus s = new PutStatus(loc.getRegionInfo(), put, maxAttempts); -214 -215 return queue.offer(s); -216 } -217 } catch (IOException e) { -218 LOG.debug("Cannot process the put " + put, e); -219 } -220 return false; -221 } -222 -223 /** -224 * @deprecated Use {@link #put(TableName, Put) } instead. -225 */ -226 @Deprecated -227 public boolean put(final byte[] tableName, final Put put, int retry) { -228 return put(TableName.valueOf(tableName), put, retry); -229 } -230 -231 /** -232 * @deprecated Use {@link #put(TableName, Put)} instead. -233 */ -234 @Deprecated -235 public boolean put(final byte[] tableName, Put put) { -236 return put(TableName.valueOf(tableName), put); -237 } -238 -239 /** -240 * @return the current HTableMultiplexerStatus -241 */ -242 public HTableMultiplexerStatus getHTableMultiplexerStatus() { -243 return new HTableMultiplexerStatus(serverToFlushWorkerMap); -244 } -245 -246 @VisibleForTesting -247 LinkedBlockingQueue<PutStatus> getQueue(HRegionLocation addr) { -248 FlushWorker worker = serverToFlushWorkerMap.get(addr); -249 if (worker == null) { -250 synchronized (this.serverToFlushWorkerMap) { -251 worker = serverToFlushWorkerMap.get(addr); -252 if (worker == null) { -253 // Create the flush worker -254 worker = new FlushWorker(workerConf, this.conn, addr, this, -255 perRegionServerBufferQueueSize, pool, executor); -256 this.serverToFlushWorkerMap.put(addr, worker); -257 executor.scheduleAtFixedRate(worker, flushPeriod, flushPeriod, TimeUnit.MILLISECONDS); -258 } -259 } -260 } -261 return worker.getQueue(); -262 } -263 -264 @VisibleForTesting -265 ClusterConnection getConnection() { -266 return this.conn; -267 } -268 -269 /** -270 * HTableMultiplexerStatus keeps track of the current status of the HTableMultiplexer. -271 * report the number of buffered requests and the number of the failed (dropped) requests -272 * in total or on per region server basis. -273 */ -274 @InterfaceAudience.Public -275 @InterfaceStability.Evolving -276 public static class HTableMultiplexerStatus { -277 private long totalFailedPutCounter; -278 private long totalBufferedPutCounter; -279 private long maxLatency; -280 private long overallAverageLatency; -281 private Map<String, Long> serverToFailedCounterMap; -282 private Map<String, Long> serverToBufferedCounterMap; -283 private Map<String, Long> serverToAverageLatencyMap; -284 private Map<String, Long> serverToMaxLatencyMap; -285 -286 public HTableMultiplexerStatus( -287 Map<HRegionLocation, FlushWorker> serverToFlushWorkerMap) { -288 this.totalBufferedPutCounter = 0; -289 this.totalFailedPutCounter = 0; -290 this.maxLatency = 0; -291 this.overallAverageLatency = 0; -292 this.serverToBufferedCounterMap = new HashMap<String, Long>(); -293 this.serverToFailedCounterMap = new HashMap<String, Long>(); -294 this.serverToAverageLatencyMap = new HashMap<String, Long>(); -295 this.serverToMaxLatencyMap = new HashMap<String, Long>(); -296 this.initialize(serverToFlushWorkerMap); -297 } -298 -299 private void initialize( -300 Map<HRegionLocation, FlushWorker> serverToFlushWorkerMap) { -301 if (serverToFlushWorkerMap == null) { -302 return; -303 } -304 -305 long averageCalcSum = 0; -306 int averageCalcCount = 0; -307 for (Map.Entry<HRegionLocation, FlushWorker> entry : serverToFlushWorkerMap -308 .entrySet()) { -309 HRegionLocation addr = entry.getKey(); -310 FlushWorker worker = entry.getValue(); -311 -312 long bufferedCounter = worker.getTotalBufferedCount(); -313 long failedCounter = worker.getTotalFailedCount(); -314 long serverMaxLatency = worker.getMaxLatency(); -315 AtomicAverageCounter averageCounter = worker.getAverageLatencyCounter(); -316 // Get sum and count pieces separately to compute overall average -317 SimpleEntry<Long, Integer> averageComponents = averageCounter -318 .getComponents(); -319 long serverAvgLatency = averageCounter.getAndReset(); -320 -321 this.totalBufferedPutCounter += bufferedCounter; -322 this.totalFailedPutCounter += failedCounter; -323 if (serverMaxLatency > this.maxLatency) { -324 this.maxLatency = serverMaxLatency; -325 } -326 averageCalcSum += averageComponents.getKey(); -327 averageCalcCount += averageComponents.getValue(); -328 -329 this.serverToBufferedCounterMap.put(addr.getHostnamePort(), -330 bufferedCounter); -331 this.serverToFailedCounterMap -332 .put(addr.getHostnamePort(), -333 failedCounter); -334 this.serverToAverageLatencyMap.put(addr.getHostnamePort(), -335 serverAvgLatency); -336 this.serverToMaxLatencyMap -337 .put(addr.getHostnamePort(), -338 serverMaxLatency); -339 } -340 this.overallAverageLatency = averageCalcCount != 0 ? averageCalcSum -341 / averageCalcCount : 0; -342 } -343 -344 public long getTotalBufferedCounter() { -345 return this.totalBufferedPutCounter; -346 } -347 -348 public long getTotalFailedCounter() { -349 return this.totalFailedPutCounter; -350 } -351 -352 public long getMaxLatency() { -353 return this.maxLatency; -354 } -355 -356 public long getOverallAverageLatency() { -357 return this.overallAverageLatency; -358 } -359 -360 public Map<String, Long> getBufferedCounterForEachRegionServer() { -361 return this.serverToBufferedCounterMap; -362 } -363 -364 public Map<String, Long> getFailedCounterForEachRegionServer() { -365 return this.serverToFailedCounterMap; -366 } -367 -368 public Map<String, Long> getMaxLatencyForEachRegionServer() { -369 return this.serverToMaxLatencyMap; -370 } -371 -372 public Map<String, Long> getAverageLatencyForEachRegionServer() { -373 return this.serverToAverageLatencyMap; -374 } -375 } -376 -377 @VisibleForTesting -378 static class PutStatus { -379 final HRegionInfo regionInfo; -380 final Put put; -381 final int maxAttempCount; -382 -383 public PutStatus(HRegionInfo regionInfo, Put put, int maxAttempCount) { -384 this.regionInfo = regionInfo; -385 this.put = put; -386 this.maxAttempCount = maxAttempCount; -387 } -388 } -389 -390 /** -391 * Helper to count the average over an interval until reset. -392 */ -393 private static class AtomicAverageCounter { -394 private long sum; -395 private int count; -396 -397 public AtomicAverageCounter() { -398 this.sum = 0L; -399 this.count = 0; -400 } -401 -402 public synchronized long getAndReset() { -403 long result = this.get(); -404 this.reset(); -405 return result; -406 } -407 -408 public synchronized long get() { -409 if (this.count == 0) { -410 return 0; -411 } -412 return this.sum / this.count; -413 } -414 -415 public synchronized SimpleEntry<Long, Integer> getComponents() { -416 return new SimpleEntry<Long, Integer>(sum, count); -417 } -418 -419 public synchronized void reset() { -420 this.sum = 0L; -421 this.count = 0; -422 } -423 -424 public synchronized void add(long value) { -425 this.sum += value; -426 this.count++; -427 } -428 } -429 -430 @VisibleForTesting -431 static class FlushWorker implements Runnable { -432 private final HRegionLocation addr; -433 private final LinkedBlockingQueue<PutStatus> queue; -434 private final HTableMultiplexer multiplexer; -435 private final AtomicLong totalFailedPutCount = new AtomicLong(0); -436 private final AtomicInteger currentProcessingCount = new AtomicInteger(0); -437 private final AtomicAverageCounter averageLatency = new AtomicAverageCounter(); -438 private final AtomicLong maxLatency = new AtomicLong(0); -439 -440 private final AsyncProcess ap; -441 private final List<PutStatus> processingList = new ArrayList<>(); -442 private final ScheduledExecutorService executor; -443 private final int maxRetryInQueue; -444 private final AtomicInteger retryInQueue = new AtomicInteger(0); -445 private final int writeRpcTimeout; // needed to pass in through AsyncProcess constructor -446 -447 public FlushWorker(Configuration conf, ClusterConnection conn, HRegionLocation addr, -448 HTableMultiplexer htableMultiplexer, int perRegionServerBufferQueueSize, -449 ExecutorService pool, ScheduledExecutorService executor) { -450 this.addr = addr; -451 this.multiplexer = htableMultiplexer; -452 this.queue = new LinkedBlockingQueue<>(perRegionServerBufferQueueSize); -453 RpcRetryingCallerFactory rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf); -454 RpcControllerFactory rpcControllerFactory = RpcControllerFactory.instantiate(conf); -455 this.writeRpcTimeout = conf.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, -456 conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, -457 HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); -458 this.ap = new AsyncProcess(conn, conf, pool, rpcCallerFactory, false, rpcControllerFactory, writeRpcTimeout); -459 this.executor = executor; -460 this.maxRetryInQueue = conf.getInt(TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE, 10000); -461 } -462 -463 protected LinkedBlockingQueue<PutStatus> getQueue() { -464 return this.queue; -465 } -466 -467 public long getTotalFailedCount() { -468 return totalFailedPutCount.get(); -469 } -470 -471 public long getTotalBufferedCount() { -472 return queue.size() + currentProcessingCount.get(); -473 } -474 -475 public AtomicAverageCounter getAverageLatencyCounter() { -476 return this.averageLatency; -477 } -478 -479 public long getMaxLatency() { -480 return this.maxLatency.getAndSet(0); -481 } -482 -483 boolean resubmitFailedPut(PutStatus ps, HRegionLocation oldLoc) throws IOException { -484 // Decrease the retry count -485 final int retryCount = ps.maxAttempCount - 1; -486 -487 if (retryCount <= 0) { -488 // Update the failed counter and no retry any more. -489 return false; -490 } -491 -492 int cnt = getRetryInQueue().incrementAndGet(); -493 if (cnt > getMaxRetryInQueue()) { -494 // Too many Puts in queue for resubmit, give up this -495 getRetryInQueue().decrementAndGet(); -496 return false; -497 } -498 -499 final Put failedPut = ps.put; -500 // The currentPut is failed. So get the table name for the currentPut. -501 final TableName tableName = ps.regionInfo.getTable(); -502 -503 long delayMs = getNextDelay(retryCount); -504 if (LOG.isDebugEnabled()) { -505 LOG.debug("resubmitting after " + delayMs + "ms: " + retryCount); -506 } -507 -508 // HBASE-12198, HBASE-15221, HBASE-15232: AsyncProcess should be responsible for updating -509 // the region location cache when the Put original failed with some exception. If we keep -510 // re-trying the same Put to the same location, AsyncProcess isn't doing the right stuff -511 // that we expect it to. -512 getExecutor().schedule(new Runnable() { -513 @Override -514 public void run() { -515 boolean succ = false; -516 try { -517 succ = FlushWorker.this.getMultiplexer().put(tableName, failedPut, retryCount); -518 } finally { -519 FlushWorker.this.getRetryInQueue().decrementAndGet(); -520 if (!succ) { -521 FlushWorker.this.getTotalFailedPutCount().incrementAndGet(); -522 } -523 } -524 } -525 }, delayMs, TimeUnit.MILLISECONDS); -526 return true; -527 } -528 -529 @VisibleForTesting -530 long getNextDelay(int retryCount) { -531 return ConnectionUtils.getPauseTime(multiplexer.flushPeriod, -532 multiplexer.maxAttempts - retryCount - 1); -533 } -534 -535 @VisibleForTesting -536 AtomicInteger getRetryInQueue() { -537 return this.retryInQueue; -538 } -539 -540 @VisibleForTesting -541 int getMaxRetryInQueue() { -542 return this.maxRetryInQueue; -543 } -544 -545 @VisibleForTesting -546 AtomicLong getTotalFailedPutCount() { -547 return this.totalFailedPutCount; -548 } -549 -550 @VisibleForTesting -551 HTableMultiplexer getMultiplexer() { -552 return this.multiplexer; -553 } -554 -555 @VisibleForTesting -556 ScheduledExecutorService getExecutor() { -557 return this.executor; -558 } -559 -560 @Override -561 public void run() { -562 int failedCount = 0; -563 try { -564 long start = EnvironmentEdgeManager.currentTime(); -565 -566 // drain all the queued puts into the tmp list -567 processingList.clear(); -568 queue.drainTo(processingList); -569 if (processingList.size() == 0) { -570 // Nothing to flush -571 return; -572 } -573 -574 currentProcessingCount.set(processingList.size()); -575 // failedCount is decreased whenever a Put is success or resubmit. -576 failedCount = processingList.size(); -577 -578 List<Action<Row>> retainedActions = new ArrayList<>(processingList.size()); -579 MultiAction<Row> actions = new MultiAction<>(); -580 for (int i = 0; i < processingList.size(); i++) { -581 PutStatus putStatus = processingList.get(i); -582 Action<Row> action = new Action<Row>(putStatus.put, i); -583 actions.add(putStatus.regionInfo.getRegionName(), action); -584 retainedActions.add(action); -585 } -586 -587 // Process this multi-put request -588 List<PutStatus> failed = null; -589 Object[] results = new Object[actions.size()]; -590 ServerName server = addr.getServerName(); -591 Map<ServerName, MultiAction<Row>> actionsByServer = -592 Collections.singletonMap(server, actions); -593 try { -594 AsyncRequestFuture arf = -595 ap.submitMultiActions(null, retainedActions, 0L, null, results, true, null, -596 null, actionsByServer, null); -597 arf.waitUntilDone(); -598 if (arf.hasError()) { -599 // We just log and ignore the exception here since failed Puts will be resubmit again. -600 LOG.debug("Caught some exceptions when flushing puts to region server " -601 + addr.getHostnamePort(), arf.getErrors()); -602 } -603 } finally { -604 for (int i = 0; i < results.length; i++) { -605 if (results[i] instanceof Result) { -606 failedCount--; -607 } else { -608 if (failed == null) { -609 failed = new ArrayList<PutStatus>(); -610 } -611 failed.add(processingList.get(i)); -612 } -613 } -614 } -615 -616 if (failed != null) { -617 // Resubmit failed puts -618 for (PutStatus putStatus : failed) { -619 if (resubmitFailedPut(putStatus, this.addr)) { -620 failedCount--; -621 } -622 } -623 } -624 -625 long elapsed = EnvironmentEdgeManager.currentTime() - start; -626 // Update latency counters -627 averageLatency.add(elapsed); -628 if (elapsed > maxLatency.get()) { -629 maxLatency.set(elapsed); -630 } -631 -632 // Log some basic info -633 if (LOG.isDebugEnabled()) { -634 LOG.debug("Processed " + currentProcessingCount + " put requests for " -635 + addr.getHostnamePort() + " and " + failedCount + " failed" -636 + ", latency for this send: " + elapsed); -637 } -638 -639 // Reset the current processing put count -640 currentProcessingCount.set(0); -641 } catch (RuntimeException e) { -642 // To make findbugs happy -643 // Log all the exceptions and move on -644 LOG.debug( -645 "Caught some exceptions " + e + " when flushing puts to region server " -646 + addr.getHostnamePort(), e); -647 } catch (Exception e) { -648 if (e instanceof InterruptedException) { -649 Thread.currentThread().interrupt(); -650 } -651 // Log all the exceptions and move on -652 LOG.debug( -653 "Caught some exceptions " + e + " when flushing puts to region server " -654 + addr.getHostnamePort(), e); -655 } finally { -656 // Update the totalFailedCount -657 this.totalFailedPutCount.addAndGet(failedCount); -658 } -659 } -660 } -661} +052import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +053import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +054 +055/** +056 * HTableMultiplexer provides a thread-safe non blocking PUT API across all the tables. +057 * Each put will be sharded into different buffer queues based on its destination region server. +058 * So each region server buffer queue will only have the puts which share the same destination. +059 * And each queue will have a flush worker thread to flush the puts request to the region server. +060 * If any queue is full, the HTableMultiplexer starts to drop the Put requests for that +061 * particular queue. +062 * +063 * Also all the puts will be retried as a configuration number before dropping. +064 * And the HTableMultiplexer can report the number of buffered requests and the number of the +065 * failed (dropped) requests in total or on per region server basis. +066 * +067 * This class is thread safe. +068 */ +069@InterfaceAudience.Public +070@InterfaceStability.Evolving +071public class HTableMultiplexer { +072 private static final Log LOG = LogFactory.getLog(HTableMultiplexer.class.getName()); +073 +074 public static final String TABLE_MULTIPLEXER_FLUSH_PERIOD_MS = +075 "hbase.tablemultiplexer.flush.period.ms"; +076 public static final String TABLE_MULTIPLEXER_INIT_THREADS = "hbase.tablemultiplexer.init.threads"; +077 public static final String TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE = +078 "hbase.client.max.retries.in.queue"; +079 +080 /** The map between each region server to its flush worker */ +081 private final Map<HRegionLocation, FlushWorker> serverToFlushWorkerMap = +082 new ConcurrentHashMap<>(); +083 +084 private final Configuration workerConf; +085 private final ClusterConnection conn; +086 private final ExecutorService pool; +087 private final int maxAttempts; +088 private final int perRegionServerBufferQueueSize; +089 private final int maxKeyValueSize; +090 private final ScheduledExecutorService executor; +091 private final long flushPeriod; +092 +093 /** +094 * @param conf The HBaseConfiguration +095 * @param perRegionServerBufferQueueSize determines the max number of the buffered Put ops for +096 * each region server before dropping the request. +097 */ +098 public HTableMultiplexer(Configuration conf, int perRegionServerBufferQueueSize) +099 throws IOException { +100 this(ConnectionFactory.createConnection(conf), conf, perRegionServerBufferQueueSize); +101 } +102 +103 /** +104 * @param conn The HBase connection. +105 * @param conf The HBase configuration +106 * @param perRegionServerBufferQueueSize determines the max number of the buffered Put ops for +107 * each region server before dropping the request. +108 */ +109 public HTableMultiplexer(Connection conn, Configuration conf, +110 int perRegionServerBufferQueueSize) { +111 this.conn = (ClusterConnection) conn; +112 this.pool = HTable.getDefaultExecutor(conf); +113 // how many times we could try in total, one more than retry number +114 this.maxAttempts = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, +115 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER) + 1; +116 this.perRegionServerBufferQueueSize = perRegionServerBufferQueueSize; +117 this.maxKeyValueSize = HTable.getMaxKeyValueSize(conf); +118 this.flushPeriod = conf.getLong(TABLE_MULTIPLEXER_FLUSH_PERIOD_MS, 100); +119 int initThreads = conf.getInt(TABLE_MULTIPLEXER_INIT_THREADS, 10); +120 this.executor = +121 Executors.newScheduledThreadPool(initThreads, +122 new ThreadFactoryBuilder().setDaemon(true).setNameFormat("HTableFlushWorker-%d").build()); +123 +124 this.workerConf = HBaseConfiguration.create(conf); +125 // We do not do the retry because we need to reassign puts to different queues if regions are +126 // moved. +127 this.workerConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0); +128 } +129 +130 /** +131 * Closes the internal {@link Connection}. Does nothing if the {@link Connection} has already +132 * been closed. +133 * @throws IOException If there is an error closing the connection. +134 */ +135 @SuppressWarnings("deprecation") +136 public synchronized void close() throws IOException { +137 if (!getConnection().isClosed()) { +138 getConnection().close(); +139 } +140 } +141 +142 /** +143 * The put request will be buffered by its corresponding buffer queue. Return false if the queue +144 * is already full. +145 * @param tableName +146 * @param put +147 * @return true if the request can be accepted by its corresponding buffer queue. +148 */ +149 public boolean put(TableName tableName, final Put put) { +150 return put(tableName, put, this.maxAttempts); +151 } +152 +153 /** +154 * The puts request will be buffered by their corresponding buffer queue. +155 * Return the list of puts which could not be queued. +156 * @param tableName +157 * @param puts +158 * @return the list of puts which could not be queued +159 */ +160 public List<Put> put(TableName tableName, final List<Put> puts) { +161 if (puts == null) +162 return null; +163 +164 List <Put> failedPuts = null; +165 boolean result; +166 for (Put put : puts) { +167 result = put(tableName, put, this.maxAttempts); +168 if (result == false) { +169 +170 // Create the failed puts list if necessary +171 if (failedPuts == null) { +172 failedPuts = new ArrayList<Put>(); +173 } +174 // Add the put to the failed puts list +175 failedPuts.add(put); +176 } +177 } +178 return failedPuts; +179 } +180 +181 /** +182 * @deprecated Use {@link #put(TableName, List) } instead. +183 */ +184 @Deprecated +185 public List<Put> put(byte[] tableName, final List<Put> puts) { +186 return put(TableName.valueOf(tableName), puts); +187 } +188 +189 /** +190 * The put request will be buffered by its corresponding buffer queue. And the put request will be +191 * retried before dropping the request. +192 * Return false if the queue is already full. +193 * @return true if the request can be accepted by its corresponding buffer queue. +194 */ +195 public boolean put(final TableName tableName, final Put put, int maxAttempts) { +196 if (maxAttempts <= 0) { +197 return false; +198 } +199 +200 try { +201 HTable.validatePut(put, maxKeyValueSize); +202 // Allow mocking to get at the connection, but don't expose the connection to users. +203 ClusterConnection conn = (ClusterConnection) getConnection(); +204 // AsyncProcess in the FlushWorker should take care of refreshing the location cache +205 // as necessary. We shouldn't have to do that here. +206 HRegionLocation loc = conn.getRegionLocation(tableName, put.getRow(), false); +207 if (loc != null) { +208 // Add the put pair into its corresponding queue. +209 LinkedBlockingQueue<PutStatus> queue = getQueue(loc); +210 +211 // Generate a MultiPutStatus object and offer it into the queue +212 PutStatus s = new PutStatus(loc.getRegionInfo(), put, maxAttempts); +213 +214 return queue.offer(s); +215 } +216 } catch (IOException e) { +217 LOG.debug("Cannot process the put " + put, e); +218 } +219 return false; +220 } +221 +222 /** +223 * @deprecated Use {@link #put(TableName, Put) } instead. +224 */ +225 @Deprecated +226 public boolean put(final byte[] tableName, final Put put, int retry) { +227 return put(TableName.valueOf(tableName), put, retry); +228 } +229 +230 /** +231 * @deprecated Use {@link #put(TableName, Put)} instead. +232 */ +233 @Deprecated +234 public boolean put(final byte[] tableName, Put put) { +235 return put(TableName.valueOf(tableName), put); +236 } +237 +238 /** +239 * @return the current HTableMultiplexerStatus +240 */ +241 public HTableMultiplexerStatus getHTableMultiplexerStatus() { +242 return new HTableMultiplexerStatus(serverToFlushWorkerMap); +243 } +244 +245 @VisibleForTesting +246 LinkedBlockingQueue<PutStatus> getQueue(HRegionLocation addr) { +247 FlushWorker worker = serverToFlushWorkerMap.get(addr); +248 if (worker == null) { +249 synchronized (this.serverToFlushWorkerMap) { +250 worker = serverToFlushWorkerMap.get(addr); +251 if (worker == null) { +252 // Create the flush worker +253 worker = new FlushWorker(workerConf, this.conn, addr, this, +254 perRegionServerBufferQueueSize, pool, executor); +255 this.serverToFlushWorkerMap.put(addr, worker); +256 executor.scheduleAtFixedRate(worker, flushPeriod, flushPeriod, TimeUnit.MILLISECONDS); +257 } +258 } +259 } +260 return worker.getQueue(); +261 } +262 +263 @VisibleForTesting +264 ClusterConnection getConnection() { +265 return this.conn; +266 } +267 +268 /** +269 * HTableMultiplexerStatus keeps track of the current status of the HTableMultiplexer. +270 * report the number of buffered requests and the number of the failed (dropped) requests +271 * in total or on per region server basis. +272 */ +273 @InterfaceAudience.Public +274 @InterfaceStability.Evolving +275 public static class HTableMultiplexerStatus { +276 private long totalFailedPutCounter; +277 private long totalBufferedPutCounter; +278 private long maxLatency; +279 private long overallAverageLatency; +280 private Map<String, Long> serverToFailedCounterMap; +281 private Map<String, Long> serverToBufferedCounterMap; +282 private Map<String, Long> serverToAverageLatencyMap; +283 private Map<String, Long> serverToMaxLatencyMap; +284 +285 public HTableMultiplexerStatus( +286 Map<HRegionLocation, FlushWorker> serverToFlushWorkerMap) { +287 this.totalBufferedPutCounter = 0; +288 this.totalFailedPutCounter = 0; +289 this.maxLatency = 0; +290 this.overallAverageLatency = 0; +291 this.serverToBufferedCounterMap = new HashMap<String, Long>(); +292 this.serverToFailedCounterMap = new HashMap<String, Long>(); +293 this.serverToAverageLatencyMap = new HashMap<String, Long>(); +294 this.serverToMaxLatencyMap = new HashMap<String, Long>(); +295 this.initialize(serverToFlushWorkerMap); +296 } +297 +298 private void initialize( +299 Map<HRegionLocation, FlushWorker> serverToFlushWorkerMap) { +300 if (serverToFlushWorkerMap == null) { +301 return; +302 } +303 +304 long averageCalcSum = 0; +305 int averageCalcCount = 0; +306 for (Map.Entry<HRegionLocation, FlushWorker> entry : serverToFlushWorkerMap +307 .entrySet()) { +308 HRegionLocation addr = entry.getKey(); +309 FlushWorker worker = entry.getValue(); +310 +311 long bufferedCounter = worker.getTotalBufferedCount(); +312 long failedCounter = worker.getTotalFailedCount(); +313 long serverMaxLatency = worker.getMaxLatency(); +314 AtomicAverageCounter averageCounter = worker.getAverageLatencyCounter(); +315 // Get sum and count pieces separately to compute overall average +316 SimpleEntry<Long, Integer> averageComponents = averageCounter +317 .getComponents(); +318 long serverAvgLatency = averageCounter.getAndReset(); +319 +320 this.totalBufferedPutCounter += bufferedCounter; +321 this.totalFailedPutCounter += failedCounter; +322 if (serverMaxLatency > this.maxLatency) { +323 this.maxLatency = serverMaxLatency; +324 } +325 averageCalcSum += averageComponents.getKey(); +326 averageCalcCount += averageComponents.getValue(); +327 +328 this.serverToBufferedCounterMap.put(addr.getHostnamePort(), +329 bufferedCounter); +330 this.serverToFailedCounterMap +331 .put(addr.getHostnamePort(), +332 failedCounter); +333 this.serverToAverageLatencyMap.put(addr.getHostnamePort(), +334 serverAvgLatency); +335 this.serverToMaxLatencyMap +336 .put(addr.getHostnamePort(), +337 serverMaxLatency); +338 } +339 this.overallAverageLatency = averageCalcCount != 0 ? averageCalcSum +340 / averageCalcCount : 0; +341 } +342 +343 public long getTotalBufferedCounter() { +344 return this.totalBufferedPutCounter; +345 } +346 +347 public long getTot