hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zg...@apache.org
Subject [1/2] hbase git commit: HBASE-19618 Remove replicationQueuesClient.class/replicationQueues.class config and remove table based ReplicationQueuesClient/ReplicationQueues implementation
Date Tue, 26 Dec 2017 06:45:31 GMT
Repository: hbase
Updated Branches:
  refs/heads/branch-2 5f548146a -> 2abf7b508


HBASE-19618 Remove replicationQueuesClient.class/replicationQueues.class config and remove table based ReplicationQueuesClient/ReplicationQueues implementation


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/65159dc2
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/65159dc2
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/65159dc2

Branch: refs/heads/branch-2
Commit: 65159dc256d6b51abadbd9abf50575ce72a6634e
Parents: 5f54814
Author: Guanghao Zhang <zghao@apache.org>
Authored: Mon Dec 25 11:44:18 2017 +0800
Committer: Guanghao Zhang <zghao@apache.org>
Committed: Tue Dec 26 14:44:02 2017 +0800

----------------------------------------------------------------------
 .../hbase/replication/ReplicationFactory.java   |  17 +-
 .../TableBasedReplicationQueuesClientImpl.java  | 113 -----
 .../TableBasedReplicationQueuesImpl.java        | 448 -----------------
 .../org/apache/hadoop/hbase/master/HMaster.java |  17 +-
 .../replication/TestMultiSlaveReplication.java  |   2 -
 .../TestReplicationStateHBaseImpl.java          | 495 -------------------
 .../replication/TestReplicationTableBase.java   | 109 ----
 ...tTableBasedReplicationSourceManagerImpl.java |  63 ---
 8 files changed, 12 insertions(+), 1252 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/65159dc2/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
