hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zy...@apache.org
Subject hbase git commit: HBASE-20204 Add locking to RefreshFileConnections in BucketCache
Date Wed, 09 May 2018 21:15:21 GMT
Repository: hbase
Updated Branches:
  refs/heads/branch-1 9d9d5aec0 -> 5406f6333


HBASE-20204 Add locking to RefreshFileConnections in BucketCache

This is a follow-up to HBASE-20141 where Anoop suggested adding locking
for refreshing channels.


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/5406f633
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/5406f633
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/5406f633

Branch: refs/heads/branch-1
Commit: 5406f6333216f7bdbf4422aa856824d02da4c2a2
Parents: 9d9d5ae
Author: Zach York <zyork@amazon.com>
Authored: Wed Mar 14 15:38:22 2018 -0700
Committer: Zach York <zyork@amazon.com>
Committed: Wed May 9 14:15:02 2018 -0700

----------------------------------------------------------------------
 .../hbase/io/hfile/bucket/FileIOEngine.java     | 32 +++++++++++++++-----
 .../hbase/io/hfile/bucket/TestFileIOEngine.java | 19 +++++++++---
 2 files changed, 38 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/5406f633/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java
index ddefa85..7d3a9fa 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java
@@ -26,6 +26,7 @@ import java.nio.channels.ClosedByInterruptException;
 import java.nio.channels.ClosedChannelException;
 import java.nio.channels.FileChannel;
 import java.util.Arrays;
+import java.util.concurrent.locks.ReentrantLock;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.logging.Log;
@@ -43,6 +44,7 @@ public class FileIOEngine implements IOEngine {
   private final String[] filePaths;
   private final FileChannel[] fileChannels;
   private final RandomAccessFile[] rafs;
+  private final ReentrantLock[] channelLocks;
 
   private final long sizePerFile;
   private final long capacity;
@@ -56,6 +58,7 @@ public class FileIOEngine implements IOEngine {
     this.filePaths = filePaths;
     this.fileChannels = new FileChannel[filePaths.length];
     this.rafs = new RandomAccessFile[filePaths.length];
+    this.channelLocks = new ReentrantLock[filePaths.length];
     for (int i = 0; i < filePaths.length; i++) {
       String filePath = filePaths[i];
       try {
@@ -71,6 +74,7 @@ public class FileIOEngine implements IOEngine {
         }
         rafs[i].setLength(sizePerFile);
         fileChannels[i] = rafs[i].getChannel();
+        channelLocks[i] = new ReentrantLock();
         LOG.info("Allocating cache " + StringUtils.byteDesc(sizePerFile)
             + ", on the path:" + filePath);
       } catch (IOException fex) {
@@ -193,8 +197,7 @@ public class FileIOEngine implements IOEngine {
       } catch (ClosedByInterruptException e) {
         throw e;
       } catch (ClosedChannelException e) {
-        LOG.warn("Caught ClosedChannelException accessing BucketCache, reopening file. ",
e);
-        refreshFileConnection(accessFileNum);
+        refreshFileConnection(accessFileNum, e);
         continue;
       }
       // recover the limit
@@ -242,13 +245,26 @@ public class FileIOEngine implements IOEngine {
   }
 
   @VisibleForTesting
-  void refreshFileConnection(int accessFileNum) throws IOException {
-    FileChannel fileChannel = fileChannels[accessFileNum];
-    if (fileChannel != null) {
-      fileChannel.close();
+  void refreshFileConnection(int accessFileNum, IOException ioe) throws IOException {
+    ReentrantLock channelLock = channelLocks[accessFileNum];
+    channelLock.lock();
+    try {
+      FileChannel fileChannel = fileChannels[accessFileNum];
+      if (fileChannel != null) {
+        // Don't re-open a channel if we were waiting on another
+        // thread to re-open the channel and it is now open.
+        if (fileChannel.isOpen()) {
+          return;
+        }
+        fileChannel.close();
+      }
+      LOG.warn("Caught ClosedChannelException accessing BucketCache, reopening file: "
+          + filePaths[accessFileNum], ioe);
+      rafs[accessFileNum] = new RandomAccessFile(filePaths[accessFileNum], "rw");
+      fileChannels[accessFileNum] = rafs[accessFileNum].getChannel();
+    } finally{
+      channelLock.unlock();
     }
-    rafs[accessFileNum] = new RandomAccessFile(filePaths[accessFileNum], "rw");
-    fileChannels[accessFileNum] = rafs[accessFileNum].getChannel();
   }
 
   private static interface FileAccessor {

http://git-wip-us.apache.org/repos/asf/hbase/blob/5406f633/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java
index 8c2bc6e..6e677d5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java
@@ -19,7 +19,8 @@
 package org.apache.hadoop.hbase.io.hfile.bucket;
 
 import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 
 import java.io.File;
@@ -134,10 +135,18 @@ public class TestFileIOEngine {
   }
 
   @Test
-  public void testRefreshFileConnectionClosesConnections() throws IOException {
-    FileChannel fileChannel = fileIOEngine.getFileChannels()[0];
+  public void testRefreshFileConnection() throws IOException {
+    FileChannel[] fileChannels = fileIOEngine.getFileChannels();
+    FileChannel fileChannel = fileChannels[0];
     assertNotNull(fileChannel);
-    fileIOEngine.refreshFileConnection(0);
-    assertFalse(fileChannel.isOpen());
+    fileChannel.close();
+    fileIOEngine.refreshFileConnection(0, new IOException("Test Exception"));
+    FileChannel[] reopenedFileChannels = fileIOEngine.getFileChannels();
+    FileChannel reopenedFileChannel = reopenedFileChannels[0];
+    assertNotEquals(fileChannel, reopenedFileChannel);
+    assertEquals(fileChannels.length, reopenedFileChannels.length);
+    for (int i = 1; i < fileChannels.length; i++) {
+      assertEquals(fileChannels[i], reopenedFileChannels[i]);
+    }
   }
 }


Mime
View raw message