hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject svn commit: r1464276 [1/2] - in /hbase/branches/0.95: hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/...
Date Thu, 04 Apr 2013 03:49:48 GMT
Author: stack
Date: Thu Apr  4 03:49:47 2013
New Revision: 1464276

URL: http://svn.apache.org/r1464276
Log:
HBASE-7568 [replication] Create an interface for replication queues

Added:
    hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
    hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java
    hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
    hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
    hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
    hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
    hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
Modified:
    hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateImpl.java
    hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java
    hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
    hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
    hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
    hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
    hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java

Added: hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java?rev=1464276&view=auto
==============================================================================
--- hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java (added)
+++ hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java Thu Apr  4 03:49:47 2013
@@ -0,0 +1,109 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import java.util.List;
+import java.util.SortedMap;
+import java.util.SortedSet;
+
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * This provides an interface for maintaining a region server's replication queues. These queues
+ * keep track of the HLogs that still need to be replicated to remote clusters.
+ */
+public interface ReplicationQueues {
+
+  /**
+   * Initialize the region server replication queue interface.
+   * @param serverName The server name of the region server that owns the replication queues this
+   *          interface manages.
+   */
+  public void init(String serverName);
+
+  /**
+   * Remove a replication queue.
+   * @param queueId a String that identifies the queue.
+   */
+  public void removeQueue(String queueId);
+
+  /**
+   * Add a new HLog file to the given queue. If the queue does not exist it is created.
+   * @param queueId a String that identifies the queue.
+   * @param filename name of the HLog
+   * @throws KeeperException
+   */
+  public void addLog(String queueId, String filename) throws KeeperException;
+
+  /**
+   * Remove an HLog file from the given queue.
+   * @param queueId a String that identifies the queue.
+   * @param filename name of the HLog
+   */
+  public void removeLog(String queueId, String filename);
+
+  /**
+   * Set the current position for a specific HLog in a given queue.
+   * @param queueId a String that identifies the queue
+   * @param filename name of the HLog
+   * @param position the current position in the file
+   */
+  public void setLogPosition(String queueId, String filename, long position);
+
+  /**
+   * Get the current position for a specific HLog in a given queue.
+   * @param queueId a String that identifies the queue
+   * @param filename name of the HLog
+   * @return the current position in the file
+   */
+  public long getLogPosition(String queueId, String filename) throws KeeperException;
+
+  /**
+   * Remove all replication queues for this region server.
+   */
+  public void removeAllQueues();
+
+  /**
+   * Get a list of all HLogs in the given queue.
+   * @param queueId a String that identifies the queue
+   * @return a list of HLogs, null if this region server is dead and has no outstanding queues
+   */
+  public List<String> getLogsInQueue(String queueId);
+
+  /**
+   * Get a list of all queues for this region server.
+   * @return a list of queueIds, null if this region server is dead and has no outstanding queues
+   */
+  public List<String> getAllQueues();
+
+  /**
+   * Take ownership for the set of queues belonging to a dead region server.
+   * @param regionserver the id of the dead region server
+   * @return A SortedMap of the queues that have been claimed, including a SortedSet of HLogs in
+   *         each queue. Returns an empty map if no queues were failed-over.
+   */
+  public SortedMap<String, SortedSet<String>> claimQueues(String regionserver);
+
+  /**
+   * Get a list of all region servers that have outstanding replication queues. These servers could
+   * be alive, dead or from a previous run of the cluster.
+   * @return a list of server names
+   */
+  public List<String> getListOfReplicators();
+}
\ No newline at end of file

Added: hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java?rev=1464276&view=auto
==============================================================================
--- hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java (added)
+++ hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java Thu Apr  4 03:49:47 2013
@@ -0,0 +1,50 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import java.util.List;
+
+/**
+ * This provides an interface for clients of replication to view replication queues. These queues
+ * keep track of the HLogs that still need to be replicated to remote clusters.
+ */
+public interface ReplicationQueuesClient {
+
+  /**
+   * Get a list of all region servers that have outstanding replication queues. These servers could
+   * be alive, dead or from a previous run of the cluster.
+   * @return a list of server names
+   */
+  public List<String> getListOfReplicators();
+
+  /**
+   * Get a list of all HLogs in the given queue on the given region server.
+   * @param serverName the server name of the region server that owns the queue
+   * @param queueId a String that identifies the queue
+   * @return a list of HLogs, null if this region server is dead and has no outstanding queues
+   */
+  public List<String> getLogsInQueue(String serverName, String queueId);
+
+  /**
+   * Get a list of all queues for the specified region server.
+   * @param serverName the server name of the region server that owns the set of queues
+   * @return a list of queueIds, null if this region server is not a replicator.
+   */
+  public List<String> getAllQueues(String serverName);
+}
\ No newline at end of file

Added: hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java?rev=1464276&view=auto
==============================================================================
--- hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java (added)
+++ hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java Thu Apr  4 03:49:47 2013
@@ -0,0 +1,64 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+
+public class ReplicationQueuesClientZKImpl extends ReplicationStateZKBase implements
+    ReplicationQueuesClient {
+
+  public ReplicationQueuesClientZKImpl(final ZooKeeperWatcher zk, Configuration conf,
+      Abortable abortable) throws KeeperException {
+    super(zk, conf, abortable);
+    ZKUtil.createWithParents(this.zookeeper, this.queuesZNode);
+  }
+
+  @Override
+  public List<String> getLogsInQueue(String serverName, String queueId) {
+    String znode = ZKUtil.joinZNode(this.queuesZNode, serverName);
+    znode = ZKUtil.joinZNode(znode, queueId);
+    List<String> result = null;
+    try {
+      result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode);
+    } catch (KeeperException e) {
+      this.abortable.abort("Failed to get list of hlogs for queueId=" + queueId
+          + " and serverName=" + serverName, e);
+    }
+    return result;
+  }
+
+  @Override
+  public List<String> getAllQueues(String serverName) {
+    String znode = ZKUtil.joinZNode(this.queuesZNode, serverName);
+    List<String> result = null;
+    try {
+      result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode);
+    } catch (KeeperException e) {
+      this.abortable.abort("Failed to get list of queues for serverName=" + serverName, e);
+    }
+    return result;
+  }
+
+}
\ No newline at end of file

