hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bus...@apache.org
Subject [1/4] hbase git commit: HBASE-9393 Hbase does not closing a closed socket resulting in many CLOSE_WAIT
Date Tue, 20 Jun 2017 07:54:57 GMT
Repository: hbase
Updated Branches:
  refs/heads/branch-1.2 c1289960d -> 4160f7273


HBASE-9393 Hbase does not closing a closed socket resulting in many CLOSE_WAIT

Signed-off-by: Andrew Purtell <apurtell@apache.org>

 Conflicts:
	hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
	hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/2bde1ac4
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/2bde1ac4
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/2bde1ac4

Branch: refs/heads/branch-1.2
Commit: 2bde1ac4050eeaaa262c41459a6590359e69d78c
Parents: c128996
Author: Ashish Singhi <ashishsinghi@apache.org>
Authored: Tue Jun 6 17:49:08 2017 +0530
Committer: Sean Busbey <busbey@apache.org>
Committed: Tue Jun 20 02:49:12 2017 -0500

----------------------------------------------------------------------
 .../hbase/io/FSDataInputStreamWrapper.java      | 71 +++++++++++++++++++-
 .../hadoop/hbase/io/HalfStoreFileReader.java    |  4 ++
 .../hbase/io/hfile/AbstractHFileReader.java     |  8 +++
 .../org/apache/hadoop/hbase/io/hfile/HFile.java | 26 +++++--
 .../hadoop/hbase/io/hfile/HFileBlock.java       | 21 +++++-
 .../hadoop/hbase/io/hfile/HFileReaderV2.java    |  5 ++
 .../hadoop/hbase/io/hfile/HFileScanner.java     |  5 ++
 .../hbase/regionserver/StoreFileScanner.java    |  5 +-
 8 files changed, 136 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/2bde1ac4/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java
index b06be6b..dc168da 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java
@@ -18,7 +18,12 @@
 package org.apache.hadoop.hbase.io;
 
 import java.io.IOException;
