hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ecl...@apache.org
Subject [13/19] hbase git commit: HBASE-12476 HydraBase consensus protocol
Date Tue, 25 Nov 2014 20:29:06 GMT
http://git-wip-us.apache.org/repos/asf/hbase/blob/eca32aa4/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/log/LogWriter.java
----------------------------------------------------------------------
diff --git a/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/log/LogWriter.java
b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/log/LogWriter.java
new file mode 100644
index 0000000..4a1947a
--- /dev/null
+++ b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/log/LogWriter.java
@@ -0,0 +1,201 @@
+package org.apache.hadoop.hbase.consensus.log;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import java.nio.ByteBuffer;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.util.List;
+import org.apache.hadoop.util.PureJavaCrc32;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+/**
+ * LogWriter provides interfaces to perform writer operations against a log file.
+ * This class is not thread safe. And there shall be ONLY one logWriter for each log file.
+ *
+ * It assumes the client will write the header of the file first, and then it can append
+ * commits or truncate the commits, finally the caller will close the writer.
+ *
+ *  Here is the log file format:
+ * ------------------
+ * | File Header:   |
+ * |  Version    4B |
+ * |  Term       8B |
+ * |  Index      8B |
+ * ------------------
+ * | Entry # N      |
+ * |  Entry Header  |
+ * |   Size     4B  |
+ * |   CRC      8B  |
+ * |   Index    8B  |
+ * |  Entry PayLoad |
+ * ------------------
+ * | Entry # N      |
+ * |  Entry Header  |
+ * |   Size     4B  |
+ * |   CRC      8B  |
+ * |   Index    8B  |
+ * |  Entry PayLoad |
+ * ----------------
+ *     .....
+ * ------------------
+ * | Entry # N      |
+ * |  Entry Header  |
+ * |   Size     4B  |
+ * |   CRC      8B  |
+ * |   Index    8B  |
+ * |  Entry PayLoad |
+ * ------------------
+ */
+@NotThreadSafe
+public class LogWriter {
+
+  private final Logger
+    LOG = LoggerFactory.getLogger(LogWriter.class);
+
+  private static final int CONSENSUS_LOG_DEFAULT_PAYLOAD_SIZE = 16 * 1024;
+  // The outside application should not have direct access to this object as
+  // the offset is maintained by the LogWriter
+  private final RandomAccessFile raf;
+  private boolean isSync = false;
+  private long currOffset;
+  private ByteBuffer buffer = ByteBuffer.allocateDirect(
+    CONSENSUS_LOG_DEFAULT_PAYLOAD_SIZE);
+
+  /** The CRC instance to compute CRC-32 of a log entry payload */
+  private PureJavaCrc32 crc32 = new PureJavaCrc32();
+
+  public LogWriter(RandomAccessFile raf, boolean isSync) {
+    this.raf = raf;
+    this.isSync = isSync;
+    this.currOffset = 0;
+  }
+
+
+  public RandomAccessFile getRandomAccessFile() {
+    return raf;
+  }
+
+  /**
+   * Write the header data to the log file
+   *
+   * @param term The term of the log file. Each log file only contains transactions
+   *             for  this term
+   * @param index The initial index for this log file.
+   * @throws IOException
+   */
+  public void writeFileHeader(long term, long index) throws IOException {
+    buffer.clear();
+
+    buffer.putInt(HConstants.RAFT_LOG_VERSION);
+
+    // Write the index to the buffer
+    buffer.putLong(term);
+    buffer.putLong(index);
+    buffer.flip();
+    currOffset += raf.getChannel().write(buffer);
+  }
+
+  public static ByteBuffer generateFileHeader(long term, long index) {
+    ByteBuffer bbuf = ByteBuffer.allocate(HConstants.RAFT_FILE_HEADER_SIZE);
+    bbuf.putInt(HConstants.RAFT_LOG_VERSION);
+    bbuf.putLong(term);
+    bbuf.putLong(index);
+    bbuf.flip();
+    return bbuf;
+  }
+
+  /**
+   * Append an specific commit (index with its transactions) to the log file
+   *
+   * @param index The index of the transactions
+   * @param data  The transaction list
+   * @return offset The file offset where this commit starts.
+   * @throws IOException
+   */
+  public long append(long index, ByteBuffer data) throws IOException {
+
+    // Get the current file offset right before this entry
+    long offset = currOffset;
+
+    ByteBuffer buffer = getBuffer(data.remaining() +
+      HConstants.RAFT_TXN_HEADER_SIZE);
+
+    // Clear the buffer
+    buffer.clear();
+
+    buffer.putInt(data.remaining());
+
+    // Update the CRC for the entry payload
+    this.crc32.reset();
+    this.crc32.update(data.array(), data.arrayOffset() + data.position(),
+      data.remaining());
+
+    // Write CRC value
+    buffer.putLong(crc32.getValue());
+
+    // Write the index to the buffer
+    buffer.putLong(index);
+
+    // Write the payload
+    buffer.put(data.array(), data.position() + data.arrayOffset(),
+      data.remaining());
+
+    // Reset the position
+    buffer.flip();
+
+    // Write the header
+    currOffset += raf.getChannel().write(buffer, currOffset);
+
+    buffer.clear();
+    // Sync the file if enabled
+    if (this.isSync) {
+      raf.getChannel().force(true);
+      raf.getFD().sync();
+    }
+
+    // Return the starting file offset before this entry
+    return offset;
+  }
+
+  /**
+   * Truncate the log from a specific offset
+   * @param offset
+   * @throws IOException
+   */
+  public void truncate(long offset) throws IOException {
+    this.raf.getChannel().truncate(offset);
+
+    // Need to always sync the data to the log file
+    this.raf.getChannel().force(true);
+    currOffset = offset;
+  }
+
+  /**
+   * Close the writer; No more writer operation is allowed after the log is closed.
+   * @throws IOException
+   */
+  public void close() throws IOException {
+    this.raf.close();
+  }
+
+  public long getCurrentPosition() {
+    return currOffset;
+  }
+
+  private ByteBuffer getBuffer(int payloadSize) {
+    if (buffer.capacity() >= payloadSize) {
+      return buffer;
+    }
+
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Allocating a new byte buffer of size " + payloadSize);
+    }
+    return ByteBuffer.allocate(payloadSize);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/eca32aa4/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/log/RandomAccessLog.java
----------------------------------------------------------------------
diff --git a/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/log/RandomAccessLog.java
b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/log/RandomAccessLog.java
new file mode 100644
index 0000000..840dbae
--- /dev/null
+++ b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/log/RandomAccessLog.java
@@ -0,0 +1,451 @@
+package org.apache.hadoop.hbase.consensus.log;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.util.Arena;
+import org.apache.hadoop.hbase.util.MemoryBuffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.NotThreadSafe;
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * The RandomAccessLog provides the random access interface for the transaction
+ * log. This class is not thread-safe in general.
+ *
+ * This class holds one LogWriter instance, which delegates the write operation.
+ * And assume there is one thread accessing these write APIs.
+ * In addition, if the log file has been finalized for write, the file will be
+ * immutable.
+ *
+ * Also this class maintains a map of LogReader instance, which delegates the
+ * read operation. The map is indexed by the session key from the client, and
+ * the client will use its own session key to access these read APIs.
+ * Concurrent access from different session key is thread safe. But concurrent
+ * access from the same session key is NOT thread safe.
+ */
+@NotThreadSafe
+public class RandomAccessLog implements LogFileInterface {
+
+  public static long UNKNOWN_CREATION_TIME = -1;
+
+  private final Logger LOG = LoggerFactory.getLogger(RandomAccessLog.class);
+
+  /** It maps from the index to its starting file offset. */
+  private final ConcurrentHashMap<Long, Long> indexToOffsetMap;
+
+  private volatile long firstIndex = Long.MAX_VALUE;
+
+  private volatile long lastIndex = Long.MIN_VALUE;
+
+  /** The actual file */
+  private File file;
+
+  /** The LogWriter instance, which delegates the write operation. */
+  private final LogWriter writer;
+
+  /** Indicates whether the log has been finalized for write operation. */
+  private boolean isFinalized = false;
+
+  /** It maps from the reader session key to the LogReader instance. */
+  private final Map<String, LogReader> readerMap = new ConcurrentHashMap<>();
+
+  /** The current term of the log file. Each log file only has one term. */
+  private volatile long currentTerm = HConstants.UNDEFINED_TERM_INDEX;
+
+  /** Creation Time */
+  private final long creationTime;
+
+  /** only here to support mocking */
+  protected RandomAccessLog() {
+    file = null;
+    writer = null;
+    indexToOffsetMap = new ConcurrentHashMap<Long, Long>();
+    creationTime = UNKNOWN_CREATION_TIME;
+  }
+
+  public RandomAccessLog(File file, boolean isSync) throws IOException {
+    this(file, new RandomAccessFile(file, "rw"), isSync);
+  }
+
+  public RandomAccessLog(File file, RandomAccessFile raf, boolean isSync)
+    throws IOException {
+    this.creationTime = populateCreationTime(file);
+    this.file = file;
+    RandomAccessFile writeRAF = raf;
+    this.writer = new LogWriter(writeRAF, isSync);
+    this.indexToOffsetMap = new ConcurrentHashMap<Long, Long>();
+  }
+
+  public RandomAccessFile getRandomAccessFile() {
+    return writer.getRandomAccessFile();
+  }
+
+  /**
+   * Append the term, index and its transactions into the commit log.
+   * If this is the first entry of the log, it will write a file header as well.
+   * And return the starting offset for this commit entry
+   *
+   * @param term
+   * @param index
+   * @param data
+   * @return offset the start offset for this commit entry
+   * @throws IOException
+   */
+  public long append(long term, long index, final ByteBuffer data) throws IOException {
+    try {
+      // initialize the file
+      if (!isInitialized(term)) {
+        initialize(term, index);
+      }
+
+      // Append transactions and get the offset.
+      long offset = this.writer.append(index, data);
+      updateIndexMap(index, offset);
+
+      // Return the starting offset for this entry
+      return offset;
+    } catch (Exception e) {
+      // TODO:
+      LOG.error("Cannot append to the transaction log ", e);
+      throw e;
+    }
+  }
+
+  private void updateIndexMap(long index, long offset) {
+    if (index < this.firstIndex) {
+      this.firstIndex = index;
+    }
+
+    if (index > this.lastIndex) {
+      this.lastIndex = index;
+    }
+
+    // Update the index to offset map
+    indexToOffsetMap.put(index, offset);
+  }
+
+  /**
+   * Truncate the log by removing all the entry with larger or the same index.
+   *
+   * @param index
+   * @throws IOException
+   */
+  public void truncate(long index) throws IOException {
+    // Verify the term and index
+    Long offset = indexToOffsetMap.get(index);
+    if (offset == null) {
+      throw new IOException("No such index " + index + "in the current log");
+    }
+
+    // Truncate the file
+    try {
+      this.writer.truncate(offset);
+    } catch (IOException e) {
+      LOG.error("Cannot truncate to the transaction log ", e);
+      throw e;
+    }
+
+    // Update the meta data
+    removeIndexUpTo(index);
+  }
+
+  /**
+   * Remove all the indexes which is equal or larger than this index
+   * @param index
+   */
+  private void removeIndexUpTo(long index) {
+    if (index == this.firstIndex) {
+      // Reset all the meta data
+      this.indexToOffsetMap.clear();
+      firstIndex = Long.MAX_VALUE;
+      lastIndex = Long.MIN_VALUE;
+      return;
+    }
+
+    // Iterate over the indexToOffsetMap
+    for (Long key : this.indexToOffsetMap.keySet()) {
+      if (key >= index) {
+        this.indexToOffsetMap.remove(key);
+      }
+    }
+
+    // Update the lastIndex correctly
+    this.lastIndex = index - 1;
+  }
+
+  /**
+   *  getTransactionFileOffset
+   *
+   *  Get the file offset of a transaction.
+   *
+   *  @param  term
+   *  @param  index
+   *  @return long  offset
+   */
+  public long getTransactionFileOffset(long term, long index) throws NoSuchElementException
{
+    // Sanity check the term and index
+    if (term != this.currentTerm || !this.indexToOffsetMap.containsKey(index)) {
+      throw new NoSuchElementException("No such index " + index +
+        " and term " + term + " in the current log " + toString());
+    }
+    return this.indexToOffsetMap.get(index);
+  }
+
+  /**
+   * Get the transactions for the given term, index and session key.
+   *
+   * @param term The term of the queried transaction.
+   * @param index The index of the quereid transaction.
+   * @param sessionKey The session key of the reader.
+   * @return transactions
+   * @throws IOException if the term does not match with the term of the log,
+   * or no such index in the log.
+   */
+  public MemoryBuffer getTransaction(long term, long index, String sessionKey,
+                             final Arena arena)
+    throws IOException, NoSuchElementException {
+    // Sanity check the term and index
+    if (term != this.currentTerm || !this.indexToOffsetMap.containsKey(index)) {
+      throw new NoSuchElementException("No such index " + index +
+        " and term " + term + " in the current log " + toString());
+    }
+
+    // Get the file offset from the map
+    long offset = this.indexToOffsetMap.get(index);
+
+    // Get the LogReader for this sessionKey
+    LogReader reader = this.getReader(sessionKey);
+
+    // Seek to the offset and read the transaction
+    return reader.seekAndRead(offset, index, arena);
+  }
+
+  @Override public long getLastModificationTime() {
+    return file.lastModified();
+  }
+
+  /**
+   * @return the absolute path of this log
+   */
+  public String getFileName() {
+    return file.getAbsolutePath();
+  }
+
+  /**
+   * Finalize the log file. The log file is immutable since then.
+   * @throws IOException
+   */
+  public void finalizeForWrite() throws IOException {
+    // Close the writer
+    this.writer.close();
+
+    // Mark the file as finalized
+    isFinalized = true;
+  }
+
+  /**
+   * Delete the current log file
+   * @throws IOException
+   */
+  @Override
+  public void closeAndDelete() throws IOException {
+    try{
+      finalizeForWrite();
+      removeAllReaders();
+    } catch (IOException e) {
+      LOG.error("Cannot close to the transaction log ", e);
+    } finally {
+      file.delete();
+    }
+  }
+
+  /**
+   * @return true if the log file has been finalized
+   */
+  public boolean isFinalized() {
+    return isFinalized;
+  }
+
+  /**   *
+   * @return the number of entries in the log;
+   */
+  public long getTxnCount() {
+    return this.indexToOffsetMap.isEmpty() ? 0 : (lastIndex - firstIndex + 1);
+  }
+
+  /**
+   * @return the initial index in the log
+   */
+  @Override
+  public long getInitialIndex() {
+    if (indexToOffsetMap.isEmpty()) {
+      return HConstants.UNDEFINED_TERM_INDEX;
+    }
+    return this.firstIndex;
+  }
+
+  /**
+   * @return the last index in the log
+   */
+  @Override
+  public long getLastIndex() {
+    if (indexToOffsetMap.isEmpty()) {
+      return HConstants.UNDEFINED_TERM_INDEX;
+    }
+    return this.lastIndex;
+  }
+
+  /**
+   * @return the current term of the log file
+   */
+  public long getCurrentTerm() {
+    return currentTerm;
+  }
+
+  public File getFile() {
+    return file;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("RandomAccessLog{");
+    sb.append("term=").append(getCurrentTerm())
+      .append(", index=[").append(getInitialIndex())
+        .append(", ").append(getLastIndex()).append("]")
+      .append(", path=").append(getFileName())
+      .append(", ntxns=").append(getTxnCount())
+      .append(", finalized=").append(isFinalized())
+    ;
+    return sb.toString();
+  }
+
+
+  /**
+   * Remove and close the registered reader from the reader map by the session key
+   * @param sessionKey
+   */
+  public void removeReader(String sessionKey) throws IOException {
+    LogReader reader = this.readerMap.remove(sessionKey);
+    if (reader != null) {
+      reader.close();
+    }
+  }
+
+  public void removeAllReaders() throws IOException {
+    synchronized (this) {
+      for (LogReader reader : readerMap.values()) {
+        reader.close();
+      }
+      readerMap.clear();
+    }
+  }
+
+  /**
+   * Rebuilds the in-memory index=>offset map by scanning the file.
+   * @param sessionKey
+   * @throws IOException
+   */
+  public void rebuild(final String sessionKey) throws IOException {
+    LogReader reader = getReader(sessionKey);
+    long index, lastKnownGoodOffset;
+
+    currentTerm = reader.getCurrentTerm();
+    if (reader.getVersion() != HConstants.RAFT_LOG_VERSION) {
+      throw new IOException("Unable to verify the version.");
+    }
+    while (reader.hasMore()) {
+      index = reader.next();
+      if (index == HConstants.UNDEFINED_TERM_INDEX) {
+        break;
+      }
+
+      updateIndexMap(index, reader.getCurrentIndexFileOffset());
+    }
+
+    lastKnownGoodOffset = reader.getCurrentPosition();
+    LOG.debug("Resetting the write offset for " + reader.getFile().getAbsoluteFile() +
+      " to " + lastKnownGoodOffset);
+
+    // truncate the entries from the last known index
+    writer.truncate(lastKnownGoodOffset);
+
+    // Reset the reader position as we are done with the rebuild
+    reader.resetPosition();
+  }
+
+  /**
+   * A utility function to retrieve the log reader by the sessionKey
+   * @param sessionKey
+   * @return
+   * @throws IOException
+   */
+  private LogReader getReader(String sessionKey) throws IOException {
+    LogReader reader = this.readerMap.get(sessionKey);
+    if (reader == null) {
+      synchronized (this) {
+        reader = this.readerMap.get(sessionKey);
+        if (reader == null) {
+          // Initialize the reader
+          reader = new LogReader(this.file);
+          reader.initialize();
+
+          // Add the reader to the map
+          this.readerMap.put(sessionKey, reader);
+        }
+      }
+    }
+    return reader;
+  }
+
+  private boolean isInitialized(long term) {
+    if (currentTerm == HConstants.UNDEFINED_TERM_INDEX) {
+      return false;
+    } else if (currentTerm == term) {
+      return true;
+    } else {
+      throw new IllegalArgumentException("Expect the same currentTerm (" + currentTerm
+        + " ) for each commit log file. Requested term : " + term);
+    }
+  }
+
+  private void initialize(long term, long index) throws IOException {
+    this.writer.writeFileHeader(term, index);
+    // Update the meta data;
+    this.currentTerm = term;
+  }
+
+  @Override
+  public long getFileSize() {
+    return file.length();
+  }
+
+  @Override
+  public String getFileAbsolutePath() {
+    return file.getAbsolutePath();
+  }
+
+  @Override public long getCreationTime() {
+    return creationTime;
+  }
+
+  public static long populateCreationTime(final File file) {
+    try {
+      BasicFileAttributes attributes =
+        Files.readAttributes(file.toPath(), BasicFileAttributes.class);
+      return attributes.creationTime().toMillis();
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+    return UNKNOWN_CREATION_TIME;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/eca32aa4/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/log/ReadOnlyLog.java
----------------------------------------------------------------------
diff --git a/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/log/ReadOnlyLog.java
b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/log/ReadOnlyLog.java
new file mode 100644
index 0000000..4edc875
--- /dev/null
+++ b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/log/ReadOnlyLog.java
@@ -0,0 +1,185 @@
+package org.apache.hadoop.hbase.consensus.log;
+
+import org.apache.hadoop.hbase.util.Arena;
+import org.apache.hadoop.hbase.util.MemoryBuffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * The ReadOnly log provides read interface for the transaction log.
+ *
+ * Also this class maintains a map of LogReader instance, which delegates the read operation.
+ * The map is indexed by the session key from the client, and the client will use
+ * its own session key to access these read APIs. Concurrent access from different session
key is
+ * thread safe. But concurrent access from the same session key is NOT thread safe.
+ *
+ * The naming convention will be "log_<term>_<initial-index>_<last-index>"
+ */
+public class ReadOnlyLog implements LogFileInterface {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ReadOnlyLog.class);
+
+  protected final long initialIndex;
+  protected final long lastIndex;
+  protected final long currentTerm;
+  protected final File file;
+  protected final long creationTime;
+
+  private final Map<String, LogReader> readerMap = new ConcurrentHashMap<>();
+
+  public ReadOnlyLog(File file, long term, long initIndex) {
+    this.creationTime = RandomAccessLog.populateCreationTime(file);
+    this.file = file;
+    this.currentTerm = term;
+    this.initialIndex = initIndex;
+    this.lastIndex = Long.parseLong(file.getName().split("_")[2]);
+  }
+
+  public ReadOnlyLog(File file, String sessionKey) throws IOException {
+    this.creationTime = RandomAccessLog.populateCreationTime(file);
+
+    this.file = file;
+
+    // Get the LogReader
+    LogReader reader = getReader(sessionKey);
+
+    // Set up the invariants
+    this.currentTerm = reader.getCurrentTerm();
+    this.initialIndex = reader.getInitialIndex();
+    this.lastIndex = Long.parseLong(file.getName().split("_")[2]);
+  }
+
+  /**
+   * Read the transactions for the given index
+   *
+   * @param index The query index
+   * @param sessionKey The session key of the log reader
+   * @return transactions of the given index
+   * @throws IOException if the term does not match up or the index is not found.
+   */
+  public MemoryBuffer getTransaction(long term, long index, String sessionKey,
+                                     final Arena arena)
+    throws IOException, NoSuchElementException {
+    if (term != currentTerm) {
+      throw new NoSuchElementException ("The term " + term + " does not exist in this log
file");
+    }
+    // Get the LogReader for this sessionKey
+    LogReader reader = this.getReader(sessionKey);
+    return reader.seekAndRead(index, arena);
+  }
+
+  @Override public long getLastModificationTime() {
+    return file.lastModified();
+  }
+
+  /**
+   * Remove and close the registered reader from the reader map by the session key
+   * @param sessionKey
+   */
+  public void removeReader(String sessionKey) throws IOException {
+    LogReader reader = this.readerMap.remove(sessionKey);
+    if (reader != null) {
+      reader.close();
+    }
+  }
+
+  /**
+   * @return the initial index of the log
+   */
+  @Override
+  public long getInitialIndex() {
+    return initialIndex;
+  }
+
+  /**
+   * @return the term of the log
+   */
+  public long getCurrentTerm() {
+    return currentTerm;
+  }
+
+  /**
+   * Close all the LogReader instances and delete the file
+   * @throws IOException
+   */
+  @Override
+  public void closeAndDelete() throws IOException {
+    for (String key : this.readerMap.keySet()) {
+      removeReader(key);
+    }
+    this.file.delete();
+  }
+
+  @Override public long getTxnCount() {
+    return this.getLastIndex() - this.getInitialIndex() + 1;
+  }
+
+  @Override public String getFileName() {
+    return file.getName();
+  }
+
+  @Override public File getFile() {
+    return file;
+  }
+
+  /** 
+   * Returns the last index in this log.
+   *
+   * @return
+   * @throws IOException
+   */
+  @Override
+  public long getLastIndex() {
+    return lastIndex;
+  }
+
+  private LogReader getReader(String sessionKey) throws IOException {
+    LogReader reader = this.readerMap.get(sessionKey);
+    if (reader == null) {
+      synchronized (this) {
+        reader = this.readerMap.get(sessionKey);
+        if (reader == null) {
+          // Initialize the reader
+          reader = new LogReader(this.file);
+          reader.initialize();
+
+          // Add the reader to the map
+          this.readerMap.put(sessionKey, reader);
+        }
+      }
+    }
+    return reader;
+  }
+
+  @Override
+  public long getFileSize() {
+    return file.length();
+  }
+
+  @Override
+  public String getFileAbsolutePath() {
+    return file.getAbsolutePath();
+  }
+
+  @Override public long getCreationTime() {
+    return creationTime;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("ReadOnlyLog{");
+    sb.append("term=").append(getCurrentTerm())
+      .append(", index=[").append(getInitialIndex())
+      .append(", ").append(getLastIndex()).append("]")
+      .append(", path=").append(getFileName())
+      .append(", ntxns=").append(getTxnCount());
+    return sb.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/eca32aa4/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/log/RemoteLogFetcher.java
----------------------------------------------------------------------
diff --git a/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/log/RemoteLogFetcher.java
b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/log/RemoteLogFetcher.java
new file mode 100644
index 0000000..dc3c30f
--- /dev/null
+++ b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/log/RemoteLogFetcher.java
@@ -0,0 +1,96 @@
+package org.apache.hadoop.hbase.consensus.log;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.hadoop.hbase.consensus.client.FetchTask;
+import org.apache.hadoop.hbase.consensus.client.QuorumClient;
+import org.apache.hadoop.hbase.consensus.quorum.RaftQuorumContext;
+import org.apache.hadoop.hbase.util.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+/**
+ * Generate fetch plan from local quorum context.
+ * Also control the actual fetch work.
+ */
+public class RemoteLogFetcher {
+  private static final Logger LOG = LoggerFactory.getLogger(RemoteLogFetcher.class);
+
+  private RaftQuorumContext context;
+  QuorumClient quorumClient;
+
+  public RemoteLogFetcher() {}
+
+  public RemoteLogFetcher(RaftQuorumContext context) {
+    this.context = context;
+  }
+
+  /**
+   * Create a fetch plan from local context and download log files.
+   */
+  public void doSpontaneousFetch() throws Exception {
+    initializeQuorumClients();
+    List<Pair<String, List<LogFileInfo>>> statuses =
+      getCommittedLogInfoFromAllPeers();
+    Collection<FetchTask> tasks = createFetchTasks(
+        statuses, context.getCommittedEdit().getIndex());
+    executeTasks(tasks, context.getQuorumName());
+  }
+
+  /**
+   * Instantiate connections to peers
+   */
+  public void initializeQuorumClients() throws IOException {
+    this.quorumClient =
+        new QuorumClient(context.getQuorumInfo(), context.getConf(),
+            context.getExecServiceForThriftClients());
+  }
+
+  /**
+   * Ask peers for information of committed log files which have greater index than
+   * the latest local committed index
+   *
+   * @return each list item contains committed log info of a peer
+   */
+  public List<Pair<String, List<LogFileInfo>>> getCommittedLogInfoFromAllPeers()
+    throws Exception {
+    return getPeerCommittedLogStatus(context.getCommittedEdit().getIndex());
+  }
+
+  /**
+   * Ask peers for information of committed log files which have greater index than
+   * a given index. It's only used by tests.
+   *
+   * @param minIndex
+   * @return each list item contains committed log info of a peer
+   */
+  protected List<Pair<String, List<LogFileInfo>>> getPeerCommittedLogStatus(long
minIndex)
+
+    throws Exception {
+    List<Pair<String, List<LogFileInfo>>> statuses = quorumClient.getPeerCommittedLogStatus(
+        context.getServerAddress(), context.getQuorumName(), minIndex);
+
+    return statuses;
+  }
+
+  protected Collection<FetchTask> createFetchTasks(
+      List<Pair<String, List<LogFileInfo>>> statuses, long minIndex) {
+    LogFetchPlanCreator planCreator = new LogFileInfoIterator(statuses, minIndex);
+    return planCreator.createFetchTasks();
+  }
+
+  /**
+   * Execute fetch tasks either generated locally or pushed from a remote server
+   *
+   * @param tasks each task item contains the address of one peer and a list of files to
+   *              be downloaded from it
+   */
+  public ListenableFuture<Void> executeTasks(Collection<FetchTask> tasks, String
regionId) {
+    //TODO to be implemented in part 2
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/eca32aa4/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/log/SeedLogFile.java
----------------------------------------------------------------------
diff --git a/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/log/SeedLogFile.java
b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/log/SeedLogFile.java
new file mode 100644
index 0000000..05f3c02
--- /dev/null
+++ b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/log/SeedLogFile.java
@@ -0,0 +1,52 @@
+package org.apache.hadoop.hbase.consensus.log;
+
+import org.apache.hadoop.hbase.HConstants;
+
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * This is a special log file used to denote the seed Index for the transaction
+ * log manager.
+ */
+public class SeedLogFile extends ReadOnlyLog {
+
+  public SeedLogFile(final File file) {
+    super(file, HConstants.SEED_TERM, getInitialSeedIndex(file));
+  }
+
+  /**
+   * Remove and close the registered reader from the reader map by the session key
+   * @param sessionKey
+   */
+  public void removeReader(String sessionKey) throws IOException {
+    // Do-nothing
+  }
+
+  /**
+   * Close all the LogReader instances and delete the file
+   * @throws IOException
+   */
+  @Override
+  public void closeAndDelete() throws IOException {
+    super.closeAndDelete();
+  }
+
+  @Override public long getTxnCount() {
+    return lastIndex - initialIndex + 1;
+  }
+
+  public static boolean isSeedFile(final File f) {
+    return isSeedFile(f.getName());
+  }
+
+  public static boolean isSeedFile(String fileName) {
+    String[] split = fileName.split("_");
+    return split[0].equals("-2");
+  }
+
+  public static long getInitialSeedIndex(final File f) {
+    final String[] split = f.getName().split("_");
+    return Long.parseLong(split[1]);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/eca32aa4/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/log/TransactionLogCreator.java
----------------------------------------------------------------------
diff --git a/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/log/TransactionLogCreator.java
b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/log/TransactionLogCreator.java
new file mode 100644
index 0000000..a20bf70
--- /dev/null
+++ b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/log/TransactionLogCreator.java
@@ -0,0 +1,151 @@
+package org.apache.hadoop.hbase.consensus.log;
+
+import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.util.DaemonThreadFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Log Creator to maintain a pool of logs readily available to use.
+ *
+ * TODO: change this to be a ConsensusServer level to handle all quorums.
+ */
+public class TransactionLogCreator {
+  private static final Logger LOG = LoggerFactory.getLogger(TransactionLogCreator.class);
+
+  /** We use a single thread by default to create new log files for all quorums */
+  private static final ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(1,
+      new DaemonThreadFactory("TransactionLogCreator-"));
+
+  /** List of opened random access files to use from. */
+  private final LinkedBlockingQueue<RandomAccessLog> futureLogs;
+  private int maxEmptyLogFiles = HConstants.RAFT_MAX_NUM_NEW_LOGS;
+
+  /** Log directory where it should create the new files */
+  private final String logDirectory;
+
+  /** Stops the creator from issuing/creating new files */
+  private volatile boolean isClosed = false;
+
+  /** Tells whether to sync the data always on append or not */
+  private final boolean isSync;
+
+  private final Runnable refillTask = new Runnable() {
+    @Override
+    public void run() {
+      while (futureLogs.remainingCapacity() > 0 && !isClosed) {
+        try {
+          tryGenerateNewLogFile();
+        } catch (IOException e) {
+          LOG.error("Failed to create log file in " + logDirectory + " . Will retry in "
+
+              HConstants.RETRY_TRANSACTION_LOG_CREATION_DELAY_IN_SECS + " seconds.", e);
+          threadPool.schedule(this, HConstants.RETRY_TRANSACTION_LOG_CREATION_DELAY_IN_SECS,
TimeUnit.SECONDS);
+          break;
+        } catch (Throwable t) {
+          LOG.error("Failed to create log file in " + logDirectory +
+              " unexpectedly. Aborting!", t);
+          break;
+        }
+      }
+    }
+  };
+
+  // purely for mocking
+  public TransactionLogCreator(final String logDirectory) {
+    this(logDirectory, false, null);
+  }
+
+  public TransactionLogCreator (String logDirectory, boolean isSync,
+                                Configuration conf) {
+    if (conf != null) {
+      maxEmptyLogFiles = conf.getInt(HConstants.RAFT_MAX_NUM_NEW_LOGS_KEY,
+        HConstants.RAFT_MAX_NUM_NEW_LOGS);
+    }
+
+    StringBuilder logDirectoryBuilder = new StringBuilder(logDirectory);
+    if (!logDirectory.endsWith(HConstants.PATH_SEPARATOR)) {
+      logDirectoryBuilder.append(HConstants.PATH_SEPARATOR);
+    }
+    logDirectoryBuilder.append(HConstants.RAFT_CURRENT_LOG_DIRECTORY_NAME);
+    logDirectoryBuilder.append(HConstants.PATH_SEPARATOR);
+    this.logDirectory = logDirectoryBuilder.toString();
+
+    this.isSync = isSync;
+
+    futureLogs = new LinkedBlockingQueue<>(maxEmptyLogFiles);
+
+    refillFutureLogs();
+  }
+
+  /**
+   * Returns the new log from the currently list of open files.
+   * @return
+   * @throws InterruptedException
+   */
+  public LogFileInterface getNewLogFile() throws InterruptedException {
+    if (isClosed) {
+      return null;
+    }
+
+    RandomAccessLog file = null;
+    while (file == null) {
+      file = futureLogs.poll(100, TimeUnit.MILLISECONDS);
+    }
+    refillFutureLogs();
+    return file;
+  }
+
+  /**
+   * Deletes all the opened files.
+   * @throws IOException
+   */
+  public void close() {
+    isClosed = true;
+    // Delete all the opened files.
+    RandomAccessLog ral;
+    while ((ral = futureLogs.poll()) != null) {
+      try {
+        ral.closeAndDelete();
+      } catch (IOException e) {
+        LOG.warn("Failed to delete log file in " + logDirectory, e);
+      }
+    }
+  }
+
+  /**
+   * Refill future logs queue
+   */
+  private void refillFutureLogs() {
+    threadPool.execute(refillTask);
+  }
+
+  private void tryGenerateNewLogFile() throws IOException {
+    String fileName = generateNewFileName();
+    File newFile = new File(logDirectory + fileName);
+    if (!futureLogs.offer(new RandomAccessLog(newFile, isSync))) {
+      LOG.debug(logDirectory + " is currently full");
+      newFile.delete();
+    }
+  }
+
+  private String generateNewFileName() {
+    return "log_" + System.nanoTime();
+  }
+
+  protected String getLogDirectory() {
+    return logDirectory;
+  }
+
+  public static void setMaxThreadNum(int maxThreadNum) {
+    ((ThreadPoolExecutor)threadPool).setMaximumPoolSize(maxThreadNum);
+  }
+}


Mime
View raw message