hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ramkris...@apache.org
Subject hbase git commit: HBASE-17837 Backport HBASE-15314 to branch-1.3 (Chunhui shen & Ram))
Date Thu, 30 Mar 2017 11:34:47 GMT
Repository: hbase
Updated Branches:
  refs/heads/branch-1 d0139a877 -> 589a0e2ef


HBASE-17837 Backport HBASE-15314 to branch-1.3	(Chunhui shen & Ram))


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/589a0e2e
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/589a0e2e
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/589a0e2e

Branch: refs/heads/branch-1
Commit: 589a0e2efb7b604d5f13f186143445b57438db0b
Parents: d0139a8
Author: Ramkrishna <ramkrishna.s.vasudevan@intel.com>
Authored: Thu Mar 30 17:03:36 2017 +0530
Committer: Ramkrishna <ramkrishna.s.vasudevan@intel.com>
Committed: Thu Mar 30 17:03:36 2017 +0530

----------------------------------------------------------------------
 .../hbase/io/hfile/bucket/BucketCache.java      |  11 +-
 .../hbase/io/hfile/bucket/FileIOEngine.java     | 177 +++++++++++++++----
 .../hbase/io/hfile/bucket/TestFileIOEngine.java |  47 ++++-
 3 files changed, 187 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/589a0e2e/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
