hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jmhs...@apache.org
Subject hbase git commit: HBASE-16117 Fix Connection leak in mapred.TableOutputFormat
Date Thu, 30 Jun 2016 00:19:03 GMT
Repository: hbase
Updated Branches:
  refs/heads/master 42106b89b -> e1d130946


HBASE-16117 Fix Connection leak in mapred.TableOutputFormat


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e1d13094
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e1d13094
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e1d13094

Branch: refs/heads/master
Commit: e1d130946bd76d38fee421daf167de68d620fd51
Parents: 42106b8
Author: Jonathan M Hsieh <jmhsieh@apache.org>
Authored: Sun Jun 26 17:05:49 2016 -0700
Committer: Jonathan M Hsieh <jmhsieh@apache.org>
Committed: Wed Jun 29 17:18:43 2016 -0700

----------------------------------------------------------------------
 .../hbase/client/ConnectionImplementation.java  |   2 +-
 .../apache/hadoop/hbase/client/HBaseAdmin.java  |  37 +++----
 .../apache/hadoop/hbase/client/Registry.java    |   4 +-
 .../hadoop/hbase/client/ZooKeeperRegistry.java  |   4 +-
 .../hadoop/hbase/mapred/TableOutputFormat.java  |  31 +++---
 .../hadoop/hbase/client/TestClientTimeouts.java |   5 +-
 .../TestTableOutputFormatConnectionExhaust.java | 104 +++++++++++++++++++
 7 files changed, 147 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/e1d13094/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
