Return-Path: X-Original-To: apmail-flume-commits-archive@www.apache.org Delivered-To: apmail-flume-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 75CEE10AEA for ; Wed, 21 Jan 2015 21:50:29 +0000 (UTC) Received: (qmail 46576 invoked by uid 500); 21 Jan 2015 21:50:29 -0000 Delivered-To: apmail-flume-commits-archive@flume.apache.org Received: (qmail 46542 invoked by uid 500); 21 Jan 2015 21:50:29 -0000 Mailing-List: contact commits-help@flume.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flume.apache.org Delivered-To: mailing list commits@flume.apache.org Received: (qmail 46533 invoked by uid 99); 21 Jan 2015 21:50:29 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 21 Jan 2015 21:50:29 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 313C9E03AB; Wed, 21 Jan 2015 21:50:29 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jarcec@apache.org To: commits@flume.apache.org Message-Id: <1152a07698ac431e9226a781757ab004@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: flume git commit: FLUME-2594: Close Async HBase Client if there are large number of consecutive timeouts Date: Wed, 21 Jan 2015 21:50:29 +0000 (UTC) Repository: flume Updated Branches: refs/heads/flume-1.6 6e9d1082e -> da294a164 FLUME-2594: Close Async HBase Client if there are large number of consecutive timeouts (Hari Shreedharan via Jarek Jarcec Cecho) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/da294a16 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/da294a16 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/da294a16 Branch: refs/heads/flume-1.6 Commit: da294a164c27d400dc0b7bc6754c883a4eb5ed24 Parents: 6e9d108 Author: Jarek Jarcec Cecho Authored: Wed Jan 21 13:49:41 2015 -0800 Committer: Jarek Jarcec Cecho Committed: Wed Jan 21 13:50:21 2015 -0800 ---------------------------------------------------------------------- .../apache/flume/sink/hbase/AsyncHBaseSink.java | 74 ++++++++++++++++++-- 1 file changed, 69 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/da294a16/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java index 1666be4..1d05189 100644 --- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java +++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java @@ -127,12 +127,21 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable { private boolean batchIncrements = false; private volatile int totalCallbacksReceived = 0; private Map incrementBuffer; + // The HBaseClient buffers the requests until a callback is received. In the event of a + // timeout, there is no way to clear these buffers. If there is a major cluster issue, this + // buffer can become too big and cause crashes. So if we hit a fixed number of HBase write + // failures/timeouts, then close the HBase Client (gracefully or not) and force a GC to get rid + // of the buffered data. + private int consecutiveHBaseFailures = 0; + private boolean lastTxnFailed = false; // Does not need to be thread-safe. Always called only from the sink's // process method. private final Comparator COMPARATOR = UnsignedBytes .lexicographicalComparator(); + private static final int MAX_CONSECUTIVE_FAILS = 10; + public AsyncHBaseSink(){ this(null); } @@ -162,6 +171,12 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable { throw new EventDeliveryException("Sink was never opened. " + "Please fix the configuration."); } + if (client == null) { + client = initHBaseClient(); + if (client == null) { + throw new EventDeliveryException("Could not establish connection to HBase!"); + } + } AtomicBoolean txnFail = new AtomicBoolean(false); AtomicInteger callbacksReceived = new AtomicInteger(0); AtomicInteger callbacksExpected = new AtomicInteger(0); @@ -292,11 +307,19 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable { * */ if (txnFail.get()) { + // We enter this if condition only if the failure was due to HBase failure, so we make sure + // we track the consecutive failures. + if (lastTxnFailed) { + consecutiveHBaseFailures++; + } + lastTxnFailed = true; this.handleTransactionFailure(txn); throw new EventDeliveryException("Could not write events to Hbase. " + "Transaction failed, and rolled back."); } else { try { + lastTxnFailed = false; + consecutiveHBaseFailures = 0; txn.commit(); txn.close(); sinkCounter.addToEventDrainSuccessCount(i); @@ -414,7 +437,12 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable { sinkCallbackPool = Executors.newCachedThreadPool(new ThreadFactoryBuilder() .setNameFormat(this.getName() + " HBase Call Pool").build()); logger.info("Callback pool created"); - if(!isTimeoutTest) { + client = initHBaseClient(); + super.start(); + } + + private HBaseClient initHBaseClient() { + if (!isTimeoutTest) { client = new HBaseClient(zkQuorum, zkBaseDir, sinkCallbackPool); } else { client = new HBaseClient(zkQuorum, zkBaseDir, @@ -454,8 +482,9 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable { } if(fail.get()){ sinkCounter.incrementConnectionFailedCount(); - client.shutdown(); - client = null; + if (client != null) { + shutdownHBaseClient(); + } throw new FlumeException( "Could not start sink. " + "Table or column family does not exist in Hbase."); @@ -463,14 +492,14 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable { open = true; } client.setFlushInterval((short) 0); - super.start(); + return client; } @Override public void stop(){ serializer.cleanUp(); if (client != null) { - client.shutdown(); + shutdownHBaseClient(); } sinkCounter.incrementConnectionClosedCount(); sinkCounter.stop(); @@ -496,8 +525,43 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable { super.stop(); } + private void shutdownHBaseClient() { + final CountDownLatch waiter = new CountDownLatch(1); + try { + client.shutdown().addCallback(new Callback() { + @Override + public Object call(Object arg) throws Exception { + waiter.countDown(); + return null; + } + }).addErrback(new Callback() { + @Override + public Object call(Object arg) throws Exception { + logger.error("Failed to shutdown HBase client cleanly! HBase cluster might be down"); + waiter.countDown(); + return null; + } + }); + if (!waiter.await(timeout, TimeUnit.NANOSECONDS)) { + logger.error("HBase connection could not be closed within timeout! HBase cluster might " + + "be down!"); + } + } catch (Exception ex) { + logger.warn("Error while attempting to close connections to HBase"); + } finally { + // Dereference the client to force GC to clear up any buffered requests. + client = null; + } + } + private void handleTransactionFailure(Transaction txn) throws EventDeliveryException { + if (consecutiveHBaseFailures >= MAX_CONSECUTIVE_FAILS) { + if (client != null) { + shutdownHBaseClient(); + } + consecutiveHBaseFailures = 0; + } try { txn.rollback(); } catch (Throwable e) {