Added: hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java?rev=1464276&view=auto
==============================================================================
--- hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java (added)
+++ hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java Thu Apr  4 03:49:47 2013
@@ -0,0 +1,403 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.SortedMap;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp;
+import org.apache.zookeeper.KeeperException;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+
+public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements ReplicationQueues {
+
+  /** Znode containing all replication queues for this region server. */
+  private String myQueuesZnode;
+  /** Name of znode we use to lock during failover */
+  private final static String RS_LOCK_ZNODE = "lock";
+
+  private static final Log LOG = LogFactory.getLog(ReplicationQueuesZKImpl.class);
+
+  public ReplicationQueuesZKImpl(final ZooKeeperWatcher zk, Configuration conf, Abortable abortable)
+      throws KeeperException {
+    super(zk, conf, abortable);
+  }
+
+  @Override
+  public void init(String serverName) {
+    this.myQueuesZnode = ZKUtil.joinZNode(this.queuesZNode, serverName);
+  }
+
+  @Override
+  public void removeQueue(String queueId) {
+    try {
+      ZKUtil.deleteNodeRecursively(this.zookeeper, ZKUtil.joinZNode(this.myQueuesZnode, queueId));
+    } catch (KeeperException e) {
+      this.abortable.abort("Failed to delete queue (queueId=" + queueId + ")", e);
+    }
+  }
+
+  @Override
+  public void addLog(String queueId, String filename) throws KeeperException {
+    String znode = ZKUtil.joinZNode(this.myQueuesZnode, queueId);
+    znode = ZKUtil.joinZNode(znode, filename);
+    ZKUtil.createWithParents(this.zookeeper, znode);
+  }
+
+  @Override
+  public void removeLog(String queueId, String filename) {
+    try {
+      String znode = ZKUtil.joinZNode(this.myQueuesZnode, queueId);
+      znode = ZKUtil.joinZNode(znode, filename);
+      ZKUtil.deleteNode(this.zookeeper, znode);
+    } catch (KeeperException e) {
+      this.abortable.abort("Failed to remove hlog from queue (queueId=" + queueId + ", filename="
+          + filename + ")", e);
+    }
+  }
+
+  @Override
+  public void setLogPosition(String queueId, String filename, long position) {
+    try {
+      String znode = ZKUtil.joinZNode(this.myQueuesZnode, queueId);
+      znode = ZKUtil.joinZNode(znode, filename);
+      // Why serialize String of Long and not Long as bytes?
+      ZKUtil.setData(this.zookeeper, znode, toByteArray(position));
+    } catch (KeeperException e) {
+      this.abortable.abort("Failed to write replication hlog position (filename=" + filename
+          + ", position=" + position + ")", e);
+    }
+  }
+
+  @Override
+  public long getLogPosition(String queueId, String filename) throws KeeperException {
+    String clusterZnode = ZKUtil.joinZNode(this.myQueuesZnode, queueId);
+    String znode = ZKUtil.joinZNode(clusterZnode, filename);
+    byte[] bytes = ZKUtil.getData(this.zookeeper, znode);
+    try {
+      return parseHLogPositionFrom(bytes);
+    } catch (DeserializationException de) {
+      LOG.warn("Failed to parse HLogPosition for queueId=" + queueId + " and hlog=" + filename
+          + "znode content, continuing.");
+    }
+    // if we can not parse the position, start at the beginning of the hlog file
+    // again
+    return 0;
+  }
+
+  @Override
+  public SortedMap<String, SortedSet<String>> claimQueues(String regionserverZnode) {
+    SortedMap<String, SortedSet<String>> newQueues = new TreeMap<String, SortedSet<String>>();
+    // check whether there is multi support. If yes, use it.
+    if (conf.getBoolean(HConstants.ZOOKEEPER_USEMULTI, true)) {
+      LOG.info("Atomically moving " + regionserverZnode + "'s hlogs to my queue");
+      newQueues = copyQueuesFromRSUsingMulti(regionserverZnode);
+    } else {
+      LOG.info("Moving " + regionserverZnode + "'s hlogs to my queue");
+      if (!lockOtherRS(regionserverZnode)) {
+        return newQueues;
+      }
+      newQueues = copyQueuesFromRS(regionserverZnode);
+      deleteAnotherRSQueues(regionserverZnode);
+    }
+    return newQueues;
+  }
+
+  @Override
+  public void removeAllQueues() {
+    try {
+      ZKUtil.deleteNodeRecursively(this.zookeeper, this.myQueuesZnode);
+    } catch (KeeperException e) {
+      // if the znode is already expired, don't bother going further
+      if (e instanceof KeeperException.SessionExpiredException) {
+        return;
+      }
+      this.abortable.abort("Failed to delete replication queues for region server: "
+          + this.myQueuesZnode, e);
+    }
+  }
+
+  @Override
+  public List<String> getLogsInQueue(String queueId) {
+    String znode = ZKUtil.joinZNode(this.myQueuesZnode, queueId);
+    List<String> result = null;
+    try {
+      result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode);
+    } catch (KeeperException e) {
+      this.abortable.abort("Failed to get list of hlogs for queueId=" + queueId, e);
+    }
+    return result;
+  }
+
+  @Override
+  public List<String> getAllQueues() {
+    List<String> listOfQueues = null;
+    try {
+      listOfQueues = ZKUtil.listChildrenNoWatch(this.zookeeper, this.myQueuesZnode);
+    } catch (KeeperException e) {
+      this.abortable.abort("Failed to get a list of queues for region server: "
+          + this.myQueuesZnode, e);
+    }
+    return listOfQueues;
+  }
+
+  /**
+   * Try to set a lock in another region server's znode.
+   * @param znode the server names of the other server
+   * @return true if the lock was acquired, false in every other cases
+   */
+  private boolean lockOtherRS(String znode) {
+    try {
+      String parent = ZKUtil.joinZNode(this.queuesZNode, znode);
+      if (parent.equals(this.myQueuesZnode)) {
+        LOG.warn("Won't lock because this is us, we're dead!");
+        return false;
+      }
+      String p = ZKUtil.joinZNode(parent, RS_LOCK_ZNODE);
+      ZKUtil.createAndWatch(this.zookeeper, p, lockToByteArray(this.myQueuesZnode));
+    } catch (KeeperException e) {
+      // This exception will pop up if the znode under which we're trying to
+      // create the lock is already deleted by another region server, meaning
+      // that the transfer already occurred.
+      // NoNode => transfer is done and znodes are already deleted
+      // NodeExists => lock znode already created by another RS
+      if (e instanceof KeeperException.NoNodeException
+          || e instanceof KeeperException.NodeExistsException) {
+        LOG.info("Won't transfer the queue," + " another RS took care of it because of: "
+            + e.getMessage());
+      } else {
+        LOG.info("Failed lock other rs", e);
+      }
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * Delete all the replication queues for a given region server.
+   * @param regionserverZnode The znode of the region server to delete.
+   */
+  private void deleteAnotherRSQueues(String regionserverZnode) {
+    String fullpath = ZKUtil.joinZNode(this.queuesZNode, regionserverZnode);
+    try {
+      List<String> clusters = ZKUtil.listChildrenNoWatch(this.zookeeper, fullpath);
+      for (String cluster : clusters) {
+        // No need to delete, it will be deleted later.
+        if (cluster.equals(RS_LOCK_ZNODE)) {
+          continue;
+        }
+        String fullClusterPath = ZKUtil.joinZNode(fullpath, cluster);
+        ZKUtil.deleteNodeRecursively(this.zookeeper, fullClusterPath);
+      }
+      // Finish cleaning up
+      ZKUtil.deleteNodeRecursively(this.zookeeper, fullpath);
+    } catch (KeeperException e) {
+      if (e instanceof KeeperException.NoNodeException
+          || e instanceof KeeperException.NotEmptyException) {
+        // Testing a special case where another region server was able to
+        // create a lock just after we deleted it, but then was also able to
+        // delete the RS znode before us or its lock znode is still there.
+        if (e.getPath().equals(fullpath)) {
+          return;
+        }
+      }
+      this.abortable.abort("Failed to delete replication queues for region server: "
+          + regionserverZnode, e);
+    }
+  }
+
+  /**
+   * It "atomically" copies all the hlogs queues from another region server and returns them all
+   * sorted per peer cluster (appended with the dead server's znode).
+   * @param znode pertaining to the region server to copy the queues from
+   * @return HLog queues sorted per peer cluster
+   */
+  private SortedMap<String, SortedSet<String>> copyQueuesFromRSUsingMulti(String znode) {
+    SortedMap<String, SortedSet<String>> queues = new TreeMap<String, SortedSet<String>>();
+    // hbase/replication/rs/deadrs
+    String deadRSZnodePath = ZKUtil.joinZNode(this.queuesZNode, znode);
+    List<String> peerIdsToProcess = null;
+    List<ZKUtilOp> listOfOps = new ArrayList<ZKUtil.ZKUtilOp>();
+    try {
+      peerIdsToProcess = ZKUtil.listChildrenNoWatch(this.zookeeper, deadRSZnodePath);
+      if (peerIdsToProcess == null) return queues; // node already processed
+      for (String peerId : peerIdsToProcess) {
+        String newPeerId = peerId + "-" + znode;
+        String newPeerZnode = ZKUtil.joinZNode(this.myQueuesZnode, newPeerId);
+        // check the logs queue for the old peer cluster
+        String oldClusterZnode = ZKUtil.joinZNode(deadRSZnodePath, peerId);
+        List<String> hlogs = ZKUtil.listChildrenNoWatch(this.zookeeper, oldClusterZnode);
+        if (hlogs == null || hlogs.size() == 0) {
+          listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode));
+          continue; // empty log queue.
+        }
+        // create the new cluster znode
+        SortedSet<String> logQueue = new TreeSet<String>();
+        queues.put(newPeerId, logQueue);
+        ZKUtilOp op = ZKUtilOp.createAndFailSilent(newPeerZnode, HConstants.EMPTY_BYTE_ARRAY);
+        listOfOps.add(op);
+        // get the offset of the logs and set it to new znodes
+        for (String hlog : hlogs) {
+          String oldHlogZnode = ZKUtil.joinZNode(oldClusterZnode, hlog);
+          byte[] logOffset = ZKUtil.getData(this.zookeeper, oldHlogZnode);
+          LOG.debug("Creating " + hlog + " with data " + Bytes.toString(logOffset));
+          String newLogZnode = ZKUtil.joinZNode(newPeerZnode, hlog);
+          listOfOps.add(ZKUtilOp.createAndFailSilent(newLogZnode, logOffset));
+          // add ops for deleting
+          listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldHlogZnode));
+          logQueue.add(hlog);
+        }
+        // add delete op for peer
+        listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode));
+      }
+      // add delete op for dead rs
+      listOfOps.add(ZKUtilOp.deleteNodeFailSilent(deadRSZnodePath));
+      LOG.debug(" The multi list size is: " + listOfOps.size());
+      ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false);
+      LOG.info("Atomically moved the dead regionserver logs. ");
+    } catch (KeeperException e) {
+      // Multi call failed; it looks like some other regionserver took away the logs.
+      LOG.warn("Got exception in copyQueuesFromRSUsingMulti: ", e);
+      queues.clear();
+    }
+    return queues;
+  }
+
+  /**
+   * This methods copies all the hlogs queues from another region server and returns them all sorted
+   * per peer cluster (appended with the dead server's znode)
+   * @param znode server names to copy
+   * @return all hlogs for all peers of that cluster, null if an error occurred
+   */
+  private SortedMap<String, SortedSet<String>> copyQueuesFromRS(String znode) {
+    // TODO this method isn't atomic enough, we could start copying and then
+    // TODO fail for some reason and we would end up with znodes we don't want.
+    SortedMap<String, SortedSet<String>> queues = new TreeMap<String, SortedSet<String>>();
+    try {
+      String nodePath = ZKUtil.joinZNode(this.queuesZNode, znode);
+      List<String> clusters = ZKUtil.listChildrenNoWatch(this.zookeeper, nodePath);
+      // We have a lock znode in there, it will count as one.
+      if (clusters == null || clusters.size() <= 1) {
+        return queues;
+      }
+      // The lock isn't a peer cluster, remove it
+      clusters.remove(RS_LOCK_ZNODE);
+      for (String cluster : clusters) {
+        // We add the name of the recovered RS to the new znode, we can even
+        // do that for queues that were recovered 10 times giving a znode like
+        // number-startcode-number-otherstartcode-number-anotherstartcode-etc
+        String newCluster = cluster + "-" + znode;
+        String newClusterZnode = ZKUtil.joinZNode(this.myQueuesZnode, newCluster);
+        String clusterPath = ZKUtil.joinZNode(nodePath, cluster);
+        List<String> hlogs = ZKUtil.listChildrenNoWatch(this.zookeeper, clusterPath);
+        // That region server didn't have anything to replicate for this cluster
+        if (hlogs == null || hlogs.size() == 0) {
+          continue;
+        }
+        ZKUtil.createNodeIfNotExistsAndWatch(this.zookeeper, newClusterZnode,
+          HConstants.EMPTY_BYTE_ARRAY);
+        SortedSet<String> logQueue = new TreeSet<String>();
+        queues.put(newCluster, logQueue);
+        for (String hlog : hlogs) {
+          String z = ZKUtil.joinZNode(clusterPath, hlog);
+          byte[] positionBytes = ZKUtil.getData(this.zookeeper, z);
+          long position = 0;
+          try {
+            position = parseHLogPositionFrom(positionBytes);
+          } catch (DeserializationException e) {
+            LOG.warn("Failed parse of hlog position from the following znode: " + z);
+          }
+          LOG.debug("Creating " + hlog + " with data " + position);
+          String child = ZKUtil.joinZNode(newClusterZnode, hlog);
+          // Position doesn't actually change, we are just deserializing it for
+          // logging, so just use the already serialized version
+          ZKUtil.createAndWatch(this.zookeeper, child, positionBytes);
+          logQueue.add(hlog);
+        }
+      }
+    } catch (KeeperException e) {
+      this.abortable.abort("Copy queues from rs", e);
+    }
+    return queues;
+  }
+
+  /**
+   * @param lockOwner
+   * @return Serialized protobuf of <code>lockOwner</code> with pb magic prefix prepended suitable
+   *         for use as content of an replication lock during region server fail over.
+   */
+  static byte[] lockToByteArray(final String lockOwner) {
+    byte[] bytes =
+        ZooKeeperProtos.ReplicationLock.newBuilder().setLockOwner(lockOwner).build().toByteArray();
+    return ProtobufUtil.prependPBMagic(bytes);
+  }
+
+  /**
+   * @param position
+   * @return Serialized protobuf of <code>position</code> with pb magic prefix prepended suitable
+   *         for use as content of an hlog position in a replication queue.
+   */
+  static byte[] toByteArray(final long position) {
+    byte[] bytes =
+        ZooKeeperProtos.ReplicationHLogPosition.newBuilder().setPosition(position).build()
+            .toByteArray();
+    return ProtobufUtil.prependPBMagic(bytes);
+  }
+
+  /**
+   * @param bytes - Content of a HLog position znode.
+   * @return long - The current HLog position.
+   * @throws DeserializationException
+   */
+  private long parseHLogPositionFrom(final byte[] bytes) throws DeserializationException {
+    if (ProtobufUtil.isPBMagicPrefix(bytes)) {
+      int pblen = ProtobufUtil.lengthOfPBMagic();
+      ZooKeeperProtos.ReplicationHLogPosition.Builder builder =
+          ZooKeeperProtos.ReplicationHLogPosition.newBuilder();
+      ZooKeeperProtos.ReplicationHLogPosition position;
+      try {
+        position = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build();
+      } catch (InvalidProtocolBufferException e) {
+        throw new DeserializationException(e);
+      }
+      return position.getPosition();
+    } else {
+      if (bytes.length > 0) {
+        return Bytes.toLong(bytes);
+      }
+      return 0;
+    }
+  }
+}
\ No newline at end of file