index d93a8b4..1fe29c8 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
@@ -436,7 +436,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable
{
 
   protected String clusterId = null;
 
-  protected void retrieveClusterId() {
+  protected void retrieveClusterId() throws IOException {
     if (clusterId != null) {
       return;
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e1d13094/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
index e21a5d2..da0de51 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
@@ -2077,29 +2077,24 @@ public class HBaseAdmin implements Admin {
     // We set it to make it fail as soon as possible if HBase is not available
     copyOfConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
     copyOfConf.setInt("zookeeper.recovery.retry", 0);
+
+    // Check ZK first.
+    // If the connection exists, we may have a connection to ZK that does not work anymore
     try (ClusterConnection connection =
-        (ClusterConnection)ConnectionFactory.createConnection(copyOfConf)) {
-        // Check ZK first.
-        // If the connection exists, we may have a connection to ZK that does not work anymore
-        ZooKeeperKeepAliveConnection zkw = null;
-        try {
-          // This is NASTY. FIX!!!! Dependent on internal implementation! TODO
-          zkw = ((ConnectionImplementation)connection).
-            getKeepAliveZooKeeperWatcher();
-          zkw.getRecoverableZooKeeper().getZooKeeper().exists(zkw.baseZNode, false);
-        } catch (IOException e) {
-          throw new ZooKeeperConnectionException("Can't connect to ZooKeeper", e);
-        } catch (InterruptedException e) {
-          throw (InterruptedIOException)
-            new InterruptedIOException("Can't connect to ZooKeeper").initCause(e);
-        } catch (KeeperException e) {
-          throw new ZooKeeperConnectionException("Can't connect to ZooKeeper", e);
-        } finally {
-          if (zkw != null) {
-            zkw.close();
-          }
-        }
+             (ClusterConnection) ConnectionFactory.createConnection(copyOfConf);
+         ZooKeeperKeepAliveConnection zkw = ((ConnectionImplementation) connection).
+             getKeepAliveZooKeeperWatcher();) {
+
+      // This is NASTY. FIX!!!! Dependent on internal implementation! TODO
+      zkw.getRecoverableZooKeeper().getZooKeeper().exists(zkw.baseZNode, false);
       connection.isMasterRunning();
+    } catch (IOException e) {
+      throw new ZooKeeperConnectionException("Can't connect to ZooKeeper", e);
+    } catch (InterruptedException e) {
+      throw (InterruptedIOException)
+          new InterruptedIOException("Can't connect to ZooKeeper").initCause(e);
+    } catch (KeeperException e) {
+      throw new ZooKeeperConnectionException("Can't connect to ZooKeeper", e);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/e1d13094/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Registry.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Registry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Registry.java
index 412e4fa..d20a4bd 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Registry.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Registry.java
@@ -32,7 +32,7 @@ interface Registry {
   /**
    * @param connection
    */
-  void init(Connection connection);
+  void init(Connection connection) throws IOException;
 
   /**
    * @return Meta region location
@@ -43,7 +43,7 @@ interface Registry {
   /**
    * @return Cluster id.
    */
-  String getClusterId();
+  String getClusterId() throws IOException;
 
   /**
    * @return Count of 'running' regionservers

http://git-wip-us.apache.org/repos/asf/hbase/blob/e1d13094/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZooKeeperRegistry.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZooKeeperRegistry.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZooKeeperRegistry.java
index 9208f6e..83f828c 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZooKeeperRegistry.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZooKeeperRegistry.java
@@ -92,7 +92,7 @@ class ZooKeeperRegistry implements Registry {
   private String clusterId = null;
 
   @Override
-  public String getClusterId() {
+  public String getClusterId() throws IOException  {
     if (this.clusterId != null) return this.clusterId;
     // No synchronized here, worse case we will retrieve it twice, that's
     //  not an issue.
@@ -105,8 +105,10 @@ class ZooKeeperRegistry implements Registry {
       }
     } catch (KeeperException e) {
       LOG.warn("Can't retrieve clusterId from ZooKeeper", e);
+      throw new IOException("ZooKeeperException ", e);
     } catch (IOException e) {
       LOG.warn("Can't retrieve clusterId from ZooKeeper", e);
+      throw e;
     } finally {
       if (zkw != null) zkw.close();
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e1d13094/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java
index 18b54da..0c5d5f6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java
@@ -53,27 +53,31 @@ public class TableOutputFormat extends FileOutputFormat<ImmutableBytesWritable,
    */
   protected static class TableRecordWriter implements RecordWriter<ImmutableBytesWritable,
Put> {
     private BufferedMutator m_mutator;
-    private Connection connection;
+    private Connection conn;
+
     /**
-     * Instantiate a TableRecordWriter with the HBase HClient for writing. Assumes control
over the
-     * lifecycle of {@code conn}.
+     * Instantiate a TableRecordWriter with a BufferedMutator for batch writing.
      */
-    public TableRecordWriter(final BufferedMutator mutator) throws IOException {
-      this.m_mutator = mutator;
-    }
-
     public TableRecordWriter(JobConf job) throws IOException {
       // expecting exactly one path
       TableName tableName = TableName.valueOf(job.get(OUTPUT_TABLE));
-      connection = ConnectionFactory.createConnection(job);
-      m_mutator = connection.getBufferedMutator(tableName);
+      try {
+        this.conn = ConnectionFactory.createConnection(job);
+        this.m_mutator = conn.getBufferedMutator(tableName);
+      } finally {
+        if (this.m_mutator == null) {
+          conn.close();
+          conn = null;
+        }
+      }
     }
 
     public void close(Reporter reporter) throws IOException {
-      this.m_mutator.close();
-      if (connection != null) {
-        connection.close();
-        connection = null;
+      if (this.m_mutator != null) {
+        this.m_mutator.close();
+      }
+      if (conn != null) {
+        this.conn.close();
       }
     }
 
@@ -101,6 +105,7 @@ public class TableOutputFormat extends FileOutputFormat<ImmutableBytesWritable,
   public RecordWriter getRecordWriter(FileSystem ignored, JobConf job, String name,
       Progressable progress)
   throws IOException {
+    // Clear write buffer on fail is true by default so no need to reset it.
     return new TableRecordWriter(job);
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/e1d13094/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java
index 109e416..679d9c9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.MasterNotRunningException;
 import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.ZooKeeperConnectionException;
 import org.apache.hadoop.hbase.ipc.AbstractRpcClient;
 import org.apache.hadoop.hbase.ipc.RpcClientFactory;
 import org.apache.hadoop.hbase.ipc.RpcClientImpl;
@@ -106,9 +107,9 @@ public class TestClientTimeouts {
           // run some admin commands
           HBaseAdmin.checkHBaseAvailable(conf);
           admin.setBalancerRunning(false, false);
-        } catch (MasterNotRunningException ex) {
+        } catch (ZooKeeperConnectionException ex) {
           // Since we are randomly throwing SocketTimeoutExceptions, it is possible to get
-          // a MasterNotRunningException.  It's a bug if we get other exceptions.
+          // a ZooKeeperConnectionException.  It's a bug if we get other exceptions.
           lastFailed = true;
         } finally {
           if(admin != null) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/e1d13094/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestTableOutputFormatConnectionExhaust.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestTableOutputFormatConnectionExhaust.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestTableOutputFormatConnectionExhaust.java
new file mode 100644
index 0000000..835117c
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestTableOutputFormatConnectionExhaust.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.IOException;
+
+import static org.junit.Assert.fail;
+
+/**
+ * Spark creates many instances of TableOutputFormat within a single process.  We need to
make
+ * sure we can have many instances and not leak connections.
+ *
+ * This test creates a few TableOutputFormats and shouldn't fail due to ZK connection exhaustion.
+ */
+@Category(MediumTests.class)
+public class TestTableOutputFormatConnectionExhaust {
+
+  private static final Log LOG =
+      LogFactory.getLog(TestTableOutputFormatConnectionExhaust.class);
+
+  private final static HBaseTestingUtility UTIL = new HBaseTestingUtility();
+  static final String TABLE = "TestTableOutputFormatConnectionExhaust";
+  static final String FAMILY = "family";
+
+  @BeforeClass
+  public static void beforeClass() throws Exception {
+    // Default in ZookeeperMiniCluster is 1000, setting artificially low to trigger exhaustion.
+    // need min of 7 to properly start the default mini HBase cluster
+    UTIL.getConfiguration().setInt(HConstants.ZOOKEEPER_MAX_CLIENT_CNXNS, 10);
+    UTIL.startMiniCluster();
+  }
+
+  @AfterClass
+  public static void afterClass() throws Exception {
+    UTIL.shutdownMiniCluster();
+  }
+
+  @Before
+  public void before() throws IOException {
+    LOG.info("before");
+    UTIL.ensureSomeRegionServersAvailable(1);
+    LOG.info("before done");
+  }
+
+  /**
+   * Open and close a TableOutputFormat.  The closing the RecordWriter should release HBase
+   * Connection (ZK) resources, and will throw exception if they are exhausted.
+   */
+  static void openCloseTableOutputFormat(int iter)  throws IOException {
+    LOG.info("Instantiating TableOutputFormat connection  " + iter);
+    JobConf conf = new JobConf();
+    conf.addResource(UTIL.getConfiguration());
+    conf.set(TableOutputFormat.OUTPUT_TABLE, TABLE);
+    TableMapReduceUtil.initTableMapJob(TABLE, FAMILY, TableMap.class,
+        ImmutableBytesWritable.class, ImmutableBytesWritable.class, conf);
+    TableOutputFormat tof = new TableOutputFormat();
+    RecordWriter rw = tof.getRecordWriter(null, conf, TABLE, null);
+    rw.close(null);
+  }
+
+  @Test
+  public void testConnectionExhaustion() throws IOException {
+    int MAX_INSTANCES = 5; // fails on iteration 3 if zk connections leak
+    for (int i = 0; i < MAX_INSTANCES; i++) {
+      final int iter = i;
+      try {
+        openCloseTableOutputFormat(iter);
+      } catch (Exception e) {
+        LOG.error("Exception encountered", e);
+        fail("Failed on iteration " + i);
+      }
+    }
+  }
+
+}


Mime
View raw message