index a7ac70c..9c075c6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
@@ -309,9 +309,14 @@ public class BucketCache implements BlockCache, HeapSize {
    */
   private IOEngine getIOEngineFromName(String ioEngineName, long capacity)
       throws IOException {
-    if (ioEngineName.startsWith("file:"))
-      return new FileIOEngine(ioEngineName.substring(5), capacity);
-    else if (ioEngineName.startsWith("offheap"))
+    if (ioEngineName.startsWith("file:") || ioEngineName.startsWith("files:")) {
+      // In order to make the usage simple, we only need the prefix 'files:' in
+      // document whether one or multiple file(s), but also support 'file:' for
+      // the compatibility
+      String[] filePaths =
+          ioEngineName.substring(ioEngineName.indexOf(":") + 1).split(FileIOEngine.FILE_DELIMITER);
+      return new FileIOEngine(capacity, filePaths);
+    } else if (ioEngineName.startsWith("offheap"))
       return new ByteBufferIOEngine(capacity, true);
     else if (ioEngineName.startsWith("heap"))
       return new ByteBufferIOEngine(capacity, false);

http://git-wip-us.apache.org/repos/asf/hbase/blob/589a0e2e/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java
index 7b6b25f..a7d6956 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java
@@ -18,10 +18,12 @@
  */
 package org.apache.hadoop.hbase.io.hfile.bucket;
 
+import java.io.File;
 import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
+import java.util.Arrays;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -34,38 +36,52 @@ import org.apache.hadoop.util.StringUtils;
 @InterfaceAudience.Private
 public class FileIOEngine implements IOEngine {
   private static final Log LOG = LogFactory.getLog(FileIOEngine.class);
-  private final RandomAccessFile raf;
-  private final FileChannel fileChannel;
-  private final String path;
-  private long size;
-
-  public FileIOEngine(String filePath, long fileSize) throws IOException {
-    this.path = filePath;
-    this.size = fileSize;
-    try {
-      raf = new RandomAccessFile(filePath, "rw");
-    } catch (java.io.FileNotFoundException fex) {
-      LOG.error("Can't create bucket cache file " + filePath, fex);
-      throw fex;
-    }
+  public static final String FILE_DELIMITER = ",";
+  private final String[] filePaths;
+  private final FileChannel[] fileChannels;
+  private final RandomAccessFile[] rafs;
 
-    try {
-      raf.setLength(fileSize);
-    } catch (IOException ioex) {
-      LOG.error("Can't extend bucket cache file; insufficient space for "
-          + StringUtils.byteDesc(fileSize), ioex);
-      raf.close();
-      throw ioex;
-    }
+  private final long sizePerFile;
+  private final long capacity;
+
+  private FileReadAccessor readAccessor = new FileReadAccessor();
+  private FileWriteAccessor writeAccessor = new FileWriteAccessor();
 
-    fileChannel = raf.getChannel();
-    LOG.info("Allocating " + StringUtils.byteDesc(fileSize) + ", on the path:" + filePath);
+  public FileIOEngine(long capacity, String... filePaths) throws IOException {
+    this.sizePerFile = capacity / filePaths.length;
+    this.capacity = this.sizePerFile * filePaths.length;
+    this.filePaths = filePaths;
+    this.fileChannels = new FileChannel[filePaths.length];
+    this.rafs = new RandomAccessFile[filePaths.length];
+    for (int i = 0; i < filePaths.length; i++) {
+      String filePath = filePaths[i];
+      try {
+        rafs[i] = new RandomAccessFile(filePath, "rw");
+        long totalSpace = new File(filePath).getTotalSpace();
+        if (totalSpace < sizePerFile) {
+          // The next setting length will throw exception,logging this message
+          // is just used for the detail reason of exception´╝î
+          String msg = "Only " + StringUtils.byteDesc(totalSpace)
+              + " total space under " + filePath + ", not enough for requested "
+              + StringUtils.byteDesc(sizePerFile);
+          LOG.warn(msg);
+        }
+        rafs[i].setLength(sizePerFile);
+        fileChannels[i] = rafs[i].getChannel();
+        LOG.info("Allocating cache " + StringUtils.byteDesc(sizePerFile)
+            + ", on the path:" + filePath);
+      } catch (IOException fex) {
+        LOG.error("Failed allocating cache on " + filePath, fex);
+        shutdown();
+        throw fex;
+      }
+    }
   }
 
   @Override
   public String toString() {
-    return "ioengine=" + this.getClass().getSimpleName() + ", path=" + this.path +
-      ", size=" + String.format("%,d", this.size);
+    return "ioengine=" + this.getClass().getSimpleName() + ", paths="
+        + Arrays.asList(filePaths) + ", capacity=" + String.format("%,d", this.capacity);
   }
 
   /**
@@ -86,7 +102,7 @@ public class FileIOEngine implements IOEngine {
    */
   @Override
   public int read(ByteBuffer dstBuffer, long offset) throws IOException {
-    return fileChannel.read(dstBuffer, offset);
+    return accessFile(readAccessor, dstBuffer, offset);
   }
 
   /**
@@ -97,7 +113,7 @@ public class FileIOEngine implements IOEngine {
    */
   @Override
   public void write(ByteBuffer srcBuffer, long offset) throws IOException {
-    fileChannel.write(srcBuffer, offset);
+    accessFile(writeAccessor, srcBuffer, offset);
   }
 
   /**
@@ -106,7 +122,16 @@ public class FileIOEngine implements IOEngine {
    */
   @Override
   public void sync() throws IOException {
-    fileChannel.force(true);
+    for (int i = 0; i < fileChannels.length; i++) {
+      try {
+        if (fileChannels[i] != null) {
+          fileChannels[i].force(true);
+        }
+      } catch (IOException ie) {
+        LOG.warn("Failed syncing data to " + this.filePaths[i]);
+        throw ie;
+      }
+    }
   }
 
   /**
@@ -114,15 +139,93 @@ public class FileIOEngine implements IOEngine {
    */
   @Override
   public void shutdown() {
-    try {
-      fileChannel.close();
-    } catch (IOException ex) {
-      LOG.error("Can't shutdown cleanly", ex);
+    for (int i = 0; i < filePaths.length; i++) {
+      try {
+        if (fileChannels[i] != null) {
+          fileChannels[i].close();
+        }
+        if (rafs[i] != null) {
+          rafs[i].close();
+        }
+      } catch (IOException ex) {
+        LOG.error("Failed closing " + filePaths[i] + " when shudown the IOEngine", ex);
+      }
+    }
+  }
+
+  private int accessFile(FileAccessor accessor, ByteBuffer buffer, long globalOffset)
+      throws IOException {
+    int startFileNum = getFileNum(globalOffset);
+    int remainingAccessDataLen = buffer.remaining();
+    int endFileNum = getFileNum(globalOffset + remainingAccessDataLen - 1);
+    int accessFileNum = startFileNum;
+    long accessOffset = getAbsoluteOffsetInFile(accessFileNum, globalOffset);
+    int bufLimit = buffer.limit();
+    while (true) {
+      FileChannel fileChannel = fileChannels[accessFileNum];
+      if (endFileNum > accessFileNum) {
+        // short the limit;
+        buffer.limit((int) (buffer.limit() - remainingAccessDataLen + sizePerFile - accessOffset));
+      }
+      int accessLen = accessor.access(fileChannel, buffer, accessOffset);
+      // recover the limit
+      buffer.limit(bufLimit);
+      if (accessLen < remainingAccessDataLen) {
+        remainingAccessDataLen -= accessLen;
+        accessFileNum++;
+        accessOffset = 0;
+      } else {
+        break;
+      }
+      if (accessFileNum >= fileChannels.length) {
+        throw new IOException("Required data len " + StringUtils.byteDesc(buffer.remaining())
+            + " exceed the engine's capacity " + StringUtils.byteDesc(capacity) + " where
offset="
+            + globalOffset);
+      }
+    }
+    return bufLimit;
+  }
+
+  /**
+   * Get the absolute offset in given file with the relative global offset.
+   * @param fileNum
+   * @param globalOffset
+   * @return the absolute offset
+   */
+  private long getAbsoluteOffsetInFile(int fileNum, long globalOffset) {
+    return globalOffset - fileNum * sizePerFile;
+  }
+
+  private int getFileNum(long offset) {
+    if (offset < 0) {
+      throw new IllegalArgumentException("Unexpected offset " + offset);
+    }
+    int fileNum = (int) (offset / sizePerFile);
+    if (fileNum >= fileChannels.length) {
+      throw new RuntimeException("Not expected offset " + offset + " where capacity=" + capacity);
+    }
+    return fileNum;
+  }
+
+  private static interface FileAccessor {
+    int access(FileChannel fileChannel, ByteBuffer byteBuffer, long accessOffset)
+        throws IOException;
+  }
+
+  private static class FileReadAccessor implements FileAccessor {
+    @Override
+    public int access(FileChannel fileChannel, ByteBuffer byteBuffer, long accessOffset)
+        throws IOException {
+      return fileChannel.read(byteBuffer, accessOffset);
     }
-    try {
-      raf.close();
-    } catch (IOException ex) {
-      LOG.error("Can't shutdown cleanly", ex);
+  }
+
+  private static class FileWriteAccessor implements FileAccessor {
+    @Override
+    public int access(FileChannel fileChannel, ByteBuffer byteBuffer, long accessOffset)
+        throws IOException {
+      return fileChannel.write(byteBuffer, accessOffset);
     }
   }
+
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/589a0e2e/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java
index 5f46681..8c71c09 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java
@@ -23,6 +23,8 @@ import static org.junit.Assert.assertTrue;
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.junit.Test;
@@ -35,13 +37,39 @@ import org.junit.experimental.categories.Category;
 public class TestFileIOEngine {
   @Test
   public void testFileIOEngine() throws IOException {
-    int size = 2 * 1024 * 1024; // 2 MB
-    String filePath = "testFileIOEngine";
+    long totalCapacity = 6 * 1024 * 1024; // 6 MB
+    String[] filePaths = { "testFileIOEngine1", "testFileIOEngine2",
+        "testFileIOEngine3" };
+    long sizePerFile = totalCapacity / filePaths.length; // 2 MB per File
+    List<Long> boundaryStartPositions = new ArrayList<Long>();
+    boundaryStartPositions.add(0L);
+    for (int i = 1; i < filePaths.length; i++) {
+      boundaryStartPositions.add(sizePerFile * i - 1);
+      boundaryStartPositions.add(sizePerFile * i);
+      boundaryStartPositions.add(sizePerFile * i + 1);
+    }
+    List<Long> boundaryStopPositions = new ArrayList<Long>();
+    for (int i = 1; i < filePaths.length; i++) {
+      boundaryStopPositions.add(sizePerFile * i - 1);
+      boundaryStopPositions.add(sizePerFile * i);
+      boundaryStopPositions.add(sizePerFile * i + 1);
+    }
+    boundaryStopPositions.add(sizePerFile * filePaths.length - 1);
+    FileIOEngine fileIOEngine = new FileIOEngine(totalCapacity, filePaths);
     try {
-      FileIOEngine fileIOEngine = new FileIOEngine(filePath, size);
-      for (int i = 0; i < 50; i++) {
+      for (int i = 0; i < 500; i++) {
         int len = (int) Math.floor(Math.random() * 100);
-        long offset = (long) Math.floor(Math.random() * size % (size - len));
+        long offset = (long) Math.floor(Math.random() * totalCapacity % (totalCapacity -
len));
+        if (i < boundaryStartPositions.size()) {
+          // make the boundary start positon
+          offset = boundaryStartPositions.get(i);
+        } else if ((i - boundaryStartPositions.size()) < boundaryStopPositions.size())
{
+          // make the boundary stop positon
+          offset = boundaryStopPositions.get(i - boundaryStartPositions.size()) - len + 1;
+        } else if (i % 2 == 0) {
+          // make the cross-files block writing/reading
+          offset = Math.max(1, i % filePaths.length) * sizePerFile - len / 2;
+        }
         byte[] data1 = new byte[len];
         for (int j = 0; j < data1.length; ++j) {
           data1[j] = (byte) (Math.random() * 255);
@@ -54,9 +82,12 @@ public class TestFileIOEngine {
         }
       }
     } finally {
-      File file = new File(filePath);
-      if (file.exists()) {
-        file.delete();
+      fileIOEngine.shutdown();
+      for (String filePath : filePaths) {
+        File file = new File(filePath);
+        if (file.exists()) {
+          file.delete();
+        }
       }
     }
 


Mime
View raw message