index 3ff6914..9f4ad18 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
@@ -31,21 +31,16 @@ import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
 @InterfaceAudience.Private
 public class ReplicationFactory {
 
-  public static final Class defaultReplicationQueueClass = ReplicationQueuesZKImpl.class;
-
   public static ReplicationQueues getReplicationQueues(ReplicationQueuesArguments args)
       throws Exception {
-    Class<?> classToBuild = args.getConf().getClass("hbase.region.replica." +
-        "replication.replicationQueues.class", defaultReplicationQueueClass);
-    return (ReplicationQueues) ConstructorUtils.invokeConstructor(classToBuild, args);
+    return (ReplicationQueues) ConstructorUtils.invokeConstructor(ReplicationQueuesZKImpl.class,
+      args);
   }
 
-  public static ReplicationQueuesClient getReplicationQueuesClient(
-      ReplicationQueuesClientArguments args) throws Exception {
-    Class<?> classToBuild = args.getConf().getClass(
-      "hbase.region.replica.replication.replicationQueuesClient.class",
-      ReplicationQueuesClientZKImpl.class);
-    return (ReplicationQueuesClient) ConstructorUtils.invokeConstructor(classToBuild, args);
+  public static ReplicationQueuesClient
+      getReplicationQueuesClient(ReplicationQueuesClientArguments args) throws Exception {
+    return (ReplicationQueuesClient) ConstructorUtils
+        .invokeConstructor(ReplicationQueuesClientZKImpl.class, args);
   }
 
   public static ReplicationPeers getReplicationPeers(final ZKWatcher zk, Configuration conf,

http://git-wip-us.apache.org/repos/asf/hbase/blob/65159dc2/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesClientImpl.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesClientImpl.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesClientImpl.java
deleted file mode 100644
index 0a8ed31..0000000
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesClientImpl.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication;
-
-import org.apache.commons.lang3.NotImplementedException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Abortable;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.zookeeper.KeeperException;
-
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-/**
- * Implements the ReplicationQueuesClient interface on top of the Replication Table. It utilizes
- * the ReplicationTableBase to access the Replication Table.
- */
-@InterfaceAudience.Private
-public class TableBasedReplicationQueuesClientImpl extends ReplicationTableBase
-  implements ReplicationQueuesClient {
-
-  public TableBasedReplicationQueuesClientImpl(ReplicationQueuesClientArguments args)
-    throws IOException {
-    super(args.getConf(), args.getAbortable());
-  }
-  public TableBasedReplicationQueuesClientImpl(Configuration conf,
-                                               Abortable abortable) throws IOException {
-    super(conf, abortable);
-  }
-
-  @Override
-  public void init() throws ReplicationException{
-    // no-op
-  }
-
-  @Override
-  public List<String> getListOfReplicators() {
-    return super.getListOfReplicators();
-  }
-
-  @Override
-  public List<String> getLogsInQueue(String serverName, String queueId) {
-    return super.getLogsInQueue(serverName, queueId);
-  }
-
-  @Override
-  public List<String> getAllQueues(String serverName) {
-    return super.getAllQueues(serverName);
-  }
-
-  @Override
-  public Set<String> getAllWALs() {
-    Set<String> allWals = new HashSet<>();
-    ResultScanner allQueues = null;
-    try (Table replicationTable = getOrBlockOnReplicationTable()) {
-      allQueues = replicationTable.getScanner(new Scan());
-      for (Result queue : allQueues) {
-        for (String wal : readWALsFromResult(queue)) {
-          allWals.add(wal);
-        }
-      }
-    } catch (IOException e) {
-      String errMsg = "Failed getting all WAL's in Replication Table";
-      abortable.abort(errMsg, e);
-    } finally {
-      if (allQueues != null) {
-        allQueues.close();
-      }
-    }
-    return allWals;
-  }
-
-  @Override
-  public int getHFileRefsNodeChangeVersion() throws KeeperException {
-    // TODO
-    throw new NotImplementedException(HConstants.NOT_IMPLEMENTED);
-  }
-
-  @Override
-  public List<String> getAllPeersFromHFileRefsQueue() throws KeeperException {
-    // TODO
-    throw new NotImplementedException(HConstants.NOT_IMPLEMENTED);
-  }
-
-  @Override
-  public List<String> getReplicableHFiles(String peerId) throws KeeperException {
-    // TODO
-    throw new NotImplementedException(HConstants.NOT_IMPLEMENTED);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/65159dc2/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesImpl.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesImpl.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesImpl.java
deleted file mode 100644
index b6c849c..0000000
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesImpl.java
+++ /dev/null
@@ -1,448 +0,0 @@
-/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-package org.apache.hadoop.hbase.replication;
-
-import org.apache.commons.lang3.NotImplementedException;
-import org.apache.hadoop.conf.Configuration;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Abortable;
-import org.apache.hadoop.hbase.CompareOperator;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.yetus.audience.InterfaceAudience;
-
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.RowMutations;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
-import org.apache.zookeeper.KeeperException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.SortedSet;
-import java.util.TreeSet;
-
-/**
- * This class provides an implementation of the ReplicationQueues interface using an HBase table
- * "Replication Table". It utilizes the ReplicationTableBase to access the Replication Table.
- */
-@InterfaceAudience.Private
-public class TableBasedReplicationQueuesImpl extends ReplicationTableBase
-  implements ReplicationQueues {
-
-  private static final Logger LOG = LoggerFactory.getLogger(TableBasedReplicationQueuesImpl.class);
-
-  // Common byte values used in replication offset tracking
-  private static final byte[] INITIAL_OFFSET_BYTES = Bytes.toBytes(0L);
-  private static final byte[] EMPTY_STRING_BYTES = Bytes.toBytes("");
-
-  private String serverName = null;
-  private byte[] serverNameBytes = null;
-
-  // TODO: Only use this variable temporarily. Eventually we want to use HBase to store all
-  // TODO: replication information
-  private ReplicationStateZKBase replicationState;
-
-  public TableBasedReplicationQueuesImpl(ReplicationQueuesArguments args) throws IOException {
-    this(args.getConf(), args.getAbortable(), args.getZk());
-  }
-
-  public TableBasedReplicationQueuesImpl(Configuration conf, Abortable abort, ZKWatcher zkw)
-    throws IOException {
-    super(conf, abort);
-    replicationState = new ReplicationStateZKBase(zkw, conf, abort) {};
-  }
-
-  @Override
-  public void init(String serverName) throws ReplicationException {
-    this.serverName = serverName;
-    this.serverNameBytes = Bytes.toBytes(serverName);
-  }
-
-  @Override
-  public List<String> getListOfReplicators() {
-    return super.getListOfReplicators();
-  }
-
-  @Override
-  public void removeQueue(String queueId) {
-    try {
-      byte[] rowKey = queueIdToRowKey(queueId);
-      if (checkQueueExists(queueId)) {
-        Delete deleteQueue = new Delete(rowKey);
-        safeQueueUpdate(deleteQueue);
-      } else {
-        LOG.info("No logs were registered for queue id=" + queueId + " so no rows were removed " +
-            "from the replication table while removing the queue");
-      }
-    } catch (IOException | ReplicationException e) {
-      String errMsg = "Failed removing queue queueId=" + queueId;
-      abortable.abort(errMsg, e);
-    }
-  }
-
-  @Override
-  public void addLog(String queueId, String filename) throws ReplicationException {
-    try (Table replicationTable = getOrBlockOnReplicationTable()) {
-      if (!checkQueueExists(queueId)) {
-        // Each queue will have an Owner, OwnerHistory, and a collection of [WAL:offset] key values
-        Put putNewQueue = new Put(Bytes.toBytes(buildQueueRowKey(queueId)));
-        putNewQueue.addColumn(CF_QUEUE, COL_QUEUE_OWNER, serverNameBytes);
-        putNewQueue.addColumn(CF_QUEUE, COL_QUEUE_OWNER_HISTORY, EMPTY_STRING_BYTES);
-        putNewQueue.addColumn(CF_QUEUE, Bytes.toBytes(filename), INITIAL_OFFSET_BYTES);
-        replicationTable.put(putNewQueue);
-      } else {
-        // Otherwise simply add the new log and offset as a new column
-        Put putNewLog = new Put(queueIdToRowKey(queueId));
-        putNewLog.addColumn(CF_QUEUE, Bytes.toBytes(filename), INITIAL_OFFSET_BYTES);
-        safeQueueUpdate(putNewLog);
-      }
-    } catch (IOException | ReplicationException e) {
-      String errMsg = "Failed adding log queueId=" + queueId + " filename=" + filename;
-      abortable.abort(errMsg, e);
-    }
-  }
-
-  @Override
-  public void removeLog(String queueId, String filename) {
-    try {
-      byte[] rowKey = queueIdToRowKey(queueId);
-      Delete delete = new Delete(rowKey);
-      delete.addColumns(CF_QUEUE, Bytes.toBytes(filename));
-      safeQueueUpdate(delete);
-    } catch (IOException | ReplicationException e) {
-      String errMsg = "Failed removing log queueId=" + queueId + " filename=" + filename;
-      abortable.abort(errMsg, e);
-    }
-  }
-
-  @Override
-  public void setLogPosition(String queueId, String filename, long position) {
-    try (Table replicationTable = getOrBlockOnReplicationTable()) {
-      byte[] rowKey = queueIdToRowKey(queueId);
-      // Check that the log exists. addLog() must have been called before setLogPosition().
-      Get checkLogExists = new Get(rowKey);
-      checkLogExists.addColumn(CF_QUEUE, Bytes.toBytes(filename));
-      if (!replicationTable.exists(checkLogExists)) {
-        String errMsg = "Could not set position of non-existent log from queueId=" + queueId +
-          ", filename=" + filename;
-        abortable.abort(errMsg, new ReplicationException(errMsg));
-        return;
-      }
-      // Update the log offset if it exists
-      Put walAndOffset = new Put(rowKey);
-      walAndOffset.addColumn(CF_QUEUE, Bytes.toBytes(filename), Bytes.toBytes(position));
-      safeQueueUpdate(walAndOffset);
-    } catch (IOException | ReplicationException e) {
-      String errMsg = "Failed writing log position queueId=" + queueId + "filename=" +
-        filename + " position=" + position;
-      abortable.abort(errMsg, e);
-    }
-  }
-
-  @Override
-  public long getLogPosition(String queueId, String filename) throws ReplicationException {
-    try {
-      byte[] rowKey = queueIdToRowKey(queueId);
-      Get getOffset = new Get(rowKey);
-      getOffset.addColumn(CF_QUEUE, Bytes.toBytes(filename));
-      Result result = getResultIfOwner(getOffset);
-      if (result == null || !result.containsColumn(CF_QUEUE, Bytes.toBytes(filename))) {
-        throw new ReplicationException("Could not read empty result while getting log position " +
-          "queueId=" + queueId + ", filename=" + filename);
-      }
-      return Bytes.toLong(result.getValue(CF_QUEUE, Bytes.toBytes(filename)));
-    } catch (IOException e) {
-      throw new ReplicationException("Could not get position in log for queueId=" + queueId +
-        ", filename=" + filename);
-    }
-  }
-
-  @Override
-  public void removeAllQueues() {
-    List<String> myQueueIds = getAllQueues();
-    for (String queueId : myQueueIds) {
-      removeQueue(queueId);
-    }
-  }
-
-  @Override
-  public List<String> getLogsInQueue(String queueId) {
-    String errMsg = "Failed getting logs in queue queueId=" + queueId;
-    byte[] rowKey = queueIdToRowKey(queueId);
-    List<String> logs = new ArrayList<>();
-    try {
-      Get getQueue = new Get(rowKey);
-      Result queue = getResultIfOwner(getQueue);
-      if (queue == null || queue.isEmpty()) {
-        String errMsgLostOwnership = "Failed getting logs for queue queueId=" +
-            Bytes.toString(rowKey) + " because the queue was missing or we lost ownership";
-        abortable.abort(errMsg, new ReplicationException(errMsgLostOwnership));
-        return null;
-      }
-      Map<byte[], byte[]> familyMap = queue.getFamilyMap(CF_QUEUE);
-      for(byte[] cQualifier : familyMap.keySet()) {
-        if (Arrays.equals(cQualifier, COL_QUEUE_OWNER) || Arrays.equals(cQualifier,
-            COL_QUEUE_OWNER_HISTORY)) {
-          continue;
-        }
-        logs.add(Bytes.toString(cQualifier));
-      }
-    } catch (IOException e) {
-      abortable.abort(errMsg, e);
-      return null;
-    }
-    return logs;
-  }
-
-  @Override
-  public List<String> getAllQueues() {
-    return getAllQueues(serverName);
-  }
-
-  @Override public List<String> getUnClaimedQueueIds(String regionserver) {
-    if (isThisOurRegionServer(regionserver)) {
-      return null;
-    }
-    try (ResultScanner queuesToClaim = getQueuesBelongingToServer(regionserver)) {
-      List<String> res = new ArrayList<>();
-      for (Result queue : queuesToClaim) {
-        String rowKey = Bytes.toString(queue.getRow());
-        res.add(rowKey);
-      }
-      return res.isEmpty() ? null : res;
-    } catch (IOException e) {
-      String errMsg = "Failed getUnClaimedQueueIds";
-      abortable.abort(errMsg, e);
-    }
-    return null;
-  }
-
-  @Override public void removeReplicatorIfQueueIsEmpty(String regionserver) {
-    // Do nothing here
-  }
-
-  @Override
-  public Pair<String, SortedSet<String>> claimQueue(String regionserver, String queueId) {
-    if (isThisOurRegionServer(regionserver)) {
-      return null;
-    }
-
-    try (ResultScanner queuesToClaim = getQueuesBelongingToServer(regionserver)){
-      for (Result queue : queuesToClaim) {
-        String rowKey = Bytes.toString(queue.getRow());
-        if (!rowKey.equals(queueId)){
-          continue;
-        }
-        if (attemptToClaimQueue(queue, regionserver)) {
-          ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(rowKey);
-          if (replicationState.peerExists(replicationQueueInfo.getPeerId())) {
-            SortedSet<String> sortedLogs = new TreeSet<>();
-            List<String> logs = getLogsInQueue(queue.getRow());
-            for (String log : logs) {
-              sortedLogs.add(log);
-            }
-            LOG.info(serverName + " has claimed queue " + rowKey + " from " + regionserver);
-            return new Pair<>(rowKey, sortedLogs);
-          } else {
-            // Delete orphaned queues
-            removeQueue(Bytes.toString(queue.getRow()));
-            LOG.info(serverName + " has deleted abandoned queue " + queueId + " from " +
-              regionserver);
-          }
-        }
-      }
-    } catch (IOException | KeeperException e) {
-      String errMsg = "Failed claiming queues for regionserver=" + regionserver;
-      abortable.abort(errMsg, e);
-    }
-    return null;
-  }
-
-  @Override
-  public boolean isThisOurRegionServer(String regionserver) {
-    return this.serverName.equals(regionserver);
-  }
-
-  @Override
-  public void addPeerToHFileRefs(String peerId) throws ReplicationException {
-    // TODO
-    throw new NotImplementedException(HConstants.NOT_IMPLEMENTED);
-  }
-
-  @Override
-  public void removePeerFromHFileRefs(String peerId) {
-    // TODO
-    throw new NotImplementedException(HConstants.NOT_IMPLEMENTED);
-  }
-
-  @Override
-  public void addHFileRefs(String peerId, List<Pair<Path, Path>> pairs)
-      throws ReplicationException {
-    // TODO
-    throw new NotImplementedException(HConstants.NOT_IMPLEMENTED);
-  }
-
-  @Override
-  public void removeHFileRefs(String peerId, List<String> files) {
-    // TODO
-    throw new NotImplementedException(HConstants.NOT_IMPLEMENTED);
-  }
-
-  private String buildQueueRowKey(String queueId) {
-    return buildQueueRowKey(serverName, queueId);
-  }
-
-  /**
-   * Convenience method that gets the row key of the queue specified by queueId
-   * @param queueId queueId of a queue in this server
-   * @return the row key of the queue in the Replication Table
-   */
-  private byte[] queueIdToRowKey(String queueId) {
-    return queueIdToRowKey(serverName, queueId);
-  }
-
-  /**
-   * See safeQueueUpdate(RowMutations mutate)
-   *
-   * @param put Row mutation to perform on the queue
-   */
-  private void safeQueueUpdate(Put put) throws ReplicationException, IOException {
-    RowMutations mutations = new RowMutations(put.getRow());
-    mutations.add(put);
-    safeQueueUpdate(mutations);
-  }
-
-  /**
-   * See safeQueueUpdate(RowMutations mutate)
-   *
-   * @param delete Row mutation to perform on the queue
-   */
-  private void safeQueueUpdate(Delete delete) throws ReplicationException,
-    IOException{
-    RowMutations mutations = new RowMutations(delete.getRow());
-    mutations.add(delete);
-    safeQueueUpdate(mutations);
-  }
-
-  /**
-   * Attempt to mutate a given queue in the Replication Table with a checkAndPut on the OWNER column
-   * of the queue. Abort the server if this checkAndPut fails: which means we have somehow lost
-   * ownership of the column or an IO Exception has occurred during the transaction.
-   *
-   * @param mutate Mutation to perform on a given queue
-   */
-  private void safeQueueUpdate(RowMutations mutate) throws ReplicationException, IOException{
-    try (Table replicationTable = getOrBlockOnReplicationTable()) {
-      boolean updateSuccess = replicationTable.checkAndMutate(mutate.getRow(), CF_QUEUE)
-          .qualifier(COL_QUEUE_OWNER).ifEquals(serverNameBytes).thenMutate(mutate);
-      if (!updateSuccess) {
-        throw new ReplicationException("Failed to update Replication Table because we lost queue " +
-            " ownership");
-      }
-    }
-  }
-
-  /**
-   * Check if the queue specified by queueId is stored in HBase
-   *
-   * @param queueId Either raw or reclaimed format of the queueId
-   * @return Whether the queue is stored in HBase
-   * @throws IOException
-   */
-  private boolean checkQueueExists(String queueId) throws IOException {
-    try (Table replicationTable = getOrBlockOnReplicationTable()) {
-      byte[] rowKey = queueIdToRowKey(queueId);
-      return replicationTable.exists(new Get(rowKey));
-    }
-  }
-
-  /**
-   * Attempt to claim the given queue with a checkAndPut on the OWNER column. We check that the
-   * recently killed server is still the OWNER before we claim it.
-   *
-   * @param queue The queue that we are trying to claim
-   * @param originalServer The server that originally owned the queue
-   * @return Whether we successfully claimed the queue
-   * @throws IOException
-   */
-  private boolean attemptToClaimQueue (Result queue, String originalServer) throws IOException{
-    Put putQueueNameAndHistory = new Put(queue.getRow());
-    putQueueNameAndHistory.addColumn(CF_QUEUE, COL_QUEUE_OWNER, Bytes.toBytes(serverName));
-    String newOwnerHistory = buildClaimedQueueHistory(Bytes.toString(queue.getValue(CF_QUEUE,
-      COL_QUEUE_OWNER_HISTORY)), originalServer);
-    putQueueNameAndHistory.addColumn(CF_QUEUE, COL_QUEUE_OWNER_HISTORY,
-        Bytes.toBytes(newOwnerHistory));
-    RowMutations claimAndRenameQueue = new RowMutations(queue.getRow());
-    claimAndRenameQueue.add(putQueueNameAndHistory);
-    // Attempt to claim ownership for this queue by checking if the current OWNER is the original
-    // server. If it is not then another RS has already claimed it. If it is we set ourselves as the
-    // new owner and update the queue's history
-    try (Table replicationTable = getOrBlockOnReplicationTable()) {
-      boolean success = replicationTable.checkAndMutate(queue.getRow(), CF_QUEUE)
-          .qualifier(COL_QUEUE_OWNER).ifEquals(Bytes.toBytes(originalServer))
-          .thenMutate(claimAndRenameQueue);
-      return success;
-    }
-  }
-
-  /**
-   * Attempts to run a Get on some queue. Will only return a non-null result if we currently own
-   * the queue.
-   *
-   * @param get The Get that we want to query
-   * @return The result of the Get if this server is the owner of the queue. Else it returns null.
-   * @throws IOException
-   */
-  private Result getResultIfOwner(Get get) throws IOException {
-    Scan scan = new Scan(get);
-    // Check if the Get currently contains all columns or only specific columns
-    if (scan.getFamilyMap().size() > 0) {
-      // Add the OWNER column if the scan is already only over specific columns
-      scan.addColumn(CF_QUEUE, COL_QUEUE_OWNER);
-    }
-    scan.setMaxResultSize(1);
-    SingleColumnValueFilter checkOwner = new SingleColumnValueFilter(CF_QUEUE, COL_QUEUE_OWNER,
-    CompareOperator.EQUAL, serverNameBytes);
-    scan.setFilter(checkOwner);
-    ResultScanner scanner = null;
-    try (Table replicationTable = getOrBlockOnReplicationTable()) {
-      scanner = replicationTable.getScanner(scan);
-      Result result = scanner.next();
-      return (result == null || result.isEmpty()) ? null : result;
-    } finally {
-      if (scanner != null) {
-        scanner.close();
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/65159dc2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index cf95030..efa17a4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -157,10 +157,8 @@ import org.apache.hadoop.hbase.regionserver.RegionSplitPolicy;
 import org.apache.hadoop.hbase.regionserver.compactions.ExploringCompactionPolicy;
 import org.apache.hadoop.hbase.regionserver.compactions.FIFOCompactionPolicy;
 import org.apache.hadoop.hbase.replication.ReplicationException;
-import org.apache.hadoop.hbase.replication.ReplicationFactory;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
-import org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl;
 import org.apache.hadoop.hbase.replication.master.ReplicationPeerConfigUpgrader;
 import org.apache.hadoop.hbase.replication.regionserver.Replication;
 import org.apache.hadoop.hbase.security.AccessDeniedException;
@@ -1148,15 +1146,12 @@ public class HMaster extends HRegionServer implements MasterServices {
     }
 
     // Start replication zk node cleaner
-    if (conf.getClass("hbase.region.replica.replication.replicationQueues.class",
-      ReplicationFactory.defaultReplicationQueueClass) == ReplicationQueuesZKImpl.class) {
-      try {
-        replicationZKNodeCleanerChore = new ReplicationZKNodeCleanerChore(this, cleanerInterval,
-            new ReplicationZKNodeCleaner(this.conf, this.getZooKeeper(), this));
-        getChoreService().scheduleChore(replicationZKNodeCleanerChore);
-      } catch (Exception e) {
-        LOG.error("start replicationZKNodeCleanerChore failed", e);
-      }
+    try {
+      replicationZKNodeCleanerChore = new ReplicationZKNodeCleanerChore(this, cleanerInterval,
+          new ReplicationZKNodeCleaner(this.conf, this.getZooKeeper(), this));
+      getChoreService().scheduleChore(replicationZKNodeCleanerChore);
+    } catch (Exception e) {
+      LOG.error("start replicationZKNodeCleanerChore failed", e);
     }
     replicationMetaCleaner = new ReplicationMetaCleaner(this, this, cleanerInterval);
     getChoreService().scheduleChore(replicationMetaCleaner);

http://git-wip-us.apache.org/repos/asf/hbase/blob/65159dc2/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java
index 9da0745..c57d9bb 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java
@@ -94,8 +94,6 @@ public class TestMultiSlaveReplication {
     conf1.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
         "org.apache.hadoop.hbase.replication.TestMasterReplication$CoprocessorCounter");
     conf1.setInt("hbase.master.cleaner.interval", 5 * 1000);
-    conf1.setClass("hbase.region.replica.replication.replicationQueues.class",
-        ReplicationQueuesZKImpl.class, ReplicationQueues.class);
 
     utility1 = new HBaseTestingUtility(conf1);
     utility1.startMiniZKCluster();

http://git-wip-us.apache.org/repos/asf/hbase/blob/65159dc2/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateHBaseImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateHBaseImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateHBaseImpl.java
deleted file mode 100644
index 1ef525f..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateHBaseImpl.java
+++ /dev/null
@@ -1,495 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.replication;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.hbase.ChoreService;
-import org.apache.hadoop.hbase.CoordinatedStateManager;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.Server;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.client.ClusterConnection;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.apache.hadoop.hbase.testclassification.ReplicationTests;
-import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
-import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
-import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
-import org.apache.zookeeper.KeeperException;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import java.io.IOException;
-import java.util.List;
-
-import static junit.framework.TestCase.assertNull;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-@Category({ReplicationTests.class, MediumTests.class})
-public class TestReplicationStateHBaseImpl {
-
-  private static Configuration conf;
-  private static HBaseTestingUtility utility;
-  private static ZKWatcher zkw;
-  private static String replicationZNode;
-
-  private static ReplicationQueues rq1;
-  private static ReplicationQueues rq2;
-  private static ReplicationQueues rq3;
-  private static ReplicationQueuesClient rqc;
-  private static ReplicationPeers rp;
-
-
-  private static final String server0 = ServerName.valueOf("hostname0.example.org", 1234, -1L)
-    .toString();
-  private static final String server1 = ServerName.valueOf("hostname1.example.org", 1234, 1L)
-    .toString();
-  private static final String server2 = ServerName.valueOf("hostname2.example.org", 1234, 1L)
-    .toString();
-  private static final String server3 = ServerName.valueOf("hostname3.example.org", 1234, 1L)
-    .toString();
-
-  private static DummyServer ds0;
-  private static DummyServer ds1;
-  private static DummyServer ds2;
-  private static DummyServer ds3;
-
-  @BeforeClass
-  public static void setUpBeforeClass() throws Exception {
-    utility = new HBaseTestingUtility();
-    conf = utility.getConfiguration();
-    conf.setClass("hbase.region.replica.replication.replicationQueues.class",
-      TableBasedReplicationQueuesImpl.class, ReplicationQueues.class);
-    conf.setClass("hbase.region.replica.replication.replicationQueuesClient.class",
-        TableBasedReplicationQueuesClientImpl.class, ReplicationQueuesClient.class);
-    utility.startMiniCluster();
-    zkw = HBaseTestingUtility.getZooKeeperWatcher(utility);
-    String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication");
-    replicationZNode = ZNodePaths.joinZNode(zkw.znodePaths.baseZNode, replicationZNodeName);
-  }
-
-  @Before
-  public void setUp() {
-    try {
-      ds0 = new DummyServer(server0);
-      rqc = ReplicationFactory.getReplicationQueuesClient(new ReplicationQueuesClientArguments(
-        conf, ds0));
-      ds1 = new DummyServer(server1);
-      rq1 = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, ds1, zkw));
-      rq1.init(server1);
-      ds2 = new DummyServer(server2);
-      rq2 = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, ds2, zkw));
-      rq2.init(server2);
-      ds3 = new DummyServer(server3);
-      rq3 = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, ds3, zkw));
-      rq3.init(server3);
-      rp = ReplicationFactory.getReplicationPeers(zkw, conf, zkw);
-      rp.init();
-    } catch (Exception e) {
-      fail("testReplicationStateHBaseConstruction received an exception" + e.getMessage());
-    }
-  }
-
-  @Test
-  public void checkNamingSchema() throws Exception {
-    assertTrue(rq1.isThisOurRegionServer(server1));
-    assertTrue(!rq1.isThisOurRegionServer(server1 + "a"));
-    assertTrue(!rq1.isThisOurRegionServer(null));
-  }
-
-  @Test
-  public void testSingleReplicationQueuesHBaseImpl() {
-    try {
-      // Test adding in WAL files
-      assertEquals(0, rq1.getAllQueues().size());
-      rq1.addLog("Queue1", "WALLogFile1.1");
-      assertEquals(1, rq1.getAllQueues().size());
-      rq1.addLog("Queue1", "WALLogFile1.2");
-      rq1.addLog("Queue1", "WALLogFile1.3");
-      rq1.addLog("Queue1", "WALLogFile1.4");
-      rq1.addLog("Queue2", "WALLogFile2.1");
-      rq1.addLog("Queue3", "WALLogFile3.1");
-      assertEquals(3, rq1.getAllQueues().size());
-      assertEquals(4, rq1.getLogsInQueue("Queue1").size());
-      assertEquals(1, rq1.getLogsInQueue("Queue2").size());
-      assertEquals(1, rq1.getLogsInQueue("Queue3").size());
-      // Make sure that abortCount is still 0
-      assertEquals(0, ds1.getAbortCount());
-      // Make sure that getting a log from a non-existent queue triggers an abort
-      assertNull(rq1.getLogsInQueue("Queue4"));
-      assertEquals(1, ds1.getAbortCount());
-    } catch (ReplicationException e) {
-      e.printStackTrace();
-      fail("testAddLog received a ReplicationException");
-    }
-    try {
-
-      // Test updating the log positions
-      assertEquals(0L, rq1.getLogPosition("Queue1", "WALLogFile1.1"));
-      rq1.setLogPosition("Queue1", "WALLogFile1.1", 123L);
-      assertEquals(123L, rq1.getLogPosition("Queue1", "WALLogFile1.1"));
-      rq1.setLogPosition("Queue1", "WALLogFile1.1", 123456789L);
-      assertEquals(123456789L, rq1.getLogPosition("Queue1", "WALLogFile1.1"));
-      rq1.setLogPosition("Queue2", "WALLogFile2.1", 242L);
-      assertEquals(242L, rq1.getLogPosition("Queue2", "WALLogFile2.1"));
-      rq1.setLogPosition("Queue3", "WALLogFile3.1", 243L);
-      assertEquals(243L, rq1.getLogPosition("Queue3", "WALLogFile3.1"));
-
-      // Test that setting log positions in non-existing logs will cause an abort
-      assertEquals(1, ds1.getAbortCount());
-      rq1.setLogPosition("NotHereQueue", "WALLogFile3.1", 243L);
-      assertEquals(2, ds1.getAbortCount());
-      rq1.setLogPosition("NotHereQueue", "NotHereFile", 243L);
-      assertEquals(3, ds1.getAbortCount());
-      rq1.setLogPosition("Queue1", "NotHereFile", 243l);
-      assertEquals(4, ds1.getAbortCount());
-
-      // Test reading log positions for non-existent queues and WAL's
-      try {
-        rq1.getLogPosition("Queue1", "NotHereWAL");
-        fail("Replication queue should have thrown a ReplicationException for reading from a " +
-          "non-existent WAL");
-      } catch (ReplicationException e) {
-      }
-      try {
-        rq1.getLogPosition("NotHereQueue", "NotHereWAL");
-        fail("Replication queue should have thrown a ReplicationException for reading from a " +
-          "non-existent queue");
-      } catch (ReplicationException e) {
-      }
-      // Test removing logs
-      rq1.removeLog("Queue1", "WALLogFile1.1");
-      assertEquals(3, rq1.getLogsInQueue("Queue1").size());
-      // Test removing queues
-      rq1.removeQueue("Queue2");
-      assertEquals(2, rq1.getAllQueues().size());
-      assertNull(rq1.getLogsInQueue("Queue2"));
-      // Test that getting logs from a non-existent queue aborts
-      assertEquals(5, ds1.getAbortCount());
-      // Test removing all queues for a Region Server
-      rq1.removeAllQueues();
-      assertEquals(0, rq1.getAllQueues().size());
-      assertNull(rq1.getLogsInQueue("Queue1"));
-      // Test that getting logs from a non-existent queue aborts
-      assertEquals(6, ds1.getAbortCount());
-      // Test removing a non-existent queue does not cause an abort. This is because we can
-      // attempt to remove a queue that has no corresponding Replication Table row (if we never
-      // registered a WAL for it)
-      rq1.removeQueue("NotHereQueue");
-      assertEquals(6, ds1.getAbortCount());
-    } catch (ReplicationException e) {
-      e.printStackTrace();
-      fail("testAddLog received a ReplicationException");
-    }
-  }
-
-  @Test
-  public void TestMultipleReplicationQueuesHBaseImpl () {
-    try {
-      rp.registerPeer("Queue1", new ReplicationPeerConfig().setClusterKey("localhost:2818:/bogus1"));
-      rp.registerPeer("Queue2", new ReplicationPeerConfig().setClusterKey("localhost:2818:/bogus2"));
-      rp.registerPeer("Queue3", new ReplicationPeerConfig().setClusterKey("localhost:2818:/bogus3"));
-    } catch (ReplicationException e) {
-      fail("Failed to add peers to ReplicationPeers");
-    }
-    try {
-      // Test adding in WAL files
-      rq1.addLog("Queue1", "WALLogFile1.1");
-      rq1.addLog("Queue1", "WALLogFile1.2");
-      rq1.addLog("Queue1", "WALLogFile1.3");
-      rq1.addLog("Queue1", "WALLogFile1.4");
-      rq1.addLog("Queue2", "WALLogFile2.1");
-      rq1.addLog("Queue3", "WALLogFile3.1");
-      rq2.addLog("Queue1", "WALLogFile1.1");
-      rq2.addLog("Queue1", "WALLogFile1.2");
-      rq2.addLog("Queue2", "WALLogFile2.1");
-      rq3.addLog("Queue1", "WALLogFile1.1");
-      // Test adding logs to replication queues
-      assertEquals(3, rq1.getAllQueues().size());
-      assertEquals(2, rq2.getAllQueues().size());
-      assertEquals(1, rq3.getAllQueues().size());
-      assertEquals(4, rq1.getLogsInQueue("Queue1").size());
-      assertEquals(1, rq1.getLogsInQueue("Queue2").size());
-      assertEquals(1, rq1.getLogsInQueue("Queue3").size());
-      assertEquals(2, rq2.getLogsInQueue("Queue1").size());
-      assertEquals(1, rq2.getLogsInQueue("Queue2").size());
-      assertEquals(1, rq3.getLogsInQueue("Queue1").size());
-    } catch (ReplicationException e) {
-      e.printStackTrace();
-      fail("testAddLogs received a ReplicationException");
-    }
-    try {
-      // Test setting and reading offset in queues
-      rq1.setLogPosition("Queue1", "WALLogFile1.1", 1l);
-      rq1.setLogPosition("Queue1", "WALLogFile1.2", 2l);
-      rq1.setLogPosition("Queue1", "WALLogFile1.3", 3l);
-      rq1.setLogPosition("Queue2", "WALLogFile2.1", 4l);
-      rq1.setLogPosition("Queue2", "WALLogFile2.2", 5l);
-      rq1.setLogPosition("Queue3", "WALLogFile3.1", 6l);
-      rq2.setLogPosition("Queue1", "WALLogFile1.1", 7l);
-      rq2.setLogPosition("Queue2", "WALLogFile2.1", 8l);
-      rq3.setLogPosition("Queue1", "WALLogFile1.1", 9l);
-      assertEquals(1l, rq1.getLogPosition("Queue1", "WALLogFile1.1"));
-      assertEquals(2l, rq1.getLogPosition("Queue1", "WALLogFile1.2"));
-      assertEquals(4l, rq1.getLogPosition("Queue2", "WALLogFile2.1"));
-      assertEquals(6l, rq1.getLogPosition("Queue3", "WALLogFile3.1"));
-      assertEquals(7l, rq2.getLogPosition("Queue1", "WALLogFile1.1"));
-      assertEquals(8l, rq2.getLogPosition("Queue2", "WALLogFile2.1"));
-      assertEquals(9l, rq3.getLogPosition("Queue1", "WALLogFile1.1"));
-      assertEquals(rq1.getListOfReplicators().size(), 3);
-      assertEquals(rq2.getListOfReplicators().size(), 3);
-      assertEquals(rq3.getListOfReplicators().size(), 3);
-    } catch (ReplicationException e) {
-      fail("testAddLogs threw a ReplicationException");
-    }
-    try {
-      // Test claiming queues
-      List<String> claimedQueuesFromRq2 = rq1.getUnClaimedQueueIds(server2);
-      // Check to make sure that list of peers with outstanding queues is decremented by one
-      // after claimQueues
-      // Check to make sure that we claimed the proper number of queues
-      assertEquals(2, claimedQueuesFromRq2.size());
-      assertTrue(claimedQueuesFromRq2.contains("Queue1-" + server2));
-      assertTrue(claimedQueuesFromRq2.contains("Queue2-" + server2));
-      assertEquals(2, rq1.claimQueue(server2, "Queue1-" + server2).getSecond().size());
-      assertEquals(1, rq1.claimQueue(server2, "Queue2-" + server2).getSecond().size());
-      rq1.removeReplicatorIfQueueIsEmpty(server2);
-      assertEquals(rq1.getListOfReplicators().size(), 2);
-      assertEquals(rq2.getListOfReplicators().size(), 2);
-      assertEquals(rq3.getListOfReplicators().size(), 2);
-      assertEquals(5, rq1.getAllQueues().size());
-      // Check that all the logs in the other queue were claimed
-      assertEquals(2, rq1.getLogsInQueue("Queue1-" + server2).size());
-      assertEquals(1, rq1.getLogsInQueue("Queue2-" + server2).size());
-      // Check that the offsets of the claimed queues are the same
-      assertEquals(7l, rq1.getLogPosition("Queue1-" + server2, "WALLogFile1.1"));
-      assertEquals(8l, rq1.getLogPosition("Queue2-" + server2, "WALLogFile2.1"));
-      // Check that the queues were properly removed from rq2
-      assertEquals(0, rq2.getAllQueues().size());
-      assertNull(rq2.getLogsInQueue("Queue1"));
-      assertNull(rq2.getLogsInQueue("Queue2"));
-      // Check that non-existent peer queues are not claimed
-      rq1.addLog("UnclaimableQueue", "WALLogFile1.1");
-      rq1.addLog("UnclaimableQueue", "WALLogFile1.2");
-      assertEquals(6, rq1.getAllQueues().size());
-      List<String> claimedQueuesFromRq1 = rq3.getUnClaimedQueueIds(server1);
-      for(String queue : claimedQueuesFromRq1) {
-        rq3.claimQueue(server1, queue);
-      }
-      rq3.removeReplicatorIfQueueIsEmpty(server1);
-      assertEquals(rq1.getListOfReplicators().size(), 1);
-      assertEquals(rq2.getListOfReplicators().size(), 1);
-      assertEquals(rq3.getListOfReplicators().size(), 1);
-      // Note that we do not pick up the queue: UnclaimableQueue which was not registered in
-      // Replication Peers
-      assertEquals(6, rq3.getAllQueues().size());
-      // Test claiming non-existing queues
-      List<String> noQueues = rq3.getUnClaimedQueueIds("NotARealServer");
-      assertNull(noQueues);
-      assertEquals(6, rq3.getAllQueues().size());
-      // Test claiming own queues
-      noQueues = rq3.getUnClaimedQueueIds(server3);
-      Assert.assertNull(noQueues);
-      assertEquals(6, rq3.getAllQueues().size());
-      // Check that rq3 still remain on list of replicators
-      assertEquals(1, rq3.getListOfReplicators().size());
-    } catch (ReplicationException e) {
-      fail("testClaimQueue threw a ReplicationException");
-    }
-  }
-
-  @Test
-  public void TestReplicationQueuesClient() throws Exception{
-
-    // Test ReplicationQueuesClient log tracking
-    rq1.addLog("Queue1", "WALLogFile1.1");
-    assertEquals(1, rqc.getLogsInQueue(server1, "Queue1").size());
-    rq1.removeLog("Queue1", "WALLogFile1.1");
-    assertEquals(0, rqc.getLogsInQueue(server1, "Queue1").size());
-    rq2.addLog("Queue2", "WALLogFile2.1");
-    rq2.addLog("Queue2", "WALLogFile2.2");
-    assertEquals(2, rqc.getLogsInQueue(server2, "Queue2").size());
-    rq3.addLog("Queue1", "WALLogFile1.1");
-    rq3.addLog("Queue3", "WALLogFile3.1");
-    rq3.addLog("Queue3", "WALLogFile3.2");
-
-    // Test ReplicationQueueClient log tracking for faulty cases
-    assertEquals(0, ds0.getAbortCount());
-    assertNull(rqc.getLogsInQueue("NotHereServer", "NotHereQueue"));
-    assertNull(rqc.getLogsInQueue(server1, "NotHereQueue"));
-    assertNull(rqc.getLogsInQueue("NotHereServer", "WALLogFile1.1"));
-    assertEquals(3, ds0.getAbortCount());
-    // Test ReplicationQueueClient replicators
-    List<String> replicators = rqc.getListOfReplicators();
-    assertEquals(3, replicators.size());
-    assertTrue(replicators.contains(server1));
-    assertTrue(replicators.contains(server2));
-    rq1.removeQueue("Queue1");
-    assertEquals(2, rqc.getListOfReplicators().size());
-
-    // Test ReplicationQueuesClient queue tracking
-    assertEquals(0, rqc.getAllQueues(server1).size());
-    rq1.addLog("Queue2", "WALLogFile2.1");
-    rq1.addLog("Queue3", "WALLogFile3.1");
-    assertEquals(2, rqc.getAllQueues(server1).size());
-    rq1.removeAllQueues();
-    assertEquals(0, rqc.getAllQueues(server1).size());
-
-    // Test ReplicationQueuesClient queue tracking for faulty cases
-    assertEquals(0, rqc.getAllQueues("NotHereServer").size());
-
-    // Test ReplicationQueuesClient get all WAL's
-    assertEquals(5 , rqc.getAllWALs().size());
-    rq3.removeLog("Queue1", "WALLogFile1.1");
-    assertEquals(4, rqc.getAllWALs().size());
-    rq3.removeAllQueues();
-    assertEquals(2, rqc.getAllWALs().size());
-    rq2.removeAllQueues();
-    assertEquals(0, rqc.getAllWALs().size());
-  }
-
-  @After
-  public void clearQueues() throws Exception{
-    rq1.removeAllQueues();
-    rq2.removeAllQueues();
-    rq3.removeAllQueues();
-    assertEquals(0, rq1.getAllQueues().size());
-    assertEquals(0, rq2.getAllQueues().size());
-    assertEquals(0, rq3.getAllQueues().size());
-    ds0.resetAbortCount();
-    ds1.resetAbortCount();
-    ds2.resetAbortCount();
-    ds3.resetAbortCount();
-  }
-
-  @After
-  public void tearDown() throws KeeperException, IOException {
-    ZKUtil.deleteNodeRecursively(zkw, replicationZNode);
-  }
-
-  @AfterClass
-  public static void tearDownAfterClass() throws Exception {
-    utility.shutdownMiniCluster();
-    utility.shutdownMiniZKCluster();
-  }
-
-  static class DummyServer implements Server {
-    private String serverName;
-    private boolean isAborted = false;
-    private boolean isStopped = false;
-    private int abortCount = 0;
-
-    public DummyServer(String serverName) {
-      this.serverName = serverName;
-    }
-
-    @Override
-    public Configuration getConfiguration() {
-      return conf;
-    }
-
-    @Override
-    public ZKWatcher getZooKeeper() {
-      return null;
-    }
-
-    @Override
-    public CoordinatedStateManager getCoordinatedStateManager() {
-      return null;
-    }
-
-    @Override
-    public ClusterConnection getConnection() {
-      return null;
-    }
-
-    @Override
-    public MetaTableLocator getMetaTableLocator() {
-      return null;
-    }
-
-    @Override
-    public ServerName getServerName() {
-      return ServerName.valueOf(this.serverName);
-    }
-
-    @Override
-    public void abort(String why, Throwable e) {
-      abortCount++;
-      this.isAborted = true;
-    }
-
-    @Override
-    public boolean isAborted() {
-      return this.isAborted;
-    }
-
-    @Override
-    public void stop(String why) {
-      this.isStopped = true;
-    }
-
-    @Override
-    public boolean isStopped() {
-      return this.isStopped;
-    }
-
-    @Override
-    public ChoreService getChoreService() {
-      return null;
-    }
-
-    @Override
-    public ClusterConnection getClusterConnection() {
-      return null;
-    }
-
-    public int getAbortCount() {
-      return abortCount;
-    }
-
-    public void resetAbortCount() {
-      abortCount = 0;
-    }
-
-    @Override
-    public FileSystem getFileSystem() {
-      return null;
-    }
-
-    @Override
-    public boolean isStopping() {
-      return false;
-    }
-
-    @Override
-    public Connection createConnection(Configuration conf) throws IOException {
-      return null;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/65159dc2/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTableBase.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTableBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTableBase.java
deleted file mode 100644
index 665eedb..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTableBase.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-package org.apache.hadoop.hbase.replication;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.Waiter;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.apache.hadoop.hbase.testclassification.ReplicationTests;
-import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Tests ReplicationTableBase behavior when the Master startup is delayed. The table initialization
- * should be non-blocking, but any method calls that access the table should be blocking.
- */
-@Category({ReplicationTests.class, MediumTests.class})
-public class TestReplicationTableBase {
-
-  private static long SLEEP_MILLIS = 5000;
-  private static long TIME_OUT_MILLIS = 3000;
-  private static Configuration conf;
-  private static HBaseTestingUtility utility;
-  private static ZKWatcher zkw;
-  private static ReplicationTableBase rb;
-  private static ReplicationQueues rq;
-  private static ReplicationQueuesClient rqc;
-  private volatile boolean asyncRequestSuccess = false;
-
-  @Test
-  public void testSlowStartup() throws Exception{
-    utility = new HBaseTestingUtility();
-    utility.startMiniZKCluster();
-    conf = utility.getConfiguration();
-    conf.setClass("hbase.region.replica.replication.replicationQueues.class",
-      TableBasedReplicationQueuesImpl.class, ReplicationQueues.class);
-    conf.setClass("hbase.region.replica.replication.replicationQueuesClient.class",
-      TableBasedReplicationQueuesClientImpl.class, ReplicationQueuesClient.class);
-    zkw = HBaseTestingUtility.getZooKeeperWatcher(utility);
-    utility.waitFor(0, TIME_OUT_MILLIS, new Waiter.ExplainingPredicate<Exception>() {
-      @Override
-      public boolean evaluate() throws Exception {
-        rb = new ReplicationTableBase(conf, zkw) {};
-        rq = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(
-          conf, zkw, zkw));
-        rqc = ReplicationFactory.getReplicationQueuesClient(
-          new ReplicationQueuesClientArguments(conf, zkw, zkw));
-        return true;
-      }
-      @Override
-      public String explainFailure() throws Exception {
-        return "Failed to initialize ReplicationTableBase, TableBasedReplicationQueuesClient and " +
-          "TableBasedReplicationQueues after a timeout=" + TIME_OUT_MILLIS +
-          " ms. Their initialization " + "should be non-blocking";
-      }
-    });
-    final RequestReplicationQueueData async = new RequestReplicationQueueData();
-    async.start();
-    Thread.sleep(SLEEP_MILLIS);
-    // Test that the Replication Table has not been assigned and the methods are blocking
-    assertFalse(rb.getInitializationStatus());
-    assertFalse(asyncRequestSuccess);
-    utility.startMiniCluster();
-    // Test that the methods do return the correct results after getting the table
-    utility.waitFor(0, TIME_OUT_MILLIS, new Waiter.ExplainingPredicate<Exception>() {
-      @Override
-      public boolean evaluate() throws Exception {
-        async.join();
-        return true;
-      }
-      @Override
-      public String explainFailure() throws Exception {
-        return "ReplicationQueue failed to return list of replicators even after Replication Table "
-          + "was initialized timeout=" + TIME_OUT_MILLIS + " ms";
-      }
-    });
-    assertTrue(asyncRequestSuccess);
-  }
-
-  public class RequestReplicationQueueData extends Thread {
-    @Override
-    public void run() {
-      assertEquals(0, rq.getListOfReplicators().size());
-      asyncRequestSuccess = true;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/65159dc2/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestTableBasedReplicationSourceManagerImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestTableBasedReplicationSourceManagerImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestTableBasedReplicationSourceManagerImpl.java
deleted file mode 100644
index 19457e2..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestTableBasedReplicationSourceManagerImpl.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-package org.apache.hadoop.hbase.replication.regionserver;
-
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.NamespaceDescriptor;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.Waiter;
-import org.apache.hadoop.hbase.replication.ReplicationQueues;
-import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
-import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
-import org.apache.hadoop.hbase.replication.TableBasedReplicationQueuesClientImpl;
-import org.apache.hadoop.hbase.replication.TableBasedReplicationQueuesImpl;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.apache.hadoop.hbase.testclassification.ReplicationTests;
-import org.junit.BeforeClass;
-import org.junit.experimental.categories.Category;
-
-/**
- * Tests the ReplicationSourceManager with TableBasedReplicationQueue's and
- * TableBasedReplicationQueuesClient
- */
-@Category({ReplicationTests.class, MediumTests.class})
-public class TestTableBasedReplicationSourceManagerImpl extends TestReplicationSourceManager {
-  @BeforeClass
-  public static void setUpBeforeClass() throws Exception {
-    conf = HBaseConfiguration.create();
-    conf.set("replication.replicationsource.implementation",
-      ReplicationSourceDummy.class.getCanonicalName());
-    conf.setLong("replication.sleep.before.failover", 2000);
-    conf.setInt("replication.source.maxretriesmultiplier", 10);
-
-    conf.setClass("hbase.region.replica.replication.replicationQueues.class",
-      TableBasedReplicationQueuesImpl.class, ReplicationQueues.class);
-    conf.setClass("hbase.region.replica.replication.replicationQueuesClient.class",
-      TableBasedReplicationQueuesClientImpl.class, ReplicationQueuesClient.class);
-    utility = new HBaseTestingUtility(conf);
-    utility.startMiniCluster();
-    Waiter.waitFor(conf, 3 * 1000,
-      () -> utility.getMiniHBaseCluster().getMaster().isInitialized());
-    utility.waitUntilAllRegionsAssigned(TableName.valueOf(
-        NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "replication"));
-    setupZkAndReplication();
-  }
-
-}


Mime
View raw message