+import java.io.InputStream;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -32,6 +37,8 @@ import com.google.common.annotations.VisibleForTesting;
  * see method comments.
  */
 public class FSDataInputStreamWrapper {
+  private static final Log LOG = LogFactory.getLog(FSDataInputStreamWrapper.class);
+
   private final HFileSystem hfs;
   private final Path path;
   private final FileLink link;
@@ -74,6 +81,11 @@ public class FSDataInputStreamWrapper {
   // reads without hbase checksum verification.
   private volatile int hbaseChecksumOffCount = -1;
 
+  private Boolean instanceOfCanUnbuffer = null;
+  // Using reflection to get org.apache.hadoop.fs.CanUnbuffer#unbuffer method to avoid compilation
+  // errors against Hadoop pre 2.6.4 and 2.7.1 versions.
+  private Method unbuffer = null;
+
   public FSDataInputStreamWrapper(FileSystem fs, Path path) throws IOException {
     this(fs, null, path, false);
   }
@@ -219,4 +231,61 @@ public class FSDataInputStreamWrapper {
   public HFileSystem getHfs() {
     return this.hfs;
   }
-}
+
+  /**
+   * This will free sockets and file descriptors held by the stream only when the stream
implements
+   * org.apache.hadoop.fs.CanUnbuffer. NOT THREAD SAFE. Must be called only when all the
clients
+   * using this stream to read the blocks have finished reading. If by chance the stream
is
+   * unbuffered and there are clients still holding this stream for read then on next client
read
+   * request a new socket will be opened by Datanode without client knowing about it and
will serve
+   * its read request. Note: If this socket is idle for some time then the DataNode will
close the
+   * socket and the socket will move into CLOSE_WAIT state and on the next client request
on this
+   * stream, the current socket will be closed and a new socket will be opened to serve the
+   * requests.
+   */
+  @SuppressWarnings({ "rawtypes" })
+  public void unbuffer() {
+    FSDataInputStream stream = this.getStream(this.shouldUseHBaseChecksum());
+    if (stream != null) {
+      InputStream wrappedStream = stream.getWrappedStream();
+      // CanUnbuffer interface was added as part of HDFS-7694 and the fix is available in
Hadoop
+      // 2.6.4+ and 2.7.1+ versions only so check whether the stream object implements the
+      // CanUnbuffer interface or not and based on that call the unbuffer api.
+      final Class<? extends InputStream> streamClass = wrappedStream.getClass();
+      if (this.instanceOfCanUnbuffer == null) {
+        // To ensure we compute whether the stream is instance of CanUnbuffer only once.
+        this.instanceOfCanUnbuffer = false;
+        Class<?>[] streamInterfaces = streamClass.getInterfaces();
+        for (Class c : streamInterfaces) {
+          if (c.getCanonicalName().toString().equals("org.apache.hadoop.fs.CanUnbuffer"))
{
+            try {
+              this.unbuffer = streamClass.getDeclaredMethod("unbuffer");
+            } catch (NoSuchMethodException | SecurityException e) {
+              LOG.warn("Failed to find 'unbuffer' method in class " + streamClass
+                  + " . So there may be a TCP socket connection "
+                  + "left open in CLOSE_WAIT state.",
+                e);
+              return;
+            }
+            this.instanceOfCanUnbuffer = true;
+            break;
+          }
+        }
+      }
+      if (this.instanceOfCanUnbuffer) {
+        try {
+          this.unbuffer.invoke(wrappedStream);
+        } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException
e) {
+          LOG.warn("Failed to invoke 'unbuffer' method in class " + streamClass
+              + " . So there may be a TCP socket connection left open in CLOSE_WAIT state.",
+            e);
+        }
+      } else {
+        LOG.warn("Failed to find 'unbuffer' method in class " + streamClass
+            + " . So there may be a TCP socket connection "
+            + "left open in CLOSE_WAIT state. For more details check "
+            + "https://issues.apache.org/jira/browse/HBASE-9393");
+      }
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/2bde1ac4/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java
index ed2e925..c259fc2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java
@@ -322,6 +322,10 @@ public class HalfStoreFileReader extends StoreFile.Reader {
       public Cell getNextIndexedKey() {
         return null;
       }
+
+      @Override
+      public void close() {
+      }
     };
   }
   

http://git-wip-us.apache.org/repos/asf/hbase/blob/2bde1ac4/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java
index 7d8b572..5039427 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java
@@ -332,6 +332,14 @@ public abstract class AbstractHFileReader
     public HFile.Reader getReader() {
       return reader;
     }
+
+    @Override
+    public void close() {
+      if (!pread) {
+        // For seek + pread stream socket should be closed when the scanner is closed. HBASE-9393
+        reader.unbufferStream();
+      }
+    }
   }
 
   /** For testing */

http://git-wip-us.apache.org/repos/asf/hbase/blob/2bde1ac4/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
index a67bf8c..1ae05b6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
@@ -454,6 +454,12 @@ public class HFile {
     boolean isPrimaryReplicaReader();
 
     void setPrimaryReplicaReader(boolean isPrimaryReplicaReader);
+
+    /**
+     * To close the stream's socket. Note: This can be concurrently called from multiple
threads and
+     * implementation should take care of thread safety.
+     */
+    void unbufferStream();
   }
 
   /**
@@ -470,8 +476,8 @@ public class HFile {
    */
   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="SF_SWITCH_FALLTHROUGH",
       justification="Intentional")
