hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ecl...@apache.org
Subject [02/19] hbase git commit: HBASE-12476 HydraBase consensus protocol
Date Tue, 25 Nov 2014 20:28:55 GMT
http://git-wip-us.apache.org/repos/asf/hbase/blob/eca32aa4/hbase-consensus/src/test/java/org/apache/hadoop/hbase/consensus/TestLowerRankBecomingLeader.java
----------------------------------------------------------------------
diff --git a/hbase-consensus/src/test/java/org/apache/hadoop/hbase/consensus/TestLowerRankBecomingLeader.java b/hbase-consensus/src/test/java/org/apache/hadoop/hbase/consensus/TestLowerRankBecomingLeader.java
new file mode 100644
index 0000000..a72ebf9
--- /dev/null
+++ b/hbase-consensus/src/test/java/org/apache/hadoop/hbase/consensus/TestLowerRankBecomingLeader.java
@@ -0,0 +1,124 @@
+package org.apache.hadoop.hbase.consensus;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.consensus.client.QuorumClient;
+import org.apache.hadoop.hbase.consensus.quorum.RaftQuorumContext;
+import org.apache.hadoop.hbase.consensus.server
+  .InstrumentedConsensusServiceImpl;
+import org.apache.hadoop.hbase.consensus.server.LocalConsensusServer;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class TestLowerRankBecomingLeader {
+  private static final Logger LOG = LoggerFactory.getLogger(
+    TestLowerRankBecomingLeader.class);
+  private static int QUORUM_SIZE = 3;
+  private static int QUORUM_MAJORITY = 2;
+  private static HRegionInfo regionInfo;
+  private static RaftTestUtil RAFT_TEST_UTIL = new RaftTestUtil();
+  private static QuorumClient client;
+  private static volatile int transactionNum = 0;
+  private ReplicationLoadForUnitTest loader;
+
+  @Before
+  public void setUp() throws Exception {
+    RAFT_TEST_UTIL.disableVerboseLogging();
+    RAFT_TEST_UTIL.createRaftCluster(QUORUM_SIZE);
+    RAFT_TEST_UTIL.setUsePeristentLog(true);
+    RAFT_TEST_UTIL.assertAllServersRunning();
+    regionInfo = RAFT_TEST_UTIL.initializePeers();
+    RAFT_TEST_UTIL.addQuorum(regionInfo, RAFT_TEST_UTIL.getScratchSetup(QUORUM_SIZE));
+    RAFT_TEST_UTIL.startQuorum(regionInfo);
+    client = RAFT_TEST_UTIL.getQuorumClient(regionInfo.getQuorumInfo());
+    transactionNum = 0;
+    loader = new ReplicationLoadForUnitTest(regionInfo, client, RAFT_TEST_UTIL, QUORUM_SIZE,
+      QUORUM_MAJORITY);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    RAFT_TEST_UTIL.shutdown();
+  }
+
+  @Test(timeout=500000)
+  public void testSinglePeerFailureAndRecovery()
+    throws InterruptedException, IOException {
+    simulateFailureEvent(1);
+  }
+
+  private void simulateFailureEvent(final int failureInterval)
+    throws InterruptedException, IOException {
+    int failureCnt = 0;
+    final long sleepTime =
+      2 * HConstants.PROGRESS_TIMEOUT_INTERVAL_IN_MILLISECONDS * failureInterval;
+
+    // Start the client load
+    loader.startReplicationLoad(100);
+
+    // Let the traffic fly for a while
+    transactionNum = loader.makeProgress(sleepTime, transactionNum);
+
+    RAFT_TEST_UTIL.simulatePacketDropForServer(regionInfo, 2,
+        InstrumentedConsensusServiceImpl.PacketDropStyle.ALWAYS);
+
+    transactionNum = loader.makeProgress(sleepTime, transactionNum);
+
+    RAFT_TEST_UTIL.simulatePacketDropForServer(regionInfo, 2,
+        InstrumentedConsensusServiceImpl.PacketDropStyle.NONE);
+
+    // At this point, rank 1 and rank 3 peers are up to date
+
+    // Get all the quorum contexts from rank 3 to rank 2
+    RaftQuorumContext c3 = RAFT_TEST_UTIL.getRaftQuorumContextByRank(regionInfo, 3);
+    RaftQuorumContext c2 = RAFT_TEST_UTIL.getRaftQuorumContextByRank(regionInfo, 2);
+
+    // Shutdown 1st quorum member whose rank is 3
+    System.out.println("Stopping one quorum member: " + c3);
+    LocalConsensusServer s3 = RAFT_TEST_UTIL.stopLocalConsensusServer(regionInfo, 3);
+
+    // Let the traffic fly for a while
+    if ((++failureCnt % failureInterval) == 0) {
+      transactionNum = loader.makeProgress(sleepTime, transactionNum);
+      while(!c2.isLeader()) {
+        System.out.println("Wait for the rank 2 to take over the leadership");
+        Thread.sleep(sleepTime);
+      }
+      Assert.assertTrue("Rank 2 shall be the leader of the quorum", c2.isLeader());
+    }
+
+    // Restart the quorum member whose rank is 3
+    c3 = RAFT_TEST_UTIL.restartLocalConsensusServer(s3, regionInfo, c3.getMyAddress());
+    System.out.println("Restarted another quorum member: " + c3);
+
+    // Let the traffic fly for a while
+    if (++failureCnt % failureInterval == 0) {
+      transactionNum = loader.makeProgress(sleepTime, transactionNum);
+      while(!c3.isLeader()) {
+        System.out.println("Wait for the rank 3 to take over the leadership");
+        Thread.sleep(sleepTime);
+      }
+      Assert.assertTrue("Rank 3 shall be the leader of the quorum", c3.isLeader());
+    }
+
+    loader.slowDownReplicationLoad();
+
+    // Verify logs are identical across all the quorum members
+    while (!RAFT_TEST_UTIL.verifyLogs(regionInfo.getQuorumInfo(), QUORUM_SIZE)) {
+      Thread.sleep(10 * 1000);
+      System.out.println("Verifying logs ....");
+      Assert.assertTrue("Rank 3 shall be the leader of the quorum", c3.isLeader());
+    }
+
+    // Stop the client load
+    loader.stopReplicationLoad();
+
+    System.out.println(transactionNum + " transactions have been successfully replicated");
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/eca32aa4/hbase-consensus/src/test/java/org/apache/hadoop/hbase/consensus/TestPersistLastVotedFor.java
----------------------------------------------------------------------
diff --git a/hbase-consensus/src/test/java/org/apache/hadoop/hbase/consensus/TestPersistLastVotedFor.java b/hbase-consensus/src/test/java/org/apache/hadoop/hbase/consensus/TestPersistLastVotedFor.java
new file mode 100644
index 0000000..73683bb
--- /dev/null
+++ b/hbase-consensus/src/test/java/org/apache/hadoop/hbase/consensus/TestPersistLastVotedFor.java
@@ -0,0 +1,83 @@
+package org.apache.hadoop.hbase.consensus;
+
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.consensus.client.QuorumClient;
+import org.apache.hadoop.hbase.consensus.protocol.ConsensusHost;
+import org.apache.hadoop.hbase.consensus.quorum.RaftQuorumContext;
+import org.apache.hadoop.hbase.consensus.server.LocalConsensusServer;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class TestPersistLastVotedFor {
+  private static int QUORUM_SIZE = 3;
+  private static int QUORUM_MAJORITY = 2;
+  private static HRegionInfo regionInfo;
+  private static RaftTestUtil RAFT_TEST_UTIL = new RaftTestUtil();
+  private static QuorumClient client;
+  private static volatile int transactionNum = 0;
+  private ReplicationLoadForUnitTest loader;
+
+  @Before
+  public void setUp() throws Exception {
+    RAFT_TEST_UTIL.disableVerboseLogging();
+    RAFT_TEST_UTIL.createRaftCluster(QUORUM_SIZE);
+    RAFT_TEST_UTIL.setUsePeristentLog(true);
+    RAFT_TEST_UTIL.assertAllServersRunning();
+    regionInfo = RAFT_TEST_UTIL.initializePeers();
+    RAFT_TEST_UTIL.addQuorum(regionInfo, RAFT_TEST_UTIL.getScratchSetup(QUORUM_SIZE));
+    RAFT_TEST_UTIL.startQuorum(regionInfo);
+
+    client = RAFT_TEST_UTIL.getQuorumClient(regionInfo.getQuorumInfo());
+    transactionNum = 0;
+    loader = new ReplicationLoadForUnitTest(regionInfo, client, RAFT_TEST_UTIL, QUORUM_SIZE,
+      QUORUM_MAJORITY);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    RAFT_TEST_UTIL.shutdown();
+  }
+
+  @Test
+  public void testLastVotedForIsPersisted() throws IOException, InterruptedException {
+    // Add some transactions
+    loader.startReplicationLoad(1000);
+    transactionNum = loader.makeProgress(100, transactionNum);
+    loader.stopReplicationLoad();
+
+    RaftQuorumContext leader =
+      RAFT_TEST_UTIL.getLeaderQuorumContext(regionInfo);
+    // What is the current lastVotedFor
+    ConsensusHost initialLastVotedFor = leader.getLastVotedFor();
+
+    // Stop the consensusServer. lastVotedFor should have been persisted.
+    LocalConsensusServer consensusServer =
+      RAFT_TEST_UTIL.stopLocalConsensusServer(leader.getMyAddress());
+
+    RaftQuorumContext newQuorumContext =
+      RAFT_TEST_UTIL.restartLocalConsensusServer(consensusServer, regionInfo,
+      leader.getMyAddress());
+    ConsensusHost lastVotedForAsReadFromDisk =
+      newQuorumContext.getLastVotedFor();
+    Assert.assertEquals("Last Voted For was not persisted properly",
+      initialLastVotedFor, lastVotedForAsReadFromDisk);
+
+    // Let us try if the persisting works, if the lastVotedFor is null.
+    newQuorumContext.clearVotedFor();
+    consensusServer =
+      RAFT_TEST_UTIL.stopLocalConsensusServer(newQuorumContext.getMyAddress());
+    RaftQuorumContext newQuorumContextAfterSecondRestart =
+      RAFT_TEST_UTIL.restartLocalConsensusServer(consensusServer, regionInfo,
+        newQuorumContext.getMyAddress());
+
+    ConsensusHost emptyLastVotedFor =
+      newQuorumContextAfterSecondRestart.getLastVotedFor();
+    Assert.assertNull(emptyLastVotedFor);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/eca32aa4/hbase-consensus/src/test/java/org/apache/hadoop/hbase/consensus/TestRaftEventListener.java
----------------------------------------------------------------------
diff --git a/hbase-consensus/src/test/java/org/apache/hadoop/hbase/consensus/TestRaftEventListener.java b/hbase-consensus/src/test/java/org/apache/hadoop/hbase/consensus/TestRaftEventListener.java
new file mode 100644
index 0000000..7da8372
--- /dev/null
+++ b/hbase-consensus/src/test/java/org/apache/hadoop/hbase/consensus/TestRaftEventListener.java
@@ -0,0 +1,130 @@
+package org.apache.hadoop.hbase.consensus;
+
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.consensus.client.QuorumClient;
+import org.apache.hadoop.hbase.consensus.protocol.EditId;
+import org.apache.hadoop.hbase.consensus.protocol.Payload;
+import org.apache.hadoop.hbase.consensus.quorum.RaftQuorumContext;
+import org.apache.hadoop.hbase.consensus.server.LocalConsensusServer;
+import org.apache.hadoop.hbase.consensus.server.peer.PeerServer;
+import org.apache.hadoop.hbase.regionserver.DataStoreState;
+import org.apache.hadoop.hbase.regionserver.RaftEventListener;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.concurrent.ConcurrentSkipListSet;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestRaftEventListener {
+
+  private static int QUORUM_SIZE = 5;
+  private static int QUORUM_MAJORITY = 3;
+  private static HRegionInfo regionInfo;
+  private static RaftTestUtil RAFT_TEST_UTIL = new RaftTestUtil();
+  private static QuorumClient client;
+  private int transactionNum = 0;
+
+  private ReplicationLoadForUnitTest loader;
+
+  /**
+   * A set keeps track of the current unavailable peers
+   */
+  private ConcurrentSkipListSet<String> unavailablePeerSet = new ConcurrentSkipListSet<>();
+
+  public class MockedRaftEventListener implements RaftEventListener {
+
+    public ByteBuffer becameLeader() {
+      return null;
+    }
+
+    public void becameNonLeader() {}
+    public void commit(final long index, final Payload payload) {}
+    public long getMinUnpersistedIndex() { return -1; }
+
+    @Override
+    public DataStoreState getState() {
+      return null;
+    }
+
+    @Override
+    public void updatePeerAvailabilityStatus(String peerAddress, boolean isAvailable) {
+      if (isAvailable) {
+        unavailablePeerSet.remove(peerAddress);
+      } else {
+        unavailablePeerSet.add(peerAddress);
+      }
+    }
+
+    @Override
+    public void closeDataStore() {}
+
+    @Override
+    public boolean canStepDown() {
+      return false;
+    }
+  }
+
+  private MockedRaftEventListener listener = new MockedRaftEventListener();
+
+  @Before
+  public void setUp() throws Exception {
+    RAFT_TEST_UTIL.disableVerboseLogging();
+    RAFT_TEST_UTIL.createRaftCluster(QUORUM_SIZE);
+    RAFT_TEST_UTIL.setUsePeristentLog(true);
+    RAFT_TEST_UTIL.assertAllServersRunning();
+    regionInfo = RAFT_TEST_UTIL.initializePeers();
+    RAFT_TEST_UTIL.addQuorum(regionInfo, RAFT_TEST_UTIL.getScratchSetup(QUORUM_SIZE));
+    RAFT_TEST_UTIL.startQuorum(regionInfo);
+    client = RAFT_TEST_UTIL.getQuorumClient(regionInfo.getQuorumInfo());
+
+    // Register the listener for the highest rank, which is equal to QUORUM_SIZE;
+    for (Map.Entry<String, PeerServer> entry :
+        RAFT_TEST_UTIL.getRaftQuorumContextByRank(regionInfo, QUORUM_SIZE).getPeerServers().entrySet()) {
+      entry.getValue().registerDataStoreEventListener(listener);
+    }
+
+    loader = new ReplicationLoadForUnitTest(regionInfo, client, RAFT_TEST_UTIL, QUORUM_SIZE,
+      QUORUM_MAJORITY);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    RAFT_TEST_UTIL.shutdown();
+  }
+
+  @Test(timeout=500000)
+  public void testRaftEventListenerForAvailability()
+    throws InterruptedException, IOException {
+    // Start the client load
+    loader.startReplicationLoad(100);
+
+    // Sleep for 5 sec
+    transactionNum = loader.makeProgress(1000, transactionNum);
+
+    // Stop the replica whose rank is 4
+    RaftQuorumContext c4 = RAFT_TEST_UTIL.getRaftQuorumContextByRank(regionInfo, 4);
+    System.out.println("Stopping one quorum member: " + c4);
+    LocalConsensusServer s4 = RAFT_TEST_UTIL.stopLocalConsensusServer(regionInfo, 4);
+
+    // Sleep for 1 sec
+    transactionNum = loader.makeProgress(2000, transactionNum);
+    assertEquals(1, unavailablePeerSet.size());
+
+    // Start the replica whose rank is 4
+    RAFT_TEST_UTIL.restartLocalConsensusServer(s4, regionInfo, c4.getMyAddress());
+    System.out.println("Restarted one quorum member: " + c4);
+
+    // Sleep for 5 sec
+    transactionNum = loader.makeProgress(3000, transactionNum);
+    assertEquals(0, unavailablePeerSet.size());
+    System.out.println("There is no element in the unavailablePeerSet !");
+    loader.stopReplicationLoad();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/eca32aa4/hbase-consensus/src/test/java/org/apache/hadoop/hbase/consensus/TestRandomAccessLog.java
----------------------------------------------------------------------
diff --git a/hbase-consensus/src/test/java/org/apache/hadoop/hbase/consensus/TestRandomAccessLog.java b/hbase-consensus/src/test/java/org/apache/hadoop/hbase/consensus/TestRandomAccessLog.java
new file mode 100644
index 0000000..37910b1
--- /dev/null
+++ b/hbase-consensus/src/test/java/org/apache/hadoop/hbase/consensus/TestRandomAccessLog.java
@@ -0,0 +1,93 @@
+package org.apache.hadoop.hbase.consensus;
+
+import junit.framework.Assert;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.consensus.log.RandomAccessLog;
+import org.apache.hadoop.hbase.io.hfile.Compression;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.util.Arena;
+import org.apache.hadoop.hbase.util.BucketAllocator;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.InHeapArena;
+import org.apache.hadoop.hbase.util.MemoryBuffer;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+public class TestRandomAccessLog {
+
+  File file;
+
+  private final Arena arena = new InHeapArena(BucketAllocator.DEFAULT_BUCKETS,
+    HConstants.ARENA_CAPACITY_DEFAULT);
+
+  @Before
+  public void setUp() throws Exception {
+    file = new File("testBasics");
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (file != null) {
+      file.delete();
+    }
+  }
+
+  @Test
+  public void testBasics() throws IOException {
+    RandomAccessLog log = new RandomAccessLog(file, false);
+
+    Assert.assertTrue(log.getCreationTime() !=
+      RandomAccessLog.UNKNOWN_CREATION_TIME);
+
+    final int term = 1;
+    final int startIndex = 1;
+    final int middleIndex = 500;
+    final int endIndex = 1000;
+    final String readSessionKey = "test";
+
+    for (int i = startIndex; i <= endIndex; i++) {
+      WALEdit edit = new WALEdit();
+      edit.add(new KeyValue(Bytes.toBytes("test" + i), System.currentTimeMillis()));
+      log.append(term, i, WALEdit.serializeToByteBuffer(Arrays.asList(edit),
+              1234567890L, Compression.Algorithm.NONE));
+    }
+
+    Assert.assertEquals(term, log.getCurrentTerm());
+    Assert.assertEquals(startIndex, log.getInitialIndex());
+    Assert.assertEquals(endIndex, log.getLastIndex());
+    Assert.assertEquals(endIndex, log.getTxnCount());
+
+    log.truncate(middleIndex + 1);
+    Assert.assertEquals(term, log.getCurrentTerm());
+    Assert.assertEquals(startIndex, log.getInitialIndex());
+    Assert.assertEquals(middleIndex, log.getLastIndex());
+    Assert.assertEquals(middleIndex, log.getTxnCount());
+
+    log.finalizeForWrite();
+
+    RandomAccessLog log2 = new RandomAccessLog(file, false);
+    log2.rebuild(readSessionKey);
+    Assert.assertEquals(log.getCurrentTerm(), log2.getCurrentTerm());
+    Assert.assertEquals(log.getInitialIndex(), log2.getInitialIndex());
+    Assert.assertEquals(log.getLastIndex(), log2.getLastIndex());
+    Assert.assertEquals(log.getTxnCount(), log2.getTxnCount());
+
+
+    for (int i = startIndex; i <= middleIndex; i++) {
+      MemoryBuffer buffer = log2.getTransaction(term, i, readSessionKey, arena);
+      List<WALEdit> txns = WALEdit.deserializeFromByteBuffer(buffer.getBuffer());
+      Assert.assertEquals(1, txns.size());
+      Assert.assertEquals(1, txns.get(0).getKeyValues().size());
+      byte[] row = txns.get(0).getKeyValues().get(0).getRow();
+      Assert.assertEquals(0, Bytes.compareTo(Bytes.toBytes("test" + i), row));
+      arena.freeByteBuffer(buffer);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/eca32aa4/hbase-consensus/src/test/java/org/apache/hadoop/hbase/consensus/TestReadOnlyLog.java
----------------------------------------------------------------------
diff --git a/hbase-consensus/src/test/java/org/apache/hadoop/hbase/consensus/TestReadOnlyLog.java b/hbase-consensus/src/test/java/org/apache/hadoop/hbase/consensus/TestReadOnlyLog.java
new file mode 100644
index 0000000..66a1e36
--- /dev/null
+++ b/hbase-consensus/src/test/java/org/apache/hadoop/hbase/consensus/TestReadOnlyLog.java
@@ -0,0 +1,147 @@
+package org.apache.hadoop.hbase.consensus;
+
+import junit.framework.Assert;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.consensus.log.LogWriter;
+import org.apache.hadoop.hbase.consensus.log.RandomAccessLog;
+import org.apache.hadoop.hbase.consensus.log.ReadOnlyLog;
+import org.apache.hadoop.hbase.io.hfile.Compression;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.util.Arena;
+import org.apache.hadoop.hbase.util.BucketAllocator;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.InHeapArena;
+import org.apache.hadoop.hbase.util.MemoryBuffer;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class TestReadOnlyLog {
+  private static final Logger LOG = LoggerFactory.getLogger(
+          TestReadOnlyLog.class);
+  private static final int TOTAL_COMMIT = 100;
+  private static final long INITIAL_TERM = 1;
+  private static final long INITIAL_INDEX = 1;
+  private static final KeyValue.KVComparator comparator = new KeyValue.KVComparator();
+  private static final int CONCURRENT_READER_CNT = 10;
+  private final AtomicInteger SUCCESS_CNT = new AtomicInteger(0);
+
+  private static Random random;
+  private static File file;
+  private static ReadOnlyLog readOnlyLog;
+
+  private final Arena arena = new InHeapArena(BucketAllocator.DEFAULT_BUCKETS,
+    HConstants.ARENA_CAPACITY_DEFAULT);
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    random = new Random();
+    file = new File("TestReadOnlyLog_" + INITIAL_INDEX + "_" + INITIAL_INDEX);
+    file.createNewFile();
+    readOnlyLog = new ReadOnlyLog(file, INITIAL_TERM, INITIAL_INDEX);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    readOnlyLog.closeAndDelete();
+  }
+
+  @Test(timeout=50000)
+  public void testConcurrentReader() throws Exception {
+    prepareLogFile();
+
+    Assert.assertTrue(readOnlyLog.getCreationTime() !=
+      RandomAccessLog.UNKNOWN_CREATION_TIME);
+
+    ExecutorService service = Executors.newFixedThreadPool(CONCURRENT_READER_CNT);
+
+    for (int i = 0 ; i < CONCURRENT_READER_CNT; i++) {
+      final String sessionKey = Integer.toString(i);
+      service.submit(new Runnable() {
+        @Override
+        public void run() {
+          readLogFile(sessionKey);
+        }
+      });
+    }
+
+    service.shutdown();
+    try {
+      service.awaitTermination(50000, TimeUnit.MILLISECONDS);
+    } catch (InterruptedException e) {}
+
+    Assert.assertEquals(CONCURRENT_READER_CNT, SUCCESS_CNT.get());
+
+  }
+
+  private void readLogFile(String sessionKey) {
+    // Choose a random read point
+    long readPointer = (long)random.nextInt(TOTAL_COMMIT);
+
+    try {
+      for (long i = readPointer; i < TOTAL_COMMIT; i++) {
+        MemoryBuffer buffer = readOnlyLog.getTransaction(INITIAL_TERM, i, sessionKey, arena);
+
+        // Read the commit entry
+        List<WALEdit> txns = WALEdit.deserializeFromByteBuffer(buffer.getBuffer());
+
+        WALEdit edit = txns.get(0);
+        KeyValue kv = edit.getKeyValues().get(0);
+        KeyValue expectedKV = new KeyValue(Bytes.toBytes(i), i);
+
+        // Verify the commit entry
+        Assert.assertEquals(1, txns.size());
+        Assert.assertEquals(1, edit.size());
+        Assert.assertEquals(0, comparator.compare(expectedKV, kv));
+        arena.freeByteBuffer(buffer);
+      }
+
+      // Increase the success cnt
+      SUCCESS_CNT.incrementAndGet();
+      LOG.info("Reader #" + sessionKey + " has read and verified all the commits from " +
+        + readPointer + " to " + TOTAL_COMMIT);
+
+    } catch (Exception e) {
+      // Fail the unit test if any exception caught
+      LOG.error("Unexpected exception: ", e);
+      Assert.fail("Unexpected exception: " + e);
+    }
+  }
+
+  private void prepareLogFile() throws IOException {
+    LogWriter writer = new LogWriter(new RandomAccessFile(file, "rw"), false);
+
+    // Generate the header
+    final long initialIndex = 0;
+    final long term = 1;
+    writer.writeFileHeader(term, initialIndex);
+
+    // Write the numTXNs to the log file
+    List<WALEdit> txns;
+    WALEdit edit;
+    for (long i = initialIndex; i < TOTAL_COMMIT; i++) {
+      edit = new WALEdit();
+      edit.add(new KeyValue(Bytes.toBytes(i), i));
+      txns = Arrays.asList(edit);
+      writer.append(i, WALEdit.serializeToByteBuffer(txns, 1234567890L,
+              Compression.Algorithm.NONE));
+    }
+
+    // Close the writer
+    writer.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/eca32aa4/hbase-consensus/src/test/java/org/apache/hadoop/hbase/consensus/fsm/TestAsyncStatesInRaftStateMachine.java
----------------------------------------------------------------------
diff --git a/hbase-consensus/src/test/java/org/apache/hadoop/hbase/consensus/fsm/TestAsyncStatesInRaftStateMachine.java b/hbase-consensus/src/test/java/org/apache/hadoop/hbase/consensus/fsm/TestAsyncStatesInRaftStateMachine.java
new file mode 100644
index 0000000..17d7ace
--- /dev/null
+++ b/hbase-consensus/src/test/java/org/apache/hadoop/hbase/consensus/fsm/TestAsyncStatesInRaftStateMachine.java
@@ -0,0 +1,242 @@
+package org.apache.hadoop.hbase.consensus.fsm;
+
+import com.google.common.util.concurrent.SettableFuture;
+import junit.framework.Assert;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.consensus.RaftTestUtil;
+import org.apache.hadoop.hbase.consensus.client.QuorumClient;
+import org.apache.hadoop.hbase.consensus.quorum.RaftQuorumContext;
+import org.apache.hadoop.hbase.consensus.raft.states.RaftStateType;
+import org.apache.hadoop.hbase.consensus.server.LocalConsensusServer;
+import org.apache.hadoop.hbase.consensus.server.peer.AbstractPeer;
+import org.apache.hadoop.hbase.consensus.server.peer.PeerServer;
+import org.apache.hadoop.hbase.consensus.server.peer.states.PeerServerStateType;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Threads;
+import org.junit.After;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.AbstractExecutorService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertTrue;
+import static junit.framework.Assert.fail;
+
+public class TestAsyncStatesInRaftStateMachine {
+  private static final Logger LOG = LoggerFactory.getLogger(
+    TestAsyncStatesInRaftStateMachine.class);
+
+  private static final int QUORUM_SIZE = 5;
+  private static final int QUORUM_MAJORITY = 3;
+  private static HRegionInfo regionInfo;
+  private static RaftTestUtil RAFT_TEST_UTIL = new RaftTestUtil();
+  private Configuration conf;
+  private QuorumClient client;
+  private ExecutorService executorService;
+
+  class DummyExecutorService extends AbstractExecutorService {
+    @Override
+    public void shutdown() {
+    }
+
+    @Override
+    public List<Runnable> shutdownNow() {
+      return null;
+    }
+
+    @Override
+    public boolean isShutdown() {
+      return false;
+    }
+
+    @Override
+    public boolean isTerminated() {
+      return false;
+    }
+
+    @Override
+    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+      return false;
+    }
+
+    @Override
+    public Future<?> submit(Runnable task) {
+      return SettableFuture.create();
+    }
+
+    @Override
+    public void execute(Runnable command) {
+    }
+  }
+
+  public void setUpExecutors(boolean useDummyWriteOpsPool,
+                             boolean useDummyReadOpsPool) {
+    FSMLargeOpsExecutorService
+      .initializeForTesting(
+        (useDummyWriteOpsPool ?
+          new DummyExecutorService() :
+          FSMLargeOpsExecutorService.createWriteOpsExecutorService(conf)),
+        (useDummyReadOpsPool ?
+          new DummyExecutorService() :
+          FSMLargeOpsExecutorService.createReadOpsExecutorService(conf)));
+  }
+
+  public void setUp(boolean useDummyWriteOpsPool,
+                    boolean useDummyReadOpsPool) throws Exception {
+    conf = RAFT_TEST_UTIL.getConf();
+    setUpExecutors(useDummyWriteOpsPool, useDummyReadOpsPool);
+    RAFT_TEST_UTIL.createRaftCluster(QUORUM_SIZE);
+    RAFT_TEST_UTIL.assertAllServersRunning();
+    RAFT_TEST_UTIL.setUsePeristentLog(true);
+    regionInfo = RAFT_TEST_UTIL.initializePeers();
+    RAFT_TEST_UTIL.addQuorum(regionInfo, null);
+    RAFT_TEST_UTIL.startQuorum(regionInfo);
+    client = RAFT_TEST_UTIL.getQuorumClient(regionInfo.getQuorumInfo());
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (executorService != null) {
+      while (!executorService.isTerminated()) {
+        executorService.shutdownNow();
+        Threads.sleep(1000);
+      }
+    }
+    LOG.info("Shutting down the FSM");
+    RAFT_TEST_UTIL.setRaftQuorumContextClass(RaftQuorumContext.class);
+    RAFT_TEST_UTIL.shutdown();
+  }
+
+  @Test(expected = TimeoutException.class)
+  public void ensureNoProgressIfSendAppendRequestIsNotComplete()
+    throws Exception {
+    setUp(true, false);
+    try {
+      testReplicatingCommitsAsync(1).get(3000, TimeUnit.MILLISECONDS);
+    } catch (TimeoutException e) {
+      // the peer with the lowest timeout will try to write the
+      // votedFor and should get stuck.
+      RaftQuorumContext r = RAFT_TEST_UTIL.getRaftQuorumContextByRank(
+        regionInfo, 5);
+      assertEquals(RaftStateType.SEND_VOTE_REQUEST,
+        r.getCurrentRaftState().getStateType());
+      throw e;
+    }
+  }
+
+  public PeerServer killAndRevivePeerWhileReplicatingEdits(
+    boolean blockOnHandleAppendResponse) throws Exception {
+    setUp(false, blockOnHandleAppendResponse);
+    // Make one single commit.
+    testReplicatingCommits(1);
+
+    RaftQuorumContext leader =
+      RAFT_TEST_UTIL.getLeaderQuorumContext(regionInfo);
+
+    // Stop the peer with rank = 1.
+    RaftQuorumContext peer =
+      RAFT_TEST_UTIL.getRaftQuorumContextByRank(regionInfo, 1);
+
+    PeerServer peerServer = leader.getPeerServers().get(peer.getMyAddress());
+    LocalConsensusServer peerConsensusServer =
+      RAFT_TEST_UTIL.stopLocalConsensusServer(regionInfo, 1);
+
+    // Replicate some other commits, the dead server will miss out.
+    testReplicatingCommits(10);
+
+    // Restart that dead server.
+    RAFT_TEST_UTIL.restartLocalConsensusServer(peerConsensusServer,
+      regionInfo, peer.getMyAddress());
+
+    // Wait for dead server to come back
+    long start = System.currentTimeMillis();
+    while (!RAFT_TEST_UTIL.verifyLogs(regionInfo.getQuorumInfo(), QUORUM_SIZE, true) &&
+      !blockOnHandleAppendResponse) {
+      Thread.sleep(1000);
+      // stop if we waited for more than 10 seconds
+      if (System.currentTimeMillis() - start > 100000) {
+        Assert.fail("Timed out while waiting for dead server to come back");
+      }
+    }
+
+    return peerServer;
+  }
+
+  // TODO
+  // The two tests below are unstable, and can be flaky dependent on the killing
+  // of the server, and restarting it. I could fix it by adding sleep, but
+  // that is no guarantee that the test won't break in the future.
+  /**
+  @Test
+  public void ensureNoProgressIfPeerHandleAppendResponseIsNotComplete()
+    throws Exception {
+    PeerServer s = killAndRevivePeerWhileReplicatingEdits(true);
+    assertTrue(
+      ((AbstractPeer) s).getStateMachineService().getCurrentState().getStateType().equals(
+        PeerServerStateType.HANDLE_APPEND_RESPONSE));
+  }
+
+  @Test
+  public void ensureProgressIfPeerHandleAppendResponseIsComplete()
+    throws Exception {
+    PeerServer s = killAndRevivePeerWhileReplicatingEdits(false);
+    assertTrue(
+      ((AbstractPeer) s).getStateMachineService().getCurrentState().getStateType().equals(
+        PeerServerStateType.PEER_FOLLOWER));
+  }
+   */
+
+  @Test
+  public void ensureProgressWhenSendAppendRequestCompletes() throws Exception {
+    setUp(false, false);
+    testReplicatingCommitsAsync(1).get(3000, TimeUnit.MILLISECONDS);
+  }
+
+  private Future testReplicatingCommitsAsync(final int numCommits)
+    throws Exception {
+    Runnable r = new Runnable() {
+      @Override
+      public void run() {
+        testReplicatingCommits(numCommits);
+      }
+    };
+    executorService = Executors.newSingleThreadExecutor();
+    return executorService.submit(r);
+  }
+
+  private void testReplicatingCommits(int numCommits) {
+    try {
+      RAFT_TEST_UTIL.waitForLeader(regionInfo);
+      RaftQuorumContext leader =
+        RAFT_TEST_UTIL.getLeaderQuorumContext(regionInfo);
+      Assert.assertNotNull(leader);
+
+      RAFT_TEST_UTIL.dumpStates(regionInfo);
+      for (int i = 0; i < numCommits; i++) {
+        client.replicateCommits(Arrays.asList(generateTestingWALEdit()));
+
+      }
+      RAFT_TEST_UTIL.dumpStates(regionInfo);
+    } catch (Exception e) {
+      LOG.error("Errors: ", e);
+      fail("Unexpected exception: " + e);
+    }
+  }
+
+  private static WALEdit generateTestingWALEdit() {
+    KeyValue kv = KeyValue.createFirstOnRow(Bytes.toBytes("TestQuorum"));
+    return new WALEdit(Arrays.asList(kv));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/eca32aa4/hbase-consensus/src/test/java/org/apache/hadoop/hbase/consensus/fsm/TestFiniteStateMachine.java
----------------------------------------------------------------------
diff --git a/hbase-consensus/src/test/java/org/apache/hadoop/hbase/consensus/fsm/TestFiniteStateMachine.java b/hbase-consensus/src/test/java/org/apache/hadoop/hbase/consensus/fsm/TestFiniteStateMachine.java
new file mode 100644
index 0000000..3125c30
--- /dev/null
+++ b/hbase-consensus/src/test/java/org/apache/hadoop/hbase/consensus/fsm/TestFiniteStateMachine.java
@@ -0,0 +1,194 @@
+package org.apache.hadoop.hbase.consensus.fsm;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * A test class wrapping FiniteStateMachineImpl, exposing some of its internals
+ * in order to increase testability.
+ */
+@InterfaceAudience.Private
+class TestableFiniteStateMachine extends FiniteStateMachine {
+
+  public TestableFiniteStateMachine(State defaultStartState) {
+    super(TestableFiniteStateMachine.class.getName());
+    setStartState(defaultStartState);
+  }
+
+  public HashMap<Transition, State> getTransitionMap(
+          final State s) {
+    return super.getTransitionMap(s);
+  }
+
+  public Transition getNextTransition(final State s, final Event e) {
+    return super.getNextTransition(s, e);
+  }
+}
+
+interface ImmutableTestContext {
+  public boolean getA();
+}
+
+interface MutableTestContext extends ImmutableTestContext, MutableContext {
+  public void setA(boolean x);
+}
+
+class TestContext implements ImmutableTestContext, MutableTestContext {
+  private boolean a;
+
+  public TestContext() {
+    this.a = false;
+  }
+
+  public boolean getA() { return a; }
+  public void setA(boolean x) { a = x; }
+}
+
+enum States implements StateType {
+  NONE,
+  START,
+  STOP,
+  A,
+  B,
+  MAX
+}
+
+class TestState extends State {
+  protected MutableTestContext c;
+
+  public TestState(final States t, final MutableTestContext c) {
+    super(t);
+    this.c = c;
+  }
+  public void onEntry(final Event e) {}
+  public void onExit(final Event e) {}
+}
+
+class Start extends TestState {
+  public Start(final MutableTestContext c) {
+    super(States.START, c);
+  }
+}
+
+class Stop extends TestState {
+  public Stop(final MutableTestContext c) {
+    super(States.STOP, c);
+  }
+}
+
+class A extends TestState {
+  public A(final MutableTestContext c) {
+    super(States.A, c);
+  }
+  public void onEntry(final Event e) {
+    if (c != null) c.setA(!c.getA());
+  }
+}
+
+enum Events implements EventType {
+  NONE,
+  TYPE1,
+  TYPE2,
+  MAX
+}
+
+enum Transitions implements TransitionType {
+  NONE,
+  NOOP,
+  IS_A,
+  ON_TYPE1,
+  MAX
+}
+
+abstract class TestCondition implements Conditional {
+  protected ImmutableTestContext c;
+
+  public TestCondition(final ImmutableTestContext c) {
+    this.c = c;
+  }
+
+  public abstract boolean isMet(final Event e);
+}
+
+class IsNoop extends TestCondition {
+  public IsNoop(final ImmutableTestContext c) {
+    super(c);
+  }
+  public boolean isMet(final Event e) { return true; }
+}
+
+class IsA extends TestCondition {
+  public IsA(final ImmutableTestContext c) {
+    super(c);
+  }
+  public boolean isMet(final Event e) { return c.getA(); }
+}
+
+public class TestFiniteStateMachine {
+  TestableFiniteStateMachine s;
+  TestContext c = new TestContext();
+
+  State START = new Start(c);
+  State STOP = new Stop(c);
+  State A = new A(c);
+
+  Transition NOOP = new Transition(Transitions.NOOP, new IsNoop(c));
+  Transition IS_A = new Transition(Transitions.IS_A, new IsA(c));
+  Transition ON_TYPE1 = new Transition(Transitions.ON_TYPE1,
+          new OnEvent(Events.TYPE1));
+
+  Event TYPE1 = new Event(Events.TYPE1);
+
+  @Before
+  public void createNewStateMachine() {
+    s = new TestableFiniteStateMachine(START);
+  }
+
+  @Test
+  public void shouldAddTransitions() throws Exception {
+    s.addTransition(START, STOP, NOOP);
+    HashMap<Transition, State> transitionMap = s.getTransitionMap(START);
+    assertNotNull(transitionMap);
+    assertEquals(STOP, transitionMap.get(NOOP));
+  }
+
+  @Test
+  public void shouldTransitionOnMetCondition() throws Exception {
+    s.addTransition(START, STOP, NOOP);
+    assertEquals(NOOP, s.getNextTransition(START, null));
+  }
+
+  @Test
+  public void shouldIgnoreUnmetConditionals() throws Exception {
+    s.addTransition(START, STOP, IS_A);
+    assertNull(s.getNextTransition(START, null));
+  }
+
+  @Test
+  public void shouldTransitionOnEvent() throws Exception {
+    s.addTransition(START, STOP, ON_TYPE1);
+    assertEquals(ON_TYPE1, s.getNextTransition(START, TYPE1));
+  }
+
+  @Test
+  public void shouldIgnoreInvalidEvents() throws Exception {
+    Event e = new Event(Events.TYPE2);
+    s.addTransition(START, STOP, ON_TYPE1);
+    assertNull(s.getNextTransition(START, e));
+  }
+
+  @Test
+  public void shouldBeGreedy() throws Exception {
+    s.addTransition(START, A, ON_TYPE1);
+    s.addTransition(A, STOP, IS_A);
+    s.setStartState(START);
+    assertEquals(STOP, s.getNextState(TYPE1));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/eca32aa4/hbase-consensus/src/test/java/org/apache/hadoop/hbase/consensus/fsm/TestFiniteStateMachineService.java
----------------------------------------------------------------------
diff --git a/hbase-consensus/src/test/java/org/apache/hadoop/hbase/consensus/fsm/TestFiniteStateMachineService.java b/hbase-consensus/src/test/java/org/apache/hadoop/hbase/consensus/fsm/TestFiniteStateMachineService.java
new file mode 100644
index 0000000..d864737
--- /dev/null
+++ b/hbase-consensus/src/test/java/org/apache/hadoop/hbase/consensus/fsm/TestFiniteStateMachineService.java
@@ -0,0 +1,144 @@
+package org.apache.hadoop.hbase.consensus.fsm;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.util.serial.AsyncSerialExecutorServiceImpl;
+import org.apache.hadoop.hbase.util.serial.SerialExecutorService;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestFiniteStateMachineService {
+  CountDownLatch latch;
+  CountDownMachine fsm;
+  FiniteStateMachineService service;
+  boolean multiplexedFSM;
+
+  public TestFiniteStateMachineService(boolean multiplexedFSM) {
+    this.multiplexedFSM = multiplexedFSM;
+  }
+
+  @SuppressWarnings("serial")
+  @Parameterized.Parameters
+  public static Collection<Boolean[]> parameters() {
+    return new ArrayList<Boolean[]>() {{
+      add(new Boolean[]{ true });
+      add(new Boolean[]{ false });
+    }};
+  }
+
+  @Before
+  public void setUp() {
+    latch = new CountDownLatch(3);
+    fsm = new CountDownMachine("CountDownMachine", latch);
+    if (multiplexedFSM) {
+      SerialExecutorService serialService =
+          new AsyncSerialExecutorServiceImpl(HConstants.DEFAULT_FSM_MUX_THREADPOOL_SIZE,
+              "serialScheduler");
+      service = new ConstitutentFSMService(fsm, serialService.createStream());
+    } else {
+      service = new FiniteStateMachineServiceImpl(fsm);
+    }
+  }
+
+  @Test
+  public void shouldDrainEventQueueOnShutdown() throws InterruptedException {
+    assertFalse("Service should not be shutdown", service.isShutdown());
+    for (int i = 0; i < latch.getCount(); i++) {
+      assertTrue("Event should be scheduled",
+        service.offer(new Event(Events.PUSH)));
+    }
+    service.shutdown();
+    fsm.start();  // Allow the CountDownMachine to handle events.
+    assertTrue("Event queue should be drained after shutdown",
+      latch.await(100, TimeUnit.MILLISECONDS));
+    assertTrue("Service should be terminated",
+      service.awaitTermination(100, TimeUnit.MILLISECONDS));
+  }
+
+  enum States implements StateType {
+    NONE,
+    WAIT,
+    PUSH,
+    MAX
+  }
+
+  enum Events implements EventType {
+    NONE,
+    PUSH,
+    MAX
+  }
+
+  enum Transitions implements TransitionType {
+    NONE,
+    UNCONDITIONAL,
+    ON_PUSH,
+    MAX
+  }
+
+  private class CountDownMachine extends FiniteStateMachine {
+    CountDownLatch startDelay;
+
+    public CountDownMachine(String name, CountDownLatch latch) {
+      super(name);
+      this.startDelay = new CountDownLatch(1);
+
+      State WAIT = new CountDownState(States.WAIT, null);
+      State PUSH = new CountDownState(States.PUSH, latch);
+
+      addTransition(WAIT, PUSH,
+        new Transition(Transitions.ON_PUSH, new OnEvent(Events.PUSH)));
+      addTransition(PUSH, WAIT,
+        new Transition(Transitions.UNCONDITIONAL, new Unconditional()));
+      setStartState(WAIT);
+    }
+
+    public void start() {
+      startDelay.countDown();
+    }
+
+    /**
+     * Delay handling events until start has been called. This works around
+     * the race condition between adding events to the queue and shutdown
+     * of the FSM service.
+     * @param e Event to be handled
+     */
+    @Override
+    public void handleEvent(final Event e) {
+      try {
+        startDelay.await();
+        super.handleEvent(e);
+      } catch (InterruptedException ie) {
+        Thread.currentThread().interrupt();
+      }
+    }
+
+    private class CountDownState extends State {
+      CountDownLatch latch;
+
+      public CountDownState(StateType t, CountDownLatch latch) {
+        super(t);
+        this.latch = latch;
+      }
+
+      @Override
+      public void onEntry(Event e) {
+        if (latch != null) {
+          latch.countDown();
+        }
+      }
+
+      @Override
+      public void onExit(Event e) {}
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/eca32aa4/hbase-consensus/src/test/java/org/apache/hadoop/hbase/consensus/fsm/TestIncompleteStates.java
----------------------------------------------------------------------
diff --git a/hbase-consensus/src/test/java/org/apache/hadoop/hbase/consensus/fsm/TestIncompleteStates.java b/hbase-consensus/src/test/java/org/apache/hadoop/hbase/consensus/fsm/TestIncompleteStates.java
new file mode 100644
index 0000000..22c62e1
--- /dev/null
+++ b/hbase-consensus/src/test/java/org/apache/hadoop/hbase/consensus/fsm/TestIncompleteStates.java
@@ -0,0 +1,291 @@
+package org.apache.hadoop.hbase.consensus.fsm;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import junit.framework.Assert;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.util.serial.AsyncSerialExecutorServiceImpl;
+import org.apache.hadoop.hbase.util.serial.SerialExecutorService;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestIncompleteStates {
+  enum States implements StateType {
+    S1,
+    S2,
+    S3,
+    S4,
+    S5,
+    S6
+  }
+
+  enum Events implements EventType {
+    E1,
+    E2,
+    E3
+  }
+
+  enum Transitions implements TransitionType {
+    T1,
+    T2,
+    T3,
+    T4,
+    T5
+  }
+
+  private class TestableState extends State {
+    boolean entered = false;
+
+    public TestableState(StateType stateType) {
+      super(stateType);
+    }
+
+    @Override
+    public void onEntry(Event e) {
+      entered = true;
+    }
+
+    @Override
+    public void onExit(Event e) {
+      countDownLatch.countDown();
+    }
+
+    public boolean isEntered() {
+      return entered;
+    }
+  }
+
+  /**
+   * This type of state mocks the behavior of a state which doesn't complete
+   * its onEntry() method's logic with the return of the onEntry() method call.
+   */
+  private class TestableStateWithAsyncWork extends TestableState {
+    boolean isComplete = false;
+    SettableFuture<?> pendingFuture;
+
+    public TestableStateWithAsyncWork(StateType stateType) {
+      super(stateType);
+    }
+
+    @Override
+    public boolean isAsyncState() {
+      return true;
+    }
+
+    @Override
+    public void onEntry(Event e) {
+      entered = true;
+      pendingFuture = SettableFuture.create();
+    }
+
+    @Override
+    public boolean isComplete() {
+      return isComplete;
+    }
+
+    @Override
+    public ListenableFuture<?> getAsyncCompletion() {
+      return pendingFuture;
+    }
+
+    public void setComplete() {
+      if (entered) {
+        isComplete = true;
+        pendingFuture.set(null);
+      }
+    }
+  }
+
+  // States
+  TestableState S1 = new TestableState(States.S1);
+  TestableStateWithAsyncWork S2 = new TestableStateWithAsyncWork(States.S2);
+  TestableState S3 = new TestableState(States.S3);
+  TestableState S4 = new TestableState(States.S4);
+  TestableState S5 = new TestableState(States.S5);
+  TestableStateWithAsyncWork S6 = new TestableStateWithAsyncWork(States.S6);
+
+  // Transitions
+  Transition T1 = new Transition(Transitions.T1, new OnEvent(Events.E1));
+  Transition T2 = new Transition(Transitions.T2, new OnEvent(Events.E2));
+  Transition T3 = new Transition(Transitions.T3, new Unconditional());
+  Transition T4 = new Transition(Transitions.T4, new Unconditional());
+  Transition T5 = new Transition(Transitions.T5, new OnEvent(Events.E3));
+
+  // Events
+  Event E1 = new Event(Events.E1);
+  Event E2 = new Event(Events.E2);
+  Event E3 = new Event(Events.E3);
+
+  CountDownLatch countDownLatch;
+  FiniteStateMachine fsm;
+  FiniteStateMachineService fsmService;
+  boolean multiplexedFSM;
+
+  public TestIncompleteStates(boolean multiplexedFSM) {
+    this.multiplexedFSM = multiplexedFSM;
+  }
+
+  @SuppressWarnings("serial")
+  @Parameterized.Parameters
+  public static Collection<Boolean[]> parameters() {
+    return new ArrayList<Boolean[]>() {{
+      add(new Boolean[]{ true });
+      add(new Boolean[]{ false });
+    }};
+  }
+
+  @Before
+  public void setUp() {
+    fsm = new FiniteStateMachine("fsm");
+    fsm.setStartState(S1);
+    if (multiplexedFSM) {
+      SerialExecutorService serialService =
+          new AsyncSerialExecutorServiceImpl(HConstants.DEFAULT_FSM_MUX_THREADPOOL_SIZE,
+              "serialScheduler");
+      fsmService = new ConstitutentFSMService(fsm, serialService.createStream());
+    } else {
+      fsmService = new FiniteStateMachineServiceImpl(fsm);
+    }
+  }
+
+  public void countDown() {
+    try {
+      countDownLatch.await(500, TimeUnit.MILLISECONDS);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    }
+    Assert.assertEquals(0, countDownLatch.getCount());
+  }
+
+  public void resetLatch(int expectedCount) {
+    countDownLatch = new CountDownLatch(expectedCount);
+  }
+
+  @Test
+  public void testIncompleteStates() throws InterruptedException {
+    fsm.addTransition(S1, S2, T1);
+    fsm.addTransition(S2, S3, T2);
+
+    // Offer the event E1.
+    resetLatch(1);
+    fsmService.offer(E1);
+    countDown();
+
+    // Check that the state S1 is complete, since it is a regular state.
+    Assert.assertEquals(true, S1.isComplete());
+
+    // Check that we land up in S2.
+    Assert.assertEquals(S2, fsmService.getCurrentState());
+    Assert.assertEquals(0, fsmService.getNumPendingEvents());
+
+    // Issue an event to transition to S3. Note that S2 mimics a state which
+    // has pending async work.
+    resetLatch(0);
+    fsmService.offer(E2);
+    countDown();
+
+    Assert.assertEquals(1, fsmService.getNumPendingEvents());
+    Assert.assertEquals(S2, fsmService.getCurrentState());
+
+    // Now set the state to be completed
+    resetLatch(1);
+    S2.setComplete();
+    Assert.assertEquals(true, S2.isComplete());
+    countDown();
+
+    Assert.assertEquals(0, fsmService.getNumPendingEvents());
+    Assert.assertEquals(S3, fsmService.getCurrentState());
+  }
+
+  /**
+   * This tests the case when you have a scenario like (S2 is an async state):
+   * S2--(Conditional)-->S3--(Conditional)-->S4--(Event E)-->S5.
+   *
+   * If we are in S2, and the async task is not complete, we should not
+   * transition to S3, and then to S4. Only when the task is complete, should we
+   * automatically transition to S3, and then to S4.
+   *
+   * Upon offering the event E, we should transition to S3, and then to S5.
+   * E should not be discarded.
+   */
+  @Test
+  public void testStateTransitionFromAsyncStatesWithConditions() {
+    fsm.addTransition(S1, S2, T1);
+    fsm.addTransition(S2, S3, T3);
+    fsm.addTransition(S3, S4, T4);
+    fsm.addTransition(S4, S5, T5);
+
+    // Offer the event E1.
+    resetLatch(1);
+    fsmService.offer(E1);
+    countDown();
+
+    // Check that the state S1 is complete, since it is a regular state.
+    Assert.assertEquals(true, S1.isComplete());
+
+    // Check that the current state is S2, and NOT S3, even though there is an
+    // unconditional transition from S2 to S3.
+    Assert.assertEquals(S2, fsm.getCurrentState());
+
+    // Now set the state S2 to be completed.
+    resetLatch(2);
+    S2.setComplete();
+    countDown();
+
+    // We should now be in S4, by taking a conditional transition to S3, and
+    // then to S4.
+    Assert.assertEquals(S4, fsmService.getCurrentState());
+
+    // Also check that we visited S3.
+    Assert.assertTrue(S3.isEntered());
+
+
+    // Now offer E3, we should transition unconditionally from S2 to S3, and
+    // unconditionally from S3 to S4. Then because E3 was offered, we should
+    // transition to S5.
+    resetLatch(1);
+    fsmService.offer(E3);
+    countDown();
+
+    Assert.assertEquals(S5, fsm.getCurrentState());
+  }
+
+  /**
+   * We should not abort an event when we are in an async state.
+   */
+  @Test
+  public void testEventNotAbortedWhenInAsyncState() {
+    // S2 and S6 are async states, which transition on E2 and E3 respectively.
+    fsm.addTransition(S1, S2, T1);
+    fsm.addTransition(S2, S6, T2);
+    fsm.addTransition(S6, S3, T5);
+
+    resetLatch(1);
+    fsmService.offer(E1);
+    countDown();
+
+    // We offer E3 before S2 is complete, so that it is in the event queue.
+    // After E2 we transition to S6, but E3 is not applicable yet because the
+    // state is not complete. We should not discard E3.
+    resetLatch(1);
+    fsmService.offer(E2);
+    fsmService.offer(E3);
+    S2.setComplete();
+    countDown();
+
+    resetLatch(1);
+    S6.setComplete();
+    countDown();
+
+    Assert.assertEquals(S3, fsm.getCurrentState());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/eca32aa4/hbase-consensus/src/test/java/org/apache/hadoop/hbase/consensus/log/TestCachedFileChannel.java
----------------------------------------------------------------------
diff --git a/hbase-consensus/src/test/java/org/apache/hadoop/hbase/consensus/log/TestCachedFileChannel.java b/hbase-consensus/src/test/java/org/apache/hadoop/hbase/consensus/log/TestCachedFileChannel.java
new file mode 100644
index 0000000..eab543c
--- /dev/null
+++ b/hbase-consensus/src/test/java/org/apache/hadoop/hbase/consensus/log/TestCachedFileChannel.java
@@ -0,0 +1,76 @@
+package org.apache.hadoop.hbase.consensus.log;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.util.Random;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.junit.Test;
+
+public class TestCachedFileChannel {
+  final static Log LOG = LogFactory.getLog(TestCachedFileChannel.class);
+
+  @Test
+  public void testNormalFile() throws IOException {
+    final int size = 100000;
+    final int max = 10000;
+    final String path = "/tmp/testNormalFile";
+    writeToFile(path, size);
+    CachedFileChannel cfc = new CachedFileChannel(new RandomAccessFile(path, "r"), max);
+
+    final int trials = 1000;
+    Random rand = new Random();
+    long now = System.currentTimeMillis();
+    LOG.debug("Setting the seed to " + now);
+    rand.setSeed(now);
+    for(int i = 0; i < trials; i++) {
+      int offset = rand.nextInt(size);
+      int length;
+      if (rand.nextBoolean()) {
+        // read something small that fits in memory.
+        length = rand.nextInt(Math.min(max, size-offset));
+      } else {
+        length = rand.nextInt(size - offset);
+      }
+
+      verifyData(cfc, offset, length);
+    }
+
+    // do some reads reading all the way to the end.
+    int more = 100;
+    for(int i = 0; i < more; i++) {
+      int length = rand.nextInt((int)(1.5 * max));
+      int offset = size - length;
+      verifyData(cfc, offset, length);
+    }
+
+    new File(path).delete();
+  }
+
+  private void verifyData(CachedFileChannel cfc, int offset, int length) throws IOException {
+    LOG.debug("Verifying data " + length + " bytes, starting from " + offset);
+    ByteBuffer bb = ByteBuffer.allocate(length);
+    cfc.read(bb, offset);
+    bb.flip();
+
+    for(int i = 0; i < length; ++i) {
+      Assert.assertEquals("Mismatch at location " + (offset + i),
+          (byte)(offset + i), bb.get());
+    }
+  }
+
+  private void writeToFile(String path, int size) throws IOException {
+    FileOutputStream fsOut = new FileOutputStream(path);
+    for(int i = 0; i < size; ++i) {
+      fsOut.write(i);
+    }
+    fsOut.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/eca32aa4/hbase-consensus/src/test/java/org/apache/hadoop/hbase/consensus/log/TestRemoteLogFetcher.java
----------------------------------------------------------------------
diff --git a/hbase-consensus/src/test/java/org/apache/hadoop/hbase/consensus/log/TestRemoteLogFetcher.java b/hbase-consensus/src/test/java/org/apache/hadoop/hbase/consensus/log/TestRemoteLogFetcher.java
new file mode 100644
index 0000000..0605222
--- /dev/null
+++ b/hbase-consensus/src/test/java/org/apache/hadoop/hbase/consensus/log/TestRemoteLogFetcher.java
@@ -0,0 +1,167 @@
+package org.apache.hadoop.hbase.consensus.log;
+
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.consensus.RaftTestUtil;
+import org.apache.hadoop.hbase.consensus.ReplicationLoadForUnitTest;
+import org.apache.hadoop.hbase.consensus.client.FetchTask;
+import org.apache.hadoop.hbase.consensus.client.QuorumClient;
+import org.apache.hadoop.hbase.consensus.quorum.RaftQuorumContext;
+import org.apache.hadoop.hbase.util.Pair;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class TestRemoteLogFetcher {
+  private static final Logger LOG = LoggerFactory.getLogger(TestRemoteLogFetcher.class);
+  private static int QUORUM_SIZE = 3;
+  private static int QUORUM_MAJORITY = 2;
+  private static HRegionInfo regionInfo;
+  private static RaftTestUtil RAFT_TEST_UTIL = new RaftTestUtil();
+  private static QuorumClient client;
+  private static volatile int transactionNum = 0;
+  private ReplicationLoadForUnitTest loader;
+
+  @Before
+  public void setUp() throws Exception {
+//    RAFT_TEST_UTIL.disableVerboseLogging();
+    RAFT_TEST_UTIL.createRaftCluster(QUORUM_SIZE);
+    RAFT_TEST_UTIL.setUsePeristentLog(true);
+    RAFT_TEST_UTIL.assertAllServersRunning();
+    regionInfo = RAFT_TEST_UTIL.initializePeers();
+    RAFT_TEST_UTIL.addQuorum(regionInfo, RAFT_TEST_UTIL.getScratchSetup(QUORUM_SIZE));
+    RAFT_TEST_UTIL.startQuorum(regionInfo);
+    client = RAFT_TEST_UTIL.getQuorumClient(regionInfo.getQuorumInfo());
+    transactionNum = 0;
+    loader = new ReplicationLoadForUnitTest(regionInfo, client, RAFT_TEST_UTIL, QUORUM_SIZE,
+        QUORUM_MAJORITY);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    RAFT_TEST_UTIL.shutdown();
+  }
+
+  @Test(timeout=60000)
+  public void testLogFileStatusRetrieval() throws Exception {
+    RaftQuorumContext c3 = RAFT_TEST_UTIL.getRaftQuorumContextByRank(regionInfo, 3);
+    RaftQuorumContext c2 = RAFT_TEST_UTIL.getRaftQuorumContextByRank(regionInfo, 2);
+    RaftQuorumContext c1 = RAFT_TEST_UTIL.getRaftQuorumContextByRank(regionInfo, 1);
+
+    TransactionLogManager l3 = (TransactionLogManager)c3.getLogManager();
+    // Around 60 indices per log file on peer 3
+    l3.setRaftLogRollSize(1024 * 3000);
+    TransactionLogManager l1 = (TransactionLogManager)c1.getLogManager();
+    // Around 40 indices per log file on peer 1
+    l1.setRaftLogRollSize(1024 * 2000);
+
+    // Start the client load
+    loader.startReplicationLoad(100);
+
+    // Let the traffic fly for a while
+    transactionNum = loader.makeProgress(10000, transactionNum);
+    l3.rollCommittedLogs();
+    l1.rollCommittedLogs();
+
+    RemoteLogFetcher fetcher = new RemoteLogFetcher(c2);
+    fetcher.initializeQuorumClients();
+    List<Pair<String, List<LogFileInfo>>> statuses = fetcher.getPeerCommittedLogStatus(0);
+    assertFalse(statuses.isEmpty());
+    long minIndex = Long.MAX_VALUE;
+    long maxIndex = Long.MIN_VALUE;
+    for (Pair<String, List<LogFileInfo>> status : statuses) {
+      for (LogFileInfo info : status.getSecond()) {
+        if (info.getLastIndex() > maxIndex) {
+          maxIndex = info.getLastIndex();
+        }
+        if (info.getInitialIndex() < minIndex) {
+          minIndex = info.getInitialIndex();
+        }
+      }
+    }
+    assertTrue(minIndex == 1);
+    assertTrue(maxIndex > 1);
+    LOG.debug("Fetch log files for range [" + minIndex + ", " + maxIndex + "]");
+
+    Collection<FetchTask> tasks = fetcher.createFetchTasks(statuses, minIndex);
+    for (FetchTask task : tasks) {
+      LOG.debug(task.toString());
+    }
+    validateIndexCoverage(minIndex, maxIndex, tasks);
+
+    // Stop the client load
+    loader.stopReplicationLoad();
+  }
+
+  /**
+   * Validate indexes in all fetch tasks can be merged into one range, which matches
+   * the maximum index on any other peer
+   */
+  private void validateIndexCoverage(long minIndex, long maxIndex, Collection<FetchTask> tasks) {
+    List<Interval> intervals = new ArrayList<>();
+    for (FetchTask task : tasks) {
+      List<LogFileInfo> fileInfos = task.getFileInfos();
+      for (LogFileInfo info : fileInfos) {
+        Interval interval = new Interval(info.getInitialIndex(), info.getLastIndex());
+        intervals.add(interval);
+      }
+    }
+
+    Collections.sort(intervals, new Comparator<Interval>() {
+      @Override
+      public int compare(Interval i1, Interval i2) {
+        if (i1.start < i2.start) {
+          return -1;
+        } else if (i1.start > i2.start) {
+          return 1;
+        } else {
+          if (i1.end < i2.end) {
+            return -1;
+          } else if (i1.end > i2.end) {
+            return 1;
+          } else {
+            return 0;
+          }
+        }
+      }
+    });
+
+    // Merge sorted intervals into a set of minimum discrete intervals
+    ArrayList<Interval> res = new ArrayList<>();
+    for (Interval cur : intervals) {
+      if (res.isEmpty()) {
+        res.add(cur);
+      } else {
+        Interval last = res.get(res.size() - 1);
+        if (last.end >= cur.start - 1) {
+          last.end = Math.max(last.end, cur.end);
+        } else {
+          res.add(cur);
+        }
+      }
+    }
+
+    assertEquals("Indices should merge into one interval", 1, res.size());
+    Interval interval = res.get(0);
+    assertEquals("Min index should match", minIndex, interval.start);
+    assertEquals("Max index should match", maxIndex, interval.end);
+  }
+
+  private class Interval {
+    public long start;
+    public long end;
+    public Interval(long s, long e) { start = s; end = e; }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/hbase/blob/eca32aa4/hbase-consensus/src/test/java/org/apache/hadoop/hbase/consensus/log/TestTransactionLogCreator.java
----------------------------------------------------------------------
diff --git a/hbase-consensus/src/test/java/org/apache/hadoop/hbase/consensus/log/TestTransactionLogCreator.java b/hbase-consensus/src/test/java/org/apache/hadoop/hbase/consensus/log/TestTransactionLogCreator.java
new file mode 100644
index 0000000..bdc7d92
--- /dev/null
+++ b/hbase-consensus/src/test/java/org/apache/hadoop/hbase/consensus/log/TestTransactionLogCreator.java
@@ -0,0 +1,40 @@
+package org.apache.hadoop.hbase.consensus.log;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.junit.Test;
+
+import java.io.File;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestTransactionLogCreator {
+
+  @Test
+  public void testNewLogfileCreation() throws Exception {
+    String logDir = "/tmp/testNewLogfileCreation";
+    File logDirFile = new File(logDir);
+    if (logDirFile.exists()) {
+      FileUtils.deleteDirectory(logDirFile);
+    }
+    logDirFile.mkdir();
+    Configuration conf = new Configuration();
+    conf.set(HConstants.RAFT_TRANSACTION_LOG_DIRECTORY_KEY, logDir);
+    TransactionLogManager logManager =
+        new TransactionLogManager(conf, "test", HConstants.UNDEFINED_TERM_INDEX);
+    logManager.initialize(null);
+    // Wait for new logs to be created
+    Thread.sleep(1000);
+    File currentLogDir = new File(logDir + "/test/current/");
+    File[] files = currentLogDir.listFiles();
+    int expected = HConstants.RAFT_MAX_NUM_NEW_LOGS;
+    assertEquals("# of new log files", expected, files.length);
+
+    logManager.forceRollLog();
+    Thread.sleep(1000);
+    files = currentLogDir.listFiles();
+    expected++;
+    assertEquals("# of new log files", expected, files.length);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/eca32aa4/hbase-consensus/src/test/java/org/apache/hadoop/hbase/consensus/metrics/TestPeerMetrics.java
----------------------------------------------------------------------
diff --git a/hbase-consensus/src/test/java/org/apache/hadoop/hbase/consensus/metrics/TestPeerMetrics.java b/hbase-consensus/src/test/java/org/apache/hadoop/hbase/consensus/metrics/TestPeerMetrics.java
new file mode 100644
index 0000000..85dcd72
--- /dev/null
+++ b/hbase-consensus/src/test/java/org/apache/hadoop/hbase/consensus/metrics/TestPeerMetrics.java
@@ -0,0 +1,23 @@
+package org.apache.hadoop.hbase.consensus.metrics;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestPeerMetrics {
+  PeerMetrics metrics;
+
+  @Before
+  public void setUp() {
+    metrics = new PeerMetrics("TestTable.region", "proc1", "peer1", null);
+  }
+
+  @Test
+  public void shoudReturnName() {
+    String expectedName = String.format("%s:type=%s,name=%s,proc=%s,peer=%s",
+            "org.apache.hadoop.hbase.consensus", PeerMetrics.TYPE,
+            "TestTable.region", "proc1", "peer1");
+    assertEquals(expectedName, metrics.getMBeanName());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/eca32aa4/hbase-consensus/src/test/java/org/apache/hadoop/hbase/consensus/quorum/TestAggregateTimer.java
----------------------------------------------------------------------
diff --git a/hbase-consensus/src/test/java/org/apache/hadoop/hbase/consensus/quorum/TestAggregateTimer.java b/hbase-consensus/src/test/java/org/apache/hadoop/hbase/consensus/quorum/TestAggregateTimer.java
new file mode 100644
index 0000000..84e53f4
--- /dev/null
+++ b/hbase-consensus/src/test/java/org/apache/hadoop/hbase/consensus/quorum/TestAggregateTimer.java
@@ -0,0 +1,221 @@
+package org.apache.hadoop.hbase.consensus.quorum;
+
+import junit.framework.Assert;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * This test checks if we can run multiple timers at the same time.
+ */
+public class TestAggregateTimer {
+  public static final Log LOG = LogFactory.getLog(TestAggregateTimer.class);
+
+  class TimeoutHandlerWithCounter implements TimeoutEventHandler {
+    public AtomicInteger counter = new AtomicInteger(0);
+
+    @Override
+    public void onTimeout() {
+      counter.incrementAndGet();
+    }
+  }
+
+  AggregateTimer aggregateTimer;
+  TimeoutHandlerWithCounter c1, c2;
+  ConstituentTimer timer1, timer2;
+  long timeout1 = 100, timeout2 = 200;
+
+
+  @Before
+  public void setUp() {
+    aggregateTimer = new AggregateTimer();
+    c1 = new TimeoutHandlerWithCounter();
+    c2 = new TimeoutHandlerWithCounter();
+
+    // Default values.
+    // timer1 is supposed to be faster than timer2.
+    timeout1 = 100;
+    timeout2 = 200;
+
+    // Create the timers
+    timer1 = aggregateTimer.createTimer(
+      "timer1",
+      timeout1,
+      TimeUnit.MILLISECONDS,
+      c1
+    );
+
+    timer2 = aggregateTimer.createTimer(
+      "timer2",
+      timeout2,
+      TimeUnit.MILLISECONDS,
+      c2
+    );
+  }
+
+  @Test
+  public void testNothingFiresWhenTimersAreNotStarted() {
+    LOG.debug("Checking for timeouts not firing when the timers weren't started");
+    // Sleep for 2 * (timeout1 + timeout2). None of them should have fired
+    try {
+      Thread.sleep(2 * (timeout1 + timeout2));
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    }
+
+    Assert.assertEquals("Timeout for Timer 1 should not have fired",
+      0, c1.counter.get());
+    Assert.assertEquals("Timeout for Timer 2 should not have fired",
+      0, c2.counter.get());
+  }
+
+  @Test
+  public void testOnlyOneTimerFiresIfOnlyOneWasStarted() {
+    // Starting the first timer thread
+    timer1.start();
+
+    LOG.debug("Checking for timeouts not firing when only one of the timers were started");
+
+    // Sleep for 2 * (timeout1 + timeout2).
+    // Only the callback for timer1 should have fired, since only timer1 was
+    // started.
+    try {
+      Thread.sleep(2 * (timeout1 + timeout2));
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    }
+
+    // Checking that the timeout fires for timer 1, and resetting the counter
+    Assert.assertTrue("Timeout for Timer 1 should have fired",
+      c1.counter.getAndSet(0) > 0);
+    Assert.assertEquals("Timeout for Timer 2 should not have fired",
+      0, c2.counter.get());
+
+  }
+
+  @Test
+  public void testTimersFireSufficiently() {
+    // Start both the timer threads
+    timer1.start();
+    timer2.start();
+
+    LOG.debug("Checking for timeouts when both timers were started");
+
+    // Sleep for 3 * (timeout1 + timeout2). Both should have fired at least
+    // (total / individual timeout - 1). Subtracting one to avoid edge cases.
+    try {
+      Thread.sleep(3 * (timeout1 + timeout2));
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    }
+
+    long totalSleepTime = (3 * (timeout1 + timeout2));
+    long targetTimeoutsForTimer1 = (totalSleepTime / timeout1) - 1;
+    long targetTimeoutsForTimer2 = (totalSleepTime / timeout2) - 1;
+    Assert.assertTrue("Timeout for Timer 1 did not fire enough",
+      c1.counter.getAndSet(0) >= targetTimeoutsForTimer1);
+    Assert.assertTrue("Timeout for Timer 2 did not fire enough",
+      c2.counter.getAndSet(0) >= targetTimeoutsForTimer2);
+  }
+
+  @Test
+  public void testResettingATimerWorks() {
+    // Start both the timer threads
+    timer1.start();
+    timer2.start();
+
+    LOG.debug("Checking timeouts when we reset one of the timers");
+    for (int i = 0; i < 100; i++) {
+      try {
+        Thread.sleep(timeout2 / 4);
+      } catch (InterruptedException e) {
+        LOG.error(e);
+        Thread.currentThread().interrupt();
+      }
+      timer2.reset();
+    }
+    LOG.debug("Done with the reset of the timers");
+
+    // As expected, timeout for timer 2 should have not fired
+    Assert.assertEquals(
+      "Timeout for Timer 2 should not be firing because we were resetting it",
+      0, c2.counter.getAndSet(0));
+
+    // Timer 1 wasn't touched, so it should still have fired
+    Assert.assertTrue(
+      "Timeout for Timer 1 should have fired, since we did not reset it",
+      c1.counter.getAndSet(0) > 0
+    );
+  }
+
+  @Test
+  public void testStartAndStopOfATimer() {
+    testTimersFireSufficiently();
+
+    // Stop timer 1 now
+    timer1.stop();
+    c1.counter.getAndSet(0);
+
+    // Sleep for 3 * (timeout1 + timeout2).
+    // Only callback for timer1 should not have fired.
+    try {
+      Thread.sleep(3 * (timeout1 + timeout2));
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    }
+
+    // As expected, timeout for timer 1 should have not fired
+    Assert.assertEquals(
+      "Timeout for Timer 1 should not be firing because we have stopped it",
+      0, c1.counter.getAndSet(0));
+
+    // Timer 2 wasn't touched, so it should still have fired
+    Assert.assertTrue(
+      "Timeout for Timer 2 should have fired, since we did not stop it",
+      c2.counter.getAndSet(0) > 0
+    );
+  }
+
+  @Test
+  public void testStopAndStartWithBackoff() {
+    // Making the second timer slower to reproduce this case.
+    timeout2 = 500;
+    timer2.setDelay(timeout2, TimeUnit.MILLISECONDS);
+
+    timer1.start();
+    timer2.start();
+
+    // Assuming right now is t=0,
+    // The event for timer2 has been queued to occur at t=timeout2,
+    // t=2*timeout2, and so on.
+
+    // Stop timer2.
+    // The old event should not be processed for some time, since timer1's
+    // event would be queued at t=timeout1, which is before t=timeout2.
+    // If we change the backoff for timer2, and start it again before the old
+    // event for timer2 is processed, the old event should be discarded.
+    LOG.debug("Stopping timer2");
+    timer2.stop();
+
+    // Set the backoff to a large value
+    timer2.backoff(timeout2 * 100, TimeUnit.MILLISECONDS);
+
+    // Now resume the timer
+    timer2.start();
+
+    try {
+      Thread.sleep(timeout2 * 10);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    }
+    Assert.assertEquals(
+      "The second timer should not have fired, since the backoff is very large",
+      0,
+      c2.counter.get()
+    );
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/eca32aa4/hbase-consensus/src/test/java/org/apache/hadoop/hbase/consensus/quorum/TestConstituentTimer.java
----------------------------------------------------------------------
diff --git a/hbase-consensus/src/test/java/org/apache/hadoop/hbase/consensus/quorum/TestConstituentTimer.java b/hbase-consensus/src/test/java/org/apache/hadoop/hbase/consensus/quorum/TestConstituentTimer.java
new file mode 100644
index 0000000..1f56536
--- /dev/null
+++ b/hbase-consensus/src/test/java/org/apache/hadoop/hbase/consensus/quorum/TestConstituentTimer.java
@@ -0,0 +1,147 @@
+package org.apache.hadoop.hbase.consensus.quorum;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+class TimeoutWithLatchForConstituentTimer implements TimeoutEventHandler {
+  private AtomicInteger count = new AtomicInteger(0);
+  private final TestConstituentTimer test;
+
+  public TimeoutWithLatchForConstituentTimer(final TestConstituentTimer test) {
+    this.test = test;
+  }
+
+  public int getCount() {
+    return count.get();
+  }
+
+  @Override
+  public void onTimeout() {
+    count.incrementAndGet();
+    test.latch.countDown();
+  }
+}
+
+/**
+ * Test that the ConstituentTimer behaves like a regular RepeatingTimer.
+ */
+public class TestConstituentTimer {
+  public CountDownLatch latch;
+  public ConstituentTimer timer;
+  public TimeoutWithLatchForConstituentTimer callback;
+
+  @Before
+  public void setUp() {
+    latch = new CountDownLatch(1);
+    callback = new TimeoutWithLatchForConstituentTimer(this);
+    timer = new AggregateTimer().createTimer("timer", 100,
+      TimeUnit.MILLISECONDS, callback);
+  }
+
+  @Test(timeout=1000)
+  public void shouldFireOnTimeout() throws InterruptedException {
+    timer.start();
+    latch.await();
+    assertEquals(1, callback.getCount());
+  }
+
+  @Test(timeout=1000)
+  public void startShouldBeIdempotent() throws InterruptedException {
+    latch = new CountDownLatch(3);
+    timer.start();
+    timer.start();
+    timer.start();
+    // Wait 50 ms to make sure the task is running.
+    assertFalse("Timeout expected", latch.await(50, TimeUnit.MILLISECONDS));
+    timer.stop();
+    // Wait several cycles to pass.
+    assertFalse("Latch should not reach zero",
+      latch.await(500, TimeUnit.MILLISECONDS));
+    assertEquals(0, callback.getCount());
+  }
+
+  @Test(timeout=1000)
+  public void shouldNotFireOnStop() throws InterruptedException {
+    timer.start();
+    latch.await(50, TimeUnit.MILLISECONDS); // Make sure the timer is running.
+    timer.stop();
+    assertFalse("Latch should not reach zero",
+      latch.await(200, TimeUnit.MILLISECONDS));
+    assertEquals(0, callback.getCount());
+  }
+
+  @Test(timeout=1000)
+  public void shouldFireRepeatedlyIfNotStopped() throws InterruptedException {
+    latch = new CountDownLatch(3);
+    timer.start();
+    assertTrue("Latch should reach zero",
+      latch.await(500, TimeUnit.MILLISECONDS));
+    assertEquals(3, callback.getCount());
+  }
+
+  @Test(timeout=1000)
+  public void shouldDelayCallbackOnReset() throws InterruptedException {
+    long begin = System.currentTimeMillis();
+    timer.start();
+    latch.await(50, TimeUnit.MILLISECONDS);
+    timer.reset();
+    latch.await();
+    long end = System.currentTimeMillis();
+    // Elapsed time should be >=150 milliseconds by now.
+    assertTrue(end - begin >= 150);
+    assertEquals(1, callback.getCount());
+  }
+
+  @Test(timeout=1000)
+  public void shouldNotFireOnShutdown() throws InterruptedException {
+    timer.start();
+    timer.shutdown();
+    assertFalse("Latch should not reach zero",
+      latch.await(200, TimeUnit.MILLISECONDS));
+    assertEquals(0, callback.getCount());
+  }
+
+  @Test(timeout=1000)
+  public void shouldRunTheUsualFollowerPattern() throws InterruptedException {
+    timer.start();
+    assertFalse("Timeout expected", latch.await(50, TimeUnit.MILLISECONDS));
+    timer.reset();
+    assertFalse("Timeout expected", latch.await(50, TimeUnit.MILLISECONDS));
+    timer.stop();
+    assertFalse("Timeout expected", latch.await(50, TimeUnit.MILLISECONDS));
+    assertEquals("onTimeout handler should not have executed yet", 0,
+      callback.getCount());
+    timer.start();
+    latch.await();
+    timer.stop();
+    synchronized (this) {
+      this.wait(300);  // Wait for three more timer cycles.
+    }
+    assertEquals(1, callback.getCount());
+  }
+
+  @Test
+  public void shouldBackoff() throws InterruptedException {
+    long delay = 200;
+    timer.setDelay(delay, TimeUnit.MILLISECONDS);
+    timer.start();
+    timer.stop();
+    timer.backoff(delay * 10, TimeUnit.MILLISECONDS);
+    timer.start();
+    synchronized (this) {
+      this.wait(delay * 5);
+    }
+    // We started and stopped the timer, followed by starting it with a large
+    // backoff again. The timer shouldn't have fired before the back off
+    // duration.
+    assertEquals(0, callback.getCount());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/eca32aa4/hbase-consensus/src/test/java/org/apache/hadoop/hbase/consensus/quorum/TestRepeatingTimer.java
----------------------------------------------------------------------
diff --git a/hbase-consensus/src/test/java/org/apache/hadoop/hbase/consensus/quorum/TestRepeatingTimer.java b/hbase-consensus/src/test/java/org/apache/hadoop/hbase/consensus/quorum/TestRepeatingTimer.java
new file mode 100644
index 0000000..0fc2b03
--- /dev/null
+++ b/hbase-consensus/src/test/java/org/apache/hadoop/hbase/consensus/quorum/TestRepeatingTimer.java
@@ -0,0 +1,143 @@
+package org.apache.hadoop.hbase.consensus.quorum;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+class TimeoutWithLatch implements TimeoutEventHandler {
+  private AtomicInteger count = new AtomicInteger(0);
+  private final TestRepeatingTimer test;
+
+  public TimeoutWithLatch(final TestRepeatingTimer test) {
+    this.test = test;
+  }
+
+  public int getCount() {
+    return count.get();
+  }
+
+  @Override
+  public void onTimeout() {
+    count.incrementAndGet();
+    test.latch.countDown();
+  }
+}
+
+public class TestRepeatingTimer {
+  public CountDownLatch latch;
+  public RepeatingTimer timer;
+  public TimeoutWithLatch callback;
+
+  @Before
+  public void setUp() {
+    latch = new CountDownLatch(1);
+    callback = new TimeoutWithLatch(this);
+    timer = new RepeatingTimer("test", 100, TimeUnit.MILLISECONDS, callback);
+  }
+
+  @Test(timeout=1000)
+  public void shouldFireOnTimeout() throws InterruptedException {
+    timer.start();
+    latch.await();
+    assertEquals(1, callback.getCount());
+  }
+
+  @Test(timeout=1000)
+  public void startShouldBeIdempotent() throws InterruptedException {
+    latch = new CountDownLatch(3);
+    timer.start();
+    timer.start();
+    timer.start();
+    // Wait 50 ms to make sure the task is running.
+    assertFalse("Timeout expected", latch.await(50, TimeUnit.MILLISECONDS));
+    timer.stop();
+    // Wait several cycles to pass.
+    assertFalse("Latch should not reach zero",
+            latch.await(500, TimeUnit.MILLISECONDS));
+    assertEquals(0, callback.getCount());
+  }
+
+  @Test(timeout=1000)
+  public void shouldNotFireOnStop() throws InterruptedException {
+    timer.start();
+    latch.await(50, TimeUnit.MILLISECONDS); // Make sure the timer is running.
+    timer.stop();
+    assertFalse("Latch should not reach zero",
+            latch.await(200, TimeUnit.MILLISECONDS));
+    assertEquals(0, callback.getCount());
+  }
+
+  @Test(timeout=1000)
+  public void shouldFireRepeatedlyIfNotStopped() throws InterruptedException {
+    latch = new CountDownLatch(3);
+    timer.start();
+    assertTrue("Latch should reach zero",
+            latch.await(500, TimeUnit.MILLISECONDS));
+    assertEquals(3, callback.getCount());
+  }
+
+  @Test(timeout=1000)
+  public void shouldDelayCallbackOnReset() throws InterruptedException {
+    long begin = System.currentTimeMillis();
+    timer.start();
+    latch.await(50, TimeUnit.MILLISECONDS);
+    timer.reset();
+    latch.await();
+    long end = System.currentTimeMillis();
+    // Elapsed time should be >150 milliseconds by now.
+    assertTrue(end - begin > 150);
+    assertEquals(1, callback.getCount());
+  }
+
+  @Test(timeout=1000)
+  public void shouldNotFireOnShutdown() throws InterruptedException {
+    timer.start();
+    timer.shutdown();
+    assertFalse("Latch should not reach zero",
+            latch.await(200, TimeUnit.MILLISECONDS));
+    assertEquals(0, callback.getCount());
+  }
+
+  @Test(timeout=1000)
+  public void shouldRunTheUsualFollowerPattern() throws InterruptedException {
+    timer.start();
+    assertFalse("Timeout expected", latch.await(50, TimeUnit.MILLISECONDS));
+    timer.reset();
+    assertFalse("Timeout expected", latch.await(50, TimeUnit.MILLISECONDS));
+    timer.stop();
+    assertFalse("Timeout expected", latch.await(50, TimeUnit.MILLISECONDS));
+    assertEquals("onTimeout handler should not have executed yet", 0,
+            callback.getCount());
+    timer.start();
+    latch.await();
+    timer.stop();
+    synchronized (this) {
+      this.wait(300);  // Wait for three more timer cycles.
+    }
+    assertEquals(1, callback.getCount());
+  }
+
+  @Test
+  public void shouldBackoff() throws InterruptedException {
+    long delay = 200;
+    timer.setDelay(delay, TimeUnit.MILLISECONDS);
+    timer.start();
+    timer.stop();
+    timer.backoff(delay * 10, TimeUnit.MILLISECONDS);
+    timer.start();
+    synchronized (this) {
+      this.wait(delay * 5);
+    }
+    // We started and stopped the timer, followed by starting it with a large
+    // backoff again. The timer shouldn't have fired before the back off
+    // duration.
+    assertEquals(0, callback.getCount());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/eca32aa4/hbase-consensus/src/test/java/org/apache/hadoop/hbase/consensus/rmap/TestParser.java
----------------------------------------------------------------------
diff --git a/hbase-consensus/src/test/java/org/apache/hadoop/hbase/consensus/rmap/TestParser.java b/hbase-consensus/src/test/java/org/apache/hadoop/hbase/consensus/rmap/TestParser.java
new file mode 100644
index 0000000..334fe09
--- /dev/null
+++ b/hbase-consensus/src/test/java/org/apache/hadoop/hbase/consensus/rmap/TestParser.java
@@ -0,0 +1,93 @@
+package org.apache.hadoop.hbase.consensus.rmap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+public class TestParser {
+  Configuration conf;
+  Parser parser;
+  JSONObject rmapAsJSON;
+
+  @Before
+  public void setUp() throws IOException, JSONException {
+    conf = HBaseConfiguration.create();
+    conf.set(HConstants.HYDRABASE_DCNAME, "DUMMYCLUSTER1");
+
+    parser = new Parser(conf);
+    rmapAsJSON = new JSONObject(new String(Files.readAllBytes(
+            Paths.get(getClass().getResource("rmap.json").getPath()))));
+  }
+
+  @Test
+  public void testParseRMap() throws IOException, JSONException {
+    List<HRegionInfo> regions = parser.parseTable("RPCBenchmarkingTable",
+            getTableObjectFromJSON("RPCBenchmarkingTable"));
+    assertEquals(3, regions.size());
+    HRegionInfo region = regions.get(0);
+    HTableDescriptor table = region.getTableDesc();
+    assertEquals("RPCBenchmarkingTable", table.getNameAsString());
+    assertFalse(table.isMetaTable());
+    assertFalse(table.isRootRegion());
+    HColumnDescriptor[] columnFamilies = table.getColumnFamilies();
+    assertEquals(1, columnFamilies.length);
+    HColumnDescriptor cf0 = columnFamilies[0];
+    assertEquals("cf", cf0.getNameAsString());
+    assertEquals("true", cf0.getValue("BLOCKCACHE"));
+    assertEquals("65536", cf0.getValue("BLOCKSIZE"));
+    assertEquals("NONE", cf0.getValue("BLOOMFILTER"));
+    assertEquals("0.01", cf0.getValue("BLOOMFILTER_ERRORRATE"));
+    assertEquals("NONE", cf0.getValue("COMPRESSION"));
+    assertEquals("NONE", cf0.getValue("DATA_BLOCK_ENCODING"));
+    assertEquals("true", cf0.getValue("ENCODE_ON_DISK"));
+    assertEquals("false", cf0.getValue("IN_MEMORY"));
+    assertEquals("0", cf0.getValue("REPLICATION_SCOPE"));
+    assertEquals("2147483647", cf0.getValue("TTL"));
+    assertEquals("2147483647", cf0.getValue("VERSIONS"));
+
+    assertEquals("aeeb54dc6fbca609443bd35796b59da5", region.getEncodedName());
+    assertEquals("", Bytes.toString(region.getStartKey()));
+    assertEquals("2aaaaaaa", Bytes.toString(region.getEndKey()));
+    assertEquals(1373324048180L, region.getRegionId());
+
+    InetSocketAddress[] favoredNodes =
+            region.getFavoredNodesMap().get("DUMMYCLUSTER1");
+    assertEquals(3, favoredNodes.length);
+    assertEquals(new InetSocketAddress("10.159.9.49", 60020), favoredNodes[0]);
+    assertEquals(new InetSocketAddress("10.159.9.45", 60020), favoredNodes[1]);
+    assertEquals(new InetSocketAddress("10.159.9.47", 60020), favoredNodes[2]);
+
+    Map<String, Map<HServerAddress, Integer>> peers = region.getPeers();
+    assertEquals(1, peers.size());
+    Map<HServerAddress, Integer> peersWithRank = region.getPeersWithRank();
+    assertEquals(3, peersWithRank.size());
+    assertEquals(new Integer(1),
+            peersWithRank.get(new HServerAddress("10.159.9.41:60020")));
+    assertEquals(new Integer(2),
+            peersWithRank.get(new HServerAddress("10.159.9.45:60020")));
+    assertEquals(new Integer(3),
+            peersWithRank.get(new HServerAddress("10.159.9.47:60020")));
+    assertEquals(peers.get("DUMMYCLUSTER1"), peersWithRank);
+
+    assertEquals(null, peersWithRank.get(new HServerAddress("1.1.1.1:11111")));
+  }
+
+  private JSONObject getTableObjectFromJSON(final String name)
+          throws JSONException {
+    return rmapAsJSON.getJSONObject("tables").getJSONObject(name);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/eca32aa4/hbase-consensus/src/test/java/org/apache/hadoop/hbase/consensus/rmap/TestRMapConfiguration.java
----------------------------------------------------------------------
diff --git a/hbase-consensus/src/test/java/org/apache/hadoop/hbase/consensus/rmap/TestRMapConfiguration.java b/hbase-consensus/src/test/java/org/apache/hadoop/hbase/consensus/rmap/TestRMapConfiguration.java
new file mode 100644
index 0000000..4c22dc1
--- /dev/null
+++ b/hbase-consensus/src/test/java/org/apache/hadoop/hbase/consensus/rmap/TestRMapConfiguration.java
@@ -0,0 +1,55 @@
+package org.apache.hadoop.hbase.consensus.rmap;
+
+import static org.junit.Assert.*;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestRMapConfiguration {
+  private Configuration conf;
+  private RMapConfiguration rmapConf;
+
+  @Before
+  public void setUp() throws Exception {
+    conf = HBaseConfiguration.create();
+    conf.set(HConstants.RMAP_SUBSCRIPTION,
+            getClass().getResource("rmap.json").toURI().toString());
+    conf.set(HConstants.HYDRABASE_DCNAME, "DUMMYCLUSTER1");
+
+    rmapConf = new RMapConfiguration(conf);
+    URI uri = RMapConfiguration.getRMapSubscription(conf);
+    rmapConf.readRMap(uri);
+    rmapConf.appliedRMap(uri);
+  }
+
+  @Test
+  public void testReadingEmptyRMapSubscription() {
+    conf.set(HConstants.RMAP_SUBSCRIPTION, "");
+    assertNull("RMap subscription should be empty",
+      rmapConf.getRMapSubscription(conf));
+  }
+
+  @Test
+  public void testReadingNonEmptyRMapSubscription()
+          throws URISyntaxException {
+    conf.set(HConstants.RMAP_SUBSCRIPTION,
+            "hbase/rmaps/map1");
+    URI expectedRMapSubscription = new URI("hbase/rmaps/map1");
+    assertEquals(expectedRMapSubscription,
+            rmapConf.getRMapSubscription(conf));
+  }
+
+  @Test
+  public void shouldApplyRMap() {
+    URI subscription = RMapConfiguration.getRMapSubscription(conf);
+    assertTrue(rmapConf.isRMapApplied(subscription));
+  }
+}


Mime
View raw message