Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 552F9200D0A for ; Thu, 24 Aug 2017 07:18:42 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 53B2616A5D5; Thu, 24 Aug 2017 05:18:42 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id C364E16A5D3 for ; Thu, 24 Aug 2017 07:18:39 +0200 (CEST) Received: (qmail 25024 invoked by uid 500); 24 Aug 2017 05:18:38 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 24808 invoked by uid 99); 24 Aug 2017 05:18:38 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 24 Aug 2017 05:18:38 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 997E8F5EE0; Thu, 24 Aug 2017 05:18:36 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: appy@apache.org To: commits@hbase.apache.org Date: Thu, 24 Aug 2017 05:18:37 -0000 Message-Id: <1306c74c4aa64bc5a6359b4f112d876f@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/4] hbase git commit: HBASE-17442 Move most of the replication related classes from hbase-client to new hbase-replication package. (Guanghao Zhang). archived-at: Thu, 24 Aug 2017 05:18:42 -0000 http://git-wip-us.apache.org/repos/asf/hbase/blob/e2ce252b/hbase-replication/pom.xml ---------------------------------------------------------------------- diff --git a/hbase-replication/pom.xml b/hbase-replication/pom.xml new file mode 100644 index 0000000..c4db874 --- /dev/null +++ b/hbase-replication/pom.xml @@ -0,0 +1,264 @@ + + + + 4.0.0 + + hbase + org.apache.hbase + 2.0.0-alpha3-SNAPSHOT + .. + + hbase-replication + Apache HBase - Replication + HBase Replication Support + + + + + org.apache.maven.plugins + maven-site-plugin + + true + + + + + maven-assembly-plugin + + true + + + + maven-surefire-plugin + + + + secondPartTestsExecution + test + + test + + + true + + + + + + + org.apache.maven.plugins + maven-source-plugin + + + + + + + org.eclipse.m2e + lifecycle-mapping + 1.0.0 + + + + + + org.apache.maven.plugins + maven-compiler-plugin + [3.2,) + + compile + + + + + + + + + + + + + + + + + + org.apache.hbase + hbase-annotations + + + jdk.tools + jdk.tools + + + + + org.apache.hbase + hbase-annotations + test-jar + test + + + org.apache.hbase + hbase-client + + + org.apache.hbase + hbase-common + + + org.apache.hbase + hbase-common + test-jar + + + + commons-codec + commons-codec + + + commons-io + commons-io + + + commons-lang + commons-lang + + + commons-logging + commons-logging + + + com.google.protobuf + protobuf-java + + + org.apache.zookeeper + zookeeper + + + log4j + log4j + test + + + + + + + hadoop-2.0 + + + + !hadoop.profile + + + + + com.github.stephenc.findbugs + findbugs-annotations + true + + + org.apache.hadoop + hadoop-auth + + + org.apache.hadoop + hadoop-common + + + net.java.dev.jets3t + jets3t + + + javax.servlet.jsp + jsp-api + + + org.mortbay.jetty + jetty + + + com.sun.jersey + jersey-server + + + com.sun.jersey + jersey-core + + + com.sun.jersey + jersey-json + + + javax.servlet + servlet-api + + + tomcat + jasper-compiler + + + tomcat + jasper-runtime + + + com.google.code.findbugs + jsr305 + + + + + + + + + hadoop-3.0 + + + hadoop.profile + 3.0 + + + + 3.0-SNAPSHOT + + + + org.apache.hadoop + hadoop-auth + + + org.apache.hadoop + hadoop-common + + + + + http://git-wip-us.apache.org/repos/asf/hbase/blob/e2ce252b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java new file mode 100644 index 0000000..8506cbb --- /dev/null +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java @@ -0,0 +1,66 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import org.apache.commons.lang.reflect.ConstructorUtils; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; + +/** + * A factory class for instantiating replication objects that deal with replication state. + */ +@InterfaceAudience.Private +public class ReplicationFactory { + + public static final Class defaultReplicationQueueClass = ReplicationQueuesZKImpl.class; + + public static ReplicationQueues getReplicationQueues(ReplicationQueuesArguments args) + throws Exception { + Class classToBuild = args.getConf().getClass("hbase.region.replica." + + "replication.replicationQueues.class", defaultReplicationQueueClass); + return (ReplicationQueues) ConstructorUtils.invokeConstructor(classToBuild, args); + } + + public static ReplicationQueuesClient getReplicationQueuesClient( + ReplicationQueuesClientArguments args) throws Exception { + Class classToBuild = args.getConf().getClass( + "hbase.region.replica.replication.replicationQueuesClient.class", + ReplicationQueuesClientZKImpl.class); + return (ReplicationQueuesClient) ConstructorUtils.invokeConstructor(classToBuild, args); + } + + public static ReplicationPeers getReplicationPeers(final ZooKeeperWatcher zk, Configuration conf, + Abortable abortable) { + return getReplicationPeers(zk, conf, null, abortable); + } + + public static ReplicationPeers getReplicationPeers(final ZooKeeperWatcher zk, Configuration conf, + final ReplicationQueuesClient queuesClient, Abortable abortable) { + return new ReplicationPeersZKImpl(zk, conf, queuesClient, abortable); + } + + public static ReplicationTracker getReplicationTracker(ZooKeeperWatcher zookeeper, + final ReplicationPeers replicationPeers, Configuration conf, Abortable abortable, + Stoppable stopper) { + return new ReplicationTrackerZKImpl(zookeeper, replicationPeers, conf, abortable, stopper); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/e2ce252b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationListener.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationListener.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationListener.java new file mode 100644 index 0000000..dfb5fdc --- /dev/null +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationListener.java @@ -0,0 +1,51 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import java.util.List; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * The replication listener interface can be implemented if a class needs to subscribe to events + * generated by the ReplicationTracker. These events include things like addition/deletion of peer + * clusters or failure of a local region server. To receive events, the class also needs to register + * itself with a Replication Tracker. + */ +@InterfaceAudience.Private +public interface ReplicationListener { + + /** + * A region server has been removed from the local cluster + * @param regionServer the removed region server + */ + public void regionServerRemoved(String regionServer); + + /** + * A peer cluster has been removed (i.e. unregistered) from replication. + * @param peerId The peer id of the cluster that has been removed + */ + public void peerRemoved(String peerId); + + /** + * The list of registered peer clusters has changed. + * @param peerIds A list of all currently registered peer clusters + */ + public void peerListChanged(List peerIds); +} http://git-wip-us.apache.org/repos/asf/hbase/blob/e2ce252b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java new file mode 100644 index 0000000..4f18048 --- /dev/null +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + + +/** + * ReplicationPeer manages enabled / disabled state for the peer. + */ +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION) +public interface ReplicationPeer { + + /** + * State of the peer, whether it is enabled or not + */ + @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION) + enum PeerState { + ENABLED, + DISABLED + } + + /** + * Get the identifier of this peer + * @return string representation of the id + */ + String getId(); + + /** + * Get the peer config object + * @return the ReplicationPeerConfig for this peer + */ + public ReplicationPeerConfig getPeerConfig(); + + /** + * Returns the state of the peer + * @return the enabled state + */ + PeerState getPeerState(); + + /** + * Get the configuration object required to communicate with this peer + * @return configuration object + */ + public Configuration getConfiguration(); + + /** + * Get replicable (table, cf-list) map of this peer + * @return the replicable (table, cf-list) map + */ + public Map> getTableCFs(); + + /** + * Get replicable namespace set of this peer + * @return the replicable namespaces set + */ + public Set getNamespaces(); + + /** + * Get the per node bandwidth upper limit for this peer + * @return the bandwidth upper limit + */ + public long getPeerBandwidth(); + + void trackPeerConfigChanges(ReplicationPeerConfigListener listener); + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/e2ce252b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigListener.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigListener.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigListener.java new file mode 100644 index 0000000..4e04186 --- /dev/null +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigListener.java @@ -0,0 +1,33 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.replication; + +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION) +public interface ReplicationPeerConfigListener { + /** Callback method for when users update the ReplicationPeerConfig for this peer + * + * @param rpc The updated ReplicationPeerConfig + */ + void peerConfigUpdated(ReplicationPeerConfig rpc); + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/e2ce252b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java new file mode 100644 index 0000000..3973be9 --- /dev/null +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java @@ -0,0 +1,318 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import java.io.Closeable; +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.replication.ReplicationSerDeHelper; +import org.apache.hadoop.hbase.exceptions.DeserializationException; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.KeeperException.NodeExistsException; + +@InterfaceAudience.Private +public class ReplicationPeerZKImpl extends ReplicationStateZKBase + implements ReplicationPeer, Abortable, Closeable { + private static final Log LOG = LogFactory.getLog(ReplicationPeerZKImpl.class); + + private ReplicationPeerConfig peerConfig; + private final String id; + private volatile PeerState peerState; + private volatile Map> tableCFs = new HashMap<>(); + private final Configuration conf; + private PeerStateTracker peerStateTracker; + private PeerConfigTracker peerConfigTracker; + + + /** + * Constructor that takes all the objects required to communicate with the specified peer, except + * for the region server addresses. + * @param conf configuration object to this peer + * @param id string representation of this peer's identifier + * @param peerConfig configuration for the replication peer + */ + public ReplicationPeerZKImpl(ZooKeeperWatcher zkWatcher, Configuration conf, + String id, ReplicationPeerConfig peerConfig, + Abortable abortable) + throws ReplicationException { + super(zkWatcher, conf, abortable); + this.conf = conf; + this.peerConfig = peerConfig; + this.id = id; + } + + /** + * start a state tracker to check whether this peer is enabled or not + * + * @param peerStateNode path to zk node which stores peer state + * @throws KeeperException + */ + public void startStateTracker(String peerStateNode) + throws KeeperException { + ensurePeerEnabled(peerStateNode); + this.peerStateTracker = new PeerStateTracker(peerStateNode, zookeeper, this); + this.peerStateTracker.start(); + try { + this.readPeerStateZnode(); + } catch (DeserializationException e) { + throw ZKUtil.convert(e); + } + } + + private void readPeerStateZnode() throws DeserializationException { + this.peerState = + isStateEnabled(this.peerStateTracker.getData(false)) + ? PeerState.ENABLED + : PeerState.DISABLED; + } + + /** + * start a table-cfs tracker to listen the (table, cf-list) map change + * @param peerConfigNode path to zk node which stores table-cfs + * @throws KeeperException + */ + public void startPeerConfigTracker(String peerConfigNode) + throws KeeperException { + this.peerConfigTracker = new PeerConfigTracker(peerConfigNode, zookeeper, + this); + this.peerConfigTracker.start(); + this.readPeerConfig(); + } + + private ReplicationPeerConfig readPeerConfig() { + try { + byte[] data = peerConfigTracker.getData(false); + if (data != null) { + this.peerConfig = ReplicationSerDeHelper.parsePeerFrom(data); + } + } catch (DeserializationException e) { + LOG.error("", e); + } + return this.peerConfig; + } + + @Override + public PeerState getPeerState() { + return peerState; + } + + /** + * Get the identifier of this peer + * @return string representation of the id (short) + */ + @Override + public String getId() { + return id; + } + + /** + * Get the peer config object + * @return the ReplicationPeerConfig for this peer + */ + @Override + public ReplicationPeerConfig getPeerConfig() { + return peerConfig; + } + + /** + * Get the configuration object required to communicate with this peer + * @return configuration object + */ + @Override + public Configuration getConfiguration() { + return conf; + } + + /** + * Get replicable (table, cf-list) map of this peer + * @return the replicable (table, cf-list) map + */ + @Override + public Map> getTableCFs() { + this.tableCFs = peerConfig.getTableCFsMap(); + return this.tableCFs; + } + + /** + * Get replicable namespace set of this peer + * @return the replicable namespaces set + */ + @Override + public Set getNamespaces() { + return this.peerConfig.getNamespaces(); + } + + @Override + public long getPeerBandwidth() { + return this.peerConfig.getBandwidth(); + } + + @Override + public void trackPeerConfigChanges(ReplicationPeerConfigListener listener) { + if (this.peerConfigTracker != null){ + this.peerConfigTracker.setListener(listener); + } + } + + @Override + public void abort(String why, Throwable e) { + LOG.fatal("The ReplicationPeer corresponding to peer " + peerConfig + + " was aborted for the following reason(s):" + why, e); + } + + @Override + public boolean isAborted() { + // Currently the replication peer is never "Aborted", we just log when the + // abort method is called. + return false; + } + + @Override + public void close() throws IOException { + // TODO: stop zkw? + } + + /** + * Parse the raw data from ZK to get a peer's state + * @param bytes raw ZK data + * @return True if the passed in bytes are those of a pb serialized ENABLED state. + * @throws DeserializationException + */ + public static boolean isStateEnabled(final byte[] bytes) throws DeserializationException { + ReplicationProtos.ReplicationState.State state = parseStateFrom(bytes); + return ReplicationProtos.ReplicationState.State.ENABLED == state; + } + + /** + * @param bytes Content of a state znode. + * @return State parsed from the passed bytes. + * @throws DeserializationException + */ + private static ReplicationProtos.ReplicationState.State parseStateFrom(final byte[] bytes) + throws DeserializationException { + ProtobufUtil.expectPBMagicPrefix(bytes); + int pblen = ProtobufUtil.lengthOfPBMagic(); + ReplicationProtos.ReplicationState.Builder builder = + ReplicationProtos.ReplicationState.newBuilder(); + ReplicationProtos.ReplicationState state; + try { + ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen); + state = builder.build(); + return state.getState(); + } catch (IOException e) { + throw new DeserializationException(e); + } + } + + /** + * Utility method to ensure an ENABLED znode is in place; if not present, we create it. + * @param path Path to znode to check + * @return True if we created the znode. + * @throws NodeExistsException + * @throws KeeperException + */ + private boolean ensurePeerEnabled(final String path) + throws NodeExistsException, KeeperException { + if (ZKUtil.checkExists(zookeeper, path) == -1) { + // There is a race b/w PeerWatcher and ReplicationZookeeper#add method to create the + // peer-state znode. This happens while adding a peer. + // The peer state data is set as "ENABLED" by default. + ZKUtil.createNodeIfNotExistsAndWatch(zookeeper, path, + ReplicationStateZKBase.ENABLED_ZNODE_BYTES); + return true; + } + return false; + } + + /** + * Tracker for state of this peer + */ + public class PeerStateTracker extends ZooKeeperNodeTracker { + + public PeerStateTracker(String peerStateZNode, ZooKeeperWatcher watcher, + Abortable abortable) { + super(watcher, peerStateZNode, abortable); + } + + @Override + public synchronized void nodeDataChanged(String path) { + if (path.equals(node)) { + super.nodeDataChanged(path); + try { + readPeerStateZnode(); + } catch (DeserializationException e) { + LOG.warn("Failed deserializing the content of " + path, e); + } + } + } + } + + /** + * Tracker for PeerConfigNode of this peer + */ + public class PeerConfigTracker extends ZooKeeperNodeTracker { + + ReplicationPeerConfigListener listener; + + public PeerConfigTracker(String peerConfigNode, ZooKeeperWatcher watcher, + Abortable abortable) { + super(watcher, peerConfigNode, abortable); + } + + public synchronized void setListener(ReplicationPeerConfigListener listener){ + this.listener = listener; + } + + @Override + public synchronized void nodeCreated(String path) { + if (path.equals(node)) { + super.nodeCreated(path); + ReplicationPeerConfig config = readPeerConfig(); + if (listener != null){ + listener.peerConfigUpdated(config); + } + } + } + + @Override + public synchronized void nodeDataChanged(String path) { + //superclass calls nodeCreated + if (path.equals(node)) { + super.nodeDataChanged(path); + } + + } + + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/e2ce252b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java new file mode 100644 index 0000000..2a7963a --- /dev/null +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java @@ -0,0 +1,177 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.Pair; + +/** + * This provides an interface for maintaining a set of peer clusters. These peers are remote slave + * clusters that data is replicated to. A peer cluster can be in three different states: + * + * 1. Not-Registered - There is no notion of the peer cluster. + * 2. Registered - The peer has an id and is being tracked but there is no connection. + * 3. Connected - There is an active connection to the remote peer. + * + * In the registered or connected state, a peer cluster can either be enabled or disabled. + */ +@InterfaceAudience.Private +public interface ReplicationPeers { + + /** + * Initialize the ReplicationPeers interface. + */ + void init() throws ReplicationException; + + /** + * Add a new remote slave cluster for replication. + * @param peerId a short that identifies the cluster + * @param peerConfig configuration for the replication slave cluster + */ + void registerPeer(String peerId, ReplicationPeerConfig peerConfig) + throws ReplicationException; + + /** + * Removes a remote slave cluster and stops the replication to it. + * @param peerId a short that identifies the cluster + */ + void unregisterPeer(String peerId) throws ReplicationException; + + /** + * Method called after a peer has been connected. It will create a ReplicationPeer to track the + * newly connected cluster. + * @param peerId a short that identifies the cluster + * @return whether a ReplicationPeer was successfully created + * @throws ReplicationException + */ + boolean peerConnected(String peerId) throws ReplicationException; + + /** + * Method called after a peer has been disconnected. It will remove the ReplicationPeer that + * tracked the disconnected cluster. + * @param peerId a short that identifies the cluster + */ + void peerDisconnected(String peerId); + + /** + * Restart the replication to the specified remote slave cluster. + * @param peerId a short that identifies the cluster + */ + void enablePeer(String peerId) throws ReplicationException; + + /** + * Stop the replication to the specified remote slave cluster. + * @param peerId a short that identifies the cluster + */ + void disablePeer(String peerId) throws ReplicationException; + + /** + * Get the table and column-family list string of the peer from the underlying storage. + * @param peerId a short that identifies the cluster + */ + public Map> getPeerTableCFsConfig(String peerId) + throws ReplicationException; + + /** + * Set the table and column-family list string of the peer to the underlying storage. + * @param peerId a short that identifies the cluster + * @param tableCFs the table and column-family list which will be replicated for this peer + */ + public void setPeerTableCFsConfig(String peerId, + Map> tableCFs) + throws ReplicationException; + + /** + * Returns the ReplicationPeer for the specified connected peer. This ReplicationPeer will + * continue to track changes to the Peer's state and config. This method returns null if no + * peer has been connected with the given peerId. + * @param peerId id for the peer + * @return ReplicationPeer object + */ + ReplicationPeer getConnectedPeer(String peerId); + + /** + * Returns the set of peerIds of the clusters that have been connected and have an underlying + * ReplicationPeer. + * @return a Set of Strings for peerIds + */ + public Set getConnectedPeerIds(); + + /** + * Get the replication status for the specified connected remote slave cluster. + * The value might be read from cache, so it is recommended to + * use {@link #getStatusOfPeerFromBackingStore(String)} + * if reading the state after enabling or disabling it. + * @param peerId a short that identifies the cluster + * @return true if replication is enabled, false otherwise. + */ + boolean getStatusOfPeer(String peerId); + + /** + * Get the replication status for the specified remote slave cluster, which doesn't + * have to be connected. The state is read directly from the backing store. + * @param peerId a short that identifies the cluster + * @return true if replication is enabled, false otherwise. + * @throws ReplicationException thrown if there's an error contacting the store + */ + boolean getStatusOfPeerFromBackingStore(String peerId) throws ReplicationException; + + /** + * List the cluster replication configs of all remote slave clusters (whether they are + * enabled/disabled or connected/disconnected). + * @return A map of peer ids to peer cluster keys + */ + Map getAllPeerConfigs(); + + /** + * List the peer ids of all remote slave clusters (whether they are enabled/disabled or + * connected/disconnected). + * @return A list of peer ids + */ + List getAllPeerIds(); + + /** + * Returns the configured ReplicationPeerConfig for this peerId + * @param peerId a short name that identifies the cluster + * @return ReplicationPeerConfig for the peer + */ + ReplicationPeerConfig getReplicationPeerConfig(String peerId) throws ReplicationException; + + /** + * Returns the configuration needed to talk to the remote slave cluster. + * @param peerId a short that identifies the cluster + * @return the configuration for the peer cluster, null if it was unable to get the configuration + */ + Pair getPeerConf(String peerId) throws ReplicationException; + + /** + * Update the peerConfig for the a given peer cluster + * @param id a short that identifies the cluster + * @param peerConfig new config for the peer cluster + * @throws ReplicationException + */ + void updatePeerConfig(String id, ReplicationPeerConfig peerConfig) throws ReplicationException; +} http://git-wip-us.apache.org/repos/asf/hbase/blob/e2ce252b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java new file mode 100644 index 0000000..751e454 --- /dev/null +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java @@ -0,0 +1,546 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.CompoundConfiguration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.replication.ReplicationSerDeHelper; +import org.apache.hadoop.hbase.exceptions.DeserializationException; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; +import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.zookeeper.ZKConfig; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.KeeperException; + +/** + * This class provides an implementation of the ReplicationPeers interface using ZooKeeper. The + * peers znode contains a list of all peer replication clusters and the current replication state of + * those clusters. It has one child peer znode for each peer cluster. The peer znode is named with + * the cluster id provided by the user in the HBase shell. The value of the peer znode contains the + * peers cluster key provided by the user in the HBase Shell. The cluster key contains a list of + * zookeeper quorum peers, the client port for the zookeeper quorum, and the base znode for HBase. + * For example: + * + * /hbase/replication/peers/1 [Value: zk1.host.com,zk2.host.com,zk3.host.com:2181:/hbase] + * /hbase/replication/peers/2 [Value: zk5.host.com,zk6.host.com,zk7.host.com:2181:/hbase] + * + * Each of these peer znodes has a child znode that indicates whether or not replication is enabled + * on that peer cluster. These peer-state znodes do not have child znodes and simply contain a + * boolean value (i.e. ENABLED or DISABLED). This value is read/maintained by the + * ReplicationPeer.PeerStateTracker class. For example: + * + * /hbase/replication/peers/1/peer-state [Value: ENABLED] + * + * Each of these peer znodes has a child znode that indicates which data will be replicated + * to the peer cluster. These peer-tableCFs znodes do not have child znodes and only have a + * table/cf list config. This value is read/maintained by the ReplicationPeer.TableCFsTracker + * class. For example: + * + * /hbase/replication/peers/1/tableCFs [Value: "table1; table2:cf1,cf3; table3:cfx,cfy"] + */ +@InterfaceAudience.Private +public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements ReplicationPeers { + + // Map of peer clusters keyed by their id + private Map peerClusters; + private final ReplicationQueuesClient queuesClient; + private Abortable abortable; + + private static final Log LOG = LogFactory.getLog(ReplicationPeersZKImpl.class); + + public ReplicationPeersZKImpl(final ZooKeeperWatcher zk, final Configuration conf, + final ReplicationQueuesClient queuesClient, Abortable abortable) { + super(zk, conf, abortable); + this.abortable = abortable; + this.peerClusters = new ConcurrentHashMap<>(); + this.queuesClient = queuesClient; + } + + @Override + public void init() throws ReplicationException { + try { + if (ZKUtil.checkExists(this.zookeeper, this.peersZNode) < 0) { + ZKUtil.createWithParents(this.zookeeper, this.peersZNode); + } + } catch (KeeperException e) { + throw new ReplicationException("Could not initialize replication peers", e); + } + addExistingPeers(); + } + + @Override + public void registerPeer(String id, ReplicationPeerConfig peerConfig) + throws ReplicationException { + try { + if (peerExists(id)) { + throw new IllegalArgumentException("Cannot add a peer with id=" + id + + " because that id already exists."); + } + + if(id.contains("-")){ + throw new IllegalArgumentException("Found invalid peer name:" + id); + } + + if (peerConfig.getClusterKey() != null) { + try { + ZKConfig.validateClusterKey(peerConfig.getClusterKey()); + } catch (IOException ioe) { + throw new IllegalArgumentException(ioe.getMessage()); + } + } + + checkQueuesDeleted(id); + + ZKUtil.createWithParents(this.zookeeper, this.peersZNode); + + List listOfOps = new ArrayList<>(2); + ZKUtilOp op1 = ZKUtilOp.createAndFailSilent(getPeerNode(id), + ReplicationSerDeHelper.toByteArray(peerConfig)); + // b/w PeerWatcher and ReplicationZookeeper#add method to create the + // peer-state znode. This happens while adding a peer + // The peer state data is set as "ENABLED" by default. + ZKUtilOp op2 = ZKUtilOp.createAndFailSilent(getPeerStateNode(id), ENABLED_ZNODE_BYTES); + listOfOps.add(op1); + listOfOps.add(op2); + ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false); + // A peer is enabled by default + } catch (KeeperException e) { + throw new ReplicationException("Could not add peer with id=" + id + + ", peerConfif=>" + peerConfig, e); + } + } + + @Override + public void unregisterPeer(String id) throws ReplicationException { + try { + if (!peerExists(id)) { + throw new IllegalArgumentException("Cannot remove peer with id=" + id + + " because that id does not exist."); + } + ZKUtil.deleteNodeRecursively(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id)); + } catch (KeeperException e) { + throw new ReplicationException("Could not remove peer with id=" + id, e); + } + } + + @Override + public void enablePeer(String id) throws ReplicationException { + changePeerState(id, ReplicationProtos.ReplicationState.State.ENABLED); + LOG.info("peer " + id + " is enabled"); + } + + @Override + public void disablePeer(String id) throws ReplicationException { + changePeerState(id, ReplicationProtos.ReplicationState.State.DISABLED); + LOG.info("peer " + id + " is disabled"); + } + + @Override + public Map> getPeerTableCFsConfig(String id) throws ReplicationException { + try { + if (!peerExists(id)) { + throw new IllegalArgumentException("peer " + id + " doesn't exist"); + } + try { + ReplicationPeerConfig rpc = getReplicationPeerConfig(id); + if (rpc == null) { + throw new ReplicationException("Unable to get tableCFs of the peer with id=" + id); + } + return rpc.getTableCFsMap(); + } catch (Exception e) { + throw new ReplicationException(e); + } + } catch (KeeperException e) { + throw new ReplicationException("Unable to get tableCFs of the peer with id=" + id, e); + } + } + + @Override + public void setPeerTableCFsConfig(String id, + Map> tableCFs) + throws ReplicationException { + try { + if (!peerExists(id)) { + throw new IllegalArgumentException("Cannot set peer tableCFs because id=" + id + + " does not exist."); + } + ReplicationPeerConfig rpc = getReplicationPeerConfig(id); + if (rpc == null) { + throw new ReplicationException("Unable to get tableCFs of the peer with id=" + id); + } + rpc.setTableCFsMap(tableCFs); + ZKUtil.setData(this.zookeeper, getPeerNode(id), + ReplicationSerDeHelper.toByteArray(rpc)); + LOG.info("Peer tableCFs with id= " + id + " is now " + + ReplicationSerDeHelper.convertToString(tableCFs)); + } catch (KeeperException e) { + throw new ReplicationException("Unable to change tableCFs of the peer with id=" + id, e); + } + } + + @Override + public boolean getStatusOfPeer(String id) { + ReplicationPeer replicationPeer = this.peerClusters.get(id); + if (replicationPeer == null) { + throw new IllegalArgumentException("Peer with id= " + id + " is not cached"); + } + return replicationPeer.getPeerState() == PeerState.ENABLED; + } + + @Override + public boolean getStatusOfPeerFromBackingStore(String id) throws ReplicationException { + try { + if (!peerExists(id)) { + throw new IllegalArgumentException("peer " + id + " doesn't exist"); + } + String peerStateZNode = getPeerStateNode(id); + try { + return ReplicationPeerZKImpl.isStateEnabled(ZKUtil.getData(this.zookeeper, peerStateZNode)); + } catch (KeeperException e) { + throw new ReplicationException(e); + } catch (DeserializationException e) { + throw new ReplicationException(e); + } + } catch (KeeperException e) { + throw new ReplicationException("Unable to get status of the peer with id=" + id + + " from backing store", e); + } catch (InterruptedException e) { + throw new ReplicationException(e); + } + } + + @Override + public Map getAllPeerConfigs() { + Map peers = new TreeMap<>(); + List ids = null; + try { + ids = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode); + for (String id : ids) { + ReplicationPeerConfig peerConfig = getReplicationPeerConfig(id); + if (peerConfig == null) { + LOG.warn("Failed to get replication peer configuration of clusterid=" + id + + " znode content, continuing."); + continue; + } + peers.put(id, peerConfig); + } + } catch (KeeperException e) { + this.abortable.abort("Cannot get the list of peers ", e); + } catch (ReplicationException e) { + this.abortable.abort("Cannot get the list of peers ", e); + } + return peers; + } + + @Override + public ReplicationPeer getConnectedPeer(String peerId) { + return peerClusters.get(peerId); + } + + @Override + public Set getConnectedPeerIds() { + return peerClusters.keySet(); // this is not thread-safe + } + + /** + * Returns a ReplicationPeerConfig from the znode or null for the given peerId. + */ + @Override + public ReplicationPeerConfig getReplicationPeerConfig(String peerId) + throws ReplicationException { + String znode = getPeerNode(peerId); + byte[] data = null; + try { + data = ZKUtil.getData(this.zookeeper, znode); + } catch (InterruptedException e) { + LOG.warn("Could not get configuration for peer because the thread " + + "was interrupted. peerId=" + peerId); + Thread.currentThread().interrupt(); + return null; + } catch (KeeperException e) { + throw new ReplicationException("Error getting configuration for peer with id=" + + peerId, e); + } + if (data == null) { + LOG.error("Could not get configuration for peer because it doesn't exist. peerId=" + peerId); + return null; + } + + try { + return ReplicationSerDeHelper.parsePeerFrom(data); + } catch (DeserializationException e) { + LOG.warn("Failed to parse cluster key from peerId=" + peerId + + ", specifically the content from the following znode: " + znode); + return null; + } + } + + @Override + public Pair getPeerConf(String peerId) + throws ReplicationException { + ReplicationPeerConfig peerConfig = getReplicationPeerConfig(peerId); + + if (peerConfig == null) { + return null; + } + + Configuration otherConf; + try { + otherConf = HBaseConfiguration.createClusterConf(this.conf, peerConfig.getClusterKey()); + } catch (IOException e) { + LOG.error("Can't get peer configuration for peerId=" + peerId + " because:", e); + return null; + } + + if (!peerConfig.getConfiguration().isEmpty()) { + CompoundConfiguration compound = new CompoundConfiguration(); + compound.add(otherConf); + compound.addStringMap(peerConfig.getConfiguration()); + return new Pair<>(peerConfig, compound); + } + + return new Pair<>(peerConfig, otherConf); + } + + @Override + public void updatePeerConfig(String id, ReplicationPeerConfig newConfig) + throws ReplicationException { + ReplicationPeer peer = getConnectedPeer(id); + if (peer == null){ + throw new ReplicationException("Could not find peer Id " + id + " in connected peers"); + } + ReplicationPeerConfig existingConfig = peer.getPeerConfig(); + if (newConfig.getClusterKey() != null && !newConfig.getClusterKey().isEmpty() && + !newConfig.getClusterKey().equals(existingConfig.getClusterKey())){ + throw new ReplicationException("Changing the cluster key on an existing peer is not allowed." + + " Existing key '" + existingConfig.getClusterKey() + "' does not match new key '" + + newConfig.getClusterKey() + + "'"); + } + String existingEndpointImpl = existingConfig.getReplicationEndpointImpl(); + if (newConfig.getReplicationEndpointImpl() != null && + !newConfig.getReplicationEndpointImpl().isEmpty() && + !newConfig.getReplicationEndpointImpl().equals(existingEndpointImpl)){ + throw new ReplicationException("Changing the replication endpoint implementation class " + + "on an existing peer is not allowed. Existing class '" + + existingConfig.getReplicationEndpointImpl() + + "' does not match new class '" + newConfig.getReplicationEndpointImpl() + "'"); + } + //Update existingConfig's peer config and peer data with the new values, but don't touch config + // or data that weren't explicitly changed + existingConfig.getConfiguration().putAll(newConfig.getConfiguration()); + existingConfig.getPeerData().putAll(newConfig.getPeerData()); + existingConfig.setTableCFsMap(newConfig.getTableCFsMap()); + existingConfig.setNamespaces(newConfig.getNamespaces()); + existingConfig.setBandwidth(newConfig.getBandwidth()); + + try { + ZKUtil.setData(this.zookeeper, getPeerNode(id), + ReplicationSerDeHelper.toByteArray(existingConfig)); + } + catch(KeeperException ke){ + throw new ReplicationException("There was a problem trying to save changes to the " + + "replication peer " + id, ke); + } + } + + /** + * List all registered peer clusters and set a watch on their znodes. + */ + @Override + public List getAllPeerIds() { + List ids = null; + try { + ids = ZKUtil.listChildrenAndWatchThem(this.zookeeper, this.peersZNode); + } catch (KeeperException e) { + this.abortable.abort("Cannot get the list of peers ", e); + } + return ids; + } + + /** + * A private method used during initialization. This method attempts to add all registered + * peer clusters. This method does not set a watch on the peer cluster znodes. + */ + private void addExistingPeers() throws ReplicationException { + List znodes = null; + try { + znodes = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode); + } catch (KeeperException e) { + throw new ReplicationException("Error getting the list of peer clusters.", e); + } + if (znodes != null) { + for (String z : znodes) { + createAndAddPeer(z); + } + } + } + + @Override + public boolean peerConnected(String peerId) throws ReplicationException { + return createAndAddPeer(peerId); + } + + @Override + public void peerDisconnected(String peerId) { + ReplicationPeer rp = this.peerClusters.get(peerId); + if (rp != null) { + ((ConcurrentMap) peerClusters).remove(peerId, rp); + } + } + + /** + * Attempt to connect to a new remote slave cluster. + * @param peerId a short that identifies the cluster + * @return true if a new connection was made, false if no new connection was made. + */ + public boolean createAndAddPeer(String peerId) throws ReplicationException { + if (peerClusters == null) { + return false; + } + if (this.peerClusters.containsKey(peerId)) { + return false; + } + + ReplicationPeerZKImpl peer = null; + try { + peer = createPeer(peerId); + } catch (Exception e) { + throw new ReplicationException("Error adding peer with id=" + peerId, e); + } + if (peer == null) { + return false; + } + ReplicationPeerZKImpl previous = + ((ConcurrentMap) peerClusters).putIfAbsent(peerId, peer); + if (previous == null) { + LOG.info("Added new peer cluster=" + peer.getPeerConfig().getClusterKey()); + } else { + LOG.info("Peer already present, " + previous.getPeerConfig().getClusterKey() + + ", new cluster=" + peer.getPeerConfig().getClusterKey()); + } + return true; + } + + /** + * Update the state znode of a peer cluster. + * @param id + * @param state + */ + private void changePeerState(String id, ReplicationProtos.ReplicationState.State state) + throws ReplicationException { + try { + if (!peerExists(id)) { + throw new IllegalArgumentException("Cannot enable/disable peer because id=" + id + + " does not exist."); + } + String peerStateZNode = getPeerStateNode(id); + byte[] stateBytes = + (state == ReplicationProtos.ReplicationState.State.ENABLED) ? ENABLED_ZNODE_BYTES + : DISABLED_ZNODE_BYTES; + if (ZKUtil.checkExists(this.zookeeper, peerStateZNode) != -1) { + ZKUtil.setData(this.zookeeper, peerStateZNode, stateBytes); + } else { + ZKUtil.createAndWatch(this.zookeeper, peerStateZNode, stateBytes); + } + LOG.info("Peer with id= " + id + " is now " + state.name()); + } catch (KeeperException e) { + throw new ReplicationException("Unable to change state of the peer with id=" + id, e); + } + } + + /** + * Helper method to connect to a peer + * @param peerId peer's identifier + * @return object representing the peer + * @throws ReplicationException + */ + private ReplicationPeerZKImpl createPeer(String peerId) throws ReplicationException { + Pair pair = getPeerConf(peerId); + if (pair == null) { + return null; + } + Configuration peerConf = pair.getSecond(); + + ReplicationPeerZKImpl peer = new ReplicationPeerZKImpl(zookeeper, + peerConf, peerId, pair.getFirst(), abortable); + try { + peer.startStateTracker(this.getPeerStateNode(peerId)); + } catch (KeeperException e) { + throw new ReplicationException("Error starting the peer state tracker for peerId=" + + peerId, e); + } + + try { + peer.startPeerConfigTracker(this.getPeerNode(peerId)); + } catch (KeeperException e) { + throw new ReplicationException("Error starting the peer tableCFs tracker for peerId=" + + peerId, e); + } + + return peer; + } + + private void checkQueuesDeleted(String peerId) throws ReplicationException { + if (queuesClient == null) return; + try { + List replicators = queuesClient.getListOfReplicators(); + if (replicators == null || replicators.isEmpty()) { + return; + } + for (String replicator : replicators) { + List queueIds = queuesClient.getAllQueues(replicator); + for (String queueId : queueIds) { + ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId); + if (queueInfo.getPeerId().equals(peerId)) { + throw new ReplicationException("undeleted queue for peerId: " + peerId + + ", replicator: " + replicator + ", queueId: " + queueId); + } + } + } + // Check for hfile-refs queue + if (-1 != ZKUtil.checkExists(zookeeper, hfileRefsZNode) + && queuesClient.getAllPeersFromHFileRefsQueue().contains(peerId)) { + throw new ReplicationException("Undeleted queue for peerId: " + peerId + + ", found in hfile-refs node path " + hfileRefsZNode); + } + } catch (KeeperException e) { + throw new ReplicationException("Could not check queues deleted with id=" + peerId, e); + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/e2ce252b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java new file mode 100644 index 0000000..1403f6d --- /dev/null +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java @@ -0,0 +1,130 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.ServerName; + +/** + * This class is responsible for the parsing logic for a znode representing a queue. + * It will extract the peerId if it's recovered as well as the dead region servers + * that were part of the queue's history. + */ +@InterfaceAudience.Private +public class ReplicationQueueInfo { + private static final Log LOG = LogFactory.getLog(ReplicationQueueInfo.class); + + private final String peerId; + private final String peerClusterZnode; + private boolean queueRecovered; + // List of all the dead region servers that had this queue (if recovered) + private List deadRegionServers = new ArrayList<>(); + + /** + * The passed znode will be either the id of the peer cluster or + * the handling story of that queue in the form of id-servername-* + */ + public ReplicationQueueInfo(String znode) { + this.peerClusterZnode = znode; + String[] parts = znode.split("-", 2); + this.queueRecovered = parts.length != 1; + this.peerId = this.queueRecovered ? + parts[0] : peerClusterZnode; + if (parts.length >= 2) { + // extract dead servers + extractDeadServersFromZNodeString(parts[1], this.deadRegionServers); + } + } + + /** + * Parse dead server names from znode string servername can contain "-" such as + * "ip-10-46-221-101.ec2.internal", so we need skip some "-" during parsing for the following + * cases: 2-ip-10-46-221-101.ec2.internal,52170,1364333181125-<server name>-... + */ + private static void + extractDeadServersFromZNodeString(String deadServerListStr, List result) { + + if(deadServerListStr == null || result == null || deadServerListStr.isEmpty()) return; + + // valid server name delimiter "-" has to be after "," in a server name + int seenCommaCnt = 0; + int startIndex = 0; + int len = deadServerListStr.length(); + + for (int i = 0; i < len; i++) { + switch (deadServerListStr.charAt(i)) { + case ',': + seenCommaCnt += 1; + break; + case '-': + if(seenCommaCnt>=2) { + if (i > startIndex) { + String serverName = deadServerListStr.substring(startIndex, i); + if(ServerName.isFullServerName(serverName)){ + result.add(serverName); + } else { + LOG.error("Found invalid server name:" + serverName); + } + startIndex = i + 1; + } + seenCommaCnt = 0; + } + break; + default: + break; + } + } + + // add tail + if(startIndex < len - 1){ + String serverName = deadServerListStr.substring(startIndex, len); + if(ServerName.isFullServerName(serverName)){ + result.add(serverName); + } else { + LOG.error("Found invalid server name at the end:" + serverName); + } + } + + LOG.debug("Found dead servers:" + result); + } + + public List getDeadRegionServers() { + return Collections.unmodifiableList(this.deadRegionServers); + } + + public String getPeerId() { + return this.peerId; + } + + public String getPeerClusterZnode() { + return this.peerClusterZnode; + } + + public boolean isQueueRecovered() { + return queueRecovered; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/e2ce252b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java new file mode 100644 index 0000000..be5a590 --- /dev/null +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java @@ -0,0 +1,160 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import java.util.List; +import java.util.SortedSet; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.Pair; + +/** + * This provides an interface for maintaining a region server's replication queues. These queues + * keep track of the WALs and HFile references (if hbase.replication.bulkload.enabled is enabled) + * that still need to be replicated to remote clusters. + */ +@InterfaceAudience.Private +public interface ReplicationQueues { + + /** + * Initialize the region server replication queue interface. + * @param serverName The server name of the region server that owns the replication queues this + * interface manages. + */ + void init(String serverName) throws ReplicationException; + + /** + * Remove a replication queue. + * @param queueId a String that identifies the queue. + */ + void removeQueue(String queueId); + + /** + * Add a new WAL file to the given queue. If the queue does not exist it is created. + * @param queueId a String that identifies the queue. + * @param filename name of the WAL + */ + void addLog(String queueId, String filename) throws ReplicationException; + + /** + * Remove an WAL file from the given queue. + * @param queueId a String that identifies the queue. + * @param filename name of the WAL + */ + void removeLog(String queueId, String filename); + + /** + * Set the current position for a specific WAL in a given queue. + * @param queueId a String that identifies the queue + * @param filename name of the WAL + * @param position the current position in the file + */ + void setLogPosition(String queueId, String filename, long position); + + /** + * Get the current position for a specific WAL in a given queue. + * @param queueId a String that identifies the queue + * @param filename name of the WAL + * @return the current position in the file + */ + long getLogPosition(String queueId, String filename) throws ReplicationException; + + /** + * Remove all replication queues for this region server. + */ + void removeAllQueues(); + + /** + * Get a list of all WALs in the given queue. + * @param queueId a String that identifies the queue + * @return a list of WALs, null if no such queue exists for this server + */ + List getLogsInQueue(String queueId); + + /** + * Get a list of all queues for this region server. + * @return a list of queueIds, an empty list if this region server is dead and has no outstanding queues + */ + List getAllQueues(); + + /** + * Get queueIds from a dead region server, whose queues has not been claimed by other region + * servers. + * @return empty if the queue exists but no children, null if the queue does not exist. + */ + List getUnClaimedQueueIds(String regionserver); + + /** + * Take ownership for the queue identified by queueId and belongs to a dead region server. + * @param regionserver the id of the dead region server + * @param queueId the id of the queue + * @return the new PeerId and A SortedSet of WALs in its queue, and null if no unclaimed queue. + */ + Pair> claimQueue(String regionserver, String queueId); + + /** + * Remove the znode of region server if the queue is empty. + * @param regionserver + */ + void removeReplicatorIfQueueIsEmpty(String regionserver); + + /** + * Get a list of all region servers that have outstanding replication queues. These servers could + * be alive, dead or from a previous run of the cluster. + * @return a list of server names + */ + List getListOfReplicators(); + + /** + * Checks if the provided znode is the same as this region server's + * @param regionserver the id of the region server + * @return if this is this rs's znode + */ + boolean isThisOurRegionServer(String regionserver); + + /** + * Add a peer to hfile reference queue if peer does not exist. + * @param peerId peer cluster id to be added + * @throws ReplicationException if fails to add a peer id to hfile reference queue + */ + void addPeerToHFileRefs(String peerId) throws ReplicationException; + + /** + * Remove a peer from hfile reference queue. + * @param peerId peer cluster id to be removed + */ + void removePeerFromHFileRefs(String peerId); + + /** + * Add new hfile references to the queue. + * @param peerId peer cluster id to which the hfiles need to be replicated + * @param pairs list of pairs of { HFile location in staging dir, HFile path in region dir which + * will be added in the queue } + * @throws ReplicationException if fails to add a hfile reference + */ + void addHFileRefs(String peerId, List> pairs) throws ReplicationException; + + /** + * Remove hfile references from the queue. + * @param peerId peer cluster id from which this hfile references needs to be removed + * @param files list of hfile references to be removed + */ + void removeHFileRefs(String peerId, List files); +} http://git-wip-us.apache.org/repos/asf/hbase/blob/e2ce252b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesArguments.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesArguments.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesArguments.java new file mode 100644 index 0000000..12fc6a1 --- /dev/null +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesArguments.java @@ -0,0 +1,70 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; + +/** + * Wrapper around common arguments used to construct ReplicationQueues. Used to construct various + * ReplicationQueues Implementations with different constructor arguments by reflection. + */ +@InterfaceAudience.Private +public class ReplicationQueuesArguments { + + private ZooKeeperWatcher zk; + private Configuration conf; + private Abortable abort; + + public ReplicationQueuesArguments(Configuration conf, Abortable abort) { + this.conf = conf; + this.abort = abort; + } + + public ReplicationQueuesArguments(Configuration conf, Abortable abort, ZooKeeperWatcher zk) { + this(conf, abort); + setZk(zk); + } + + public ZooKeeperWatcher getZk() { + return zk; + } + + public void setZk(ZooKeeperWatcher zk) { + this.zk = zk; + } + + public Configuration getConf() { + return conf; + } + + public void setConf(Configuration conf) { + this.conf = conf; + } + + public Abortable getAbortable() { + return abort; + } + + public void setAbortable(Abortable abort) { + this.abort = abort; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/e2ce252b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java new file mode 100644 index 0000000..6d8900e --- /dev/null +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java @@ -0,0 +1,93 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import java.util.List; +import java.util.Set; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.zookeeper.KeeperException; + +/** + * This provides an interface for clients of replication to view replication queues. These queues + * keep track of the sources(WALs/HFile references) that still need to be replicated to remote + * clusters. + */ +@InterfaceAudience.Private +public interface ReplicationQueuesClient { + + /** + * Initialize the replication queue client interface. + */ + public void init() throws ReplicationException; + + /** + * Get a list of all region servers that have outstanding replication queues. These servers could + * be alive, dead or from a previous run of the cluster. + * @return a list of server names + * @throws KeeperException zookeeper exception + */ + List getListOfReplicators() throws KeeperException; + + /** + * Get a list of all WALs in the given queue on the given region server. + * @param serverName the server name of the region server that owns the queue + * @param queueId a String that identifies the queue + * @return a list of WALs, null if this region server is dead and has no outstanding queues + * @throws KeeperException zookeeper exception + */ + List getLogsInQueue(String serverName, String queueId) throws KeeperException; + + /** + * Get a list of all queues for the specified region server. + * @param serverName the server name of the region server that owns the set of queues + * @return a list of queueIds, null if this region server is not a replicator. + */ + List getAllQueues(String serverName) throws KeeperException; + + /** + * Load all wals in all replication queues from ZK. This method guarantees to return a + * snapshot which contains all WALs in the zookeeper at the start of this call even there + * is concurrent queue failover. However, some newly created WALs during the call may + * not be included. + */ + Set getAllWALs() throws KeeperException; + + /** + * Get the change version number of replication hfile references node. This can be used as + * optimistic locking to get a consistent snapshot of the replication queues of hfile references. + * @return change version number of hfile references node + */ + int getHFileRefsNodeChangeVersion() throws KeeperException; + + /** + * Get list of all peers from hfile reference queue. + * @return a list of peer ids + * @throws KeeperException zookeeper exception + */ + List getAllPeersFromHFileRefsQueue() throws KeeperException; + + /** + * Get a list of all hfile references in the given peer. + * @param peerId a String that identifies the peer + * @return a list of hfile references, null if not found any + * @throws KeeperException zookeeper exception + */ + List getReplicableHFiles(String peerId) throws KeeperException; +} http://git-wip-us.apache.org/repos/asf/hbase/blob/e2ce252b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientArguments.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientArguments.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientArguments.java new file mode 100644 index 0000000..834f831 --- /dev/null +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientArguments.java @@ -0,0 +1,40 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; + +/** + * Wrapper around common arguments used to construct ReplicationQueuesClient. Used to construct + * various ReplicationQueuesClient Implementations with different constructor arguments by + * reflection. + */ +@InterfaceAudience.Private +public class ReplicationQueuesClientArguments extends ReplicationQueuesArguments { + public ReplicationQueuesClientArguments(Configuration conf, Abortable abort, + ZooKeeperWatcher zk) { + super(conf, abort, zk); + } + public ReplicationQueuesClientArguments(Configuration conf, Abortable abort) { + super(conf, abort); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/e2ce252b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java new file mode 100644 index 0000000..1981131 --- /dev/null +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java @@ -0,0 +1,175 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import java.util.List; +import java.util.Set; + +import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableSet; +import org.apache.hadoop.hbase.shaded.com.google.common.collect.Sets; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; + +@InterfaceAudience.Private +public class ReplicationQueuesClientZKImpl extends ReplicationStateZKBase implements + ReplicationQueuesClient { + + Log LOG = LogFactory.getLog(ReplicationQueuesClientZKImpl.class); + + public ReplicationQueuesClientZKImpl(ReplicationQueuesClientArguments args) { + this(args.getZk(), args.getConf(), args.getAbortable()); + } + + public ReplicationQueuesClientZKImpl(final ZooKeeperWatcher zk, Configuration conf, + Abortable abortable) { + super(zk, conf, abortable); + } + + @Override + public void init() throws ReplicationException { + try { + if (ZKUtil.checkExists(this.zookeeper, this.queuesZNode) < 0) { + ZKUtil.createWithParents(this.zookeeper, this.queuesZNode); + } + } catch (KeeperException e) { + throw new ReplicationException("Internal error while initializing a queues client", e); + } + } + + @Override + public List getLogsInQueue(String serverName, String queueId) throws KeeperException { + String znode = ZKUtil.joinZNode(this.queuesZNode, serverName); + znode = ZKUtil.joinZNode(znode, queueId); + List result = null; + try { + result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode); + } catch (KeeperException e) { + this.abortable.abort("Failed to get list of wals for queueId=" + queueId + + " and serverName=" + serverName, e); + throw e; + } + return result; + } + + @Override + public List getAllQueues(String serverName) throws KeeperException { + String znode = ZKUtil.joinZNode(this.queuesZNode, serverName); + List result = null; + try { + result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode); + } catch (KeeperException e) { + this.abortable.abort("Failed to get list of queues for serverName=" + serverName, e); + throw e; + } + return result; + } + + @Override + public Set getAllWALs() throws KeeperException { + /** + * Load all wals in all replication queues from ZK. This method guarantees to return a + * snapshot which contains all WALs in the zookeeper at the start of this call even there + * is concurrent queue failover. However, some newly created WALs during the call may + * not be included. + */ + for (int retry = 0; ; retry++) { + int v0 = getQueuesZNodeCversion(); + List rss = getListOfReplicators(); + if (rss == null || rss.isEmpty()) { + LOG.debug("Didn't find any region server that replicates, won't prevent any deletions."); + return ImmutableSet.of(); + } + Set wals = Sets.newHashSet(); + for (String rs : rss) { + List listOfPeers = getAllQueues(rs); + // if rs just died, this will be null + if (listOfPeers == null) { + continue; + } + for (String id : listOfPeers) { + List peersWals = getLogsInQueue(rs, id); + if (peersWals != null) { + wals.addAll(peersWals); + } + } + } + int v1 = getQueuesZNodeCversion(); + if (v0 == v1) { + return wals; + } + LOG.info(String.format("Replication queue node cversion changed from %d to %d, retry = %d", + v0, v1, retry)); + } + } + + public int getQueuesZNodeCversion() throws KeeperException { + try { + Stat stat = new Stat(); + ZKUtil.getDataNoWatch(this.zookeeper, this.queuesZNode, stat); + return stat.getCversion(); + } catch (KeeperException e) { + this.abortable.abort("Failed to get stat of replication rs node", e); + throw e; + } + } + + @Override + public int getHFileRefsNodeChangeVersion() throws KeeperException { + Stat stat = new Stat(); + try { + ZKUtil.getDataNoWatch(this.zookeeper, this.hfileRefsZNode, stat); + } catch (KeeperException e) { + this.abortable.abort("Failed to get stat of replication hfile references node.", e); + throw e; + } + return stat.getCversion(); + } + + @Override + public List getAllPeersFromHFileRefsQueue() throws KeeperException { + List result = null; + try { + result = ZKUtil.listChildrenNoWatch(this.zookeeper, this.hfileRefsZNode); + } catch (KeeperException e) { + this.abortable.abort("Failed to get list of all peers in hfile references node.", e); + throw e; + } + return result; + } + + @Override + public List getReplicableHFiles(String peerId) throws KeeperException { + String znode = ZKUtil.joinZNode(this.hfileRefsZNode, peerId); + List result = null; + try { + result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode); + } catch (KeeperException e) { + this.abortable.abort("Failed to get list of hfile references for peerId=" + peerId, e); + throw e; + } + return result; + } +}