Modified: hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateImpl.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateImpl.java?rev=1464276&r1=1464275&r2=1464276&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateImpl.java (original)
+++ hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateImpl.java Thu Apr  4 03:49:47 2013
@@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.replicat
 import com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@@ -37,34 +38,37 @@ import java.util.concurrent.atomic.Atomi
  * ReplicationStateImpl is responsible for maintaining the replication state
  * znode.
  */
-public class ReplicationStateImpl implements ReplicationStateInterface {
+public class ReplicationStateImpl extends ReplicationStateZKBase implements
+    ReplicationStateInterface {
 
-  private ReplicationStateTracker stateTracker;
-  private final String stateZnode;
-  private final ZooKeeperWatcher zookeeper;
-  private final Abortable abortable;
+  private final ReplicationStateTracker stateTracker;
   private final AtomicBoolean replicating;
 
   private static final Log LOG = LogFactory.getLog(ReplicationStateImpl.class);
 
-  public ReplicationStateImpl(final ZooKeeperWatcher zk, final String stateZnode,
+  public ReplicationStateImpl(final ZooKeeperWatcher zk, final Configuration conf,
       final Abortable abortable, final AtomicBoolean replicating) {
-    this.zookeeper = zk;
-    this.stateZnode = stateZnode;
-    this.abortable = abortable;
+    super(zk, conf, abortable);
     this.replicating = replicating;
 
     // Set a tracker on replicationStateNode
-    this.stateTracker = new ReplicationStateTracker(this.zookeeper, this.stateZnode,
-        this.abortable);
+    this.stateTracker =
+        new ReplicationStateTracker(this.zookeeper, this.stateZNode, this.abortable);
     stateTracker.start();
     readReplicationStateZnode();
   }
 
+  public ReplicationStateImpl(final ZooKeeperWatcher zk, final Configuration conf,
+      final Abortable abortable) {
+    this(zk, conf, abortable, new AtomicBoolean());
+  }
+
+  @Override
   public boolean getState() throws KeeperException {
     return getReplication();
   }
 
+  @Override
   public void setState(boolean newState) throws KeeperException {
     setReplicating(newState);
   }
@@ -110,10 +114,10 @@ public class ReplicationStateImpl implem
    * @param newState
    */
   private void setReplicating(boolean newState) throws KeeperException {
-    ZKUtil.createWithParents(this.zookeeper, this.stateZnode);
+    ZKUtil.createWithParents(this.zookeeper, this.stateZNode);
     byte[] stateBytes = (newState == true) ? ReplicationZookeeper.ENABLED_ZNODE_BYTES
         : ReplicationZookeeper.DISABLED_ZNODE_BYTES;
-    ZKUtil.setData(this.zookeeper, this.stateZnode, stateBytes);
+    ZKUtil.setData(this.zookeeper, this.stateZNode, stateBytes);
   }
 
   /**
@@ -143,7 +147,7 @@ public class ReplicationStateImpl implem
       this.replicating.set(getReplication());
       LOG.info("Replication is now " + (this.replicating.get() ? "started" : "stopped"));
     } catch (KeeperException e) {
-      this.abortable.abort("Failed getting data on from " + this.stateZnode, e);
+      this.abortable.abort("Failed getting data on from " + this.stateZNode, e);
     }
   }
 

Added: hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java?rev=1464276&view=auto
==============================================================================
--- hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java (added)
+++ hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java Thu Apr  4 03:49:47 2013
@@ -0,0 +1,81 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+
+
+/**
+ * This is a base class for maintaining replication state in zookeeper.
+ */
+public abstract class ReplicationStateZKBase {
+
+  /**
+   * The name of the znode that contains the replication status of a remote slave (i.e. peer)
+   * cluster.
+   */
+  protected final String peerStateNodeName;
+  /** The name of the znode that contains the replication status of the local cluster. */
+  protected final String stateZNode;
+  /** The name of the base znode that contains all replication state. */
+  protected final String replicationZNode;
+  /** The name of the znode that contains a list of all remote slave (i.e. peer) clusters. */
+  protected final String peersZNode;
+  /** The name of the znode that contains all replication queues */
+  protected final String queuesZNode;
+  /** The cluster key of the local cluster */
+  protected final String ourClusterKey;
+  protected final ZooKeeperWatcher zookeeper;
+  protected final Configuration conf;
+  protected final Abortable abortable;
+
+  public ReplicationStateZKBase(ZooKeeperWatcher zookeeper, Configuration conf, 
+      Abortable abortable) {
+    this.zookeeper = zookeeper;
+    this.conf = conf;
+    this.abortable = abortable;
+
+    String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication");
+    String peersZNodeName = conf.get("zookeeper.znode.replication.peers", "peers");
+    String queuesZNodeName = conf.get("zookeeper.znode.replication.rs", "rs");
+    String stateZNodeName = conf.get("zookeeper.znode.replication.state", "state");
+    this.peerStateNodeName = conf.get("zookeeper.znode.replication.peers.state", "peer-state");
+    this.ourClusterKey = ZKUtil.getZooKeeperClusterKey(this.conf);
+    this.replicationZNode = ZKUtil.joinZNode(this.zookeeper.baseZNode, replicationZNodeName);
+    this.stateZNode = ZKUtil.joinZNode(replicationZNode, stateZNodeName);
+    this.peersZNode = ZKUtil.joinZNode(replicationZNode, peersZNodeName);
+    this.queuesZNode = ZKUtil.joinZNode(replicationZNode, queuesZNodeName);
+  }
+
+  public List<String> getListOfReplicators() {
+    List<String> result = null;
+    try {
+      result = ZKUtil.listChildrenNoWatch(this.zookeeper, this.queuesZNode);
+    } catch (KeeperException e) {
+      this.abortable.abort("Failed to get list of replicators", e);
+    }
+    return result;
+  }
+}
\ No newline at end of file

Modified: hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java?rev=1464276&r1=1464275&r2=1464276&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java (original)
+++ hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java Thu Apr  4 03:49:47 2013
@@ -24,7 +24,6 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Abortable;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
@@ -33,7 +32,6 @@ import org.apache.hadoop.hbase.protobuf.
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.zookeeper.KeeperException;
@@ -51,7 +49,6 @@ import java.util.Map;
 import java.util.SortedMap;
 import java.util.SortedSet;
 import java.util.TreeMap;
-import java.util.TreeSet;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -88,8 +85,6 @@ import java.util.concurrent.atomic.Atomi
 public class ReplicationZookeeper implements Closeable {
   private static final Log LOG =
     LogFactory.getLog(ReplicationZookeeper.class);
-  // Name of znode we use to lock when failover
-  private final static String RS_LOCK_ZNODE = "lock";
 
   // Our handle on zookeeper
   private final ZooKeeperWatcher zookeeper;
@@ -114,6 +109,7 @@ public class ReplicationZookeeper implem
   // Abortable
   private Abortable abortable;
   private final ReplicationStateInterface replicationState;
+  private final ReplicationQueues replicationQueues;
 
   /**
    * ZNode content if enabled state.
@@ -139,8 +135,10 @@ public class ReplicationZookeeper implem
     this.conf = conf;
     this.zookeeper = zk;
     setZNodes(abortable);
-    this.replicationState =
-        new ReplicationStateImpl(this.zookeeper, getRepStateNode(), abortable, new AtomicBoolean());
+    this.replicationState = new ReplicationStateImpl(this.zookeeper, conf, abortable);
+    // TODO This interface is no longer used by anyone using this constructor. When this class goes
+    // away, we will no longer have this null initialization business
+    this.replicationQueues = null;
   }
 
   /**
@@ -149,7 +147,7 @@ public class ReplicationZookeeper implem
    * @param server
    * @param replicating    atomic boolean to start/stop replication
    * @throws IOException
-   * @throws KeeperException 
+   * @throws KeeperException
    */
   public ReplicationZookeeper(final Server server, final AtomicBoolean replicating)
   throws IOException, KeeperException {
@@ -158,13 +156,14 @@ public class ReplicationZookeeper implem
     this.conf = server.getConfiguration();
     setZNodes(server);
 
-    this.replicationState =
-        new ReplicationStateImpl(this.zookeeper, getRepStateNode(), server, replicating);
+    this.replicationState = new ReplicationStateImpl(this.zookeeper, conf, server, replicating);
     this.peerClusters = new HashMap<String, ReplicationPeer>();
     ZKUtil.createWithParents(this.zookeeper,
         ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName));
     this.rsServerNameZnode = ZKUtil.joinZNode(rsZNode, server.getServerName().toString());
     ZKUtil.createWithParents(this.zookeeper, this.rsServerNameZnode);
+    this.replicationQueues = new ReplicationQueuesZKImpl(this.zookeeper, this.conf, server);
+    this.replicationQueues.init(server.getServerName().toString());
     connectExistingPeers();
   }
 
@@ -433,32 +432,6 @@ public class ReplicationZookeeper implem
   }
 
   /**
-   * @param position
-   * @return Serialized protobuf of <code>position</code> with pb magic prefix
-   *         prepended suitable for use as content of an hlog position in a
-   *         replication queue.
-   */
-  static byte[] toByteArray(
-      final long position) {
-    byte[] bytes = ZooKeeperProtos.ReplicationHLogPosition.newBuilder().setPosition(position)
-        .build().toByteArray();
-    return ProtobufUtil.prependPBMagic(bytes);
-  }
-
-  /**
-   * @param lockOwner
-   * @return Serialized protobuf of <code>lockOwner</code> with pb magic prefix
-   *         prepended suitable for use as content of an replication lock during
-   *         region server fail over.
-   */
-  static byte[] lockToByteArray(
-      final String lockOwner) {
-    byte[] bytes = ZooKeeperProtos.ReplicationLock.newBuilder().setLockOwner(lockOwner).build()
-        .toByteArray();
-    return ProtobufUtil.prependPBMagic(bytes);
-  }
-
-  /**
    * @param bytes Content of a peer znode.
    * @return ClusterKey parsed from the passed bytes.
    * @throws DeserializationException
@@ -503,58 +476,6 @@ public class ReplicationZookeeper implem
     }
   }
 
-  /**
-   * @param bytes - Content of a HLog position znode.
-   * @return long - The current HLog position.
-   * @throws DeserializationException
-   */
-  static long parseHLogPositionFrom(
-      final byte[] bytes) throws DeserializationException {
-    if (ProtobufUtil.isPBMagicPrefix(bytes)) {
-      int pblen = ProtobufUtil.lengthOfPBMagic();
-      ZooKeeperProtos.ReplicationHLogPosition.Builder builder = ZooKeeperProtos.ReplicationHLogPosition
-          .newBuilder();
-      ZooKeeperProtos.ReplicationHLogPosition position;
-      try {
-        position = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build();
-      } catch (InvalidProtocolBufferException e) {
-        throw new DeserializationException(e);
-      }
-      return position.getPosition();
-    } else {
-      if (bytes.length > 0) {
-        return Bytes.toLong(bytes);
-      }
-      return 0;
-    }
-  }
-
-  /**
-   * @param bytes - Content of a lock znode.
-   * @return String - The owner of the lock.
-   * @throws DeserializationException
-   */
-  static String parseLockOwnerFrom(
-      final byte[] bytes) throws DeserializationException {
-    if (ProtobufUtil.isPBMagicPrefix(bytes)) {
-      int pblen = ProtobufUtil.lengthOfPBMagic();
-      ZooKeeperProtos.ReplicationLock.Builder builder = ZooKeeperProtos.ReplicationLock
-          .newBuilder();
-      ZooKeeperProtos.ReplicationLock lock;
-      try {
-        lock = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build();
-      } catch (InvalidProtocolBufferException e) {
-        throw new DeserializationException(e);
-      }
-      return lock.getLockOwner();
-    } else {
-      if (bytes.length > 0) {
-        return Bytes.toString(bytes);
-      }
-      return "";
-    }
-  }
-
   private boolean peerExists(String id) throws KeeperException {
     return ZKUtil.checkExists(this.zookeeper,
           ZKUtil.joinZNode(this.peersZNode, id)) >= 0;
@@ -624,10 +545,6 @@ public class ReplicationZookeeper implem
     return ZKUtil.joinZNode(this.peersZNode, ZKUtil.joinZNode(id, this.peerStateNodeName));
   }
 
-  private String getRepStateNode() {
-    return ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName);
-  }
-
   /**
    * Get the replication status of this cluster. If the state znode doesn't exist it will also
    * create it and set it true.
@@ -652,11 +569,8 @@ public class ReplicationZookeeper implem
    * @param filename name of the hlog's znode
    * @param peerId name of the cluster's znode
    */
-  public void addLogToList(String filename, String peerId)
-    throws KeeperException {
-    String znode = ZKUtil.joinZNode(this.rsServerNameZnode, peerId);
-    znode = ZKUtil.joinZNode(znode, filename);
-    ZKUtil.createWithParents(this.zookeeper, znode);
+  public void addLogToList(String filename, String peerId) throws KeeperException {
+    this.replicationQueues.addLog(peerId, filename);
   }
 
   /**
@@ -665,13 +579,7 @@ public class ReplicationZookeeper implem
    * @param clusterId name of the cluster's znode
    */
   public void removeLogFromList(String filename, String clusterId) {
-    try {
-      String znode = ZKUtil.joinZNode(rsServerNameZnode, clusterId);
-      znode = ZKUtil.joinZNode(znode, filename);
-      ZKUtil.deleteNode(this.zookeeper, znode);
-    } catch (KeeperException e) {
-      this.abortable.abort("Failed remove from list", e);
-    }
+    this.replicationQueues.removeLog(clusterId, filename);
   }
 
   /**
@@ -679,18 +587,9 @@ public class ReplicationZookeeper implem
    * @param filename filename name of the hlog's znode
    * @param clusterId clusterId name of the cluster's znode
    * @param position the position in the file
-   * @throws IOException
    */
-  public void writeReplicationStatus(String filename, String clusterId,
-      long position) {
-    try {
-      String znode = ZKUtil.joinZNode(this.rsServerNameZnode, clusterId);
-      znode = ZKUtil.joinZNode(znode, filename);
-      // Why serialize String of Long and note Long as bytes?
-      ZKUtil.setData(this.zookeeper, znode, toByteArray(position));
-    } catch (KeeperException e) {
-      this.abortable.abort("Writing replication status", e);
-    }
+  public void writeReplicationStatus(String filename, String clusterId, long position) {
+    this.replicationQueues.setLogPosition(clusterId, filename, position);
   }
 
   /**
@@ -709,202 +608,15 @@ public class ReplicationZookeeper implem
     return result;
   }
 
-  /**
-   * Get the list of the replicators that have queues, they can be alive, dead
-   * or simply from a previous run
-   * @return a list of server names
-   */
-  public List<String> getListOfReplicators() {
-    List<String> result = null;
-    try {
-      result = ZKUtil.listChildrenNoWatch(this.zookeeper, rsZNode);
-    } catch (KeeperException e) {
-      this.abortable.abort("Get list of replicators", e);
-    }
-    return result;
-  }
 
   /**
-   * Get the list of peer clusters for the specified server names
-   * @param rs server names of the rs
-   * @return a list of peer cluster
+   * Take ownership for the set of queues belonging to a dead region server.
+   * @param regionserver the id of the dead region server
+   * @return A SortedMap of the queues that have been claimed, including a SortedSet of HLogs in
+   *         each queue.
    */
-  public List<String> getListPeersForRS(String rs) {
-    String znode = ZKUtil.joinZNode(rsZNode, rs);
-    List<String> result = null;
-    try {
-      result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode);
-    } catch (KeeperException e) {
-      this.abortable.abort("Get list of peers for rs", e);
-    }
-    return result;
-  }
-
-  /**
-   * Get the list of hlogs for the specified region server and peer cluster
-   * @param rs server names of the rs
-   * @param id peer cluster
-   * @return a list of hlogs
-   */
-  public List<String> getListHLogsForPeerForRS(String rs, String id) {
-    String znode = ZKUtil.joinZNode(rsZNode, rs);
-    znode = ZKUtil.joinZNode(znode, id);
-    List<String> result = null;
-    try {
-      result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode);
-    } catch (KeeperException e) {
-      this.abortable.abort("Get list of hlogs for peer", e);
-    }
-    return result;
-  }
-
-  /**
-   * Try to set a lock in another server's znode.
-   * @param znode the server names of the other server
-   * @return true if the lock was acquired, false in every other cases
-   */
-  public boolean lockOtherRS(String znode) {
-    try {
-      String parent = ZKUtil.joinZNode(this.rsZNode, znode);
-      if (parent.equals(rsServerNameZnode)) {
-        LOG.warn("Won't lock because this is us, we're dead!");
-        return false;
-      }
-      String p = ZKUtil.joinZNode(parent, RS_LOCK_ZNODE);
-      ZKUtil.createAndWatch(this.zookeeper, p, lockToByteArray(rsServerNameZnode));
-    } catch (KeeperException e) {
-      // This exception will pop up if the znode under which we're trying to
-      // create the lock is already deleted by another region server, meaning
-      // that the transfer already occurred.
-      // NoNode => transfer is done and znodes are already deleted
-      // NodeExists => lock znode already created by another RS
-      if (e instanceof KeeperException.NoNodeException ||
-          e instanceof KeeperException.NodeExistsException) {
-        LOG.info("Won't transfer the queue," +
-            " another RS took care of it because of: " + e.getMessage());
-      } else {
-        LOG.info("Failed lock other rs", e);
-      }
-      return false;
-    }
-    return true;
-  }
-
-  /**
-   * It "atomically" copies all the hlogs queues from another region server and returns them all
-   * sorted per peer cluster (appended with the dead server's znode).
-   * @param znode
-   * @return HLog queues sorted per peer cluster
-   */
-  public SortedMap<String, SortedSet<String>> copyQueuesFromRSUsingMulti(String znode) {
-    SortedMap<String, SortedSet<String>> queues = new TreeMap<String, SortedSet<String>>();
-    String deadRSZnodePath = ZKUtil.joinZNode(rsZNode, znode);// hbase/replication/rs/deadrs
-    List<String> peerIdsToProcess = null;
-    List<ZKUtilOp> listOfOps = new ArrayList<ZKUtil.ZKUtilOp>();
-    try {
-      peerIdsToProcess = ZKUtil.listChildrenNoWatch(this.zookeeper, deadRSZnodePath);
-      if (peerIdsToProcess == null) return queues; // node already processed
-      for (String peerId : peerIdsToProcess) {
-        String newPeerId = peerId + "-" + znode;
-        String newPeerZnode = ZKUtil.joinZNode(this.rsServerNameZnode, newPeerId);
-        // check the logs queue for the old peer cluster
-        String oldClusterZnode = ZKUtil.joinZNode(deadRSZnodePath, peerId);
-        List<String> hlogs = ZKUtil.listChildrenNoWatch(this.zookeeper, oldClusterZnode);
-        if (hlogs == null || hlogs.size() == 0) {
-          listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode));
-          continue; // empty log queue.
-        }
-        // create the new cluster znode
-        SortedSet<String> logQueue = new TreeSet<String>();
-        queues.put(newPeerId, logQueue);
-        ZKUtilOp op = ZKUtilOp.createAndFailSilent(newPeerZnode, HConstants.EMPTY_BYTE_ARRAY);
-        listOfOps.add(op);
-        // get the offset of the logs and set it to new znodes
-        for (String hlog : hlogs) {
-          String oldHlogZnode = ZKUtil.joinZNode(oldClusterZnode, hlog);
-          byte[] logOffset = ZKUtil.getData(this.zookeeper, oldHlogZnode);
-          LOG.debug("Creating " + hlog + " with data " + Bytes.toString(logOffset));
-          String newLogZnode = ZKUtil.joinZNode(newPeerZnode, hlog);
-          listOfOps.add(ZKUtilOp.createAndFailSilent(newLogZnode, logOffset));
-          // add ops for deleting
-          listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldHlogZnode));
-          logQueue.add(hlog);
-        }
-        // add delete op for peer
-        listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode));
-      }
-      // add delete op for dead rs
-      listOfOps.add(ZKUtilOp.deleteNodeFailSilent(deadRSZnodePath));
-      LOG.debug(" The multi list size is: " + listOfOps.size());
-      ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false);
-      LOG.info("Atomically moved the dead regionserver logs. ");
-    } catch (KeeperException e) {
-      // Multi call failed; it looks like some other regionserver took away the logs.
-      LOG.warn("Got exception in copyQueuesFromRSUsingMulti: ", e);
-      queues.clear();
-    }
-    return queues;
-  }
-
-  /**
-   * This methods copies all the hlogs queues from another region server
-   * and returns them all sorted per peer cluster (appended with the dead
-   * server's znode)
-   * @param znode server names to copy
-   * @return all hlogs for all peers of that cluster, null if an error occurred
-   */
-  public SortedMap<String, SortedSet<String>> copyQueuesFromRS(String znode) {
-    // TODO this method isn't atomic enough, we could start copying and then
-    // TODO fail for some reason and we would end up with znodes we don't want.
-    SortedMap<String,SortedSet<String>> queues =
-        new TreeMap<String,SortedSet<String>>();
-    try {
-      String nodePath = ZKUtil.joinZNode(rsZNode, znode);
-      List<String> clusters =
-        ZKUtil.listChildrenNoWatch(this.zookeeper, nodePath);
-      // We have a lock znode in there, it will count as one.
-      if (clusters == null || clusters.size() <= 1) {
-        return queues;
-      }
-      // The lock isn't a peer cluster, remove it
-      clusters.remove(RS_LOCK_ZNODE);
-      for (String cluster : clusters) {
-        // We add the name of the recovered RS to the new znode, we can even
-        // do that for queues that were recovered 10 times giving a znode like
-        // number-startcode-number-otherstartcode-number-anotherstartcode-etc
-        String newCluster = cluster+"-"+znode;
-        String newClusterZnode = ZKUtil.joinZNode(rsServerNameZnode, newCluster);
-        String clusterPath = ZKUtil.joinZNode(nodePath, cluster);
-        List<String> hlogs = ZKUtil.listChildrenNoWatch(this.zookeeper, clusterPath);
-        // That region server didn't have anything to replicate for this cluster
-        if (hlogs == null || hlogs.size() == 0) {
-          continue;
-        }
-        ZKUtil.createNodeIfNotExistsAndWatch(this.zookeeper, newClusterZnode,
-            HConstants.EMPTY_BYTE_ARRAY);
-        SortedSet<String> logQueue = new TreeSet<String>();
-        queues.put(newCluster, logQueue);
-        for (String hlog : hlogs) {
-          String z = ZKUtil.joinZNode(clusterPath, hlog);
-          byte[] positionBytes = ZKUtil.getData(this.zookeeper, z);
-          long position = 0;
-          try {
-            position = parseHLogPositionFrom(positionBytes);
-          } catch (DeserializationException e) {
-            LOG.warn("Failed parse of hlog position from the following znode: " + z);
-          }
-          LOG.debug("Creating " + hlog + " with data " + position);
-          String child = ZKUtil.joinZNode(newClusterZnode, hlog);
-          // Position doesn't actually change, we are just deserializing it for
-          // logging, so just use the already serialized version
-          ZKUtil.createAndWatch(this.zookeeper, child, positionBytes);
-          logQueue.add(hlog);
-        }
-      }
-    } catch (KeeperException e) {
-      this.abortable.abort("Copy queues from rs", e);
-    }
-    return queues;
+  public SortedMap<String, SortedSet<String>> claimQueues(String regionserver) {
+    return this.replicationQueues.claimQueues(regionserver);
   }
 
   /**
@@ -912,48 +624,10 @@ public class ReplicationZookeeper implem
    * @param peerZnode znode of the peer cluster queue of hlogs to delete
    */
   public void deleteSource(String peerZnode, boolean closeConnection) {
-    try {
-      ZKUtil.deleteNodeRecursively(this.zookeeper,
-          ZKUtil.joinZNode(rsServerNameZnode, peerZnode));
-      if (closeConnection) {
-        this.peerClusters.get(peerZnode).getZkw().close();
-        this.peerClusters.remove(peerZnode);
-      }
-    } catch (KeeperException e) {
-      this.abortable.abort("Failed delete of " + peerZnode, e);
-    }
-  }
-
-  /**
-   * Recursive deletion of all znodes in specified rs' znode
-   * @param znode
-   */
-  public void deleteRsQueues(String znode) {
-    String fullpath = ZKUtil.joinZNode(rsZNode, znode);
-    try {
-      List<String> clusters =
-        ZKUtil.listChildrenNoWatch(this.zookeeper, fullpath);
-      for (String cluster : clusters) {
-        // We'll delete it later
-        if (cluster.equals(RS_LOCK_ZNODE)) {
-          continue;
-        }
-        String fullClusterPath = ZKUtil.joinZNode(fullpath, cluster);
-        ZKUtil.deleteNodeRecursively(this.zookeeper, fullClusterPath);
-      }
-      // Finish cleaning up
-      ZKUtil.deleteNodeRecursively(this.zookeeper, fullpath);
-    } catch (KeeperException e) {
-      if (e instanceof KeeperException.NoNodeException ||
-          e instanceof KeeperException.NotEmptyException) {
-        // Testing a special case where another region server was able to
-        // create a lock just after we deleted it, but then was also able to
-        // delete the RS znode before us or its lock znode is still there.
-        if (e.getPath().equals(fullpath)) {
-          return;
-        }
-      }
-      this.abortable.abort("Failed delete of " + znode, e);
+    this.replicationQueues.removeQueue(peerZnode);
+    if (closeConnection) {
+      this.peerClusters.get(peerZnode).getZkw().close();
+      this.peerClusters.remove(peerZnode);
     }
   }
 
@@ -961,16 +635,7 @@ public class ReplicationZookeeper implem
    * Delete this cluster's queues
    */
   public void deleteOwnRSZNode() {
-    try {
-      ZKUtil.deleteNodeRecursively(this.zookeeper,
-          this.rsServerNameZnode);
-    } catch (KeeperException e) {
-      // if the znode is already expired, don't bother going further
-      if (e instanceof KeeperException.SessionExpiredException) {
-        return;
-      }
-      this.abortable.abort("Failed delete of " + this.rsServerNameZnode, e);
-    }
+    this.replicationQueues.removeAllQueues();
   }
 
   /**
@@ -978,22 +643,10 @@ public class ReplicationZookeeper implem
    * @param peerId znode of the peer cluster
    * @param hlog name of the hlog
    * @return the position in that hlog
-   * @throws KeeperException 
+   * @throws KeeperException
    */
-  public long getHLogRepPosition(String peerId, String hlog)
-  throws KeeperException {
-    String clusterZnode = ZKUtil.joinZNode(rsServerNameZnode, peerId);
-    String znode = ZKUtil.joinZNode(clusterZnode, hlog);
-    byte[] bytes = ZKUtil.getData(this.zookeeper, znode);
-    try {
-      return parseHLogPositionFrom(bytes);
-    } catch (DeserializationException de) {
-      LOG.warn("Failed parse of HLogPosition for peerId=" + peerId + " and hlog=" + hlog
-          + "znode content, continuing.");
-    }
-    // if we can not parse the position, start at the beginning of the hlog file
-    // again
-    return 0;
+  public long getHLogRepPosition(String peerId, String hlog) throws KeeperException {
+    return this.replicationQueues.getLogPosition(peerId, hlog);
   }
 
   /**
@@ -1051,7 +704,7 @@ public class ReplicationZookeeper implem
   public Map<String, ReplicationPeer> getPeerClusters() {
     return this.peerClusters;
   }
-  
+
   /**
    * Determine if a ZK path points to a peer node.
    * @param path path to be checked

Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java?rev=1464276&r1=1464275&r2=1464276&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java Thu Apr  4 03:49:47 2013
@@ -27,7 +27,10 @@ import org.apache.hadoop.hbase.Abortable
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.HConnectionManager;
 import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate;
-import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
+import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
+import org.apache.hadoop.hbase.replication.ReplicationQueuesClientZKImpl;
+import org.apache.hadoop.hbase.replication.ReplicationStateImpl;
+import org.apache.hadoop.hbase.replication.ReplicationStateInterface;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.zookeeper.KeeperException;
 
@@ -43,8 +46,10 @@ import java.util.Set;
 @InterfaceAudience.Private
 public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abortable {
   private static final Log LOG = LogFactory.getLog(ReplicationLogCleaner.class);
-  private ReplicationZookeeper zkHelper;
-  private Set<String> hlogs = new HashSet<String>();
+  private ZooKeeperWatcher zkw;
+  private ReplicationQueuesClient replicationQueues;
+  private ReplicationStateInterface replicationState;
+  private final Set<String> hlogs = new HashSet<String>();
   private boolean stopped = false;
   private boolean aborted;
 
@@ -53,7 +58,7 @@ public class ReplicationLogCleaner exten
   public boolean isLogDeletable(Path filePath) {
 
     try {
-      if (!zkHelper.getReplication()) {
+      if (!replicationState.getState()) {
         return false;
       }
     } catch (KeeperException e) {
@@ -89,20 +94,20 @@ public class ReplicationLogCleaner exten
   private boolean refreshHLogsAndSearch(String searchedLog) {
     this.hlogs.clear();
     final boolean lookForLog = searchedLog != null;
-    List<String> rss = zkHelper.getListOfReplicators();
+    List<String> rss = replicationQueues.getListOfReplicators();
     if (rss == null) {
       LOG.debug("Didn't find any region server that replicates, deleting: " +
           searchedLog);
       return false;
     }
     for (String rs: rss) {
-      List<String> listOfPeers = zkHelper.getListPeersForRS(rs);
+      List<String> listOfPeers = replicationQueues.getAllQueues(rs);
       // if rs just died, this will be null
       if (listOfPeers == null) {
         continue;
       }
       for (String id : listOfPeers) {
-        List<String> peersHlogs = zkHelper.getListHLogsForPeerForRS(rs, id);
+        List<String> peersHlogs = replicationQueues.getLogsInQueue(rs, id);
         if (peersHlogs != null) {
           this.hlogs.addAll(peersHlogs);
         }
@@ -128,8 +133,9 @@ public class ReplicationLogCleaner exten
     Configuration conf = new Configuration(config);
     super.setConf(conf);
     try {
-      ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "replicationLogCleaner", null);
-      this.zkHelper = new ReplicationZookeeper(this, conf, zkw);
+      this.zkw = new ZooKeeperWatcher(conf, "replicationLogCleaner", null);
+      this.replicationQueues = new ReplicationQueuesClientZKImpl(zkw, conf, this);
+      this.replicationState = new ReplicationStateImpl(zkw, conf, this);
     } catch (KeeperException e) {
       LOG.error("Error while configuring " + this.getClass().getName(), e);
     } catch (IOException e) {
@@ -143,9 +149,17 @@ public class ReplicationLogCleaner exten
   public void stop(String why) {
     if (this.stopped) return;
     this.stopped = true;
-    if (this.zkHelper != null) {
-      LOG.info("Stopping " + this.zkHelper.getZookeeperWatcher());
-      this.zkHelper.getZookeeperWatcher().close();
+    if (this.zkw != null) {
+      LOG.info("Stopping " + this.zkw);
+      this.zkw.close();
+    }
+    if (this.replicationState != null) {
+      LOG.info("Stopping " + this.replicationState);
+      try {
+        this.replicationState.close();
+      } catch (IOException e) {
+        LOG.error("Error while stopping " + this.replicationState, e);
+      }
     }
     // Not sure why we're deleting a connection that we never acquired or used
     HConnectionManager.deleteConnection(this.getConf());

Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java?rev=1464276&r1=1464275&r2=1464276&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java Thu Apr  4 03:49:47 2013
@@ -43,6 +43,8 @@ import org.apache.hadoop.hbase.regionser
 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.replication.ReplicationQueues;
+import org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl;
 import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
 import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -64,6 +66,7 @@ public class Replication implements WALA
   private ReplicationSourceManager replicationManager;
   private final AtomicBoolean replicating = new AtomicBoolean(true);
   private ReplicationZookeeper zkHelper;
+  private ReplicationQueues replicationQueues;
   private Configuration conf;
   private ReplicationSink replicationSink;
   // Hosting server
@@ -104,18 +107,23 @@ public class Replication implements WALA
     if (replication) {
       try {
         this.zkHelper = new ReplicationZookeeper(server, this.replicating);
+        this.replicationQueues =
+            new ReplicationQueuesZKImpl(server.getZooKeeper(), this.conf, this.server);
+        this.replicationQueues.init(this.server.getServerName().toString());
       } catch (KeeperException ke) {
         throw new IOException("Failed replication handler create " +
            "(replicating=" + this.replicating, ke);
       }
-      this.replicationManager = new ReplicationSourceManager(zkHelper, conf, this.server, fs,
-          this.replicating, logDir, oldLogDir);
+      this.replicationManager =
+          new ReplicationSourceManager(zkHelper, replicationQueues, conf, this.server, fs,
+              this.replicating, logDir, oldLogDir);
       this.statsThreadPeriod =
           this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60);
       LOG.debug("ReplicationStatisticsThread " + this.statsThreadPeriod);
     } else {
       this.replicationManager = null;
       this.zkHelper = null;
+      this.replicationQueues = null;
     }
   }
 

Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java?rev=1464276&r1=1464275&r2=1464276&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java Thu Apr  4 03:49:47 2013
@@ -41,8 +41,8 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.replication.ReplicationQueues;
 import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
@@ -73,6 +73,7 @@ public class ReplicationSourceManager {
   private final AtomicBoolean replicating;
   // Helper for zookeeper
   private final ReplicationZookeeper zkHelper;
+  private final ReplicationQueues replicationQueues;
   // All about stopping
   private final Stoppable stopper;
   // All logs we are currently tracking
@@ -91,14 +92,14 @@ public class ReplicationSourceManager {
   private final long sleepBeforeFailover;
   // Homemade executer service for replication
   private final ThreadPoolExecutor executor;
-  
+
   private final Random rand;
 
 
   /**
-   * Creates a replication manager and sets the watch on all the other
-   * registered region servers
+   * Creates a replication manager and sets the watch on all the other registered region servers
    * @param zkHelper the zk helper for replication
+   * @param replicationQueues the interface for manipulating replication queues
    * @param conf the configuration to use
    * @param stopper the stopper object for this region server
    * @param fs the file system to use
@@ -107,15 +108,13 @@ public class ReplicationSourceManager {
    * @param oldLogDir the directory where old logs are archived
    */
   public ReplicationSourceManager(final ReplicationZookeeper zkHelper,
-                                  final Configuration conf,
-                                  final Stoppable stopper,
-                                  final FileSystem fs,
-                                  final AtomicBoolean replicating,
-                                  final Path logDir,
-                                  final Path oldLogDir) {
+      final ReplicationQueues replicationQueues, final Configuration conf, final Stoppable stopper,
+      final FileSystem fs, final AtomicBoolean replicating, final Path logDir,
+      final Path oldLogDir) {
     this.sources = new ArrayList<ReplicationSourceInterface>();
     this.replicating = replicating;
     this.zkHelper = zkHelper;
+    this.replicationQueues = replicationQueues;
     this.stopper = stopper;
     this.hlogsById = new HashMap<String, SortedSet<String>>();
     this.oldsources = new ArrayList<ReplicationSourceInterface>();
@@ -181,7 +180,7 @@ public class ReplicationSourceManager {
     for (String id : this.zkHelper.getPeerClusters().keySet()) {
       addSource(id);
     }
-    List<String> currentReplicators = this.zkHelper.getListOfReplicators();
+    List<String> currentReplicators = this.replicationQueues.getListOfReplicators();
     if (currentReplicators == null || currentReplicators.size() == 0) {
       return;
     }
@@ -350,13 +349,12 @@ public class ReplicationSourceManager {
    * It creates one old source for any type of source of the old rs.
    * @param rsZnode
    */
-  public void transferQueues(String rsZnode) {
+  private void transferQueues(String rsZnode) {
     NodeFailoverWorker transfer = new NodeFailoverWorker(rsZnode);
     try {
       this.executor.execute(transfer);
     } catch (RejectedExecutionException ex) {
-      LOG.info("Cancelling the transfer of " + rsZnode +
-          " because of " + ex.getMessage());
+      LOG.info("Cancelling the transfer of " + rsZnode + " because of " + ex.getMessage());
     }
   }
 
@@ -589,20 +587,12 @@ public class ReplicationSourceManager {
       }
       SortedMap<String, SortedSet<String>> newQueues = null;
 
-      // check whether there is multi support. If yes, use it.
-      if (conf.getBoolean(HConstants.ZOOKEEPER_USEMULTI, true)) {
-        LOG.info("Atomically moving " + rsZnode + "'s hlogs to my queue");
-        newQueues = zkHelper.copyQueuesFromRSUsingMulti(rsZnode);
-      } else {
-        LOG.info("Moving " + rsZnode + "'s hlogs to my queue");
-        if (!zkHelper.lockOtherRS(rsZnode)) {
-          return;
-        }
-        newQueues = zkHelper.copyQueuesFromRS(rsZnode);
-        zkHelper.deleteRsQueues(rsZnode);
-      }
-      // process of copying over the failed queue is completed.
+      newQueues = zkHelper.claimQueues(rsZnode);
+
+      // Copying over the failed queue is completed.
       if (newQueues.isEmpty()) {
+        // We either didn't get the lock or the failed region server didn't have any outstanding
+        // HLogs to replicate, so we are done.
         return;
       }
 

Modified: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java?rev=1464276&r1=1464275&r2=1464276&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java (original)
+++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java Thu Apr  4 03:49:47 2013
@@ -17,15 +17,10 @@
  */
 package org.apache.hadoop.hbase.client.replication;
 
-import java.util.concurrent.atomic.AtomicBoolean;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.*;
-import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -49,9 +44,7 @@ public class TestReplicationAdmin {
   private final String ID_SECOND = "2";
   private final String KEY_SECOND = "127.0.0.1:2181:/hbase2";
 
-  private static ReplicationSourceManager manager;
   private static ReplicationAdmin admin;
-  private static AtomicBoolean replicating = new AtomicBoolean(true);
 
   /**
    * @throws java.lang.Exception
@@ -62,19 +55,6 @@ public class TestReplicationAdmin {
     Configuration conf = TEST_UTIL.getConfiguration();
     conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true);
     admin = new ReplicationAdmin(conf);
-    Path oldLogDir = new Path(TEST_UTIL.getDataTestDir(),
-        HConstants.HREGION_OLDLOGDIR_NAME);
-    Path logDir = new Path(TEST_UTIL.getDataTestDir(),
-        HConstants.HREGION_LOGDIR_NAME);
-    manager = new ReplicationSourceManager(admin.getReplicationZk(), conf,
-        // The following stopper never stops so that we can respond
-        // to zk notification
-        new Stoppable() {
-          @Override
-          public void stop(String why) {}
-          @Override
-          public boolean isStopped() {return false;}
-        }, FileSystem.get(conf), replicating, logDir, oldLogDir);
   }
 
   /**
@@ -84,7 +64,6 @@ public class TestReplicationAdmin {
    */
   @Test
   public void testAddRemovePeer() throws Exception {
-    assertEquals(0, manager.getSources().size());
     // Add a valid peer
     admin.addPeer(ID_ONE, KEY_ONE);
     // try adding the same (fails)

Added: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java?rev=1464276&view=auto
==============================================================================
--- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java (added)
+++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java Thu Apr  4 03:49:47 2013
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication;
+
+import static org.junit.Assert.*;
+
+import java.util.List;
+import java.util.SortedMap;
+import java.util.SortedSet;
+
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.zookeeper.KeeperException;
+import org.junit.Test;
+
+/**
+ * White box testing for replication state interfaces. Implementations should extend this class, and
+ * initialize the interfaces properly.
+ */
+public abstract class TestReplicationStateBasic {
+
+  protected ReplicationQueues rq1;
+  protected ReplicationQueues rq2;
+  protected ReplicationQueues rq3;
+  protected ReplicationQueuesClient rqc;
+  protected String server1 = new ServerName("hostname1.example.org", 1234, -1L).toString();
+  protected String server2 = new ServerName("hostname2.example.org", 1234, -1L).toString();
+  protected String server3 = new ServerName("hostname3.example.org", 1234, -1L).toString();
+
+  @Test
+  public void testReplicationQueuesClient() throws KeeperException {
+    // Test methods with empty state
+    assertEquals(0, rqc.getListOfReplicators().size());
+    assertNull(rqc.getLogsInQueue(server1, "qId1"));
+    assertNull(rqc.getAllQueues(server1));
+
+    /*
+     * Set up data Two replicators: -- server1: three queues with 0, 1 and 2 log files each --
+     * server2: zero queues
+     */
+    rq1.init(server1);
+    rq2.init(server2);
+    rq1.addLog("qId1", "trash");
+    rq1.removeLog("qId1", "trash");
+    rq1.addLog("qId2", "filename1");
+    rq1.addLog("qId3", "filename2");
+    rq1.addLog("qId3", "filename3");
+    rq2.addLog("trash", "trash");
+    rq2.removeQueue("trash");
+
+    List<String> reps = rqc.getListOfReplicators();
+    assertEquals(2, reps.size());
+    assertTrue(server1, reps.contains(server1));
+    assertTrue(server2, reps.contains(server2));
+
+    assertNull(rqc.getLogsInQueue("bogus", "bogus"));
+    assertNull(rqc.getLogsInQueue(server1, "bogus"));
+    assertEquals(0, rqc.getLogsInQueue(server1, "qId1").size());
+    assertEquals(1, rqc.getLogsInQueue(server1, "qId2").size());
+    assertEquals("filename1", rqc.getLogsInQueue(server1, "qId2").get(0));
+
+    assertNull(rqc.getAllQueues("bogus"));
+    assertEquals(0, rqc.getAllQueues(server2).size());
+    List<String> list = rqc.getAllQueues(server1);
+    assertEquals(3, list.size());
+    assertTrue(list.contains("qId2"));
+    assertTrue(list.contains("qId3"));
+  }
+
+  @Test
+  public void testReplicationQueues() throws KeeperException {
+    rq1.init(server1);
+    rq2.init(server2);
+    rq3.init(server3);
+
+    // Zero queues or replicators exist
+    assertEquals(0, rq1.getListOfReplicators().size());
+    rq1.removeQueue("bogus");
+    rq1.removeLog("bogus", "bogus");
+    rq1.removeAllQueues();
+    assertNull(rq1.getAllQueues());
+    // TODO fix NPE if getting a log position on a file that does not exist
+    // assertEquals(0, rq1.getLogPosition("bogus", "bogus"));
+    assertNull(rq1.getLogsInQueue("bogus"));
+    assertEquals(0, rq1.claimQueues(new ServerName("bogus", 1234, -1L).toString()).size());
+
+    // TODO test setting a log position on a bogus file
+    // rq1.setLogPosition("bogus", "bogus", 5L);
+
+    populateQueues();
+
+    assertEquals(3, rq1.getListOfReplicators().size());
+    assertEquals(0, rq2.getLogsInQueue("qId1").size());
+    assertEquals(5, rq3.getLogsInQueue("qId5").size());
+    assertEquals(0, rq3.getLogPosition("qId1", "filename0"));
+    rq3.setLogPosition("qId5", "filename4", 354L);
+    assertEquals(354L, rq3.getLogPosition("qId5", "filename4"));
+
+    assertEquals(5, rq3.getLogsInQueue("qId5").size());
+    assertEquals(0, rq2.getLogsInQueue("qId1").size());
+    assertEquals(0, rq1.getAllQueues().size());
+    assertEquals(1, rq2.getAllQueues().size());
+    assertEquals(5, rq3.getAllQueues().size());
+
+    assertEquals(0, rq3.claimQueues(server1).size());
+    assertEquals(2, rq3.getListOfReplicators().size());
+
+    SortedMap<String, SortedSet<String>> queues = rq2.claimQueues(server3);
+    assertEquals(5, queues.size());
+    assertEquals(1, rq2.getListOfReplicators().size());
+
+    // TODO test claimQueues on yourself
+    // rq2.claimQueues(server2);
+
+    assertEquals(6, rq2.getAllQueues().size());
+
+    rq2.removeAllQueues();
+
+    assertEquals(0, rq2.getListOfReplicators().size());
+  }
+
+  /*
+   * three replicators: rq1 has 0 queues, rq2 has 1 queue with no logs, rq3 has 5 queues with 1, 2,
+   * 3, 4, 5 log files respectively
+   */
+  protected void populateQueues() throws KeeperException {
+    rq1.addLog("trash", "trash");
+    rq1.removeQueue("trash");
+
+    rq2.addLog("qId1", "trash");
+    rq2.removeLog("qId1", "trash");
+
+    for (int i = 1; i < 6; i++) {
+      for (int j = 0; j < i; j++) {
+        rq3.addLog("qId" + i, "filename" + j);
+      }
+    }
+  }
+}
+

Added: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java?rev=1464276&view=auto
==============================================================================
--- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java (added)
+++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java Thu Apr  4 03:49:47 2013
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.catalog.CatalogTracker;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+
+@Category(MediumTests.class)
+public class TestReplicationStateZKImpl extends TestReplicationStateBasic {
+
+  private static Configuration conf;
+  private static HBaseTestingUtility utility;
+  private static ZooKeeperWatcher zkw;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    utility = new HBaseTestingUtility();
+    utility.startMiniZKCluster();
+    conf = utility.getConfiguration();
+    zkw = HBaseTestingUtility.getZooKeeperWatcher(utility);
+  }
+
+  @Before
+  public void setUp() throws KeeperException {
+    DummyServer ds1 = new DummyServer(server1);
+    DummyServer ds2 = new DummyServer(server2);
+    DummyServer ds3 = new DummyServer(server3);
+    rq1 = new ReplicationQueuesZKImpl(zkw, conf, ds1);
+    rq2 = new ReplicationQueuesZKImpl(zkw, conf, ds2);
+    rq3 = new ReplicationQueuesZKImpl(zkw, conf, ds3);
+    rqc = new ReplicationQueuesClientZKImpl(zkw, conf, ds1);
+  }
+
+  @After
+  public void tearDown() throws KeeperException, IOException {
+    String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication");
+    String replicationZNode = ZKUtil.joinZNode(zkw.baseZNode, replicationZNodeName);
+    ZKUtil.deleteNodeRecursively(zkw, replicationZNode);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    utility.shutdownMiniZKCluster();
+  }
+
+  static class DummyServer implements Server {
+    private String serverName;
+    private boolean isAborted = false;
+    private boolean isStopped = false;
+
+    public DummyServer(String serverName) {
+      this.serverName = serverName;
+    }
+
+    @Override
+    public Configuration getConfiguration() {
+      return conf;
+    }
+
+    @Override
+    public ZooKeeperWatcher getZooKeeper() {
+      return zkw;
+    }
+
+    @Override
+    public CatalogTracker getCatalogTracker() {
+      return null;
+    }
+
+    @Override
+    public ServerName getServerName() {
+      return new ServerName(this.serverName);
+    }
+
+    @Override
+    public void abort(String why, Throwable e) {
+      this.isAborted = true;
+    }
+
+    @Override
+    public boolean isAborted() {
+      return this.isAborted;
+    }
+
+    @Override
+    public void stop(String why) {
+      this.isStopped = true;
+    }
+
+    @Override
+    public boolean isStopped() {
+      return this.isStopped;
+    }
+  }
+}
+



Mime
View raw message