Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 990C7F924 for ; Sat, 6 Apr 2013 06:10:42 +0000 (UTC) Received: (qmail 63085 invoked by uid 500); 6 Apr 2013 06:10:41 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 62923 invoked by uid 500); 6 Apr 2013 06:10:41 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 62848 invoked by uid 99); 6 Apr 2013 06:10:40 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 06 Apr 2013 06:10:40 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_FRT_STOCK2 X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 06 Apr 2013 06:10:26 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 6EA092388994; Sat, 6 Apr 2013 06:09:46 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1465201 [2/2] - in /hbase/hbase.apache.org/trunk: xref-test/org/apache/hadoop/hbase/replication/ xref/org/apache/hadoop/hbase/replication/ Date: Sat, 06 Apr 2013 06:09:46 -0000 To: commits@hbase.apache.org From: stack@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20130406060946.6EA092388994@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Added: hbase/hbase.apache.org/trunk/xref/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.html URL: http://svn.apache.org/viewvc/hbase/hbase.apache.org/trunk/xref/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.html?rev=1465201&view=auto ============================================================================== --- hbase/hbase.apache.org/trunk/xref/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.html (added) +++ hbase/hbase.apache.org/trunk/xref/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.html Sat Apr 6 06:09:45 2013 @@ -0,0 +1,417 @@ + + + + +ReplicationQueuesZKImpl xref + + + +
+
+1   /*
+2    *
+3    * Licensed to the Apache Software Foundation (ASF) under one
+4    * or more contributor license agreements.  See the NOTICE file
+5    * distributed with this work for additional information
+6    * regarding copyright ownership.  The ASF licenses this file
+7    * to you under the Apache License, Version 2.0 (the
+8    * "License"); you may not use this file except in compliance
+9    * with the License.  You may obtain a copy of the License at
+10   *
+11   *     http://www.apache.org/licenses/LICENSE-2.0
+12   *
+13   * Unless required by applicable law or agreed to in writing, software
+14   * distributed under the License is distributed on an "AS IS" BASIS,
+15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+16   * See the License for the specific language governing permissions and
+17   * limitations under the License.
+18   */
+19  package org.apache.hadoop.hbase.replication;
+20  
+21  import java.util.ArrayList;
+22  import java.util.List;
+23  import java.util.SortedMap;
+24  import java.util.SortedSet;
+25  import java.util.TreeMap;
+26  import java.util.TreeSet;
+27  
+28  import org.apache.commons.logging.Log;
+29  import org.apache.commons.logging.LogFactory;
+30  import org.apache.hadoop.conf.Configuration;
+31  import org.apache.hadoop.hbase.Abortable;
+32  import org.apache.hadoop.hbase.HConstants;
+33  import org.apache.hadoop.hbase.exceptions.DeserializationException;
+34  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+35  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
+36  import org.apache.hadoop.hbase.util.Bytes;
+37  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+38  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+39  import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp;
+40  import org.apache.zookeeper.KeeperException;
+41  
+42  import com.google.protobuf.InvalidProtocolBufferException;
+43  
+44  public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements ReplicationQueues {
+45  
+46    /** Znode containing all replication queues for this region server. */
+47    private String myQueuesZnode;
+48    /** Name of znode we use to lock during failover */
+49    private final static String RS_LOCK_ZNODE = "lock";
+50  
+51    private static final Log LOG = LogFactory.getLog(ReplicationQueuesZKImpl.class);
+52  
+53    public ReplicationQueuesZKImpl(final ZooKeeperWatcher zk, Configuration conf, Abortable abortable)
+54        throws KeeperException {
+55      super(zk, conf, abortable);
+56    }
+57  
+58    @Override
+59    public void init(String serverName) {
+60      this.myQueuesZnode = ZKUtil.joinZNode(this.queuesZNode, serverName);
+61    }
+62  
+63    @Override
+64    public void removeQueue(String queueId) {
+65      try {
+66        ZKUtil.deleteNodeRecursively(this.zookeeper, ZKUtil.joinZNode(this.myQueuesZnode, queueId));
+67      } catch (KeeperException e) {
+68        this.abortable.abort("Failed to delete queue (queueId=" + queueId + ")", e);
+69      }
+70    }
+71  
+72    @Override
+73    public void addLog(String queueId, String filename) throws KeeperException {
+74      String znode = ZKUtil.joinZNode(this.myQueuesZnode, queueId);
+75      znode = ZKUtil.joinZNode(znode, filename);
+76      ZKUtil.createWithParents(this.zookeeper, znode);
+77    }
+78  
+79    @Override
+80    public void removeLog(String queueId, String filename) {
+81      try {
+82        String znode = ZKUtil.joinZNode(this.myQueuesZnode, queueId);
+83        znode = ZKUtil.joinZNode(znode, filename);
+84        ZKUtil.deleteNode(this.zookeeper, znode);
+85      } catch (KeeperException e) {
+86        this.abortable.abort("Failed to remove hlog from queue (queueId=" + queueId + ", filename="
+87            + filename + ")", e);
+88      }
+89    }
+90  
+91    @Override
+92    public void setLogPosition(String queueId, String filename, long position) {
+93      try {
+94        String znode = ZKUtil.joinZNode(this.myQueuesZnode, queueId);
+95        znode = ZKUtil.joinZNode(znode, filename);
+96        // Why serialize String of Long and not Long as bytes?
+97        ZKUtil.setData(this.zookeeper, znode, toByteArray(position));
+98      } catch (KeeperException e) {
+99        this.abortable.abort("Failed to write replication hlog position (filename=" + filename
+100           + ", position=" + position + ")", e);
+101     }
+102   }
+103 
+104   @Override
+105   public long getLogPosition(String queueId, String filename) throws KeeperException {
+106     String clusterZnode = ZKUtil.joinZNode(this.myQueuesZnode, queueId);
+107     String znode = ZKUtil.joinZNode(clusterZnode, filename);
+108     byte[] bytes = ZKUtil.getData(this.zookeeper, znode);
+109     try {
+110       return parseHLogPositionFrom(bytes);
+111     } catch (DeserializationException de) {
+112       LOG.warn("Failed to parse HLogPosition for queueId=" + queueId + " and hlog=" + filename
+113           + "znode content, continuing.");
+114     }
+115     // if we can not parse the position, start at the beginning of the hlog file
+116     // again
+117     return 0;
+118   }
+119 
+120   @Override
+121   public SortedMap<String, SortedSet<String>> claimQueues(String regionserverZnode) {
+122     SortedMap<String, SortedSet<String>> newQueues = new TreeMap<String, SortedSet<String>>();
+123     // check whether there is multi support. If yes, use it.
+124     if (conf.getBoolean(HConstants.ZOOKEEPER_USEMULTI, true)) {
+125       LOG.info("Atomically moving " + regionserverZnode + "'s hlogs to my queue");
+126       newQueues = copyQueuesFromRSUsingMulti(regionserverZnode);
+127     } else {
+128       LOG.info("Moving " + regionserverZnode + "'s hlogs to my queue");
+129       if (!lockOtherRS(regionserverZnode)) {
+130         return newQueues;
+131       }
+132       newQueues = copyQueuesFromRS(regionserverZnode);
+133       deleteAnotherRSQueues(regionserverZnode);
+134     }
+135     return newQueues;
+136   }
+137 
+138   @Override
+139   public void removeAllQueues() {
+140     try {
+141       ZKUtil.deleteNodeRecursively(this.zookeeper, this.myQueuesZnode);
+142     } catch (KeeperException e) {
+143       // if the znode is already expired, don't bother going further
+144       if (e instanceof KeeperException.SessionExpiredException) {
+145         return;
+146       }
+147       this.abortable.abort("Failed to delete replication queues for region server: "
+148           + this.myQueuesZnode, e);
+149     }
+150   }
+151 
+152   @Override
+153   public List<String> getLogsInQueue(String queueId) {
+154     String znode = ZKUtil.joinZNode(this.myQueuesZnode, queueId);
+155     List<String> result = null;
+156     try {
+157       result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode);
+158     } catch (KeeperException e) {
+159       this.abortable.abort("Failed to get list of hlogs for queueId=" + queueId, e);
+160     }
+161     return result;
+162   }
+163 
+164   @Override
+165   public List<String> getAllQueues() {
+166     List<String> listOfQueues = null;
+167     try {
+168       listOfQueues = ZKUtil.listChildrenNoWatch(this.zookeeper, this.myQueuesZnode);
+169     } catch (KeeperException e) {
+170       this.abortable.abort("Failed to get a list of queues for region server: "
+171           + this.myQueuesZnode, e);
+172     }
+173     return listOfQueues;
+174   }
+175 
+176   /**
+177    * Try to set a lock in another region server's znode.
+178    * @param znode the server names of the other server
+179    * @return true if the lock was acquired, false in every other cases
+180    */
+181   private boolean lockOtherRS(String znode) {
+182     try {
+183       String parent = ZKUtil.joinZNode(this.queuesZNode, znode);
+184       if (parent.equals(this.myQueuesZnode)) {
+185         LOG.warn("Won't lock because this is us, we're dead!");
+186         return false;
+187       }
+188       String p = ZKUtil.joinZNode(parent, RS_LOCK_ZNODE);
+189       ZKUtil.createAndWatch(this.zookeeper, p, lockToByteArray(this.myQueuesZnode));
+190     } catch (KeeperException e) {
+191       // This exception will pop up if the znode under which we're trying to
+192       // create the lock is already deleted by another region server, meaning
+193       // that the transfer already occurred.
+194       // NoNode => transfer is done and znodes are already deleted
+195       // NodeExists => lock znode already created by another RS
+196       if (e instanceof KeeperException.NoNodeException
+197           || e instanceof KeeperException.NodeExistsException) {
+198         LOG.info("Won't transfer the queue," + " another RS took care of it because of: "
+199             + e.getMessage());
+200       } else {
+201         LOG.info("Failed lock other rs", e);
+202       }
+203       return false;
+204     }
+205     return true;
+206   }
+207 
+208   /**
+209    * Delete all the replication queues for a given region server.
+210    * @param regionserverZnode The znode of the region server to delete.
+211    */
+212   private void deleteAnotherRSQueues(String regionserverZnode) {
+213     String fullpath = ZKUtil.joinZNode(this.queuesZNode, regionserverZnode);
+214     try {
+215       List<String> clusters = ZKUtil.listChildrenNoWatch(this.zookeeper, fullpath);
+216       for (String cluster : clusters) {
+217         // No need to delete, it will be deleted later.
+218         if (cluster.equals(RS_LOCK_ZNODE)) {
+219           continue;
+220         }
+221         String fullClusterPath = ZKUtil.joinZNode(fullpath, cluster);
+222         ZKUtil.deleteNodeRecursively(this.zookeeper, fullClusterPath);
+223       }
+224       // Finish cleaning up
+225       ZKUtil.deleteNodeRecursively(this.zookeeper, fullpath);
+226     } catch (KeeperException e) {
+227       if (e instanceof KeeperException.NoNodeException
+228           || e instanceof KeeperException.NotEmptyException) {
+229         // Testing a special case where another region server was able to
+230         // create a lock just after we deleted it, but then was also able to
+231         // delete the RS znode before us or its lock znode is still there.
+232         if (e.getPath().equals(fullpath)) {
+233           return;
+234         }
+235       }
+236       this.abortable.abort("Failed to delete replication queues for region server: "
+237           + regionserverZnode, e);
+238     }
+239   }
+240 
+241   /**
+242    * It "atomically" copies all the hlogs queues from another region server and returns them all
+243    * sorted per peer cluster (appended with the dead server's znode).
+244    * @param znode pertaining to the region server to copy the queues from
+245    * @return HLog queues sorted per peer cluster
+246    */
+247   private SortedMap<String, SortedSet<String>> copyQueuesFromRSUsingMulti(String znode) {
+248     SortedMap<String, SortedSet<String>> queues = new TreeMap<String, SortedSet<String>>();
+249     // hbase/replication/rs/deadrs
+250     String deadRSZnodePath = ZKUtil.joinZNode(this.queuesZNode, znode);
+251     List<String> peerIdsToProcess = null;
+252     List<ZKUtilOp> listOfOps = new ArrayList<ZKUtil.ZKUtilOp>();
+253     try {
+254       peerIdsToProcess = ZKUtil.listChildrenNoWatch(this.zookeeper, deadRSZnodePath);
+255       if (peerIdsToProcess == null) return queues; // node already processed
+256       for (String peerId : peerIdsToProcess) {
+257         String newPeerId = peerId + "-" + znode;
+258         String newPeerZnode = ZKUtil.joinZNode(this.myQueuesZnode, newPeerId);
+259         // check the logs queue for the old peer cluster
+260         String oldClusterZnode = ZKUtil.joinZNode(deadRSZnodePath, peerId);
+261         List<String> hlogs = ZKUtil.listChildrenNoWatch(this.zookeeper, oldClusterZnode);
+262         if (hlogs == null || hlogs.size() == 0) {
+263           listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode));
+264           continue; // empty log queue.
+265         }
+266         // create the new cluster znode
+267         SortedSet<String> logQueue = new TreeSet<String>();
+268         queues.put(newPeerId, logQueue);
+269         ZKUtilOp op = ZKUtilOp.createAndFailSilent(newPeerZnode, HConstants.EMPTY_BYTE_ARRAY);
+270         listOfOps.add(op);
+271         // get the offset of the logs and set it to new znodes
+272         for (String hlog : hlogs) {
+273           String oldHlogZnode = ZKUtil.joinZNode(oldClusterZnode, hlog);
+274           byte[] logOffset = ZKUtil.getData(this.zookeeper, oldHlogZnode);
+275           LOG.debug("Creating " + hlog + " with data " + Bytes.toString(logOffset));
+276           String newLogZnode = ZKUtil.joinZNode(newPeerZnode, hlog);
+277           listOfOps.add(ZKUtilOp.createAndFailSilent(newLogZnode, logOffset));
+278           // add ops for deleting
+279           listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldHlogZnode));
+280           logQueue.add(hlog);
+281         }
+282         // add delete op for peer
+283         listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode));
+284       }
+285       // add delete op for dead rs
+286       listOfOps.add(ZKUtilOp.deleteNodeFailSilent(deadRSZnodePath));
+287       LOG.debug(" The multi list size is: " + listOfOps.size());
+288       ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false);
+289       LOG.info("Atomically moved the dead regionserver logs. ");
+290     } catch (KeeperException e) {
+291       // Multi call failed; it looks like some other regionserver took away the logs.
+292       LOG.warn("Got exception in copyQueuesFromRSUsingMulti: ", e);
+293       queues.clear();
+294     }
+295     return queues;
+296   }
+297 
+298   /**
+299    * This methods copies all the hlogs queues from another region server and returns them all sorted
+300    * per peer cluster (appended with the dead server's znode)
+301    * @param znode server names to copy
+302    * @return all hlogs for all peers of that cluster, null if an error occurred
+303    */
+304   private SortedMap<String, SortedSet<String>> copyQueuesFromRS(String znode) {
+305     // TODO this method isn't atomic enough, we could start copying and then
+306     // TODO fail for some reason and we would end up with znodes we don't want.
+307     SortedMap<String, SortedSet<String>> queues = new TreeMap<String, SortedSet<String>>();
+308     try {
+309       String nodePath = ZKUtil.joinZNode(this.queuesZNode, znode);
+310       List<String> clusters = ZKUtil.listChildrenNoWatch(this.zookeeper, nodePath);
+311       // We have a lock znode in there, it will count as one.
+312       if (clusters == null || clusters.size() <= 1) {
+313         return queues;
+314       }
+315       // The lock isn't a peer cluster, remove it
+316       clusters.remove(RS_LOCK_ZNODE);
+317       for (String cluster : clusters) {
+318         // We add the name of the recovered RS to the new znode, we can even
+319         // do that for queues that were recovered 10 times giving a znode like
+320         // number-startcode-number-otherstartcode-number-anotherstartcode-etc
+321         String newCluster = cluster + "-" + znode;
+322         String newClusterZnode = ZKUtil.joinZNode(this.myQueuesZnode, newCluster);
+323         String clusterPath = ZKUtil.joinZNode(nodePath, cluster);
+324         List<String> hlogs = ZKUtil.listChildrenNoWatch(this.zookeeper, clusterPath);
+325         // That region server didn't have anything to replicate for this cluster
+326         if (hlogs == null || hlogs.size() == 0) {
+327           continue;
+328         }
+329         ZKUtil.createNodeIfNotExistsAndWatch(this.zookeeper, newClusterZnode,
+330           HConstants.EMPTY_BYTE_ARRAY);
+331         SortedSet<String> logQueue = new TreeSet<String>();
+332         queues.put(newCluster, logQueue);
+333         for (String hlog : hlogs) {
+334           String z = ZKUtil.joinZNode(clusterPath, hlog);
+335           byte[] positionBytes = ZKUtil.getData(this.zookeeper, z);
+336           long position = 0;
+337           try {
+338             position = parseHLogPositionFrom(positionBytes);
+339           } catch (DeserializationException e) {
+340             LOG.warn("Failed parse of hlog position from the following znode: " + z);
+341           }
+342           LOG.debug("Creating " + hlog + " with data " + position);
+343           String child = ZKUtil.joinZNode(newClusterZnode, hlog);
+344           // Position doesn't actually change, we are just deserializing it for
+345           // logging, so just use the already serialized version
+346           ZKUtil.createAndWatch(this.zookeeper, child, positionBytes);
+347           logQueue.add(hlog);
+348         }
+349       }
+350     } catch (KeeperException e) {
+351       this.abortable.abort("Copy queues from rs", e);
+352     }
+353     return queues;
+354   }
+355 
+356   /**
+357    * @param lockOwner
+358    * @return Serialized protobuf of <code>lockOwner</code> with pb magic prefix prepended suitable
+359    *         for use as content of an replication lock during region server fail over.
+360    */
+361   static byte[] lockToByteArray(final String lockOwner) {
+362     byte[] bytes =
+363         ZooKeeperProtos.ReplicationLock.newBuilder().setLockOwner(lockOwner).build().toByteArray();
+364     return ProtobufUtil.prependPBMagic(bytes);
+365   }
+366 
+367   /**
+368    * @param position
+369    * @return Serialized protobuf of <code>position</code> with pb magic prefix prepended suitable
+370    *         for use as content of an hlog position in a replication queue.
+371    */
+372   static byte[] toByteArray(final long position) {
+373     byte[] bytes =
+374         ZooKeeperProtos.ReplicationHLogPosition.newBuilder().setPosition(position).build()
+375             .toByteArray();
+376     return ProtobufUtil.prependPBMagic(bytes);
+377   }
+378 
+379   /**
+380    * @param bytes - Content of a HLog position znode.
+381    * @return long - The current HLog position.
+382    * @throws DeserializationException
+383    */
+384   private long parseHLogPositionFrom(final byte[] bytes) throws DeserializationException {
+385     if (ProtobufUtil.isPBMagicPrefix(bytes)) {
+386       int pblen = ProtobufUtil.lengthOfPBMagic();
+387       ZooKeeperProtos.ReplicationHLogPosition.Builder builder =
+388           ZooKeeperProtos.ReplicationHLogPosition.newBuilder();
+389       ZooKeeperProtos.ReplicationHLogPosition position;
+390       try {
+391         position = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build();
+392       } catch (InvalidProtocolBufferException e) {
+393         throw new DeserializationException(e);
+394       }
+395       return position.getPosition();
+396     } else {
+397       if (bytes.length > 0) {
+398         return Bytes.toLong(bytes);
+399       }
+400       return 0;
+401     }
+402   }
+403 }
+
+
+ + Added: hbase/hbase.apache.org/trunk/xref/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.html URL: http://svn.apache.org/viewvc/hbase/hbase.apache.org/trunk/xref/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.html?rev=1465201&view=auto ============================================================================== --- hbase/hbase.apache.org/trunk/xref/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.html (added) +++ hbase/hbase.apache.org/trunk/xref/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.html Sat Apr 6 06:09:45 2013 @@ -0,0 +1,95 @@ + + + + +ReplicationStateZKBase xref + + + +
+
+1   /*
+2    *
+3    * Licensed to the Apache Software Foundation (ASF) under one
+4    * or more contributor license agreements.  See the NOTICE file
+5    * distributed with this work for additional information
+6    * regarding copyright ownership.  The ASF licenses this file
+7    * to you under the Apache License, Version 2.0 (the
+8    * "License"); you may not use this file except in compliance
+9    * with the License.  You may obtain a copy of the License at
+10   *
+11   *     http://www.apache.org/licenses/LICENSE-2.0
+12   *
+13   * Unless required by applicable law or agreed to in writing, software
+14   * distributed under the License is distributed on an "AS IS" BASIS,
+15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+16   * See the License for the specific language governing permissions and
+17   * limitations under the License.
+18   */
+19  package org.apache.hadoop.hbase.replication;
+20  
+21  import java.util.List;
+22  
+23  import org.apache.hadoop.conf.Configuration;
+24  import org.apache.hadoop.hbase.Abortable;
+25  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+26  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+27  import org.apache.zookeeper.KeeperException;
+28  
+29  
+30  /**
+31   * This is a base class for maintaining replication state in zookeeper.
+32   */
+33  public abstract class ReplicationStateZKBase {
+34  
+35    /**
+36     * The name of the znode that contains the replication status of a remote slave (i.e. peer)
+37     * cluster.
+38     */
+39    protected final String peerStateNodeName;
+40    /** The name of the znode that contains the replication status of the local cluster. */
+41    protected final String stateZNode;
+42    /** The name of the base znode that contains all replication state. */
+43    protected final String replicationZNode;
+44    /** The name of the znode that contains a list of all remote slave (i.e. peer) clusters. */
+45    protected final String peersZNode;
+46    /** The name of the znode that contains all replication queues */
+47    protected final String queuesZNode;
+48    /** The cluster key of the local cluster */
+49    protected final String ourClusterKey;
+50    protected final ZooKeeperWatcher zookeeper;
+51    protected final Configuration conf;
+52    protected final Abortable abortable;
+53  
+54    public ReplicationStateZKBase(ZooKeeperWatcher zookeeper, Configuration conf, 
+55        Abortable abortable) {
+56      this.zookeeper = zookeeper;
+57      this.conf = conf;
+58      this.abortable = abortable;
+59  
+60      String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication");
+61      String peersZNodeName = conf.get("zookeeper.znode.replication.peers", "peers");
+62      String queuesZNodeName = conf.get("zookeeper.znode.replication.rs", "rs");
+63      String stateZNodeName = conf.get("zookeeper.znode.replication.state", "state");
+64      this.peerStateNodeName = conf.get("zookeeper.znode.replication.peers.state", "peer-state");
+65      this.ourClusterKey = ZKUtil.getZooKeeperClusterKey(this.conf);
+66      this.replicationZNode = ZKUtil.joinZNode(this.zookeeper.baseZNode, replicationZNodeName);
+67      this.stateZNode = ZKUtil.joinZNode(replicationZNode, stateZNodeName);
+68      this.peersZNode = ZKUtil.joinZNode(replicationZNode, peersZNodeName);
+69      this.queuesZNode = ZKUtil.joinZNode(replicationZNode, queuesZNodeName);
+70    }
+71  
+72    public List<String> getListOfReplicators() {
+73      List<String> result = null;
+74      try {
+75        result = ZKUtil.listChildrenNoWatch(this.zookeeper, this.queuesZNode);
+76      } catch (KeeperException e) {
+77        this.abortable.abort("Failed to get list of replicators", e);
+78      }
+79      return result;
+80    }
+81  }
+
+
+ +