Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 7D591200CFC for ; Thu, 28 Sep 2017 17:14:33 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 7BCDE1609ED; Thu, 28 Sep 2017 15:14:33 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id E5E871609CD for ; Thu, 28 Sep 2017 17:14:31 +0200 (CEST) Received: (qmail 78180 invoked by uid 500); 28 Sep 2017 15:14:14 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 76576 invoked by uid 99); 28 Sep 2017 15:14:12 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 28 Sep 2017 15:14:12 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C6039F5BCD; Thu, 28 Sep 2017 15:14:10 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: git-site-role@apache.org To: commits@hbase.apache.org Date: Thu, 28 Sep 2017 15:14:48 -0000 Message-Id: In-Reply-To: <0ae50d3876544e89813c25027ef0caaa@git.apache.org> References: <0ae50d3876544e89813c25027ef0caaa@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [40/51] [partial] hbase-site git commit: Published site at . archived-at: Thu, 28 Sep 2017 15:14:33 -0000 http://git-wip-us.apache.org/repos/asf/hbase-site/blob/67deb422/apidocs/src-html/org/apache/hadoop/hbase/client/HTableMultiplexer.HTableMultiplexerStatus.html ---------------------------------------------------------------------- diff --git a/apidocs/src-html/org/apache/hadoop/hbase/client/HTableMultiplexer.HTableMultiplexerStatus.html b/apidocs/src-html/org/apache/hadoop/hbase/client/HTableMultiplexer.HTableMultiplexerStatus.html index c9037ad..c4a4d8f 100644 --- a/apidocs/src-html/org/apache/hadoop/hbase/client/HTableMultiplexer.HTableMultiplexerStatus.html +++ b/apidocs/src-html/org/apache/hadoop/hbase/client/HTableMultiplexer.HTableMultiplexerStatus.html @@ -27,651 +27,650 @@ 019 */ 020package org.apache.hadoop.hbase.client; 021 -022import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; -023import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder; -024 -025import java.io.IOException; -026import java.util.AbstractMap.SimpleEntry; -027import java.util.ArrayList; -028import java.util.Collections; -029import java.util.HashMap; -030import java.util.List; -031import java.util.Map; -032import java.util.concurrent.ConcurrentHashMap; -033import java.util.concurrent.ExecutorService; -034import java.util.concurrent.Executors; -035import java.util.concurrent.LinkedBlockingQueue; -036import java.util.concurrent.ScheduledExecutorService; -037import java.util.concurrent.TimeUnit; -038import java.util.concurrent.atomic.AtomicInteger; -039import java.util.concurrent.atomic.AtomicLong; -040 -041import org.apache.commons.logging.Log; -042import org.apache.commons.logging.LogFactory; -043import org.apache.hadoop.conf.Configuration; -044import org.apache.hadoop.hbase.HBaseConfiguration; -045import org.apache.hadoop.hbase.HConstants; -046import org.apache.hadoop.hbase.HRegionInfo; -047import org.apache.hadoop.hbase.HRegionLocation; -048import org.apache.hadoop.hbase.ServerName; -049import org.apache.hadoop.hbase.TableName; -050import org.apache.yetus.audience.InterfaceAudience; -051import org.apache.hadoop.hbase.ipc.RpcControllerFactory; -052import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -053 -054/** -055 * HTableMultiplexer provides a thread-safe non blocking PUT API across all the tables. -056 * Each put will be sharded into different buffer queues based on its destination region server. -057 * So each region server buffer queue will only have the puts which share the same destination. -058 * And each queue will have a flush worker thread to flush the puts request to the region server. -059 * If any queue is full, the HTableMultiplexer starts to drop the Put requests for that -060 * particular queue. -061 * -062 * Also all the puts will be retried as a configuration number before dropping. -063 * And the HTableMultiplexer can report the number of buffered requests and the number of the -064 * failed (dropped) requests in total or on per region server basis. -065 * -066 * This class is thread safe. -067 */ -068@InterfaceAudience.Public -069public class HTableMultiplexer { -070 private static final Log LOG = LogFactory.getLog(HTableMultiplexer.class.getName()); -071 -072 public static final String TABLE_MULTIPLEXER_FLUSH_PERIOD_MS = -073 "hbase.tablemultiplexer.flush.period.ms"; -074 public static final String TABLE_MULTIPLEXER_INIT_THREADS = "hbase.tablemultiplexer.init.threads"; -075 public static final String TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE = -076 "hbase.client.max.retries.in.queue"; -077 -078 /** The map between each region server to its flush worker */ -079 private final Map<HRegionLocation, FlushWorker> serverToFlushWorkerMap = -080 new ConcurrentHashMap<>(); -081 -082 private final Configuration workerConf; -083 private final ClusterConnection conn; -084 private final ExecutorService pool; -085 private final int maxAttempts; -086 private final int perRegionServerBufferQueueSize; -087 private final int maxKeyValueSize; -088 private final ScheduledExecutorService executor; -089 private final long flushPeriod; -090 -091 /** -092 * @param conf The HBaseConfiguration -093 * @param perRegionServerBufferQueueSize determines the max number of the buffered Put ops for -094 * each region server before dropping the request. -095 */ -096 public HTableMultiplexer(Configuration conf, int perRegionServerBufferQueueSize) -097 throws IOException { -098 this(ConnectionFactory.createConnection(conf), conf, perRegionServerBufferQueueSize); -099 } -100 -101 /** -102 * @param conn The HBase connection. -103 * @param conf The HBase configuration -104 * @param perRegionServerBufferQueueSize determines the max number of the buffered Put ops for -105 * each region server before dropping the request. -106 */ -107 public HTableMultiplexer(Connection conn, Configuration conf, -108 int perRegionServerBufferQueueSize) { -109 this.conn = (ClusterConnection) conn; -110 this.pool = HTable.getDefaultExecutor(conf); -111 // how many times we could try in total, one more than retry number -112 this.maxAttempts = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, -113 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER) + 1; -114 this.perRegionServerBufferQueueSize = perRegionServerBufferQueueSize; -115 this.maxKeyValueSize = HTable.getMaxKeyValueSize(conf); -116 this.flushPeriod = conf.getLong(TABLE_MULTIPLEXER_FLUSH_PERIOD_MS, 100); -117 int initThreads = conf.getInt(TABLE_MULTIPLEXER_INIT_THREADS, 10); -118 this.executor = -119 Executors.newScheduledThreadPool(initThreads, -120 new ThreadFactoryBuilder().setDaemon(true).setNameFormat("HTableFlushWorker-%d").build()); -121 -122 this.workerConf = HBaseConfiguration.create(conf); -123 // We do not do the retry because we need to reassign puts to different queues if regions are -124 // moved. -125 this.workerConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0); -126 } -127 -128 /** -129 * Closes the internal {@link Connection}. Does nothing if the {@link Connection} has already -130 * been closed. -131 * @throws IOException If there is an error closing the connection. -132 */ -133 @SuppressWarnings("deprecation") -134 public synchronized void close() throws IOException { -135 if (!getConnection().isClosed()) { -136 getConnection().close(); -137 } -138 } -139 -140 /** -141 * The put request will be buffered by its corresponding buffer queue. Return false if the queue -142 * is already full. -143 * @param tableName -144 * @param put -145 * @return true if the request can be accepted by its corresponding buffer queue. -146 */ -147 public boolean put(TableName tableName, final Put put) { -148 return put(tableName, put, this.maxAttempts); -149 } -150 -151 /** -152 * The puts request will be buffered by their corresponding buffer queue. -153 * Return the list of puts which could not be queued. -154 * @param tableName -155 * @param puts -156 * @return the list of puts which could not be queued -157 */ -158 public List<Put> put(TableName tableName, final List<Put> puts) { -159 if (puts == null) -160 return null; -161 -162 List <Put> failedPuts = null; -163 boolean result; -164 for (Put put : puts) { -165 result = put(tableName, put, this.maxAttempts); -166 if (result == false) { -167 -168 // Create the failed puts list if necessary -169 if (failedPuts == null) { -170 failedPuts = new ArrayList<>(); -171 } -172 // Add the put to the failed puts list -173 failedPuts.add(put); -174 } -175 } -176 return failedPuts; -177 } -178 -179 /** -180 * @deprecated Use {@link #put(TableName, List) } instead. -181 */ -182 @Deprecated -183 public List<Put> put(byte[] tableName, final List<Put> puts) { -184 return put(TableName.valueOf(tableName), puts); -185 } -186 -187 /** -188 * The put request will be buffered by its corresponding buffer queue. And the put request will be -189 * retried before dropping the request. -190 * Return false if the queue is already full. -191 * @return true if the request can be accepted by its corresponding buffer queue. -192 */ -193 public boolean put(final TableName tableName, final Put put, int maxAttempts) { -194 if (maxAttempts <= 0) { -195 return false; -196 } -197 -198 try { -199 HTable.validatePut(put, maxKeyValueSize); -200 // Allow mocking to get at the connection, but don't expose the connection to users. -201 ClusterConnection conn = (ClusterConnection) getConnection(); -202 // AsyncProcess in the FlushWorker should take care of refreshing the location cache -203 // as necessary. We shouldn't have to do that here. -204 HRegionLocation loc = conn.getRegionLocation(tableName, put.getRow(), false); -205 if (loc != null) { -206 // Add the put pair into its corresponding queue. -207 LinkedBlockingQueue<PutStatus> queue = getQueue(loc); -208 -209 // Generate a MultiPutStatus object and offer it into the queue -210 PutStatus s = new PutStatus(loc.getRegionInfo(), put, maxAttempts); -211 -212 return queue.offer(s); -213 } -214 } catch (IOException e) { -215 LOG.debug("Cannot process the put " + put, e); -216 } -217 return false; -218 } -219 -220 /** -221 * @deprecated Use {@link #put(TableName, Put) } instead. -222 */ -223 @Deprecated -224 public boolean put(final byte[] tableName, final Put put, int retry) { -225 return put(TableName.valueOf(tableName), put, retry); -226 } -227 -228 /** -229 * @deprecated Use {@link #put(TableName, Put)} instead. -230 */ -231 @Deprecated -232 public boolean put(final byte[] tableName, Put put) { -233 return put(TableName.valueOf(tableName), put); -234 } -235 -236 /** -237 * @return the current HTableMultiplexerStatus -238 */ -239 public HTableMultiplexerStatus getHTableMultiplexerStatus() { -240 return new HTableMultiplexerStatus(serverToFlushWorkerMap); -241 } -242 -243 @VisibleForTesting -244 LinkedBlockingQueue<PutStatus> getQueue(HRegionLocation addr) { -245 FlushWorker worker = serverToFlushWorkerMap.get(addr); -246 if (worker == null) { -247 synchronized (this.serverToFlushWorkerMap) { -248 worker = serverToFlushWorkerMap.get(addr); -249 if (worker == null) { -250 // Create the flush worker -251 worker = new FlushWorker(workerConf, this.conn, addr, this, -252 perRegionServerBufferQueueSize, pool, executor); -253 this.serverToFlushWorkerMap.put(addr, worker); -254 executor.scheduleAtFixedRate(worker, flushPeriod, flushPeriod, TimeUnit.MILLISECONDS); -255 } -256 } -257 } -258 return worker.getQueue(); -259 } -260 -261 @VisibleForTesting -262 ClusterConnection getConnection() { -263 return this.conn; -264 } -265 -266 /** -267 * HTableMultiplexerStatus keeps track of the current status of the HTableMultiplexer. -268 * report the number of buffered requests and the number of the failed (dropped) requests -269 * in total or on per region server basis. -270 */ -271 @InterfaceAudience.Public -272 public static class HTableMultiplexerStatus { -273 private long totalFailedPutCounter; -274 private long totalBufferedPutCounter; -275 private long maxLatency; -276 private long overallAverageLatency; -277 private Map<String, Long> serverToFailedCounterMap; -278 private Map<String, Long> serverToBufferedCounterMap; -279 private Map<String, Long> serverToAverageLatencyMap; -280 private Map<String, Long> serverToMaxLatencyMap; -281 -282 public HTableMultiplexerStatus( -283 Map<HRegionLocation, FlushWorker> serverToFlushWorkerMap) { -284 this.totalBufferedPutCounter = 0; -285 this.totalFailedPutCounter = 0; -286 this.maxLatency = 0; -287 this.overallAverageLatency = 0; -288 this.serverToBufferedCounterMap = new HashMap<>(); -289 this.serverToFailedCounterMap = new HashMap<>(); -290 this.serverToAverageLatencyMap = new HashMap<>(); -291 this.serverToMaxLatencyMap = new HashMap<>(); -292 this.initialize(serverToFlushWorkerMap); -293 } -294 -295 private void initialize( -296 Map<HRegionLocation, FlushWorker> serverToFlushWorkerMap) { -297 if (serverToFlushWorkerMap == null) { -298 return; -299 } -300 -301 long averageCalcSum = 0; -302 int averageCalcCount = 0; -303 for (Map.Entry<HRegionLocation, FlushWorker> entry : serverToFlushWorkerMap -304 .entrySet()) { -305 HRegionLocation addr = entry.getKey(); -306 FlushWorker worker = entry.getValue(); -307 -308 long bufferedCounter = worker.getTotalBufferedCount(); -309 long failedCounter = worker.getTotalFailedCount(); -310 long serverMaxLatency = worker.getMaxLatency(); -311 AtomicAverageCounter averageCounter = worker.getAverageLatencyCounter(); -312 // Get sum and count pieces separately to compute overall average -313 SimpleEntry<Long, Integer> averageComponents = averageCounter -314 .getComponents(); -315 long serverAvgLatency = averageCounter.getAndReset(); -316 -317 this.totalBufferedPutCounter += bufferedCounter; -318 this.totalFailedPutCounter += failedCounter; -319 if (serverMaxLatency > this.maxLatency) { -320 this.maxLatency = serverMaxLatency; -321 } -322 averageCalcSum += averageComponents.getKey(); -323 averageCalcCount += averageComponents.getValue(); -324 -325 this.serverToBufferedCounterMap.put(addr.getHostnamePort(), -326 bufferedCounter); -327 this.serverToFailedCounterMap -328 .put(addr.getHostnamePort(), -329 failedCounter); -330 this.serverToAverageLatencyMap.put(addr.getHostnamePort(), -331 serverAvgLatency); -332 this.serverToMaxLatencyMap -333 .put(addr.getHostnamePort(), -334 serverMaxLatency); -335 } -336 this.overallAverageLatency = averageCalcCount != 0 ? averageCalcSum -337 / averageCalcCount : 0; -338 } -339 -340 public long getTotalBufferedCounter() { -341 return this.totalBufferedPutCounter; -342 } -343 -344 public long getTotalFailedCounter() { -345 return this.totalFailedPutCounter; -346 } -347 -348 public long getMaxLatency() { -349 return this.maxLatency; -350 } -351 -352 public long getOverallAverageLatency() { -353 return this.overallAverageLatency; -354 } -355 -356 public Map<String, Long> getBufferedCounterForEachRegionServer() { -357 return this.serverToBufferedCounterMap; -358 } -359 -360 public Map<String, Long> getFailedCounterForEachRegionServer() { -361 return this.serverToFailedCounterMap; -362 } -363 -364 public Map<String, Long> getMaxLatencyForEachRegionServer() { -365 return this.serverToMaxLatencyMap; -366 } -367 -368 public Map<String, Long> getAverageLatencyForEachRegionServer() { -369 return this.serverToAverageLatencyMap; -370 } -371 } -372 -373 @VisibleForTesting -374 static class PutStatus { -375 final HRegionInfo regionInfo; -376 final Put put; -377 final int maxAttempCount; -378 -379 public PutStatus(HRegionInfo regionInfo, Put put, int maxAttempCount) { -380 this.regionInfo = regionInfo; -381 this.put = put; -382 this.maxAttempCount = maxAttempCount; -383 } -384 } -385 -386 /** -387 * Helper to count the average over an interval until reset. -388 */ -389 private static class AtomicAverageCounter { -390 private long sum; -391 private int count; -392 -393 public AtomicAverageCounter() { -394 this.sum = 0L; -395 this.count = 0; -396 } -397 -398 public synchronized long getAndReset() { -399 long result = this.get(); -400 this.reset(); -401 return result; -402 } -403 -404 public synchronized long get() { -405 if (this.count == 0) { -406 return 0; -407 } -408 return this.sum / this.count; -409 } -410 -411 public synchronized SimpleEntry<Long, Integer> getComponents() { -412 return new SimpleEntry<>(sum, count); -413 } -414 -415 public synchronized void reset() { -416 this.sum = 0L; -417 this.count = 0; -418 } -419 -420 public synchronized void add(long value) { -421 this.sum += value; -422 this.count++; -423 } -424 } -425 -426 @VisibleForTesting -427 static class FlushWorker implements Runnable { -428 private final HRegionLocation addr; -429 private final LinkedBlockingQueue<PutStatus> queue; -430 private final HTableMultiplexer multiplexer; -431 private final AtomicLong totalFailedPutCount = new AtomicLong(0); -432 private final AtomicInteger currentProcessingCount = new AtomicInteger(0); -433 private final AtomicAverageCounter averageLatency = new AtomicAverageCounter(); -434 private final AtomicLong maxLatency = new AtomicLong(0); -435 -436 private final AsyncProcess ap; -437 private final List<PutStatus> processingList = new ArrayList<>(); -438 private final ScheduledExecutorService executor; -439 private final int maxRetryInQueue; -440 private final AtomicInteger retryInQueue = new AtomicInteger(0); -441 private final int writeRpcTimeout; // needed to pass in through AsyncProcess constructor -442 private final int operationTimeout; -443 private final ExecutorService pool; -444 public FlushWorker(Configuration conf, ClusterConnection conn, HRegionLocation addr, -445 HTableMultiplexer htableMultiplexer, int perRegionServerBufferQueueSize, -446 ExecutorService pool, ScheduledExecutorService executor) { -447 this.addr = addr; -448 this.multiplexer = htableMultiplexer; -449 this.queue = new LinkedBlockingQueue<>(perRegionServerBufferQueueSize); -450 RpcRetryingCallerFactory rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf); -451 RpcControllerFactory rpcControllerFactory = RpcControllerFactory.instantiate(conf); -452 this.writeRpcTimeout = conf.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, -453 conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, -454 HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); -455 this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, -456 HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); -457 this.ap = new AsyncProcess(conn, conf, rpcCallerFactory, false, rpcControllerFactory); -458 this.executor = executor; -459 this.maxRetryInQueue = conf.getInt(TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE, 10000); -460 this.pool = pool; -461 } -462 -463 protected LinkedBlockingQueue<PutStatus> getQueue() { -464 return this.queue; -465 } -466 -467 public long getTotalFailedCount() { -468 return totalFailedPutCount.get(); -469 } -470 -471 public long getTotalBufferedCount() { -472 return queue.size() + currentProcessingCount.get(); -473 } -474 -475 public AtomicAverageCounter getAverageLatencyCounter() { -476 return this.averageLatency; -477 } -478 -479 public long getMaxLatency() { -480 return this.maxLatency.getAndSet(0); -481 } -482 -483 boolean resubmitFailedPut(PutStatus ps, HRegionLocation oldLoc) throws IOException { -484 // Decrease the retry count -485 final int retryCount = ps.maxAttempCount - 1; -486 -487 if (retryCount <= 0) { -488 // Update the failed counter and no retry any more. -489 return false; -490 } -491 -492 int cnt = getRetryInQueue().incrementAndGet(); -493 if (cnt > getMaxRetryInQueue()) { -494 // Too many Puts in queue for resubmit, give up this -495 getRetryInQueue().decrementAndGet(); -496 return false; -497 } -498 -499 final Put failedPut = ps.put; -500 // The currentPut is failed. So get the table name for the currentPut. -501 final TableName tableName = ps.regionInfo.getTable(); -502 -503 long delayMs = getNextDelay(retryCount); -504 if (LOG.isDebugEnabled()) { -505 LOG.debug("resubmitting after " + delayMs + "ms: " + retryCount); -506 } -507 -508 // HBASE-12198, HBASE-15221, HBASE-15232: AsyncProcess should be responsible for updating -509 // the region location cache when the Put original failed with some exception. If we keep -510 // re-trying the same Put to the same location, AsyncProcess isn't doing the right stuff -511 // that we expect it to. -512 getExecutor().schedule(new Runnable() { -513 @Override -514 public void run() { -515 boolean succ = false; -516 try { -517 succ = FlushWorker.this.getMultiplexer().put(tableName, failedPut, retryCount); -518 } finally { -519 FlushWorker.this.getRetryInQueue().decrementAndGet(); -520 if (!succ) { -521 FlushWorker.this.getTotalFailedPutCount().incrementAndGet(); -522 } -523 } -524 } -525 }, delayMs, TimeUnit.MILLISECONDS); -526 return true; -527 } -528 -529 @VisibleForTesting -530 long getNextDelay(int retryCount) { -531 return ConnectionUtils.getPauseTime(multiplexer.flushPeriod, -532 multiplexer.maxAttempts - retryCount - 1); -533 } -534 -535 @VisibleForTesting -536 AtomicInteger getRetryInQueue() { -537 return this.retryInQueue; -538 } -539 -540 @VisibleForTesting -541 int getMaxRetryInQueue() { -542 return this.maxRetryInQueue; -543 } -544 -545 @VisibleForTesting -546 AtomicLong getTotalFailedPutCount() { -547 return this.totalFailedPutCount; -548 } -549 -550 @VisibleForTesting -551 HTableMultiplexer getMultiplexer() { -552 return this.multiplexer; -553 } -554 -555 @VisibleForTesting -556 ScheduledExecutorService getExecutor() { -557 return this.executor; -558 } -559 -560 @Override -561 public void run() { -562 int failedCount = 0; -563 try { -564 long start = EnvironmentEdgeManager.currentTime(); -565 -566 // drain all the queued puts into the tmp list -567 processingList.clear(); -568 queue.drainTo(processingList); -569 if (processingList.isEmpty()) { -570 // Nothing to flush -571 return; -572 } -573 -574 currentProcessingCount.set(processingList.size()); -575 // failedCount is decreased whenever a Put is success or resubmit. -576 failedCount = processingList.size(); -577 -578 List<Action> retainedActions = new ArrayList<>(processingList.size()); -579 MultiAction actions = new MultiAction(); -580 for (int i = 0; i < processingList.size(); i++) { -581 PutStatus putStatus = processingList.get(i); -582 Action action = new Action(putStatus.put, i); -583 actions.add(putStatus.regionInfo.getRegionName(), action); -584 retainedActions.add(action); -585 } -586 -587 // Process this multi-put request -588 List<PutStatus> failed = null; -589 Object[] results = new Object[actions.size()]; -590 ServerName server = addr.getServerName(); -591 Map<ServerName, MultiAction> actionsByServer = -592 Collections.singletonMap(server, actions); -593 try { -594 AsyncProcessTask task = AsyncProcessTask.newBuilder() -595 .setResults(results) -596 .setPool(pool) -597 .setRpcTimeout(writeRpcTimeout) -598 .setOperationTimeout(operationTimeout) -599 .build(); -600 AsyncRequestFuture arf = -601 ap.submitMultiActions(task, retainedActions, 0L, null, null, actionsByServer); -602 arf.waitUntilDone(); -603 if (arf.hasError()) { -604 // We just log and ignore the exception here since failed Puts will be resubmit again. -605 LOG.debug("Caught some exceptions when flushing puts to region server " -606 + addr.getHostnamePort(), arf.getErrors()); -607 } -608 } finally { -609 for (int i = 0; i < results.length; i++) { -610 if (results[i] instanceof Result) { -611 failedCount--; -612 } else { -613 if (failed == null) { -614 failed = new ArrayList<>(); -615 } -616 failed.add(processingList.get(i)); -617 } -618 } -619 } -620 -621 if (failed != null) { -622 // Resubmit failed puts -623 for (PutStatus putStatus : failed) { -624 if (resubmitFailedPut(putStatus, this.addr)) { -625 failedCount--; -626 } -627 } -628 } -629 -630 long elapsed = EnvironmentEdgeManager.currentTime() - start; -631 // Update latency counters -632 averageLatency.add(elapsed); -633 if (elapsed > maxLatency.get()) { -634 maxLatency.set(elapsed); -635 } -636 -637 // Log some basic info -638 if (LOG.isDebugEnabled()) { -639 LOG.debug("Processed " + currentProcessingCount + " put requests for " -640 + addr.getHostnamePort() + " and " + failedCount + " failed" -641 + ", latency for this send: " + elapsed); -642 } -643 -644 // Reset the current processing put count -645 currentProcessingCount.set(0); -646 } catch (RuntimeException e) { -647 // To make findbugs happy -648 // Log all the exceptions and move on -649 LOG.debug( -650 "Caught some exceptions " + e + " when flushing puts to region server " -651 + addr.getHostnamePort(), e); -652 } catch (Exception e) { -653 if (e instanceof InterruptedException) { -654 Thread.currentThread().interrupt(); -655 } -656 // Log all the exceptions and move on -657 LOG.debug( -658 "Caught some exceptions " + e + " when flushing puts to region server " -659 + addr.getHostnamePort(), e); -660 } finally { -661 // Update the totalFailedCount -662 this.totalFailedPutCount.addAndGet(failedCount); -663 } -664 } -665 } -666} +022import java.io.IOException; +023import java.util.AbstractMap.SimpleEntry; +024import java.util.ArrayList; +025import java.util.Collections; +026import java.util.HashMap; +027import java.util.List; +028import java.util.Map; +029import java.util.concurrent.ConcurrentHashMap; +030import java.util.concurrent.ExecutorService; +031import java.util.concurrent.Executors; +032import java.util.concurrent.LinkedBlockingQueue; +033import java.util.concurrent.ScheduledExecutorService; +034import java.util.concurrent.TimeUnit; +035import java.util.concurrent.atomic.AtomicInteger; +036import java.util.concurrent.atomic.AtomicLong; +037 +038import org.apache.commons.logging.Log; +039import org.apache.commons.logging.LogFactory; +040import org.apache.hadoop.conf.Configuration; +041import org.apache.hadoop.hbase.HBaseConfiguration; +042import org.apache.hadoop.hbase.HConstants; +043import org.apache.hadoop.hbase.HRegionLocation; +044import org.apache.hadoop.hbase.ServerName; +045import org.apache.hadoop.hbase.TableName; +046import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +047import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +048import org.apache.yetus.audience.InterfaceAudience; +049 +050import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; +051import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder; +052 +053/** +054 * HTableMultiplexer provides a thread-safe non blocking PUT API across all the tables. +055 * Each put will be sharded into different buffer queues based on its destination region server. +056 * So each region server buffer queue will only have the puts which share the same destination. +057 * And each queue will have a flush worker thread to flush the puts request to the region server. +058 * If any queue is full, the HTableMultiplexer starts to drop the Put requests for that +059 * particular queue. +060 * +061 * Also all the puts will be retried as a configuration number before dropping. +062 * And the HTableMultiplexer can report the number of buffered requests and the number of the +063 * failed (dropped) requests in total or on per region server basis. +064 * +065 * This class is thread safe. +066 */ +067@InterfaceAudience.Public +068public class HTableMultiplexer { +069 private static final Log LOG = LogFactory.getLog(HTableMultiplexer.class.getName()); +070 +071 public static final String TABLE_MULTIPLEXER_FLUSH_PERIOD_MS = +072 "hbase.tablemultiplexer.flush.period.ms"; +073 public static final String TABLE_MULTIPLEXER_INIT_THREADS = "hbase.tablemultiplexer.init.threads"; +074 public static final String TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE = +075 "hbase.client.max.retries.in.queue"; +076 +077 /** The map between each region server to its flush worker */ +078 private final Map<HRegionLocation, FlushWorker> serverToFlushWorkerMap = +079 new ConcurrentHashMap<>(); +080 +081 private final Configuration workerConf; +082 private final ClusterConnection conn; +083 private final ExecutorService pool; +084 private final int maxAttempts; +085 private final int perRegionServerBufferQueueSize; +086 private final int maxKeyValueSize; +087 private final ScheduledExecutorService executor; +088 private final long flushPeriod; +089 +090 /** +091 * @param conf The HBaseConfiguration +092 * @param perRegionServerBufferQueueSize determines the max number of the buffered Put ops for +093 * each region server before dropping the request. +094 */ +095 public HTableMultiplexer(Configuration conf, int perRegionServerBufferQueueSize) +096 throws IOException { +097 this(ConnectionFactory.createConnection(conf), conf, perRegionServerBufferQueueSize); +098 } +099 +100 /** +101 * @param conn The HBase connection. +102 * @param conf The HBase configuration +103 * @param perRegionServerBufferQueueSize determines the max number of the buffered Put ops for +104 * each region server before dropping the request. +105 */ +106 public HTableMultiplexer(Connection conn, Configuration conf, +107 int perRegionServerBufferQueueSize) { +108 this.conn = (ClusterConnection) conn; +109 this.pool = HTable.getDefaultExecutor(conf); +110 // how many times we could try in total, one more than retry number +111 this.maxAttempts = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, +112 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER) + 1; +113 this.perRegionServerBufferQueueSize = perRegionServerBufferQueueSize; +114 this.maxKeyValueSize = HTable.getMaxKeyValueSize(conf); +115 this.flushPeriod = conf.getLong(TABLE_MULTIPLEXER_FLUSH_PERIOD_MS, 100); +116 int initThreads = conf.getInt(TABLE_MULTIPLEXER_INIT_THREADS, 10); +117 this.executor = +118 Executors.newScheduledThreadPool(initThreads, +119 new ThreadFactoryBuilder().setDaemon(true).setNameFormat("HTableFlushWorker-%d").build()); +120 +121 this.workerConf = HBaseConfiguration.create(conf); +122 // We do not do the retry because we need to reassign puts to different queues if regions are +123 // moved. +124 this.workerConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0); +125 } +126 +127 /** +128 * Closes the internal {@link Connection}. Does nothing if the {@link Connection} has already +129 * been closed. +130 * @throws IOException If there is an error closing the connection. +131 */ +132 @SuppressWarnings("deprecation") +133 public synchronized void close() throws IOException { +134 if (!getConnection().isClosed()) { +135 getConnection().close(); +136 } +137 } +138 +139 /** +140 * The put request will be buffered by its corresponding buffer queue. Return false if the queue +141 * is already full. +142 * @param tableName +143 * @param put +144 * @return true if the request can be accepted by its corresponding buffer queue. +145 */ +146 public boolean put(TableName tableName, final Put put) { +147 return put(tableName, put, this.maxAttempts); +148 } +149 +150 /** +151 * The puts request will be buffered by their corresponding buffer queue. +152 * Return the list of puts which could not be queued. +153 * @param tableName +154 * @param puts +155 * @return the list of puts which could not be queued +156 */ +157 public List<Put> put(TableName tableName, final List<Put> puts) { +158 if (puts == null) +159 return null; +160 +161 List <Put> failedPuts = null; +162 boolean result; +163 for (Put put : puts) { +164 result = put(tableName, put, this.maxAttempts); +165 if (result == false) { +166 +167 // Create the failed puts list if necessary +168 if (failedPuts == null) { +169 failedPuts = new ArrayList<>(); +170 } +171 // Add the put to the failed puts list +172 failedPuts.add(put); +173 } +174 } +175 return failedPuts; +176 } +177 +178 /** +179 * @deprecated Use {@link #put(TableName, List) } instead. +180 */ +181 @Deprecated +182 public List<Put> put(byte[] tableName, final List<Put> puts) { +183 return put(TableName.valueOf(tableName), puts); +184 } +185 +186 /** +187 * The put request will be buffered by its corresponding buffer queue. And the put request will be +188 * retried before dropping the request. +189 * Return false if the queue is already full. +190 * @return true if the request can be accepted by its corresponding buffer queue. +191 */ +192 public boolean put(final TableName tableName, final Put put, int maxAttempts) { +193 if (maxAttempts <= 0) { +194 return false; +195 } +196 +197 try { +198 HTable.validatePut(put, maxKeyValueSize); +199 // Allow mocking to get at the connection, but don't expose the connection to users. +200 ClusterConnection conn = (ClusterConnection) getConnection(); +201 // AsyncProcess in the FlushWorker should take care of refreshing the location cache +202 // as necessary. We shouldn't have to do that here. +203 HRegionLocation loc = conn.getRegionLocation(tableName, put.getRow(), false); +204 if (loc != null) { +205 // Add the put pair into its corresponding queue. +206 LinkedBlockingQueue<PutStatus> queue = getQueue(loc); +207 +208 // Generate a MultiPutStatus object and offer it into the queue +209 PutStatus s = new PutStatus(loc.getRegion(), put, maxAttempts); +210 +211 return queue.offer(s); +212 } +213 } catch (IOException e) { +214 LOG.debug("Cannot process the put " + put, e); +215 } +216 return false; +217 } +218 +219 /** +220 * @deprecated Use {@link #put(TableName, Put) } instead. +221 */ +222 @Deprecated +223 public boolean put(final byte[] tableName, final Put put, int retry) { +224 return put(TableName.valueOf(tableName), put, retry); +225 } +226 +227 /** +228 * @deprecated Use {@link #put(TableName, Put)} instead. +229 */ +230 @Deprecated +231 public boolean put(final byte[] tableName, Put put) { +232 return put(TableName.valueOf(tableName), put); +233 } +234 +235 /** +236 * @return the current HTableMultiplexerStatus +237 */ +238 public HTableMultiplexerStatus getHTableMultiplexerStatus() { +239 return new HTableMultiplexerStatus(serverToFlushWorkerMap); +240 } +241 +242 @VisibleForTesting +243 LinkedBlockingQueue<PutStatus> getQueue(HRegionLocation addr) { +244 FlushWorker worker = serverToFlushWorkerMap.get(addr); +245 if (worker == null) { +246 synchronized (this.serverToFlushWorkerMap) { +247 worker = serverToFlushWorkerMap.get(addr); +248 if (worker == null) { +249 // Create the flush worker +250 worker = new FlushWorker(workerConf, this.conn, addr, this, +251 perRegionServerBufferQueueSize, pool, executor); +252 this.serverToFlushWorkerMap.put(addr, worker); +253 executor.scheduleAtFixedRate(worker, flushPeriod, flushPeriod, TimeUnit.MILLISECONDS); +254 } +255 } +256 } +257 return worker.getQueue(); +258 } +259 +260 @VisibleForTesting +261 ClusterConnection getConnection() { +262 return this.conn; +263 } +264 +265 /** +266 * HTableMultiplexerStatus keeps track of the current status of the HTableMultiplexer. +267 * report the number of buffered requests and the number of the failed (dropped) requests +268 * in total or on per region server basis. +269 */ +270 @InterfaceAudience.Public +271 public static class HTableMultiplexerStatus { +272 private long totalFailedPutCounter; +273 private long totalBufferedPutCounter; +274 private long maxLatency; +275 private long overallAverageLatency; +276 private Map<String, Long> serverToFailedCounterMap; +277 private Map<String, Long> serverToBufferedCounterMap; +278 private Map<String, Long> serverToAverageLatencyMap; +279 private Map<String, Long> serverToMaxLatencyMap; +280 +281 public HTableMultiplexerStatus( +282 Map<HRegionLocation, FlushWorker> serverToFlushWorkerMap) {<