hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ssrungar...@apache.org
Subject hbase git commit: HBASE-14385 Close the sockets that is missing in connection closure.
Date Thu, 10 Sep 2015 07:19:00 GMT
Repository: hbase
Updated Branches:
  refs/heads/branch-1.1 1e91cd2b2 -> d1d403d97


HBASE-14385 Close the sockets that is missing in connection closure.


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/d1d403d9
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/d1d403d9
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/d1d403d9

Branch: refs/heads/branch-1.1
Commit: d1d403d9723b2228d89940ac03f40fcbe3d71791
Parents: 1e91cd2
Author: Srikanth Srungarapu <ssrungarapu@cloudera.com>
Authored: Wed Sep 9 23:39:05 2015 -0700
Committer: Srikanth Srungarapu <ssrungarapu@cloudera.com>
Committed: Thu Sep 10 00:08:36 2015 -0700

----------------------------------------------------------------------
 .../apache/hadoop/hbase/ipc/RpcClientImpl.java  |   9 ++
 .../hadoop/hbase/ipc/TestRpcClientLeaks.java    | 115 +++++++++++++++++++
 2 files changed, 124 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/d1d403d9/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
index cb84bde..567e4b6 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
@@ -869,6 +869,15 @@ public class RpcClientImpl extends AbstractRpcClient {
       }
       IOUtils.closeStream(in);
       this.in = null;
+      if (this.socket != null) {
+        try {
+          this.socket.close();
+          this.socket = null;
+        } catch (IOException e) {
+          LOG.error("Error while closing socket", e);
+        }
+      }
+
       disposeSasl();
 
       // log the info

http://git-wip-us.apache.org/repos/asf/hbase/blob/d1d403d9/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcClientLeaks.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcClientLeaks.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcClientLeaks.java
new file mode 100644
index 0000000..2965055
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcClientLeaks.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.util.List;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.RetriesExhaustedException;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.codec.Codec;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
+
+import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
+import static org.junit.Assert.*;
+
+@Category(SmallTests.class)
+public class TestRpcClientLeaks {
+
+  public static class MyRpcClientImpl extends RpcClientImpl {
+    public static List<Socket> savedSockets = Lists.newArrayList();
+    @Rule public ExpectedException thrown = ExpectedException.none();
+
+    public MyRpcClientImpl(Configuration conf, String clusterId) {
+      super(conf, clusterId);
+    }
+
+    public MyRpcClientImpl(Configuration conf, String clusterId, SocketAddress address) {
+      super(conf, clusterId, address);
+    }
+
+    @Override
+    protected Connection createConnection(ConnectionId remoteId, Codec codec,
+        CompressionCodec compressor) throws IOException {
+      return new Connection(remoteId, codec, compressor) {
+        @Override
+        protected synchronized void setupConnection() throws IOException {
+          super.setupConnection();
+          synchronized (savedSockets) {
+            savedSockets.add(socket);
+          }
+          throw new IOException("Sample exception for " +
+            "verifying socket closure in case of exceptions.");
+        }
+      };
+    }
+  }
+
+  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    UTIL.startMiniCluster();
+  }
+
+  @AfterClass
+  public static void teardown() throws Exception {
+    UTIL.shutdownMiniCluster();
+  }
+
+  public static final Log LOG = LogFactory.getLog(TestRpcClientLeaks.class);
+
+  @Test(expected=RetriesExhaustedException.class)
+  public void testSocketClosed() throws IOException, InterruptedException {
+    String tableName = "testSocketClosed";
+    TableName name = TableName.valueOf(tableName);
+    UTIL.createTable(name, fam1).close();
+
+    Configuration conf = new Configuration(UTIL.getConfiguration());
+    conf.set(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY,
+      MyRpcClientImpl.class.getName());
+    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
+    Connection connection = ConnectionFactory.createConnection(conf);
+    Table table = connection.getTable(TableName.valueOf(tableName));
+    table.get(new Get("asd".getBytes()));
+    connection.close();
+    for (Socket socket : MyRpcClientImpl.savedSockets) {
+      assertTrue("Socket + " +  socket + " is not closed", socket.isClosed());
+    }
+  }
+}
+


Mime
View raw message