flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hshreedha...@apache.org
Subject flume git commit: FLUME-2738. Fix file descriptor leak in AsyncHBaseSink when HBase cluster goes down.
Date Thu, 09 Jul 2015 19:23:05 GMT
Repository: flume
Updated Branches:
  refs/heads/flume-1.7 200e6dcbf -> 90e13ab8b


FLUME-2738. Fix file descriptor leak in AsyncHBaseSink when HBase cluster goes down.

(Johny Rufus via Hari)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/90e13ab8
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/90e13ab8
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/90e13ab8

Branch: refs/heads/flume-1.7
Commit: 90e13ab8becf763258883d13637e34fe34342561
Parents: 200e6dc
Author: Hari Shreedharan <hshreedharan@apache.org>
Authored: Thu Jul 9 12:21:17 2015 -0700
Committer: Hari Shreedharan <hshreedharan@apache.org>
Committed: Thu Jul 9 12:22:27 2015 -0700

----------------------------------------------------------------------
 .../apache/flume/sink/hbase/AsyncHBaseSink.java | 18 +++---
 .../flume/sink/hbase/TestAsyncHBaseSink.java    | 66 ++++++++++++++++++++
 2 files changed, 73 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/90e13ab8/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
index 80a3484..eac00f6 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
@@ -437,23 +437,19 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable
{
             + "before calling start on an old instance.");
     sinkCounter.start();
     sinkCounter.incrementConnectionCreatedCount();
-      sinkCallbackPool = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
-        .setNameFormat(this.getName() + " HBase Call Pool").build());
-    logger.info("Callback pool created");
     client = initHBaseClient();
     super.start();
   }
 
   private HBaseClient initHBaseClient() {
     logger.info("Initializing HBase Client");
-    if (!isTimeoutTest) {
-      client = new HBaseClient(zkQuorum, zkBaseDir, sinkCallbackPool);
-    } else {
-      client = new HBaseClient(zkQuorum, zkBaseDir,
-        new NioClientSocketChannelFactory(Executors
-          .newSingleThreadExecutor(),
-          Executors.newSingleThreadExecutor()));
-    }
+
+    sinkCallbackPool = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
+            .setNameFormat(this.getName() + " HBase Call Pool").build());
+    logger.info("Callback pool created");
+    client = new HBaseClient(zkQuorum, zkBaseDir,
+            new NioClientSocketChannelFactory(sinkCallbackPool, sinkCallbackPool));
+
     final CountDownLatch latch = new CountDownLatch(1);
     final AtomicBoolean fail = new AtomicBoolean(false);
     client.ensureTableFamilyExists(

http://git-wip-us.apache.org/repos/asf/flume/blob/90e13ab8/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java
b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java
index af90f99..b4bbd6b 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java
@@ -20,6 +20,8 @@
 package org.apache.flume.sink.hbase;
 
 import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.lang.management.OperatingSystemMXBean;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
@@ -49,6 +51,7 @@ import org.junit.Ignore;
 import org.junit.Test;
 
 import com.google.common.primitives.Longs;
+import com.sun.management.UnixOperatingSystemMXBean;
 
 import org.junit.After;
 
@@ -62,6 +65,7 @@ public class TestAsyncHBaseSink {
   private static Context ctx = new Context();
   private static String valBase = "testing hbase sink: jham";
   private boolean deleteTable = true;
+  private static OperatingSystemMXBean os;
 
 
   @BeforeClass
@@ -78,6 +82,8 @@ public class TestAsyncHBaseSink {
     ctxMap.put("keep-alive", "0");
     ctxMap.put("timeout", "10000");
     ctx.putAll(ctxMap);
+
+    os = ManagementFactory.getOperatingSystemMXBean();
   }
 
   @AfterClass
@@ -448,6 +454,66 @@ public class TestAsyncHBaseSink {
     sink.process();
     sink.stop();
   }
+
+  // We only have support for getting File Descriptor count for Unix from the JDK
+  private long getOpenFileDescriptorCount() {
+    if(os instanceof UnixOperatingSystemMXBean){
+      return ((UnixOperatingSystemMXBean) os).getOpenFileDescriptorCount();
+    } else {
+      return -1;
+    }
+  }
+
+  /*
+   * Before the fix for FLUME-2738, consistently File Descriptors were leaked with at least
+   * > 10 FDs being leaked for every single shutdown-reinitialize routine
+   * If there is a leak, then the increase in FDs should be way higher than
+   * 50 and if there is no leak, there should not be any substantial increase in
+   * FDs. This is over a set of 10 shutdown-reinitialize runs
+   * This test makes sure that there is no File Descriptor leak, by continuously
+   * failing transactions and shutting down and reinitializing the client every time
+   * and this test will fail if a leak is detected
+   */
+  @Test
+  public void testFDLeakOnShutdown() throws Exception {
+    if(getOpenFileDescriptorCount() < 0) {
+      return;
+    }
+    testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
+    deleteTable = true;
+    AsyncHBaseSink sink = new AsyncHBaseSink(testUtility.getConfiguration(),
+            true, false);
+    ctx.put("maxConsecutiveFails", "1");
+    Configurables.configure(sink, ctx);
+    Channel channel = new MemoryChannel();
+    Configurables.configure(channel, ctx);
+    sink.setChannel(channel);
+    channel.start();
+    sink.start();
+    Transaction tx = channel.getTransaction();
+    tx.begin();
+    for(int i = 0; i < 3; i++){
+      Event e = EventBuilder.withBody(Bytes.toBytes(valBase + "-" + i));
+      channel.put(e);
+    }
+    tx.commit();
+    tx.close();
+    Assert.assertFalse(sink.isConfNull());
+    long initialFDCount = getOpenFileDescriptorCount();
+
+    // Since the isTimeOutTest is set to true, transaction will fail
+    // with EventDeliveryException
+    for(int i = 0; i < 10; i ++) {
+      try {
+        sink.process();
+      } catch (EventDeliveryException ex) {
+      }
+    }
+    long increaseInFD = getOpenFileDescriptorCount() - initialFDCount;
+    Assert.assertTrue("File Descriptor leak detected. FDs have increased by " +
+      increaseInFD + " from an initial FD count of " + initialFDCount,  increaseInFD <
50);
+  }
+
   /**
    * This test must run last - it shuts down the minicluster :D
    * @throws Exception


Mime
View raw message