hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ecl...@apache.org
Subject [08/19] hbase git commit: HBASE-12476 HydraBase consensus protocol
Date Tue, 25 Nov 2014 20:29:01 GMT
http://git-wip-us.apache.org/repos/asf/hbase/blob/eca32aa4/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/rmap/HDFSReader.java
----------------------------------------------------------------------
diff --git a/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/rmap/HDFSReader.java b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/rmap/HDFSReader.java
new file mode 100644
index 0000000..7d6b0f7
--- /dev/null
+++ b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/rmap/HDFSReader.java
@@ -0,0 +1,140 @@
+package org.apache.hadoop.hbase.consensus.rmap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class HDFSReader extends RMapReader {
+  protected static final Logger LOG = LoggerFactory.getLogger(HDFSReader.class);
+
+  private Configuration conf;
+
+  public HDFSReader(final Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public List<Long> getVersions(URI uri) throws IOException {
+    Path path = new Path(getSchemeAndPath(uri));
+    FileSystem fs = path.getFileSystem(conf);
+    FileStatus[] statuses = fs.globStatus(new Path(path.toString() + ".*"));
+
+    List<Long> versions = new ArrayList<>(statuses.length);
+    for (FileStatus status : statuses) {
+      long version = getVersionFromPath(status.getPath().toString());
+      if (version > 0) {
+        versions.add(version);
+      }
+    }
+    Collections.sort(versions);
+    return versions;
+  }
+
+  @Override
+  public URI resolveSymbolicVersion(URI uri) throws URISyntaxException {
+    long version = getVersion(uri);
+    String schemeAndPath = getSchemeAndPath(uri);
+
+    if (version == RMapReader.CURRENT || version == RMapReader.NEXT) {
+      Path link = new Path(String.format("%s.%s", schemeAndPath,
+              version == RMapReader.CURRENT ? "CURRENT" : "NEXT"));
+      // Resolve to an explicit version, or UNKNOWN
+      try {
+        Path target = getLinkTarget(link);
+        version = target != null ? getVersionFromPath(target.toString()) :
+                RMapReader.UNKNOWN;
+      } catch (IOException e) {
+        LOG.error("Failed to look up version from link:", e);
+        version = RMapReader.UNKNOWN;
+      }
+    }
+
+    if (version > 0) {
+      return new URI(String.format("%s?version=%d", schemeAndPath, version));
+    }
+    return new URI(schemeAndPath);
+  }
+
+  @Override
+  public String readRMapAsString(final URI uri) throws IOException {
+    // Get file status, throws IOException if the path does not exist.
+    Path path = getPathWithVersion(uri);
+    FileSystem fs = path.getFileSystem(conf);
+    FileStatus status = fs.getFileStatus(path);
+
+    long n = status.getLen();
+    if (n < 0 || n > MAX_SIZE_BYTES) {
+      throw new IOException(String.format("Invalid RMap file size " +
+              "(expected between 0 and %d but got %d bytes)",
+              MAX_SIZE_BYTES, n));
+    }
+
+    byte[] buf = new byte[(int)n];
+    FSDataInputStream stream = fs.open(path);
+    try {
+      stream.readFully(buf);
+    } finally {
+      stream.close();
+    }
+    return Bytes.toString(buf);
+  }
+
+  public Path getPathWithVersion(final URI uri) throws IOException {
+    long version = RMapReader.UNKNOWN;
+    try {
+      version = getVersion(resolveSymbolicVersion(uri));
+    } catch (URISyntaxException e) {
+      // Ignore invalid URIs and assume version UNKNOWN
+    }
+
+    if (version > 0) {
+      return new Path(String.format("%s.%d", getSchemeAndPath(uri), version));
+    }
+    return new Path(uri.toString());
+  }
+
+  private long getVersionFromPath(final String path) {
+    String[] tokens = path.split("[\\.]");
+    try {
+      return Long.parseLong(tokens[tokens.length - 1]);
+    } catch (NumberFormatException e) {
+      // Skip if token not numerical
+    }
+    return RMapReader.UNKNOWN;
+  }
+
+  private Path getLinkTarget(final Path path) throws IOException {
+    FileSystem fs = path.getFileSystem(conf);
+
+    // The getHardLinkedFiles call is a bit tricky, as it effectively returns
+    // all other paths to the inode shared with the given path. In order to
+    // guard against erroneous links, only consider those where the paths
+    // are the same, up to the version.
+    String pathWithoutVersion = path.toString().substring(0,
+            path.toString().lastIndexOf('.'));
+    /*
+TODO: FIXME: Amit: this code works with the internal hdfs. might not work with the
+OSS version.
+
+    for (String link : fs.getHardLinkedFiles(path)) {
+      if (path.toString().startsWith(pathWithoutVersion) &&
+              getVersionFromPath(link) > 0) {
+        return new Path(link);
+      }
+    }
+    */
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/eca32aa4/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/rmap/LocalReader.java
----------------------------------------------------------------------
diff --git a/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/rmap/LocalReader.java b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/rmap/LocalReader.java
new file mode 100644
index 0000000..fc1e877
--- /dev/null
+++ b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/rmap/LocalReader.java
@@ -0,0 +1,96 @@
+package org.apache.hadoop.hbase.consensus.rmap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class LocalReader extends RMapReader {
+  protected static final Logger LOG = LoggerFactory.getLogger(
+          LocalReader.class);
+
+  @Override
+  public List<Long> getVersions(final URI uri) throws IOException {
+    Path path = Paths.get(uri);
+    List<Long> versions = new ArrayList<>();
+
+    for (Path match : Files.newDirectoryStream(path.getParent(),
+            path.getFileName() + ".*")) {
+      long version = getVersionFromPath(match.toString());
+      if (version > 0) {
+        versions.add(version);
+      }
+    }
+    Collections.sort(versions);
+    return versions;
+  }
+
+  @Override
+  public URI resolveSymbolicVersion(URI uri) throws URISyntaxException {
+    long version = getVersion(uri);
+    String schemeAndPath = getSchemeAndPath(uri);
+
+    if (version == RMapReader.CURRENT || version == RMapReader.NEXT) {
+      Path link = Paths.get(String.format("%s.%s", schemeAndPath,
+              version == RMapReader.CURRENT ? "CURRENT" : "NEXT"));
+      // Resolve to an explicit version, or UNKNOWN
+      try {
+        version = getVersionFromPath(Files.readSymbolicLink(link).toString());
+      } catch (IOException e) {
+        LOG.error("Failed to look up version from link:", e);
+        version = RMapReader.UNKNOWN;
+      }
+    }
+
+    if (version > 0) {
+      return new URI(String.format("%s?version=%d", schemeAndPath, version));
+    }
+    return new URI(schemeAndPath);
+  }
+
+  @Override
+  public String readRMapAsString(final URI uri) throws IOException {
+    Path path = getPathWithVersion(uri);
+
+    long n = Files.size(path);
+    if (n < 0 || n > MAX_SIZE_BYTES) {
+      throw new IOException(String.format("Invalid RMap file size " +
+              "(expected between 0 and %d but got %d bytes)",
+              MAX_SIZE_BYTES, n));
+    }
+
+    return new String(Files.readAllBytes(path));
+  }
+
+  private long getVersionFromPath(final String path) {
+    String[] tokens = path.split("[\\.]");
+    try {
+      return Long.parseLong(tokens[tokens.length - 1]);
+    } catch (NumberFormatException e) {
+      // Skip if token not numerical
+    }
+    return RMapReader.UNKNOWN;
+  }
+
+  private Path getPathWithVersion(final URI uri) {
+    long version = RMapReader.UNKNOWN;
+    try {
+      version = getVersion(resolveSymbolicVersion(uri));
+    } catch (URISyntaxException e) {
+      // Ignore invalid URIs and assume version UNKNOWN
+    }
+
+    if (version > 0) {
+      return Paths.get(String.format("%s.%d", uri.getPath(), version));
+    }
+    return Paths.get(uri);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/eca32aa4/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/rmap/NoSuchRMapException.java
----------------------------------------------------------------------
diff --git a/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/rmap/NoSuchRMapException.java b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/rmap/NoSuchRMapException.java
new file mode 100644
index 0000000..6136063
--- /dev/null
+++ b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/rmap/NoSuchRMapException.java
@@ -0,0 +1,10 @@
+package org.apache.hadoop.hbase.consensus.rmap;
+
+import java.io.IOException;
+import java.net.URI;
+
+public class NoSuchRMapException extends IOException {
+  public NoSuchRMapException(final URI uri) {
+    super("No RMap found with URI " + uri);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/eca32aa4/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/rmap/Parser.java
----------------------------------------------------------------------
diff --git a/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/rmap/Parser.java b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/rmap/Parser.java
new file mode 100644
index 0000000..f345b1a
--- /dev/null
+++ b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/rmap/Parser.java
@@ -0,0 +1,146 @@
+package org.apache.hadoop.hbase.consensus.rmap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import java.net.InetSocketAddress;
+import java.util.*;
+
+public class Parser {
+  private Configuration conf;
+
+  public Parser(final Configuration conf) {
+    this.conf = conf;
+  }
+
+  public List<HRegionInfo> parseEncodedRMap(JSONObject encodedRMap)
+          throws JSONException {
+    List<HRegionInfo> regions = new ArrayList<>();
+    JSONObject tables = encodedRMap.getJSONObject("tables");
+
+    for (Iterator<String> names = tables.keys(); names.hasNext();) {
+      String name = names.next();
+      regions.addAll(parseTable(name, tables.getJSONObject(name)));
+    }
+
+    return regions;
+  }
+
+  public List<HRegionInfo> parseTable(String name, JSONObject table)
+          throws JSONException {
+    HTableDescriptor tableDesc = new HTableDescriptor(name);
+    List<HRegionInfo> regions = Collections.emptyList();
+    Iterator<String> keys = table.keys();
+    while (keys.hasNext()) {
+      String key = keys.next();
+      if (key.equals("families")) {
+        JSONObject families = table.getJSONObject(key);
+        Iterator<String> familyKeys = families.keys();
+        while (familyKeys.hasNext()) {
+          String familyName = familyKeys.next();
+          JSONObject familyJson = families.getJSONObject(familyName);
+          tableDesc.addFamily(parseFamily(familyName, familyJson));
+        }
+      } else if (key.equals("regions")) {
+        JSONArray regionsJson = table.getJSONArray(key);
+        int length = regionsJson.length();
+        regions = new ArrayList<>(length);
+        for (int i = 0; i < length; ++i) {
+          regions.add(parseRegion(tableDesc, regionsJson.getJSONObject(i)));
+        }
+      } else {
+        String value = table.get(key).toString();
+        tableDesc.setValue(key, value);
+      }
+    }
+    return regions;
+  }
+
+  public HColumnDescriptor parseFamily(String name, JSONObject family)
+          throws JSONException {
+    HColumnDescriptor columnDesc = new HColumnDescriptor();
+    columnDesc.setName(Bytes.toBytes(name));
+    Iterator<String> keys = family.keys();
+    while (keys.hasNext()) {
+      String key = keys.next();
+      String value = family.get(key).toString();
+      columnDesc.setValue(key, value);
+    }
+    return columnDesc;
+  }
+
+  public HRegionInfo parseRegion(HTableDescriptor table, JSONObject region)
+          throws JSONException {
+    long id = region.getLong("id");
+    byte[] startKey = Bytes.toBytes(region.getString("start_key"));
+    byte[] endKey = Bytes.toBytes(region.getString("end_key"));
+    Map<String, Map<HServerAddress, Integer>> peers = parsePeers(region
+            .getJSONObject("peers"));
+    Map<String, InetSocketAddress[]> favoredNodesMap = parseFavoredNodesMap(region
+            .getJSONObject("favored_nodes"));
+    return new HRegionInfo(table, startKey, endKey, false, id, peers,
+            favoredNodesMap);
+  }
+
+  public Map<String, Map<HServerAddress, Integer>> parsePeers(JSONObject peersJson)
+          throws JSONException {
+    Map<String, Map<HServerAddress, Integer>> peers = new LinkedHashMap<>();
+    Iterator<String> keys = peersJson.keys();
+    while (keys.hasNext()) {
+      String cellName = keys.next();
+      JSONArray peersWithRank = peersJson.getJSONArray(cellName);
+      peers.put(cellName, parsePeersWithRank(peersWithRank));
+    }
+    return peers;
+  }
+
+  public Map<HServerAddress, Integer> parsePeersWithRank(JSONArray peersJson)
+          throws JSONException {
+    Map<HServerAddress, Integer> peers = new LinkedHashMap<HServerAddress, Integer>();
+    for (int i = 0; i < peersJson.length(); ++i) {
+      String peer = peersJson.getString(i);
+      int colonIndex = peer.lastIndexOf(':');
+      peers.put(new HServerAddress(peer.substring(0, colonIndex)),
+              Integer.valueOf(peer.substring(colonIndex + 1)));
+    }
+    return peers;
+  }
+
+  Map<String, InetSocketAddress[]> parseFavoredNodesMap(JSONObject favoredNodesJson)
+          throws JSONException {
+    Iterator<String> keys = favoredNodesJson.keys();
+
+    HashMap<String, InetSocketAddress[]> favoredNodesMap = new HashMap<>();
+    while (keys.hasNext()) {
+      String cellName = keys.next();
+      JSONArray peersWithRank = favoredNodesJson.getJSONArray(cellName);
+      favoredNodesMap.put(cellName, parseFavoredNodes(peersWithRank));
+    }
+    return favoredNodesMap;
+  }
+
+  public InetSocketAddress[] parseFavoredNodes(JSONArray favoredNodesInCell)
+          throws JSONException {
+    if (favoredNodesInCell == null) {
+      return null;
+    } else {
+      int length = favoredNodesInCell.length();
+      InetSocketAddress[] favoredNodes = new InetSocketAddress[length];
+      for (int i = 0; i < length; ++i) {
+        String node = favoredNodesInCell.getString(i);
+        int colonIndex = node.lastIndexOf(':');
+        favoredNodes[i] = new InetSocketAddress(node.substring(0, colonIndex),
+                Integer.parseInt(node.substring(colonIndex + 1)));
+
+      }
+      return favoredNodes;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/eca32aa4/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/rmap/RMapConfiguration.java
----------------------------------------------------------------------
diff --git a/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/rmap/RMapConfiguration.java b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/rmap/RMapConfiguration.java
new file mode 100644
index 0000000..00306dc
--- /dev/null
+++ b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/rmap/RMapConfiguration.java
@@ -0,0 +1,330 @@
+package org.apache.hadoop.hbase.consensus.rmap;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.cli.PosixParser;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HServerAddress;
+import org.json.JSONException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+public class RMapConfiguration {
+  private static final Logger LOG = LoggerFactory.getLogger(RMapConfiguration.class);
+
+  private Configuration conf;
+
+  private Map<String, RMap> appliedRMaps;
+  private Map<URI, RMap> cachedRMaps;
+
+  public RMapConfiguration(final Configuration conf) {
+    this.conf = conf;
+    this.appliedRMaps = new HashMap<>();
+    this.cachedRMaps = new HashMap<>();
+  }
+
+  public static URI getRMapSubscription(final Configuration conf) {
+    String[] subscriptionsList =
+      conf.get(HConstants.RMAP_SUBSCRIPTION, "").split(",");
+    if (subscriptionsList.length >= 1) {
+      if (subscriptionsList.length > 1) {
+        LOG.warn(String.format("We do not support multiple RMaps. " +
+          "Using the first RMap as the correct one: %s", subscriptionsList[0]));
+      }
+      else if (!subscriptionsList[0].equals("")) {
+        try {
+          return new URI(subscriptionsList[0]);
+        } catch (URISyntaxException e) {
+          LOG.warn(String.format("Failed to parse URI for subscription %s: ",
+            subscriptionsList[0]), e);
+        }
+      }
+    }
+    return null;
+  }
+
+  public static RMapReader getRMapReader(final Configuration conf,
+          final URI uri) throws RMapException {
+    switch (uri.getScheme()) {
+      case "file":
+        return new LocalReader();
+      case "hdfs":
+        return new HDFSReader(conf);
+      default:
+        throw new RMapException("No reader found for RMap: " + uri);
+    }
+  }
+
+  public synchronized RMap getRMap(URI uri)
+      throws IOException, RMapException {
+    return getRMap(uri, false);
+  }
+
+  public synchronized RMap getRMap(URI uri, boolean reload)
+      throws IOException, RMapException {
+    try {
+      RMapReader reader = getRMapReader(conf, uri);
+      URI nonSymbolicURI = reader.resolveSymbolicVersion(uri);
+      // Try to get a cached instance of the RMap.
+      RMap rmap = cachedRMaps.get(nonSymbolicURI);
+      if (reload || rmap == null) {
+        // No cached instance was found, read it using the reader.
+        RMapJSON encodedRMap = reader.readRMap(nonSymbolicURI);
+        rmap = new RMap(encodedRMap.uri,
+            new Parser(conf).parseEncodedRMap(encodedRMap.getEncodedRMap()),
+            encodedRMap.signature);
+        cachedRMaps.put(rmap.uri, rmap);
+      }
+      return rmap;
+    } catch (URISyntaxException e) {
+      throw new RMapException("URI syntax invalid for RMap: " + uri, e);
+    } catch (JSONException e) {
+      throw new RMapException("Failed to decode JSON for RMap: " + uri, e);
+    }
+  }
+
+  /**
+   * Reads and caches the RMap from the given URI and returns its signature.
+   *
+   * @param uri
+   * @return
+   */
+  public synchronized String readRMap(final URI uri) throws IOException,
+          RMapException {
+    return getRMap(uri).signature;
+  }
+
+  public synchronized String readRMap(URI uri, boolean reload)
+      throws IOException, RMapException {
+    return getRMap(uri, reload).signature;
+  }
+
+  /**
+   * Get the list of regions which need to be updated in order to transition to
+   * this (version) of the RMap by the given URI.
+   *
+   * @param uri of the RMap
+   * @return a list of regions
+   */
+  public synchronized Collection<HRegionInfo> getTransitionDelta(final URI uri)
+          throws IOException, RMapException {
+    RMap nextRMap = getRMap(uri);
+    RMap currentRMap = appliedRMaps.get(RMapReader.getSchemeAndPath(uri));
+
+    // The standard Set implementations seem to be using compareTo() for their
+    // operations. On the HRegionInfo objects compareTo() and equals() have
+    // different properties where equals() is needed here. What follows is a
+    // poor mans Set comparison to determine which regions need to be modified
+    // to make the RMap transition.
+    if (nextRMap != null) {
+      HashMap<String, HRegionInfo> delta = new HashMap<>();
+      for (HRegionInfo next : nextRMap.regions) {
+        delta.put(next.getEncodedName(), next);
+      }
+
+      if (currentRMap != null) {
+        // Remove all regions already present in the current RMap from the
+        // delta. This should use the {@link HRegionInfo.equals} method as it
+        // should consider the favored nodes and replicas.
+        for (HRegionInfo current : currentRMap.regions) {
+          HRegionInfo next = delta.get(current.getEncodedName());
+          if (next != null) {
+            if (next.equals(current)) {
+              delta.remove(next.getEncodedName());
+            }
+          }
+        }
+      }
+
+      return delta.values();
+    }
+
+    return Collections.emptyList();
+  }
+
+  public synchronized void appliedRMap(final URI uri) throws IOException,
+          RMapException {
+    RMap previous = appliedRMaps.put(RMapReader.getSchemeAndPath(uri),
+        getRMap(uri));
+    // Purge the earlier version of the RMap from cache.
+    if (previous != null) {
+      cachedRMaps.remove(previous.uri);
+    }
+  }
+
+  public synchronized boolean isRMapApplied(final URI uri) {
+    RMap active = appliedRMaps.get(RMapReader.getSchemeAndPath(uri));
+    if (active != null) {
+      return active.uri.equals(uri);
+    }
+    return false;
+  }
+
+  public synchronized RMap getAppliedRMap(String uri) {
+    return appliedRMaps.get(uri);
+  }
+
+  public synchronized List<HRegionInfo> getRegions(final URI uri)
+          throws IOException, RMapException {
+    RMap rmap = getRMap(uri);
+    if (rmap == null) {
+      return Collections.emptyList();
+    }
+    return Collections.unmodifiableList(rmap.regions);
+  }
+
+  public synchronized void clearFromRMapCache(URI uri) {
+    cachedRMaps.remove(uri);
+  }
+
+  /**
+   * Replace the content of cached RMap. For testing only!
+   *
+   * @param uri
+   * @param rMap
+   */
+  public synchronized void cacheCustomRMap(URI uri, RMap rMap) {
+    cachedRMaps.put(uri, rMap);
+    appliedRMaps.put(uri.toString(), rMap);
+  }
+
+  public class RMap {
+    public final URI uri;
+    public final List<HRegionInfo> regions;
+    public final String signature;
+
+    RMap(final URI uri, final List<HRegionInfo> regions,
+         final String signature) {
+      this.uri = uri;
+      this.regions = regions;
+      this.signature = signature;
+    }
+
+    /**
+     * Return the quorum size in the RMap.
+     * @return
+     */
+    public int getQuorumSize() {
+      if (regions.size() == 0) {
+        return 0;
+      }
+      return regions.get(0).getQuorumInfo().getQuorumSize();
+    }
+
+    /**
+     * Return the list of regions that are served by the specified server.
+     * @param hServerAddress
+     * @return
+     */
+    public List<HRegionInfo> getRegionsForServer(HServerAddress hServerAddress) {
+      List<HRegionInfo> ret = new ArrayList<HRegionInfo>();
+      for (HRegionInfo region: regions) {
+        if (region.getPeersWithRank().containsKey(hServerAddress)) {
+          ret.add(region);
+        }
+      }
+      return ret;
+    }
+
+    /**
+     * Returns the set of servers that are hosting any of the regions in the RMap.
+     * @return
+     */
+    public Set<HServerAddress> getAllServers() {
+      Set<HServerAddress> ret = new HashSet<>();
+      for (HRegionInfo region: regions) {
+        ret.addAll(region.getPeersWithRank().keySet());
+      }
+      return ret;
+    }
+
+    /**
+     * Create a customized RMap for test use only!
+     *
+     * @param uri
+     * @param regions
+     * @param signature
+     * @return
+     */
+    public RMap createCustomizedRMap(URI uri,
+                                     List<HRegionInfo> regions,
+                                     String signature) {
+      return new RMapConfiguration.RMap(
+          uri == null ? this.uri : uri,
+          regions == null ? this.regions : regions,
+          signature == null ? this.signature : signature
+      );
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (obj == null || !(obj instanceof RMap)) {
+        return false;
+      }
+      RMap that = (RMap)obj;
+      if (this.regions == null || that.regions == null || this.regions.size() != that.regions.size()) {
+        return false;
+      }
+      Set<HRegionInfo> regionInfos = new TreeSet<>();
+      regionInfos.addAll(regions);
+      for (HRegionInfo region : that.regions) {
+        if (!regionInfos.contains(region)) {
+          return false;
+        }
+        regionInfos.remove(region);
+      }
+      return regionInfos.isEmpty();
+    }
+  }
+
+  /**
+   * Creates a temporary name for an RMap, based on the date and time.
+   * @return
+   */
+  public static String createRMapName() {
+    SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd-HHmmss");
+    return "rmap.json." + format.format(System.currentTimeMillis());
+  }
+
+  /**
+   * View information about an RMap. Currently only prints its signature.
+   * @param args
+   */
+  public static void main(String[] args) throws ParseException,
+    URISyntaxException, RMapException, IOException {
+    Options options = new Options();
+    options.addOption("r", "rmap", true, "Name of the rmap");
+
+    CommandLineParser parser = new PosixParser();
+    CommandLine cmd = parser.parse(options, args);
+
+    if (!cmd.hasOption("r")) {
+      System.out.println("Please specify the rmap with -r");
+      return;
+    }
+
+    String rmapUriStr = cmd.getOptionValue("r");
+    RMapConfiguration conf = new RMapConfiguration(new Configuration());
+    String rmapStr = conf.readRMap(new URI(rmapUriStr));
+    LOG.debug("RMap Signature: " + rmapStr);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/eca32aa4/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/rmap/RMapException.java
----------------------------------------------------------------------
diff --git a/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/rmap/RMapException.java b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/rmap/RMapException.java
new file mode 100644
index 0000000..31621ab
--- /dev/null
+++ b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/rmap/RMapException.java
@@ -0,0 +1,11 @@
+package org.apache.hadoop.hbase.consensus.rmap;
+
+public class RMapException extends Exception {
+  public RMapException(final String message) {
+    super(message);
+  }
+
+  public RMapException(final String message, final Throwable cause) {
+    super(message, cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/eca32aa4/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/rmap/RMapJSON.java
----------------------------------------------------------------------
diff --git a/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/rmap/RMapJSON.java b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/rmap/RMapJSON.java
new file mode 100644
index 0000000..6d06123
--- /dev/null
+++ b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/rmap/RMapJSON.java
@@ -0,0 +1,34 @@
+package org.apache.hadoop.hbase.consensus.rmap;
+
+import org.json.JSONObject;
+
+import java.net.URI;
+
+public class RMapJSON {
+  final URI uri;
+  final JSONObject rmap;
+  final String signature;
+
+  public RMapJSON(final URI uri, final JSONObject rmap,
+                  final String signature) {
+    this.uri = uri;
+    this.rmap = rmap;
+    this.signature = signature;
+  }
+
+  public long getVersion() {
+    return RMapReader.getVersion(uri);
+  }
+
+  public URI getURI() {
+    return uri;
+  }
+
+  public JSONObject getEncodedRMap() {
+    return rmap;
+  }
+
+  public String getSignature() {
+    return signature;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/eca32aa4/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/rmap/RMapReader.java
----------------------------------------------------------------------
diff --git a/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/rmap/RMapReader.java b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/rmap/RMapReader.java
new file mode 100644
index 0000000..dc81d34
--- /dev/null
+++ b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/rmap/RMapReader.java
@@ -0,0 +1,205 @@
+package org.apache.hadoop.hbase.consensus.rmap;
+
+import org.apache.commons.codec.binary.Hex;
+//import org.apache.hadoop.hbase.thrift.generated.Hbase;
+import org.apache.http.NameValuePair;
+import org.apache.http.client.utils.URLEncodedUtils;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.List;
+
+public abstract class RMapReader {
+  /** Max file sisze of a single file containing a RMap */
+  public static long MAX_SIZE_BYTES = 16 * 1024 * 1204; // 16 MB
+
+  /** RMap version special values */
+  public static long NEXT = -2;
+  public static long CURRENT = -1;
+  public static long UNKNOWN = 0;
+
+  /**
+   * Return a naturally sorted list of available versions of a given RMap URI.
+   *
+   * @param uri URI of the RMap
+   * @return a naturally sorted list of versions of the given RMap URI
+   * @throws IOException if an exception occurs while reading versions
+   */
+  public abstract List<Long> getVersions(final URI uri) throws IOException;
+
+  /**
+   * Resolve a URI containing a symbolic version into a URI with an absolute
+   * value which can be requested from the reader.
+   *
+   * @param uri URI containing a symbolic version
+   * @return a URI containing an absolute version
+   * @throws URISyntaxException if the given URI is malformed
+   */
+  public abstract URI resolveSymbolicVersion(final URI uri)
+          throws URISyntaxException;
+
+  /**
+   * Return the contents of the RMap at given URI as a string.
+   *
+   * @param uri URI of the RMap
+   * @return contents of the RMap as String
+   * @throws IOException if an exception occurs while reading the RMap
+   */
+  public abstract String readRMapAsString(final URI uri) throws IOException;
+
+  /**
+   * Return the version number of the RMap specified in the given URI.
+   *
+   * @param uri URI of the RMap
+   * @return the version number of the RMap or 0 if no version was found
+   */
+  public static long getVersion(final URI uri) {
+    for (NameValuePair param : URLEncodedUtils.parse(uri, "UTF-8")) {
+      if (param.getName().equals("version")) {
+        switch (param.getValue().toUpperCase()) {
+          case "NEXT":
+            return NEXT;
+          case "CURRENT":
+            return CURRENT;
+          default:
+            try {
+              return Long.parseLong(param.getValue());
+            } catch (NumberFormatException e) {
+              /* Ignore if NaN */
+            }
+        }
+      }
+    }
+    return UNKNOWN;
+  }
+
+  public static boolean isSymbolicVersion(final URI uri) {
+    return getVersion(uri) < 0;
+  }
+
+  /**
+   * Read and return a {@link RMapJSON} of the RMap at the given URI.
+   *
+   * @param uri URI of the RMap
+   * @return a JSON representation of the RMap
+   * @throws IOException if an (possible transient) exception occurs while
+   *        reading the RMap
+   * @throws RMapException if any other exception occurs while reading the RMap
+   */
+  public RMapJSON readRMap(final URI uri) throws IOException, RMapException {
+    URI nonSymbolicURI;
+    try {
+      nonSymbolicURI = resolveSymbolicVersion(uri);
+      String encodedRMap = readRMapAsString(nonSymbolicURI);
+      return new RMapJSON(nonSymbolicURI, new JSONObject(encodedRMap),
+              getSignature(encodedRMap));
+    } catch (URISyntaxException e) {
+      throw new RMapException("URI syntax invalid for RMap: " + uri, e);
+    } catch (JSONException e) {
+      throw new RMapException(
+              "Failed to decode JSON string for RMap: " + uri, e);
+    } catch (NoSuchAlgorithmException e) {
+      throw new RMapException(
+              "Failed to generate signature for RMap: " + uri, e);
+    }
+  }
+
+  /**
+   * Get a MD5 hash of the given string.
+   *
+   * @param s string to be hashed
+   * @return a hex String representation of the hash
+   * @throws NoSuchAlgorithmException if MD5 message digest is unavailable
+   */
+  public static String getSignature(final String s)
+          throws NoSuchAlgorithmException {
+    return new String(Hex.encodeHex(
+            MessageDigest.getInstance("MD5").digest(s.getBytes())));
+  }
+
+  /**
+   * Get a MD5 hash of the given string.
+   *
+   * @param s string to be hashed
+   * @return a hex String representation of the hash
+   * @throws NoSuchAlgorithmException if MD5 message digest is unavailable
+   */
+  public String getSignature(final URI uri) throws IOException, RMapException {
+    URI nonSymbolicURI;
+    try {
+      nonSymbolicURI = resolveSymbolicVersion(uri);
+      String encodedRMap = readRMapAsString(nonSymbolicURI);
+      return getSignature(encodedRMap);
+    } catch (URISyntaxException e) {
+      throw new RMapException("URI syntax invalid for RMap: " + uri, e);
+    } catch (NoSuchAlgorithmException e) {
+      throw new RMapException(
+              "Failed to generate signature for RMap: " + uri, e);
+    }
+  }
+
+  /**
+   * Get the scheme, authority (if present) and path of a given URI as a string.
+   * @param uri
+   * @return a string containing just the scheme, authority and path
+   */
+  public static String getSchemeAndPath(final URI uri) {
+    return String.format("%s:%s%s", uri.getScheme(),
+            uri.getAuthority() != null ?
+                    String.format("//%s", uri.getAuthority()) : "",
+            uri.getPath());
+  }
+
+  /**
+   * Get a versioned URI for the RMap with given scheme, path and version.
+   * @param schemeAndPath
+   * @param version
+   * @return a URI of the form [scheme]:[authority]//[path]?version=[version]
+   * @throws URISyntaxException
+   */
+  public static URI getVersionedURI(final String schemeAndPath,
+          final long version) throws URISyntaxException {
+    String token = "UNKNOWN";
+
+    if (version > 0) {
+      token = String.format("%d", version);
+    } else if (version == CURRENT) {
+      token = "CURRENT";
+    } else if (version == NEXT) {
+      token = "NEXT";
+    }
+
+    return new URI(String.format("%s?version=%s", schemeAndPath, token));
+  }
+
+  /**
+   * Get a versioned URI for the RMap with given base URI and version. If the
+   * given URI already contains a version it is overwritten by the given
+   * version.
+   * @param uri
+   * @param version
+   * @return a URI of the form [scheme]:[authority]//[path]?version=[version]
+   * @throws URISyntaxException
+   */
+  public static URI getVersionedURI(final URI uri, final long version)
+          throws URISyntaxException {
+    return getVersionedURI(getSchemeAndPath(uri), version);
+  }
+
+  public long getCurrentVersion(final String schemeAndPath)
+          throws URISyntaxException {
+    return getVersion(resolveSymbolicVersion(
+            getVersionedURI(schemeAndPath, CURRENT)));
+  }
+
+  public long getNextVersion(final String schemeAndPath)
+          throws URISyntaxException {
+    return getVersion(resolveSymbolicVersion(
+            getVersionedURI(schemeAndPath, NEXT)));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/eca32aa4/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/rmap/RegionLocator.java
----------------------------------------------------------------------
diff --git a/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/rmap/RegionLocator.java b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/rmap/RegionLocator.java
new file mode 100644
index 0000000..6dfaa57
--- /dev/null
+++ b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/rmap/RegionLocator.java
@@ -0,0 +1,142 @@
+package org.apache.hadoop.hbase.consensus.rmap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+public class RegionLocator {
+  private static final Logger LOG = LoggerFactory.getLogger(
+          RegionLocator.class);
+
+  private Configuration conf;
+
+  // regionInfoMap is a mapping from table name to region start key to
+  // HRegionInfo. This will be used in locateRegion and in turn in
+  // HConnection.locateRegion, so it needs to be thread-safe as the same
+  // HConnection can be used from multiple threads at the same time
+  ConcurrentHashMap<String, ConcurrentSkipListMap<byte[], HRegionInfo>>
+          regionInfoMap = new ConcurrentHashMap<>();
+
+  public RegionLocator(final Configuration conf) {
+    this.conf = conf;
+  }
+
+  public HRegionInfo findRegion(byte[] tableName, byte[] row) {
+    ConcurrentSkipListMap<byte[], HRegionInfo> regions =
+            regionInfoMap.get(Bytes.toString(tableName));
+    if (regions != null) {
+      Map.Entry<byte[], HRegionInfo> entry = regions.floorEntry(row);
+      if (entry != null) {
+        return entry.getValue();
+      }
+    }
+    return null;
+  }
+
+  public List<HTableDescriptor> getAllTables() {
+    List<HTableDescriptor> tables = new ArrayList<>(regionInfoMap.size());
+    for (ConcurrentSkipListMap<byte[], HRegionInfo> regionMapForTable :
+            regionInfoMap.values()) {
+      if (regionMapForTable.size() > 0) {
+        tables.add(regionMapForTable.firstEntry().getValue().getTableDesc());
+      }
+    }
+    return tables;
+  }
+
+  public List<List<HRegionInfo>> getAllRegionsGroupByTable() {
+    List<List<HRegionInfo>> regions = new ArrayList<>(regionInfoMap.size());
+    for (ConcurrentSkipListMap<byte[], HRegionInfo> regionMapForTable :
+            regionInfoMap.values()) {
+      regions.add(new ArrayList<>(regionMapForTable.values()));
+    }
+    return regions;
+  }
+
+  /**
+   * Get all servers found in the regionInfo map. This method iterates over all
+   * HRegionInfo entries and thus might be expensive.
+   *
+   * @return a set containing all servers found in the region map
+   */
+  public Set<HServerAddress> getAllServers() {
+    Set<HServerAddress> servers = new HashSet<>();
+    for (ConcurrentSkipListMap<byte[], HRegionInfo> regionMapForTable :
+            regionInfoMap.values()) {
+      for (HRegionInfo region : regionMapForTable.values()) {
+        for (HServerAddress server : region.getPeersWithRank().keySet()) {
+          servers.add(server);
+        }
+      }
+    }
+    return servers;
+  }
+
+  public List<HRegionInfo> getRegionsForTable(byte[] tableName) {
+    ConcurrentSkipListMap<byte[], HRegionInfo> regions =
+            regionInfoMap.get(Bytes.toString(tableName));
+    if (regions != null) {
+      return new ArrayList<>(regions.values());
+    } else {
+      return null;
+    }
+  }
+
+  public List<HRegionInfo> getRegionsForServer(final HServerAddress address) {
+    List<HRegionInfo> regions = new ArrayList<>();
+    for (ConcurrentSkipListMap<byte[], HRegionInfo> regionMapForTable :
+            regionInfoMap.values()) {
+      for (HRegionInfo region : regionMapForTable.values()) {
+        if (region.getPeersWithRank().containsKey(address)) {
+          regions.add(region);
+        }
+      }
+    }
+    return regions;
+  }
+
+  private void updateRegionInfoMap(final List<HRegionInfo> regions) {
+    for (HRegionInfo region : regions) {
+      String tableName = region.getTableDesc().getNameAsString();
+      ConcurrentSkipListMap<byte[], HRegionInfo> regionMapForTable
+              = regionInfoMap.get(tableName);
+      if (regionMapForTable == null) {
+        regionMapForTable = new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
+        regionInfoMap.put(tableName, regionMapForTable);
+      }
+      regionMapForTable.put(region.getStartKey(), region);
+    }
+  }
+
+  public void refresh() throws IOException, RMapException {
+    Parser parser = new Parser(conf);
+
+    URI uri = RMapConfiguration.getRMapSubscription(conf);
+    if (uri != null) {
+      RMapReader reader = RMapConfiguration.getRMapReader(conf, uri);
+
+      try {
+        JSONObject encodedRMap = reader.readRMap(uri).getEncodedRMap();
+        updateRegionInfoMap(parser.parseEncodedRMap(encodedRMap));
+      } catch (JSONException e) {
+        throw new RMapException("Failed to decode JSON for RMap: " + uri, e);
+      }
+    }
+  }
+
+  public boolean isEmpty() {
+    return regionInfoMap.isEmpty();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/eca32aa4/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/rpc/AppendRequest.java
----------------------------------------------------------------------
diff --git a/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/rpc/AppendRequest.java b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/rpc/AppendRequest.java
new file mode 100644
index 0000000..c2bd496
--- /dev/null
+++ b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/rpc/AppendRequest.java
@@ -0,0 +1,217 @@
+package org.apache.hadoop.hbase.consensus.rpc;
+
+import com.facebook.swift.codec.ThriftConstructor;
+import com.facebook.swift.codec.ThriftField;
+import com.facebook.swift.codec.ThriftStruct;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import org.apache.hadoop.hbase.consensus.util.RaftUtil;
+import org.apache.hadoop.hbase.consensus.protocol.ConsensusHost;
+import org.apache.hadoop.hbase.consensus.protocol.EditId;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+
+@ThriftStruct
+public final class AppendRequest extends Request<AppendResponse> {
+
+  private final String regionId;
+
+  private final ConsensusHost leaderId;
+
+  private final boolean isHeartBeat;
+
+  private final long commitIndex;
+
+  private final long persistedIndex;
+
+  private final EditId prevLogId;
+
+  private final List<EditId> logIds;
+
+  private final List<ByteBuffer> listOfEdits;
+
+  private boolean isTraceable = false;
+
+  private SettableFuture<AppendResponse> response;
+
+  @ThriftConstructor
+  public AppendRequest(
+      @ThriftField(1) final String regionId,
+      @ThriftField(2) final ConsensusHost id,
+      @ThriftField(3) final boolean isHeartBeat,
+      @ThriftField(4) final long commitIndex,
+      @ThriftField(5) final long persistedIndex,
+      @ThriftField(6) final EditId prevLogId,
+      @ThriftField(7) final List<EditId> logIds,
+      @ThriftField(8) final List<ByteBuffer> listOfEdits
+  ) {
+    this.regionId = regionId;
+    this.leaderId = id;
+    this.isHeartBeat = isHeartBeat;
+    this.commitIndex = commitIndex;
+    this.persistedIndex = persistedIndex;
+    this.prevLogId = prevLogId;
+    this.logIds = logIds;
+    this.listOfEdits = listOfEdits;
+    assert logIds.size() == listOfEdits.size();
+  }
+
+  public AppendRequest(final AppendRequest r) {
+    this.regionId = r.regionId;
+    this.leaderId = r.leaderId;
+    this.isHeartBeat = r.isHeartBeat;
+    this.commitIndex = r.commitIndex;
+    this.persistedIndex = r.persistedIndex;
+    this.prevLogId = r.prevLogId;
+    this.logIds = r.logIds;
+    this.listOfEdits = r.listOfEdits;
+  }
+
+  @ThriftField(1)
+  public String getRegionId() {
+    return regionId;
+  }
+
+  @ThriftField(2)
+  public ConsensusHost getLeaderId() {
+    return leaderId;
+  }
+
+  @ThriftField(3)
+  public boolean isHeartBeat() {
+    return this.isHeartBeat;
+  }
+
+  @ThriftField(4)
+  public long getCommitIndex() {
+    return commitIndex;
+  }
+
+  @ThriftField(5)
+  public long getPersistedIndex() {
+    return persistedIndex;
+  }
+
+  @ThriftField(6)
+  public EditId getPrevLogId() {
+    return prevLogId;
+  }
+
+  @ThriftField(7)
+  public List<EditId> getLogIds() {
+    return logIds;
+  }
+
+  @ThriftField(8)
+  public List<ByteBuffer> getListOfEdits() {
+    return listOfEdits;
+  }
+
+  public EditId getLogId(int index) {
+    return logIds.get(index);
+  }
+
+  public ByteBuffer getEdit(int index) {
+    return listOfEdits.get(index);
+  }
+
+  public void createAppendResponse() {
+    if (response == null) {
+      response = SettableFuture.create();
+    }
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb = sb.append("AppendRequest{")
+      .append("region = ").append(regionId)
+      .append(", address = ").append(leaderId)
+      .append(", heartbeat = ").append(isHeartBeat)
+      .append(", commit index = ").append(commitIndex)
+      .append(", persisted index = ").append(persistedIndex)
+      .append(", prev log edit = ").append(prevLogId);
+
+    sb.append(", current edit logs = ")
+      .append(RaftUtil.listToString(logIds));
+
+    if (listOfEdits != null) {
+      sb.append(", edit sizes = [");
+
+      for (int i=0; i < listOfEdits.size(); i++) {
+        if (i > 0) {
+          sb.append(", ");
+        }
+        if (listOfEdits.get(i) != null) {
+          sb.append("" + listOfEdits.get(i));
+        }
+      }
+      sb.append("] ");
+    }
+
+    sb.append('}');
+    return sb.toString();
+  }
+
+  public ListenableFuture<AppendResponse> getResponse() {
+    return response;
+  }
+
+  public void setResponse(AppendResponse r) {
+    if (response != null) {
+      response.set(r);
+    }
+  }
+
+  public void setError(final Throwable exception) {
+    if (response != null) {
+      response.setException(exception);
+    }
+  }
+
+  public boolean validateFields() {
+    assert getLogIds() != null;
+    assert getListOfEdits() != null;
+    assert getLogIds().size() == getListOfEdits().size();
+    return true;
+  }
+
+  public int logCount() {
+    assert validateFields();
+    return getLogIds().size();
+  }
+
+  public static AppendRequest createSingleAppendRequest(
+    final String regionId,
+    final ConsensusHost id,
+    final EditId logId,
+    final EditId prevLogId,
+    final long commitIndex,
+    final long persistedIndex,
+    final boolean isHeartBeat,
+    final ByteBuffer edits) {
+    return new AppendRequest(
+      regionId,
+      id,
+      isHeartBeat,
+      commitIndex,
+      persistedIndex,
+      prevLogId,
+      Arrays.asList(logId),
+      Arrays.asList(edits));
+  }
+
+  public void enableTraceable() {
+    this.isTraceable = true;
+  }
+
+  public void disableTraceable() {
+    this.isTraceable = false;
+  }
+
+  public boolean isTraceable() {
+    return this.isTraceable;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/eca32aa4/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/rpc/AppendResponse.java
----------------------------------------------------------------------
diff --git a/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/rpc/AppendResponse.java b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/rpc/AppendResponse.java
new file mode 100644
index 0000000..8e7c1e0
--- /dev/null
+++ b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/rpc/AppendResponse.java
@@ -0,0 +1,94 @@
+package org.apache.hadoop.hbase.consensus.rpc;
+
+import com.facebook.swift.codec.ThriftConstructor;
+import com.facebook.swift.codec.ThriftField;
+import com.facebook.swift.codec.ThriftStruct;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.hadoop.hbase.consensus.protocol.EditId;
+
+@ThriftStruct
+public final class AppendResponse {
+
+  public enum Result
+  {
+    SUCCESS, HIGHER_TERM, LAGGING, MISSING_EDITS
+  }
+
+  private final String address;
+
+  /** The identifier to associate the AppendRequest and AppendResponse. */
+  private final EditId id;
+
+  private final EditId prevEditID;
+
+  private final Result result;
+
+  private final int rank;
+
+  private final boolean canTakeover;
+
+  @ThriftConstructor
+  public AppendResponse (String address,
+                         final EditId id,
+                         final EditId prevEditID,
+                         Result result,
+                         int rank,
+                         boolean canTakeover) {
+    this.address = address;
+    this.id = id;
+    this.prevEditID = prevEditID;
+    this.result = result;
+    this.rank = rank;
+    this.canTakeover = canTakeover;
+  }
+
+  @ThriftField(1)
+  public String getAddress() {
+    return address;
+  }
+
+  /**
+   *
+   * @return the identifier to associate the AppendRequest and AppendResponse.
+   */
+  @ThriftField(2)
+  public EditId getId() {
+    return id;
+  }
+
+  @ThriftField(3)
+  public EditId getPrevEditID() {
+    return prevEditID;
+  }
+
+  @ThriftField(4)
+  public Result getResult() {
+    return result;
+  }
+
+  @ThriftField(5)
+  public int getRank() {
+    return rank;
+  }
+
+  @ThriftField(6)
+  public boolean canTakeover() {
+    return canTakeover;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder s = new StringBuilder();
+
+    s.append("AppendResponse{")
+      .append("address= ").append(address)
+      .append(", id=").append(id)
+      .append(", prevEditID=").append(prevEditID)
+      .append(", result=").append(result)
+      .append(", rank=").append(rank)
+      .append(", canTakeOver=").append(canTakeover)
+      .append('}');
+
+    return s.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/eca32aa4/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/rpc/LogState.java
----------------------------------------------------------------------
diff --git a/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/rpc/LogState.java b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/rpc/LogState.java
new file mode 100644
index 0000000..0d2dae3
--- /dev/null
+++ b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/rpc/LogState.java
@@ -0,0 +1,151 @@
+package org.apache.hadoop.hbase.consensus.rpc;
+
+import com.facebook.swift.codec.ThriftConstructor;
+import com.facebook.swift.codec.ThriftField;
+import com.facebook.swift.codec.ThriftStruct;
+import org.apache.hadoop.hbase.consensus.log.LogFileInfo;
+import org.apache.hadoop.hbase.consensus.log.SeedLogFile;
+import org.apache.hadoop.hbase.consensus.protocol.EditId;
+import org.apache.hadoop.hbase.util.Pair;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+@ThriftStruct
+public final class LogState {
+  private List<LogFileInfo> committedLogFiles;
+  private List<LogFileInfo> uncommittedLogFiles;
+  private String peerState;
+  private EditId lastCommittedEdit;
+
+  // Used to pass error message. If not null, other fields are considered
+  // invalid and should be discarded.
+  private String errMsg;
+
+  public LogState(String errMsg) {
+    this.errMsg = errMsg;
+    this.peerState = null;
+    lastCommittedEdit = null;
+
+    if (this.errMsg == null) {
+      this.committedLogFiles = new ArrayList<>();
+      this.uncommittedLogFiles = new ArrayList<>();
+    }
+  }
+
+  @ThriftConstructor
+  public LogState(
+      @ThriftField(1) List<LogFileInfo> committedLogFiles,
+      @ThriftField(2) List<LogFileInfo> uncommittedLogFiles,
+      @ThriftField(3) String peerState,
+      @ThriftField(4) String errMsg,
+      @ThriftField(5) EditId lastCommittedEdit) {
+    this.committedLogFiles = committedLogFiles;
+    this.uncommittedLogFiles = uncommittedLogFiles;
+    this.peerState = peerState;
+    this.errMsg = errMsg;
+    this.lastCommittedEdit = lastCommittedEdit;
+  }
+
+  @ThriftField(1)
+  public List<LogFileInfo> getCommittedLogFiles() {
+    return this.committedLogFiles;
+  }
+
+  @ThriftField(2)
+  public List<LogFileInfo> getUncommittedLogFiles() {
+    return this.uncommittedLogFiles;
+  }
+
+  @ThriftField(3)
+  public String getPeerState() {
+    return this.peerState;
+  }
+
+  @ThriftField(4)
+  public String getErrMsg() {
+    return this.errMsg;
+  }
+
+  @ThriftField(5)
+  public EditId getLastCommittedEdit() {
+    return this.lastCommittedEdit;
+  }
+
+  public void setPeerState(String peerState) {
+    this.peerState = peerState;
+  }
+
+  public void addCommittedLogFile(LogFileInfo info) {
+    this.committedLogFiles.add(info);
+  }
+
+  public void addUncommittedLogFile(LogFileInfo info) {
+    this.uncommittedLogFiles.add(info);
+  }
+
+  public void setErrMsg(String errMsg) {
+    this.errMsg = errMsg;
+  }
+
+  public void setLastCommittedEdit(EditId lastCommittedEdit) {
+    this.lastCommittedEdit = lastCommittedEdit;
+  }
+
+  public boolean isErrorState() {
+    return errMsg != null;
+  }
+
+  public void sortLogFiles() {
+    if (committedLogFiles != null && !committedLogFiles.isEmpty()) {
+      Collections.sort(committedLogFiles);
+    }
+    if (uncommittedLogFiles != null && !uncommittedLogFiles.isEmpty()) {
+      Collections.sort(uncommittedLogFiles);
+    }
+  }
+
+  private Pair<Long, Long> getIndexRange(List<LogFileInfo> logFiles) {
+    if (logFiles == null || logFiles.isEmpty()) {
+      return null;
+    }
+    Long startIndex = logFiles.get(0).getInitialIndex();
+    Long lastIndex = logFiles.get(logFiles.size() - 1).getLastIndex();
+    return new Pair<>(startIndex, lastIndex);
+  }
+
+  @Override
+  public String toString() {
+    if (errMsg != null) {
+      return errMsg;
+    }
+
+    sortLogFiles();
+
+    StringBuilder sb = new StringBuilder();
+    sb.append("{ ");
+
+    Pair<Long, Long> uncommittedRange = getIndexRange(uncommittedLogFiles);
+    if (uncommittedRange != null) {
+      sb.append("Uncommitted [").append(uncommittedRange.getFirst())
+          .append(", ").append(uncommittedRange.getSecond()).append("] ");
+    }
+
+    Pair<Long, Long> committedRange = getIndexRange(committedLogFiles);
+    if (committedRange != null) {
+      sb.append("Committed [").append(committedRange.getFirst())
+          .append(", ").append(committedRange.getSecond()).append("] ");
+    }
+    for (LogFileInfo info : committedLogFiles) {
+      if (SeedLogFile.isSeedFile(info.getFilename())) {
+        sb.append("Seed File [").append(info.getInitialIndex()).append(", ")
+            .append(info.getLastIndex()).append("]");
+      }
+    }
+    sb.append(" } ; {Peers: ");
+    sb.append(peerState);
+    sb.append(" }");
+    return sb.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/eca32aa4/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/rpc/PeerStatus.java
----------------------------------------------------------------------
diff --git a/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/rpc/PeerStatus.java b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/rpc/PeerStatus.java
new file mode 100644
index 0000000..cbd28ce
--- /dev/null
+++ b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/rpc/PeerStatus.java
@@ -0,0 +1,107 @@
+package org.apache.hadoop.hbase.consensus.rpc;
+
+import com.facebook.swift.codec.ThriftConstructor;
+import com.facebook.swift.codec.ThriftEnum;
+import com.facebook.swift.codec.ThriftField;
+import com.facebook.swift.codec.ThriftStruct;
+import org.apache.hadoop.hbase.regionserver.DataStoreState;
+
+import javax.annotation.concurrent.Immutable;
+
+@Immutable
+@ThriftStruct
+public final class PeerStatus implements Comparable<PeerStatus> {
+
+  @ThriftEnum
+  public static enum RAFT_STATE {
+    INVALID,
+    LEADER,
+    FOLLOWER,
+    CANDIDATE,
+    HALT
+  }
+
+  private final String id;
+  private final int rank;
+  private final long term;
+  private final RAFT_STATE paxosState;
+  private final LogState logState;
+  private final String metrics;
+  private final DataStoreState dataStoreState;
+
+  private String peerAddress;
+
+  @ThriftConstructor
+  public PeerStatus(
+    @ThriftField(1) final String id,
+    @ThriftField(2) final int rank,
+    @ThriftField(3) final long term,
+    @ThriftField(4) final RAFT_STATE paxosState,
+    @ThriftField(5) final LogState logState,
+    @ThriftField(6) final String metrics,
+    @ThriftField(7) final DataStoreState dataStoreState) {
+    this.id = id;
+    this.rank = rank;
+    this.term = term;
+    this.paxosState = paxosState;
+    this.logState = logState;
+    this.metrics = metrics;
+    this.dataStoreState = dataStoreState;
+  }
+
+  @ThriftField(1)
+  public String getId() {
+    return id;
+  }
+
+  @ThriftField(2)
+  public int getRank() {
+    return rank;
+  }
+
+  @ThriftField(3)
+  public long getTerm() {
+    return term;
+  }
+
+  @ThriftField(4)
+  public RAFT_STATE getPaxosState() {
+    return paxosState;
+  }
+
+  @ThriftField(5)
+  public LogState getLogState() {
+    return logState;
+  }
+
+  @ThriftField(6)
+  public String getMetrics() {
+    return metrics;
+  }
+
+  @ThriftField(7)
+  public DataStoreState getDataStoreState() {
+    return dataStoreState;
+  }
+
+  public String getPeerAddress() {
+    return peerAddress;
+  }
+
+  public void setPeerAddress(String peerAddress) {
+    this.peerAddress = peerAddress;
+  }
+
+  @Override
+  public String toString() {
+    return "Peer : " + peerAddress + " {" + "id=" + id + "-" + rank
+           + "term=" + term + ", " + "paxosState=" + paxosState + ", "
+           + "logState=" + logState + ", " + "dataStoreState=" + dataStoreState
+           + "}";
+  }
+
+  @Override
+  public int compareTo(PeerStatus peer) {
+    return Integer.compare(this.rank, peer.getRank());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/eca32aa4/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/rpc/Request.java
----------------------------------------------------------------------
diff --git a/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/rpc/Request.java b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/rpc/Request.java
new file mode 100644
index 0000000..a90af7b
--- /dev/null
+++ b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/rpc/Request.java
@@ -0,0 +1,8 @@
+package org.apache.hadoop.hbase.consensus.rpc;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+public abstract class Request<T> {
+  public abstract void setResponse(final T response);
+  public abstract ListenableFuture<T> getResponse();
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/eca32aa4/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/rpc/VoteRequest.java
----------------------------------------------------------------------
diff --git a/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/rpc/VoteRequest.java b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/rpc/VoteRequest.java
new file mode 100644
index 0000000..bb7660a
--- /dev/null
+++ b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/rpc/VoteRequest.java
@@ -0,0 +1,94 @@
+package org.apache.hadoop.hbase.consensus.rpc;
+
+import javax.annotation.concurrent.Immutable;
+
+import com.facebook.swift.codec.ThriftConstructor;
+import com.facebook.swift.codec.ThriftField;
+import com.facebook.swift.codec.ThriftStruct;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import org.apache.hadoop.hbase.consensus.protocol.EditId;
+
+@Immutable
+@ThriftStruct
+public final class VoteRequest extends Request<VoteResponse> {
+
+  private final String regionId;
+  private final String address;
+  private final long term;
+  private final EditId prevEditID;
+
+  private SettableFuture<VoteResponse> response;
+
+  @ThriftConstructor
+  public VoteRequest(
+      @ThriftField(1)final String regionId,
+      @ThriftField(2)final String address,
+      @ThriftField(3)final long term,
+      @ThriftField(4)final EditId prevEditID) {
+    this.regionId = regionId;
+    this.address = address;
+    this.term = term;
+    this.prevEditID = prevEditID;
+  }
+
+  public VoteRequest(final VoteRequest r) {
+    this.regionId = r.regionId;
+    this.address = r.address;
+    this.term = r.term;
+    this.prevEditID = r.prevEditID;
+  }
+
+  @ThriftField(1)
+  public String getRegionId() {
+    return regionId;
+  }
+
+  @ThriftField(2)
+  public String getAddress() {
+    return address;
+  }
+
+  @ThriftField(3)
+  public long getTerm() {
+    return term;
+  }
+
+  @ThriftField(4)
+  public EditId getPrevEditID() {
+    return prevEditID;
+  }
+  
+  @Override
+  public String toString() {
+    return "VoteRequest{" +
+        "region=" + regionId +
+        ", address='" + address + '\'' +
+        ", term=" + term +
+        ", prevEditID=" + prevEditID +
+        '}';
+  }
+
+  public void createVoteResponse() {
+    if (response == null) {
+      response = SettableFuture.create();
+    }
+  }
+
+  public void setResponse(VoteResponse r) {
+    if (response != null) {
+      response.set(r);
+    }
+  }
+
+  @Override
+  public ListenableFuture<VoteResponse> getResponse() {
+    return response;
+  }
+
+  public void setError(final Throwable exception) {
+    if (response != null) {
+      response.setException(exception);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/eca32aa4/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/rpc/VoteResponse.java
----------------------------------------------------------------------
diff --git a/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/rpc/VoteResponse.java b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/rpc/VoteResponse.java
new file mode 100644
index 0000000..3986f3c
--- /dev/null
+++ b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/rpc/VoteResponse.java
@@ -0,0 +1,61 @@
+package org.apache.hadoop.hbase.consensus.rpc;
+
+import com.facebook.swift.codec.ThriftConstructor;
+import com.facebook.swift.codec.ThriftField;
+import com.facebook.swift.codec.ThriftStruct;
+
+@ThriftStruct
+public final class VoteResponse {
+  final String address;
+  final long term;
+  final VoteResult voteResult;
+
+  @ThriftStruct
+  public enum VoteResult {
+    SUCCESS,
+    FAILURE,
+    WRONGQUORUM
+  };
+
+  @ThriftConstructor
+  public VoteResponse(
+      @ThriftField(1) final String address,
+      @ThriftField(2) final long term,
+      @ThriftField(3) final VoteResult voteResult) {
+    this.address = address;
+    this.term = term;
+    this.voteResult = voteResult;
+  }
+
+  @ThriftField(1)
+  public String getAddress() {
+    return address;
+  }
+
+  @ThriftField(2)
+  public long getTerm() {
+    return term;
+  }
+
+  @ThriftField(3)
+  public VoteResult voteResult() {
+    return voteResult;
+  }
+
+  public boolean isSuccess() {
+    return voteResult.equals(VoteResult.SUCCESS);
+  }
+
+  public boolean isWrongQuorum() {
+    return voteResult.equals(VoteResult.WRONGQUORUM);
+  }
+
+  @Override
+  public String toString() {
+    return "VoteResponse{" +
+        "address=" + address +
+        " term=" + term +
+        ", voteResult=" + voteResult +
+        '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/eca32aa4/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/server/ConsensusService.java
----------------------------------------------------------------------
diff --git a/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/server/ConsensusService.java b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/server/ConsensusService.java
new file mode 100644
index 0000000..e061cc3
--- /dev/null
+++ b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/server/ConsensusService.java
@@ -0,0 +1,70 @@
+package org.apache.hadoop.hbase.consensus.server;
+
+import com.facebook.swift.service.ThriftException;
+import com.facebook.swift.service.ThriftMethod;
+import com.facebook.swift.service.ThriftService;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.hadoop.hbase.consensus.client.FetchTask;
+import org.apache.hadoop.hbase.consensus.log.LogFileInfo;
+import org.apache.hadoop.hbase.consensus.quorum.RaftQuorumContext;
+import org.apache.hadoop.hbase.consensus.rpc.AppendRequest;
+import org.apache.hadoop.hbase.consensus.rpc.AppendResponse;
+import org.apache.hadoop.hbase.consensus.rpc.PeerStatus;
+import org.apache.hadoop.hbase.consensus.rpc.VoteRequest;
+import org.apache.hadoop.hbase.consensus.rpc.VoteResponse;
+import org.apache.hadoop.hbase.ipc.thrift.exceptions.ThriftHBaseException;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+@ThriftService
+public interface ConsensusService extends AutoCloseable {
+
+  @ThriftMethod(value = "appendEntries", exception = {
+      @ThriftException(type = ThriftHBaseException.class, id = 1) })
+  ListenableFuture<AppendResponse> appendEntries(AppendRequest appendRequest);
+
+  @ThriftMethod(value = "requestVote", exception = {
+      @ThriftException(type = ThriftHBaseException.class, id = 1) })
+  ListenableFuture<VoteResponse> requestVote(VoteRequest request);
+
+  @ThriftMethod
+  ListenableFuture<PeerStatus> getPeerStatus(String quorumName);
+
+  @ThriftMethod(value = "replicateCommit", exception = {
+        @ThriftException(type = ThriftHBaseException.class, id = 1) })
+  ListenableFuture<Long> replicateCommit(String regionId, List<WALEdit> txns)
+    throws ThriftHBaseException;
+
+  @ThriftMethod
+  ListenableFuture<Boolean> changeQuorum(String regionId, final ByteBuffer config);
+
+  @ThriftMethod
+  ListenableFuture<String> getLeader(String regionId);
+
+  @ThriftMethod
+  ListenableFuture<List<LogFileInfo>> getCommittedLogStatus(String quorumName,
+                                                            long minIndex);
+
+  @ThriftMethod
+  // TODO @gauravm
+  // Remove?
+  ListenableFuture<Void> fetchLogs(List<FetchTask> tasks, String regionId);
+
+  @ThriftMethod
+  ListenableFuture<List<PeerStatus>> getAllPeerStatuses();
+
+  ImmutableMap<String, RaftQuorumContext> getQuorumContextMapSnapshot();
+
+  RaftQuorumContext getRaftQuorumContext(String regionId);
+
+  RaftQuorumContext addRaftQuorumContext(final RaftQuorumContext c);
+
+  // TODO @gauravm
+  // Remove?
+  boolean removeRaftQuorumContext(final String regionName);
+
+  void stopService();
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/eca32aa4/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/server/ConsensusServiceImpl.java
----------------------------------------------------------------------
diff --git a/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/server/ConsensusServiceImpl.java b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/server/ConsensusServiceImpl.java
new file mode 100644
index 0000000..6bb9eac
--- /dev/null
+++ b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/server/ConsensusServiceImpl.java
@@ -0,0 +1,248 @@
+package org.apache.hadoop.hbase.consensus.server;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.consensus.client.FetchTask;
+import org.apache.hadoop.hbase.consensus.log.LogFileInfo;
+import org.apache.hadoop.hbase.consensus.log.RemoteLogFetcher;
+import org.apache.hadoop.hbase.consensus.protocol.ConsensusHost;
+import org.apache.hadoop.hbase.consensus.quorum.QuorumAgent;
+import org.apache.hadoop.hbase.consensus.quorum.QuorumInfo;
+import org.apache.hadoop.hbase.consensus.quorum.QuorumMembershipChangeRequest;
+import org.apache.hadoop.hbase.consensus.quorum.RaftQuorumContext;
+import org.apache.hadoop.hbase.consensus.raft.events.AppendRequestEvent;
+import org.apache.hadoop.hbase.consensus.raft.events.QuorumMembershipChangeEvent;
+import org.apache.hadoop.hbase.consensus.raft.events.VoteRequestEvent;
+import org.apache.hadoop.hbase.consensus.rpc.AppendRequest;
+import org.apache.hadoop.hbase.consensus.rpc.AppendResponse;
+import org.apache.hadoop.hbase.consensus.rpc.LogState;
+import org.apache.hadoop.hbase.consensus.rpc.PeerStatus;
+import org.apache.hadoop.hbase.consensus.rpc.VoteRequest;
+import org.apache.hadoop.hbase.consensus.rpc.VoteResponse;
+import org.apache.hadoop.hbase.ipc.thrift.exceptions.ThriftHBaseException;
+import org.apache.hadoop.hbase.regionserver.DataStoreState;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class ConsensusServiceImpl implements ConsensusService {
+  private static final Logger LOG = LoggerFactory.getLogger(
+    ConsensusServiceImpl.class);
+
+  /** TODO: replace this with HRegionServer.getRegions() API once we
+   *  integrate HBase with the protocol
+   */
+  private ConcurrentHashMap<String, RaftQuorumContext> quorumContextMap = new ConcurrentHashMap<>();
+
+  protected ConsensusServiceImpl() {}
+
+  public static ConsensusService createConsensusServiceImpl() {
+    return new ConsensusServiceImpl();
+  }
+
+  public static ConsensusService createTestConsensusServiceImpl() {
+    return new InstrumentedConsensusServiceImpl();
+  }
+
+  @Override
+  public ListenableFuture<AppendResponse> appendEntries(AppendRequest appendRequest) {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Receiving " + appendRequest);
+    }
+    boolean success = false;
+
+    appendRequest.createAppendResponse();
+
+    RaftQuorumContext c = getRaftQuorumContext(appendRequest.getRegionId());
+    if (c != null) {
+      // Add an event for the correct state machine
+      success = c.offerEvent(new AppendRequestEvent(appendRequest));
+    }
+
+    if (!success) {
+      appendRequest.setError(new ThriftHBaseException(new Exception(
+          "Unable to complete AppendEntries: " + appendRequest)));
+    }
+    return appendRequest.getResponse();
+  }
+
+  @Override
+  public ListenableFuture<VoteResponse> requestVote(VoteRequest voteRequest) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Receiving " + voteRequest);
+    }
+
+    voteRequest.createVoteResponse();
+
+    RaftQuorumContext c = getRaftQuorumContext(voteRequest.getRegionId());
+
+    boolean success = false;
+    if (c != null) {
+      // Add an event for the correct state machine
+      success = c.offerEvent(new VoteRequestEvent(voteRequest));
+    } else {
+      LOG.error("There is no such region " + voteRequest.getRegionId() +
+        " in the current quorum server");
+    }
+
+    if (!success) {
+      voteRequest.setError(new ThriftHBaseException(
+          new Exception("Unable to complete Vote Request: " + voteRequest)));
+    }
+
+    return voteRequest.getResponse();
+  }
+
+  @Override
+  public ListenableFuture<Long> replicateCommit(String regionId, List<WALEdit> txns) throws ThriftHBaseException {
+    RaftQuorumContext c = getRaftQuorumContext(regionId);
+    if (c != null) {
+      QuorumAgent agent = c.getQuorumAgentInstance();
+      try {
+        return agent.asyncAppend(txns);
+      } catch (IOException e) {
+        throw new ThriftHBaseException(e);
+      }
+    } else {
+      Exception e = new Exception("Unable to find the " +
+              "region information for " + regionId);
+      LOG.error(e.getMessage());
+      return Futures.immediateFailedFuture(new ThriftHBaseException(e));
+    }
+  }
+
+  @Override
+  public ListenableFuture<Boolean> changeQuorum(String regionId, ByteBuffer config) {
+
+    RaftQuorumContext c = getRaftQuorumContext(regionId);
+
+    if (c != null) {
+      List<QuorumInfo> configs = QuorumInfo.deserializeFromByteBuffer(config);
+      if (configs == null || configs.size() != 1) {
+        return Futures.immediateCheckedFuture(false);
+      }
+      // Make sure the config request change is really needed.
+      if (configs.get(0).equals(c.getQuorumInfo())) {
+        return Futures.immediateCheckedFuture(true);
+      }
+
+      final QuorumMembershipChangeRequest request =
+        new QuorumMembershipChangeRequest(configs.get(0));
+      // Add an event for the correct state machine
+      boolean success = c.offerEvent(new QuorumMembershipChangeEvent(request));
+      if (!success) {
+        return Futures.immediateCheckedFuture(false);
+      }
+      return request;
+    } else {
+      LOG.debug("Ignored " + config);
+    }
+
+    return Futures.immediateCheckedFuture(false);
+  }
+
+  @Override
+  public ListenableFuture<String> getLeader(String regionId) {
+    RaftQuorumContext c = getRaftQuorumContext(regionId);
+    String leader = "";
+    if (c != null) {
+      ConsensusHost host = c.getLeader();
+      if (host != null) {
+        leader = host.getHostId();
+      }
+    }
+    if (leader.isEmpty() && LOG.isTraceEnabled()) {
+      LOG.trace("Leader is unknown for " + regionId);
+    }
+    return Futures.immediateFuture(leader);
+  }
+
+  @Override
+  public ListenableFuture<PeerStatus> getPeerStatus(String quorum) {
+    RaftQuorumContext c = getRaftQuorumContext(quorum);
+
+    if (c != null) {
+     return Futures.immediateFuture(c.getStatus());
+    } else {
+      return Futures.immediateFuture(new PeerStatus(quorum, -1, HConstants.UNDEFINED_TERM_INDEX,
+        PeerStatus.RAFT_STATE.INVALID, new LogState("Wrong Quorum"), "Error",
+        new DataStoreState("Wrong Quorum")));
+    }
+  }
+
+  @Override
+  public ListenableFuture<List<LogFileInfo>> getCommittedLogStatus(String quorumName, long minIndex) {
+    RaftQuorumContext c = getRaftQuorumContext(quorumName);
+    List<LogFileInfo> info = null;
+
+    if (c != null) {
+      info = c.getCommittedLogStatus(minIndex);
+    } else {
+      info = new ArrayList<>();
+    }
+
+    return Futures.immediateFuture(info);
+  }
+
+  @Override
+  public void stopService() {
+    LOG.info("Clear all the RaftQuorum from the quorumContextMap !");
+    // stop all the context before clearing
+    for (RaftQuorumContext context : quorumContextMap.values()) {
+      context.stop(true);
+    }
+
+    this.quorumContextMap.clear();
+  }
+
+  @Override
+  public ImmutableMap<String, RaftQuorumContext> getQuorumContextMapSnapshot() {
+    return ImmutableMap.copyOf(this.quorumContextMap);
+  }
+
+  @Override
+  public RaftQuorumContext addRaftQuorumContext(final RaftQuorumContext c) {
+    LOG.info("Adding RaftQuorumContext " + c + " to the quorumContextMap");
+    return this.quorumContextMap.put(c.getQuorumName(), c);
+  }
+
+  @Override
+  public boolean removeRaftQuorumContext(String regionName) {
+    LOG.info("Removing quorum for the region " + regionName + " from the quorumContextMap");
+    return this.quorumContextMap.remove(regionName) != null;
+  }
+
+  @Override
+  public RaftQuorumContext getRaftQuorumContext(String regionId) {
+    return this.quorumContextMap.get(regionId);
+  }
+
+  @Override public void close() throws Exception {
+    // Do nothing. Added the AutoCloseable only for the client cleanup
+  }
+
+  @Override
+  public ListenableFuture<Void> fetchLogs(List<FetchTask> tasks, String regionId) {
+    return new RemoteLogFetcher().executeTasks(tasks, regionId);
+  }
+
+  @Override public ListenableFuture<List<PeerStatus>> getAllPeerStatuses() {
+    List<PeerStatus> statuses = new ArrayList<>();
+    Map<String, RaftQuorumContext> quorumMap = getQuorumContextMapSnapshot();
+
+    for (RaftQuorumContext quorum : quorumMap.values()) {
+      statuses.add(quorum.getStatus());
+    }
+
+    return Futures.immediateFuture(statuses);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/eca32aa4/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/server/InstrumentedConsensusServiceImpl.java
----------------------------------------------------------------------
diff --git a/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/server/InstrumentedConsensusServiceImpl.java b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/server/InstrumentedConsensusServiceImpl.java
new file mode 100644
index 0000000..7b10e7d
--- /dev/null
+++ b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/server/InstrumentedConsensusServiceImpl.java
@@ -0,0 +1,241 @@
+package org.apache.hadoop.hbase.consensus.server;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.hadoop.hbase.consensus.rpc.AppendRequest;
+import org.apache.hadoop.hbase.consensus.rpc.AppendResponse;
+import org.apache.hadoop.hbase.consensus.rpc.VoteRequest;
+import org.apache.hadoop.hbase.consensus.rpc.VoteResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.MarkerFactory;
+
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class InstrumentedConsensusServiceImpl extends ConsensusServiceImpl {
+  private final Logger LOG = LoggerFactory.getLogger(InstrumentedConsensusServiceImpl.class);
+
+  private ConcurrentHashMap<String, Double> packetDropRateMap = new ConcurrentHashMap<String, Double>();
+  private ConcurrentHashMap<String, Long> packetDelayMap = new ConcurrentHashMap<String, Long>();
+  private ConcurrentHashMap<String, Boolean> hiccupMap = new ConcurrentHashMap<String, Boolean>();
+  private Random random = new Random(System.currentTimeMillis());
+  private RelayThread relayThread;
+  private String identity = "None";
+  private AtomicLong normalPacketDropCount = new AtomicLong(0);
+  private AtomicLong hiccupPacketDropCount = new AtomicLong(0);
+
+  protected InstrumentedConsensusServiceImpl() {
+    relayThread = new RelayThread(this);
+    relayThread.start();
+  }
+
+  public synchronized void setIdentity(String str) {
+    identity = str;
+  }
+
+  public synchronized String getIdentity() {
+    return identity;
+  }
+
+  public enum PacketDropStyle {
+    NONE,
+    ALWAYS,
+    RANDOM
+  };
+
+  public class RelayThread extends Thread {
+    private InstrumentedConsensusServiceImpl cs;
+    private DelayQueue<DelayedRequest> queue = new DelayQueue<DelayedRequest>();
+
+    public class DelayedRequest implements Delayed {
+      private long    deadline;
+      private Object  request;
+
+      public DelayedRequest(Object request, long delay) {
+        deadline = delay + System.currentTimeMillis();
+        this.request = request;
+      }
+
+      @Override
+      public long getDelay(TimeUnit unit) {
+        return unit.convert(deadline - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
+      }
+
+      public long getDeadline() {
+        return deadline;
+      }
+
+      public int compareTo(Delayed other) {
+        long diff = getDelay(TimeUnit.MILLISECONDS) - other.getDelay(TimeUnit.MILLISECONDS);
+        return diff < 0 ? -1 : (diff > 0 ? +1 : 0);
+      }
+
+      public Object request() {
+        return request;
+      }
+    }
+
+    public RelayThread(InstrumentedConsensusServiceImpl cs) {
+      this.cs = cs;
+    }
+
+    public void run() {
+      while (true) {
+        try {
+          DelayedRequest request = queue.take();
+          LOG.info("-------- [" + getIdentity() + "] draining request " + request.request());
+          if (request.request() instanceof AppendRequest) {
+            cs.reallyAppendEntries((AppendRequest) request.request());
+          } else if (request.request() instanceof VoteRequest) {
+            cs.reallyRequestVote((VoteRequest) request.request());
+          } else {
+            LOG.error(MarkerFactory.getMarker("FATAL"),
+                    "Incorrect request type found : " + request.request());
+            System.exit(1);
+          }
+        } catch (InterruptedException ex) {
+          LOG.warn("RelayThread is interrupted; time to die!");
+          return;
+        } catch (Exception ex) {
+          LOG.warn("Caught exception:\n" + ex);
+        }
+      }
+    }
+
+    public <T extends Object> T queueRequest(T request, long delay) {
+      try {
+        LOG.info("-------- [" + getIdentity() + "] queueing request " + request + " with delay " + delay + " ms");
+        queue.offer(new DelayedRequest((Object)request, delay));
+      } catch (NullPointerException ex) {
+      }
+      return request;
+    }
+  }
+
+  public void setPacketDelay(long delay) {
+    packetDelayMap.clear();
+    packetDelayMap.put("*", delay);
+  }
+
+  public void setPacketDelay(String src, long delay) {
+    packetDelayMap.put(src, delay);
+  }
+
+  public long getPacketDelay(String src) {
+    Long delay = packetDelayMap.get(src);
+    if (delay == null) {
+      delay = packetDelayMap.get("*");
+      if (delay == null) {
+        return 0L;
+      } else {
+        return delay;
+      }
+    } else {
+      return delay;
+    }
+  }
+
+  public synchronized void setHiccup(
+      final String src, boolean inHiccup
+  ) {
+    hiccupMap.put(src, inHiccup);
+  }
+
+  public synchronized boolean inHiccup(final String src) {
+    Boolean inHiccup = hiccupMap.get(src);
+    return inHiccup != null ? (boolean) inHiccup : false;
+  }
+
+  public synchronized void setPacketDropRate(
+      final String src, final double rate
+  ) {
+    packetDropRateMap.put(src, rate);
+  }
+
+  public synchronized void setPacketDropStyle(final PacketDropStyle style) {
+    packetDropRateMap.clear();
+    if (style == null) {
+      return;
+    }
+    switch (style) {
+      case ALWAYS:
+        packetDropRateMap.put("*", 2.0);
+        break;
+      case RANDOM:
+        packetDropRateMap.put("*", 0.5);
+        break;
+      case NONE:
+        packetDropRateMap.put("*", 0.0);
+        break;
+    }
+  }
+
+  private Double getPacketDropRate(String src) {
+    Double dropRate = packetDropRateMap.get(src);
+    if (dropRate == null) {
+      return packetDropRateMap.get("*");
+    } else {
+      return dropRate;
+    }
+  }
+
+  public ListenableFuture<AppendResponse> reallyAppendEntries(AppendRequest appendRequest) {
+    return super.appendEntries(appendRequest);
+  }
+
+  public ListenableFuture<VoteResponse> reallyRequestVote(VoteRequest voteRequest) {
+    return super.requestVote(voteRequest);
+  }
+
+  public long getNormalPacketDropCount() {
+    return normalPacketDropCount.get();
+  }
+
+  public long getHiccupPacketDropCount() {
+    return hiccupPacketDropCount.get();
+  }
+
+  public long getPacketDropCount() {
+    return getHiccupPacketDropCount() + getNormalPacketDropCount();
+  }
+
+  @Override
+  public ListenableFuture<AppendResponse> appendEntries(AppendRequest appendRequest) {
+    appendRequest.createAppendResponse();
+    String src = appendRequest.getLeaderId().getHostId();
+    if (inHiccup(src)) {
+        LOG.debug("-------- [" + getIdentity() + "] Dropping packet due to hiccup: " + appendRequest);
+        hiccupPacketDropCount.incrementAndGet();
+        return appendRequest.getResponse();
+    }
+    Double dropRate = getPacketDropRate(src);
+    if (dropRate != null && random.nextDouble() < dropRate) {
+        LOG.debug("-------- [" + getIdentity() + "] Dropping packet " + appendRequest);
+        normalPacketDropCount.incrementAndGet();
+        return appendRequest.getResponse();
+    }
+    return relayThread.queueRequest(appendRequest, getPacketDelay(src)).getResponse();
+  }
+
+  @Override
+  public ListenableFuture<VoteResponse> requestVote(VoteRequest voteRequest) {
+    voteRequest.createVoteResponse();
+    String src = voteRequest.getAddress();
+    if (inHiccup(src)) {
+        LOG.debug("-------- [" + getIdentity() + "] Dropping packet due to hiccup: " + voteRequest);
+        hiccupPacketDropCount.incrementAndGet();
+        return voteRequest.getResponse();
+    }
+    Double dropRate = getPacketDropRate(src);
+    if (dropRate != null && random.nextDouble() < dropRate) {
+        normalPacketDropCount.incrementAndGet();
+        LOG.debug("-------- [" + getIdentity() + "] Dropping packet " + voteRequest);
+        return voteRequest.getResponse();
+    }
+    return relayThread.queueRequest(voteRequest, getPacketDelay(src)).getResponse();
+  }
+}


Mime
View raw message