-  private static Reader pickReaderVersion(Path path, FSDataInputStreamWrapper fsdis,
-      long size, CacheConfig cacheConf, HFileSystem hfs, Configuration conf) throws IOException
{
+  private static Reader openReader(Path path, FSDataInputStreamWrapper fsdis, long size,
+      CacheConfig cacheConf, HFileSystem hfs, Configuration conf) throws IOException {
     FixedFileTrailer trailer = null;
     try {
       boolean isHBaseChecksum = fsdis.shouldUseHBaseChecksum();
@@ -492,10 +498,15 @@ public class HFile {
         LOG.warn("Error closing fsdis FSDataInputStreamWrapper", t2);
       }
       throw new CorruptHFileException("Problem reading HFile Trailer from file " + path,
t);
+    } finally {
+      fsdis.unbuffer();
     }
   }
 
   /**
+   * The sockets and the file descriptors held by the method parameter
+   * {@code FSDataInputStreamWrapper} passed will be freed after its usage so caller needs
to ensure
+   * that no other threads have access to the same passed reference.
    * @param fs A file system
    * @param path Path to HFile
    * @param fsdis a stream of path's file
@@ -519,7 +530,7 @@ public class HFile {
     } else {
       hfs = (HFileSystem)fs;
     }
-    return pickReaderVersion(path, fsdis, size, cacheConf, hfs, conf);
+    return openReader(path, fsdis, size, cacheConf, hfs, conf);
   }
 
   /**
@@ -534,18 +545,21 @@ public class HFile {
       FileSystem fs, Path path, CacheConfig cacheConf, Configuration conf) throws IOException
{
     Preconditions.checkNotNull(cacheConf, "Cannot create Reader with null CacheConf");
     FSDataInputStreamWrapper stream = new FSDataInputStreamWrapper(fs, path);
-    return pickReaderVersion(path, stream, fs.getFileStatus(path).getLen(),
+    return openReader(path, stream, fs.getFileStatus(path).getLen(),
       cacheConf, stream.getHfs(), conf);
   }
 
   /**
-   * This factory method is used only by unit tests
+   * This factory method is used only by unit tests. <br/>
+   * The sockets and the file descriptors held by the method parameter
+   * {@code FSDataInputStreamWrapper} passed will be freed after its usage so caller needs
to ensure
+   * that no other threads have access to the same passed reference.
    */
   static Reader createReaderFromStream(Path path,
       FSDataInputStream fsdis, long size, CacheConfig cacheConf, Configuration conf)
       throws IOException {
     FSDataInputStreamWrapper wrapper = new FSDataInputStreamWrapper(fsdis);
-    return pickReaderVersion(path, wrapper, size, cacheConf, null, conf);
+    return openReader(path, wrapper, size, cacheConf, null, conf);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/2bde1ac4/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
index 351c3c4..751c667 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
@@ -1327,6 +1327,12 @@ public class HFileBlock implements Cacheable {
 
     /** Get the default decoder for blocks from this file. */
     HFileBlockDecodingContext getDefaultBlockDecodingContext();
+
+    /**
+     * To close the stream's socket. Note: This can be concurrently called from multiple
threads and
+     * implementation should take care of thread safety.
+     */
+    void unbufferStream();
   }
 
   /**
@@ -1348,7 +1354,7 @@ public class HFileBlock implements Cacheable {
     /** The path (if any) where this data is coming from */
     protected Path path;
 
-    private final Lock streamLock = new ReentrantLock();
+    protected final Lock streamLock = new ReentrantLock();
 
     /** The default buffer size for our buffered streams */
     public static final int DEFAULT_BUFFER_SIZE = 1 << 20;
@@ -1764,6 +1770,19 @@ public class HFileBlock implements Cacheable {
     }
 
     @Override
+    public void unbufferStream() {
+      // To handle concurrent reads, ensure that no other client is accessing the streams
while we
+      // unbuffer it.
+      if (streamLock.tryLock()) {
+        try {
+          this.streamWrapper.unbuffer();
+        } finally {
+          streamLock.unlock();
+        }
+      }
+    }
+
+    @Override
     public String toString() {
       return "hfs=" + hfs + ", path=" + path + ", fileContext=" + fileContext;
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/2bde1ac4/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java
index fd5fc02..3e6c673 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java
@@ -1413,4 +1413,9 @@ public class HFileReaderV2 extends AbstractHFileReader {
   boolean prefetchComplete() {
     return PrefetchExecutor.isCompleted(path);
   }
+
+  @Override
+  public void unbufferStream() {
+    fsBlockReader.unbufferStream();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/2bde1ac4/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java
index 3e0f91f..b5f207c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java
@@ -161,4 +161,9 @@ public interface HFileScanner {
    * @return the next key in the index (the key to seek to the next block)
    */
   Cell getNextIndexedKey();
+
+  /**
+   * Close the stream socket to handle RS CLOSE_WAIT. HBASE-9393
+   */
+  void close();
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/2bde1ac4/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
index 0785db1..a55c3da 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
@@ -51,6 +51,7 @@ public class StoreFileScanner implements KeyValueScanner {
   private final StoreFile.Reader reader;
   private final HFileScanner hfs;
   private Cell cur = null;
+  private boolean closed = false;
 
   private boolean realSeekDone;
   private boolean delayedReseek;
@@ -252,8 +253,10 @@ public class StoreFileScanner implements KeyValueScanner {
   }
 
   public void close() {
-    // Nothing to close on HFileScanner?
     cur = null;
+    if (closed) return;
+    closed = true;
+    this.hfs.close();
   }
 
   /**


Mime
View raw message