Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A2A1718E7E for ; Tue, 29 Mar 2016 02:55:09 +0000 (UTC) Received: (qmail 11193 invoked by uid 500); 29 Mar 2016 02:55:09 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 11010 invoked by uid 500); 29 Mar 2016 02:55:09 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 10909 invoked by uid 99); 29 Mar 2016 02:55:09 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 29 Mar 2016 02:55:09 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 1BE47E095D; Tue, 29 Mar 2016 02:55:09 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: chenheng@apache.org To: commits@hbase.apache.org Date: Tue, 29 Mar 2016 02:55:10 -0000 Message-Id: <8ddc12da20a643f1a23800dde289a562@git.apache.org> In-Reply-To: <5bb487f8b7264dadb450e69e0e83256f@git.apache.org> References: <5bb487f8b7264dadb450e69e0e83256f@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [3/3] hbase git commit: HBASE-11393 Replication TableCfs should be a PB object rather than a string HBASE-11393 Replication TableCfs should be a PB object rather than a string Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/7f39baf0 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/7f39baf0 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/7f39baf0 Branch: refs/heads/master Commit: 7f39baf0f4572ff209837d7de5d37554851ecbb7 Parents: 0520097 Author: chenheng Authored: Fri Mar 25 14:16:47 2016 +0800 Committer: chenheng Committed: Tue Mar 29 10:25:29 2016 +0800 ---------------------------------------------------------------------- .../client/replication/ReplicationAdmin.java | 170 +-- .../replication/ReplicationSerDeHelper.java | 315 +++++ .../hbase/replication/ReplicationPeer.java | 1 + .../replication/ReplicationPeerConfig.java | 16 +- .../replication/ReplicationPeerZKImpl.java | 76 +- .../hbase/replication/ReplicationPeers.java | 19 +- .../replication/ReplicationPeersZKImpl.java | 163 +-- .../replication/ReplicationStateZKBase.java | 19 + .../protobuf/generated/ZooKeeperProtos.java | 1155 +++++++++++++++++- .../src/main/protobuf/ZooKeeper.proto | 6 + .../org/apache/hadoop/hbase/master/HMaster.java | 10 +- .../replication/master/TableCFsUpdater.java | 122 ++ .../hbase/client/TestReplicaWithCluster.java | 6 +- .../replication/TestReplicationAdmin.java | 195 +-- .../cleaner/TestReplicationHFileCleaner.java | 2 +- .../replication/TestMasterReplication.java | 11 +- .../replication/TestMultiSlaveReplication.java | 10 +- .../replication/TestPerTableCFReplication.java | 158 ++- .../hbase/replication/TestReplicationBase.java | 4 +- .../replication/TestReplicationSmallTests.java | 5 +- .../replication/TestReplicationStateBasic.java | 14 +- .../replication/TestReplicationSyncUpTool.java | 4 +- .../TestReplicationTrackerZKImpl.java | 10 +- .../replication/TestReplicationWithTags.java | 6 +- .../replication/master/TestTableCFsUpdater.java | 210 ++++ ...sibilityLabelReplicationWithExpAsString.java | 9 +- .../TestVisibilityLabelsReplication.java | 5 +- .../hadoop/hbase/util/TestHBaseFsckOneRS.java | 5 +- .../src/main/ruby/hbase/replication_admin.rb | 44 +- .../src/main/ruby/shell/commands/add_peer.rb | 4 +- .../ruby/shell/commands/append_peer_tableCFs.rb | 2 +- .../ruby/shell/commands/remove_peer_tableCFs.rb | 4 +- .../ruby/shell/commands/set_peer_tableCFs.rb | 5 +- 33 files changed, 2309 insertions(+), 476 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/7f39baf0/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java index dcf1957..8ee3a22 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java @@ -32,7 +32,6 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; -import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -48,6 +47,7 @@ import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.RegionLocator; +import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationPeer; @@ -166,27 +166,6 @@ public class ReplicationAdmin implements Closeable { } /** - * Add a new peer cluster to replicate to. - * @param id a short name that identifies the cluster - * @param clusterKey the concatenation of the slave cluster's - * hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent - * @throws IllegalStateException if there's already one slave since - * multi-slave isn't supported yet. - * @deprecated Use addPeer(String, ReplicationPeerConfig, Map) instead. - */ - @Deprecated - public void addPeer(String id, String clusterKey) throws ReplicationException { - this.addPeer(id, new ReplicationPeerConfig().setClusterKey(clusterKey), null); - } - - @Deprecated - public void addPeer(String id, String clusterKey, String tableCFs) - throws ReplicationException { - this.replicationPeers.addPeer(id, - new ReplicationPeerConfig().setClusterKey(clusterKey), tableCFs); - } - - /** * Add a new remote slave cluster for replication. * @param id a short name that identifies the cluster * @param peerConfig configuration for the replication slave cluster @@ -194,83 +173,36 @@ public class ReplicationAdmin implements Closeable { * A map from tableName to column family names. An empty collection can be passed * to indicate replicating all column families. Pass null for replicating all table and column * families + * @deprecated as release of 2.0.0, and it will be removed in 3.0.0, + * use {@link #addPeer(String, ReplicationPeerConfig)} instead. */ + @Deprecated public void addPeer(String id, ReplicationPeerConfig peerConfig, Map> tableCfs) throws ReplicationException { - this.replicationPeers.addPeer(id, peerConfig, getTableCfsStr(tableCfs)); - } - - public static Map> parseTableCFsFromConfig(String tableCFsConfig) { - if (tableCFsConfig == null || tableCFsConfig.trim().length() == 0) { - return null; - } - - Map> tableCFsMap = null; - // TODO: This should be a PB object rather than a String to be parsed!! See HBASE-11393 - // parse out (table, cf-list) pairs from tableCFsConfig - // format: "table1:cf1,cf2;table2:cfA,cfB" - String[] tables = tableCFsConfig.split(";"); - for (String tab : tables) { - // 1 ignore empty table config - tab = tab.trim(); - if (tab.length() == 0) { - continue; - } - // 2 split to "table" and "cf1,cf2" - // for each table: "table:cf1,cf2" or "table" - String[] pair = tab.split(":"); - String tabName = pair[0].trim(); - if (pair.length > 2 || tabName.length() == 0) { - LOG.error("ignore invalid tableCFs setting: " + tab); - continue; - } - - // 3 parse "cf1,cf2" part to List - List cfs = null; - if (pair.length == 2) { - String[] cfsList = pair[1].split(","); - for (String cf : cfsList) { - String cfName = cf.trim(); - if (cfName.length() > 0) { - if (cfs == null) { - cfs = new ArrayList(); - } - cfs.add(cfName); - } - } - } - - // 4 put > to map - if (tableCFsMap == null) { - tableCFsMap = new HashMap>(); - } - tableCFsMap.put(TableName.valueOf(tabName), cfs); + if (tableCfs != null) { + peerConfig.setTableCFsMap(tableCfs); } - return tableCFsMap; + this.replicationPeers.addPeer(id, peerConfig); } - @VisibleForTesting - static String getTableCfsStr(Map> tableCfs) { - String tableCfsStr = null; - if (tableCfs != null) { - // Format: table1:cf1,cf2;table2:cfA,cfB;table3 - StringBuilder builder = new StringBuilder(); - for (Entry> entry : tableCfs.entrySet()) { - if (builder.length() > 0) { - builder.append(";"); - } - builder.append(entry.getKey()); - if (entry.getValue() != null && !entry.getValue().isEmpty()) { - builder.append(":"); - builder.append(StringUtils.join(entry.getValue(), ",")); - } - } - tableCfsStr = builder.toString(); - } - return tableCfsStr; + /** + * Add a new remote slave cluster for replication. + * @param id a short name that identifies the cluster + * @param peerConfig configuration for the replication slave cluster + */ + public void addPeer(String id, ReplicationPeerConfig peerConfig) throws ReplicationException { + this.replicationPeers.addPeer(id, peerConfig); } /** + * @deprecated as release of 2.0.0, and it will be removed in 3.0.0 + * */ + @Deprecated + public static Map> parseTableCFsFromConfig(String tableCFsConfig) { + return ReplicationSerDeHelper.parseTableCFsFromConfig(tableCFsConfig); + } + + /** * Removes a peer cluster and stops the replication to it. * @param id a short name that identifies the cluster */ @@ -302,22 +234,6 @@ public class ReplicationAdmin implements Closeable { return this.replicationPeers.getAllPeerIds().size(); } - /** - * Map of this cluster's peers for display. - * @return A map of peer ids to peer cluster keys - * @deprecated use {@link #listPeerConfigs()} - */ - @Deprecated - public Map listPeers() { - Map peers = this.listPeerConfigs(); - Map ret = new HashMap(peers.size()); - - for (Map.Entry entry : peers.entrySet()) { - ret.put(entry.getKey(), entry.getValue().getClusterKey()); - } - return ret; - } - public Map listPeerConfigs() { return this.replicationPeers.getAllPeerConfigs(); } @@ -329,19 +245,12 @@ public class ReplicationAdmin implements Closeable { /** * Get the replicable table-cf config of the specified peer. * @param id a short name that identifies the cluster - */ - public String getPeerTableCFs(String id) throws ReplicationException { - return this.replicationPeers.getPeerTableCFsConfig(id); - } - - /** - * Set the replicable table-cf config of the specified peer - * @param id a short name that identifies the cluster - * @deprecated use {@link #setPeerTableCFs(String, Map)} - */ + * @deprecated as release of 2.0.0, and it will be removed in 3.0.0, + * use {@link #getPeerConfig(String)} instead. + * */ @Deprecated - public void setPeerTableCFs(String id, String tableCFs) throws ReplicationException { - this.replicationPeers.setPeerTableCFsConfig(id, tableCFs); + public String getPeerTableCFs(String id) throws ReplicationException { + return ReplicationSerDeHelper.convertToString(this.replicationPeers.getPeerTableCFsConfig(id)); } /** @@ -349,9 +258,12 @@ public class ReplicationAdmin implements Closeable { * @param id a short that identifies the cluster * @param tableCfs table-cfs config str * @throws ReplicationException + * @deprecated as release of 2.0.0, and it will be removed in 3.0.0, + * use {@link #appendPeerTableCFs(String, Map)} instead. */ + @Deprecated public void appendPeerTableCFs(String id, String tableCfs) throws ReplicationException { - appendPeerTableCFs(id, parseTableCFsFromConfig(tableCfs)); + appendPeerTableCFs(id, ReplicationSerDeHelper.parseTableCFsFromConfig(tableCfs)); } /** @@ -365,12 +277,11 @@ public class ReplicationAdmin implements Closeable { if (tableCfs == null) { throw new ReplicationException("tableCfs is null"); } - Map> preTableCfs = parseTableCFsFromConfig(getPeerTableCFs(id)); + Map> preTableCfs = this.replicationPeers.getPeerTableCFsConfig(id); if (preTableCfs == null) { setPeerTableCFs(id, tableCfs); return; } - for (Map.Entry> entry : tableCfs.entrySet()) { TableName table = entry.getKey(); Collection appendCfs = entry.getValue(); @@ -382,6 +293,7 @@ public class ReplicationAdmin implements Closeable { Set cfSet = new HashSet(cfs); cfSet.addAll(appendCfs); preTableCfs.put(table, Lists.newArrayList(cfSet)); + } } else { if (appendCfs == null || appendCfs.isEmpty()) { @@ -399,9 +311,12 @@ public class ReplicationAdmin implements Closeable { * @param id a short name that identifies the cluster * @param tableCf table-cfs config str * @throws ReplicationException + * @deprecated as release of 2.0.0, and it will be removed in 3.0.0, + * use {@link #removePeerTableCFs(String, Map)} instead. */ + @Deprecated public void removePeerTableCFs(String id, String tableCf) throws ReplicationException { - removePeerTableCFs(id, parseTableCFsFromConfig(tableCf)); + removePeerTableCFs(id, ReplicationSerDeHelper.parseTableCFsFromConfig(tableCf)); } /** @@ -415,12 +330,12 @@ public class ReplicationAdmin implements Closeable { if (tableCfs == null) { throw new ReplicationException("tableCfs is null"); } - - Map> preTableCfs = parseTableCFsFromConfig(getPeerTableCFs(id)); + Map> preTableCfs = this.replicationPeers.getPeerTableCFsConfig(id); if (preTableCfs == null) { throw new ReplicationException("Table-Cfs for peer" + id + " is null"); } for (Map.Entry> entry: tableCfs.entrySet()) { + TableName table = entry.getKey(); Collection removeCfs = entry.getValue(); if (preTableCfs.containsKey(table)) { @@ -444,6 +359,7 @@ public class ReplicationAdmin implements Closeable { } } else { throw new ReplicationException("No table: " + table + " in table-cfs config of peer: " + id); + } } setPeerTableCFs(id, preTableCfs); @@ -459,7 +375,7 @@ public class ReplicationAdmin implements Closeable { */ public void setPeerTableCFs(String id, Map> tableCfs) throws ReplicationException { - this.replicationPeers.setPeerTableCFsConfig(id, getTableCfsStr(tableCfs)); + this.replicationPeers.setPeerTableCFsConfig(id, tableCfs); } /** @@ -645,8 +561,8 @@ public class ReplicationAdmin implements Closeable { try { Pair pair = this.replicationPeers.getPeerConf(peerId); Configuration peerConf = pair.getSecond(); - ReplicationPeer peer = new ReplicationPeerZKImpl(peerConf, peerId, pair.getFirst(), - parseTableCFsFromConfig(this.getPeerTableCFs(peerId))); + ReplicationPeer peer = new ReplicationPeerZKImpl(zkw, pair.getSecond(), + peerId, pair.getFirst(), this.connection); listOfPeers.add(peer); } catch (ReplicationException e) { LOG.warn("Failed to get valid replication peers. " http://git-wip-us.apache.org/repos/asf/hbase/blob/7f39baf0/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationSerDeHelper.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationSerDeHelper.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationSerDeHelper.java new file mode 100644 index 0000000..9682f89 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationSerDeHelper.java @@ -0,0 +1,315 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.replication; + +import com.google.protobuf.ByteString; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.exceptions.DeserializationException; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; +import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Strings; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.HashMap; +import java.util.ArrayList; + +/** + * Helper for TableCFs Operations. + */ +@InterfaceAudience.Private +@InterfaceStability.Stable +public final class ReplicationSerDeHelper { + + private static final Log LOG = LogFactory.getLog(ReplicationSerDeHelper.class); + + private ReplicationSerDeHelper() {} + + /** convert map to TableCFs Object */ + public static ZooKeeperProtos.TableCF[] convert( + Map> tableCfs) { + if (tableCfs == null) { + return null; + } + List tableCFList = new ArrayList<>(); + ZooKeeperProtos.TableCF.Builder tableCFBuilder = ZooKeeperProtos.TableCF.newBuilder(); + for (Map.Entry> entry : tableCfs.entrySet()) { + tableCFBuilder.clear(); + tableCFBuilder.setTableName(ProtobufUtil.toProtoTableName(entry.getKey())); + Collection v = entry.getValue(); + if (v != null && !v.isEmpty()) { + for (String value : entry.getValue()) { + tableCFBuilder.addFamilies(ByteString.copyFromUtf8(value)); + } + } + tableCFList.add(tableCFBuilder.build()); + } + return tableCFList.toArray(new ZooKeeperProtos.TableCF[tableCFList.size()]); + } + + public static String convertToString(Map> tableCfs) { + if (tableCfs == null) { + return null; + } + return convert(convert(tableCfs)); + } + + /** + * Convert string to TableCFs Object. + * This is only for read TableCFs information from TableCF node. + * Input String Format: ns1.table1:cf1,cf2;ns2.table2:cfA,cfB;ns3.table3. + * */ + public static ZooKeeperProtos.TableCF[] convert(String tableCFsConfig) { + if (tableCFsConfig == null || tableCFsConfig.trim().length() == 0) { + return null; + } + List tableCFList = new ArrayList<>(); + ZooKeeperProtos.TableCF.Builder tableCFBuilder = ZooKeeperProtos.TableCF.newBuilder(); + + String[] tables = tableCFsConfig.split(";"); + for (String tab : tables) { + // 1 ignore empty table config + tab = tab.trim(); + if (tab.length() == 0) { + continue; + } + // 2 split to "table" and "cf1,cf2" + // for each table: "table#cf1,cf2" or "table" + String[] pair = tab.split(":"); + String tabName = pair[0].trim(); + if (pair.length > 2 || tabName.length() == 0) { + LOG.info("incorrect format:" + tableCFsConfig); + continue; + } + + tableCFBuilder.clear(); + // split namespace from tableName + String ns = "default"; + String tName = tabName; + String[] dbs = tabName.split("\\."); + if (dbs != null && dbs.length == 2) { + ns = dbs[0]; + tName = dbs[1]; + } + tableCFBuilder.setTableName( + ProtobufUtil.toProtoTableName(TableName.valueOf(ns, tName))); + + // 3 parse "cf1,cf2" part to List + if (pair.length == 2) { + String[] cfsList = pair[1].split(","); + for (String cf : cfsList) { + String cfName = cf.trim(); + if (cfName.length() > 0) { + tableCFBuilder.addFamilies(ByteString.copyFromUtf8(cfName)); + } + } + } + tableCFList.add(tableCFBuilder.build()); + } + return tableCFList.toArray(new ZooKeeperProtos.TableCF[tableCFList.size()]); + } + + /** + * Convert TableCFs Object to String. + * Output String Format: ns1.table1:cf1,cf2;ns2.table2:cfA,cfB;table3 + * */ + public static String convert(ZooKeeperProtos.TableCF[] tableCFs) { + StringBuilder sb = new StringBuilder(); + for (int i = 0, n = tableCFs.length; i < n; i++) { + ZooKeeperProtos.TableCF tableCF = tableCFs[i]; + String namespace = tableCF.getTableName().getNamespace().toStringUtf8(); + if (!Strings.isEmpty(namespace)) { + sb.append(namespace).append("."). + append(tableCF.getTableName().getQualifier().toStringUtf8()) + .append(":"); + } else { + sb.append(tableCF.getTableName().toString()).append(":"); + } + for (int j = 0; j < tableCF.getFamiliesCount(); j++) { + sb.append(tableCF.getFamilies(j).toStringUtf8()).append(","); + } + sb.deleteCharAt(sb.length() - 1).append(";"); + } + if (sb.length() > 0) { + sb.deleteCharAt(sb.length() - 1); + } + return sb.toString(); + } + + /** + * Get TableCF in TableCFs, if not exist, return null. + * */ + public static ZooKeeperProtos.TableCF getTableCF(ZooKeeperProtos.TableCF[] tableCFs, + String table) { + for (int i = 0, n = tableCFs.length; i < n; i++) { + ZooKeeperProtos.TableCF tableCF = tableCFs[i]; + if (tableCF.getTableName().getQualifier().toStringUtf8().equals(table)) { + return tableCF; + } + } + return null; + } + + /** + * Parse bytes into TableCFs. + * It is used for backward compatibility. + * Old format bytes have no PB_MAGIC Header + * */ + public static ZooKeeperProtos.TableCF[] parseTableCFs(byte[] bytes) throws IOException { + if (bytes == null) { + return null; + } + return ReplicationSerDeHelper.convert(Bytes.toString(bytes)); + } + + /** + * Convert tableCFs string into Map. + * */ + public static Map> parseTableCFsFromConfig(String tableCFsConfig) { + ZooKeeperProtos.TableCF[] tableCFs = convert(tableCFsConfig); + return convert2Map(tableCFs); + } + + /** + * Convert tableCFs Object to Map. + * */ + public static Map> convert2Map(ZooKeeperProtos.TableCF[] tableCFs) { + if (tableCFs == null || tableCFs.length == 0) { + return null; + } + Map> tableCFsMap = new HashMap>(); + for (int i = 0, n = tableCFs.length; i < n; i++) { + ZooKeeperProtos.TableCF tableCF = tableCFs[i]; + List families = new ArrayList<>(); + for (int j = 0, m = tableCF.getFamiliesCount(); j < m; j++) { + families.add(tableCF.getFamilies(j).toStringUtf8()); + } + if (families.size() > 0) { + tableCFsMap.put(ProtobufUtil.toTableName(tableCF.getTableName()), families); + } else { + tableCFsMap.put(ProtobufUtil.toTableName(tableCF.getTableName()), null); + } + } + + return tableCFsMap; + } + + /** + * @param bytes Content of a peer znode. + * @return ClusterKey parsed from the passed bytes. + * @throws DeserializationException + */ + public static ReplicationPeerConfig parsePeerFrom(final byte[] bytes) + throws DeserializationException { + if (ProtobufUtil.isPBMagicPrefix(bytes)) { + int pblen = ProtobufUtil.lengthOfPBMagic(); + ZooKeeperProtos.ReplicationPeer.Builder builder = + ZooKeeperProtos.ReplicationPeer.newBuilder(); + ZooKeeperProtos.ReplicationPeer peer; + try { + ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen); + peer = builder.build(); + } catch (IOException e) { + throw new DeserializationException(e); + } + return convert(peer); + } else { + if (bytes.length > 0) { + return new ReplicationPeerConfig().setClusterKey(Bytes.toString(bytes)); + } + return new ReplicationPeerConfig().setClusterKey(""); + } + } + + public static ReplicationPeerConfig convert(ZooKeeperProtos.ReplicationPeer peer) { + ReplicationPeerConfig peerConfig = new ReplicationPeerConfig(); + if (peer.hasClusterkey()) { + peerConfig.setClusterKey(peer.getClusterkey()); + } + if (peer.hasReplicationEndpointImpl()) { + peerConfig.setReplicationEndpointImpl(peer.getReplicationEndpointImpl()); + } + + for (HBaseProtos.BytesBytesPair pair : peer.getDataList()) { + peerConfig.getPeerData().put(pair.getFirst().toByteArray(), pair.getSecond().toByteArray()); + } + + for (HBaseProtos.NameStringPair pair : peer.getConfigurationList()) { + peerConfig.getConfiguration().put(pair.getName(), pair.getValue()); + } + Map> tableCFsMap = convert2Map( + peer.getTableCfsList().toArray(new ZooKeeperProtos.TableCF[peer.getTableCfsCount()])); + if (tableCFsMap != null) { + peerConfig.setTableCFsMap(tableCFsMap); + } + return peerConfig; + } + + public static ZooKeeperProtos.ReplicationPeer convert(ReplicationPeerConfig peerConfig) { + ZooKeeperProtos.ReplicationPeer.Builder builder = ZooKeeperProtos.ReplicationPeer.newBuilder(); + if (peerConfig.getClusterKey() != null) { + builder.setClusterkey(peerConfig.getClusterKey()); + } + if (peerConfig.getReplicationEndpointImpl() != null) { + builder.setReplicationEndpointImpl(peerConfig.getReplicationEndpointImpl()); + } + + for (Map.Entry entry : peerConfig.getPeerData().entrySet()) { + builder.addData(HBaseProtos.BytesBytesPair.newBuilder() + .setFirst(ByteString.copyFrom(entry.getKey())) + .setSecond(ByteString.copyFrom(entry.getValue())) + .build()); + } + + for (Map.Entry entry : peerConfig.getConfiguration().entrySet()) { + builder.addConfiguration(HBaseProtos.NameStringPair.newBuilder() + .setName(entry.getKey()) + .setValue(entry.getValue()) + .build()); + } + ZooKeeperProtos.TableCF[] tableCFs = convert(peerConfig.getTableCFsMap()); + if (tableCFs != null) { + for (int i = 0; i < tableCFs.length; i++) { + builder.addTableCfs(tableCFs[i]); + } + } + return builder.build(); + } + + /** + * @param peerConfig + * @return Serialized protobuf of peerConfig with pb magic prefix prepended suitable + * for use as content of a this.peersZNode; i.e. the content of PEER_ID znode under + * /hbase/replication/peers/PEER_ID + */ + public static byte[] toByteArray(final ReplicationPeerConfig peerConfig) { + byte[] bytes = convert(peerConfig).toByteArray(); + return ProtobufUtil.prependPBMagic(bytes); + } + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/7f39baf0/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java index b8b5b22..920eea6 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; + /** * ReplicationPeer manages enabled / disabled state for the peer. */ http://git-wip-us.apache.org/repos/asf/hbase/blob/7f39baf0/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java index 043b38f..8d05fa0 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java @@ -18,12 +18,16 @@ package org.apache.hadoop.hbase.replication; +import java.util.Collection; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.TreeMap; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos; import org.apache.hadoop.hbase.util.Bytes; /** @@ -37,6 +41,7 @@ public class ReplicationPeerConfig { private String replicationEndpointImpl; private final Map peerData; private final Map configuration; + private Map> tableCFsMap = null; public ReplicationPeerConfig() { @@ -78,10 +83,19 @@ public class ReplicationPeerConfig { return configuration; } + public Map> getTableCFsMap() { + return (Map>) tableCFsMap; + } + + public void setTableCFsMap(Map> tableCFsMap) { + this.tableCFsMap = tableCFsMap; + } + @Override public String toString() { StringBuilder builder = new StringBuilder("clusterKey=").append(clusterKey).append(","); - builder.append("replicationEndpointImpl=").append(replicationEndpointImpl); + builder.append("replicationEndpointImpl=").append(replicationEndpointImpl).append(",") + .append("tableCFs=").append(tableCFsMap.toString()); return builder.toString(); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/7f39baf0/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java index 39f6ebc..f7a2411 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java @@ -30,11 +30,10 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; +import org.apache.hadoop.hbase.client.replication.ReplicationSerDeHelper; import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos; -import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; @@ -42,17 +41,18 @@ import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.NodeExistsException; @InterfaceAudience.Private -public class ReplicationPeerZKImpl implements ReplicationPeer, Abortable, Closeable { +public class ReplicationPeerZKImpl extends ReplicationStateZKBase + implements ReplicationPeer, Abortable, Closeable { private static final Log LOG = LogFactory.getLog(ReplicationPeerZKImpl.class); - private final ReplicationPeerConfig peerConfig; + private ReplicationPeerConfig peerConfig; private final String id; private volatile PeerState peerState; private volatile Map> tableCFs = new HashMap>(); private final Configuration conf; - private PeerStateTracker peerStateTracker; - private TableCFsTracker tableCFsTracker; + private PeerConfigTracker peerConfigTracker; + /** * Constructor that takes all the objects required to communicate with the specified peer, except @@ -61,39 +61,25 @@ public class ReplicationPeerZKImpl implements ReplicationPeer, Abortable, Closea * @param id string representation of this peer's identifier * @param peerConfig configuration for the replication peer */ - public ReplicationPeerZKImpl(Configuration conf, String id, ReplicationPeerConfig peerConfig) + public ReplicationPeerZKImpl(ZooKeeperWatcher zkWatcher, Configuration conf, + String id, ReplicationPeerConfig peerConfig, + Abortable abortable) throws ReplicationException { + super(zkWatcher, conf, abortable); this.conf = conf; this.peerConfig = peerConfig; this.id = id; } - - /** - * Constructor that takes all the objects required to communicate with the specified peer, except - * for the region server addresses. - * @param conf configuration object to this peer - * @param id string representation of this peer's identifier - * @param peerConfig configuration for the replication peer - * @param tableCFs table-cf configuration for this peer - */ - public ReplicationPeerZKImpl(Configuration conf, String id, ReplicationPeerConfig peerConfig, - Map> tableCFs) throws ReplicationException { - this.conf = conf; - this.peerConfig = peerConfig; - this.id = id; - this.tableCFs = tableCFs; - } /** * start a state tracker to check whether this peer is enabled or not * - * @param zookeeper zk watcher for the local cluster * @param peerStateNode path to zk node which stores peer state * @throws KeeperException */ - public void startStateTracker(ZooKeeperWatcher zookeeper, String peerStateNode) + public void startStateTracker(String peerStateNode) throws KeeperException { - ensurePeerEnabled(zookeeper, peerStateNode); + ensurePeerEnabled(peerStateNode); this.peerStateTracker = new PeerStateTracker(peerStateNode, zookeeper, this); this.peerStateTracker.start(); try { @@ -112,22 +98,26 @@ public class ReplicationPeerZKImpl implements ReplicationPeer, Abortable, Closea /** * start a table-cfs tracker to listen the (table, cf-list) map change - * - * @param zookeeper zk watcher for the local cluster - * @param tableCFsNode path to zk node which stores table-cfs + * @param peerConfigNode path to zk node which stores table-cfs * @throws KeeperException */ - public void startTableCFsTracker(ZooKeeperWatcher zookeeper, String tableCFsNode) + public void startPeerConfigTracker(String peerConfigNode) throws KeeperException { - this.tableCFsTracker = new TableCFsTracker(tableCFsNode, zookeeper, + this.peerConfigTracker = new PeerConfigTracker(peerConfigNode, zookeeper, this); - this.tableCFsTracker.start(); - this.readTableCFsZnode(); + this.peerConfigTracker.start(); + this.readPeerConfig(); } - private void readTableCFsZnode() { - String currentTableCFs = Bytes.toString(tableCFsTracker.getData(false)); - this.tableCFs = ReplicationAdmin.parseTableCFsFromConfig(currentTableCFs); + private void readPeerConfig() { + try { + byte[] data = peerConfigTracker.getData(false); + if (data != null) { + this.peerConfig = ReplicationSerDeHelper.parsePeerFrom(data); + } + } catch (DeserializationException e) { + LOG.error("", e); + } } @Override @@ -168,6 +158,7 @@ public class ReplicationPeerZKImpl implements ReplicationPeer, Abortable, Closea */ @Override public Map> getTableCFs() { + this.tableCFs = peerConfig.getTableCFsMap(); return this.tableCFs; } @@ -223,13 +214,12 @@ public class ReplicationPeerZKImpl implements ReplicationPeer, Abortable, Closea /** * Utility method to ensure an ENABLED znode is in place; if not present, we create it. - * @param zookeeper * @param path Path to znode to check * @return True if we created the znode. * @throws NodeExistsException * @throws KeeperException */ - private static boolean ensurePeerEnabled(final ZooKeeperWatcher zookeeper, final String path) + private boolean ensurePeerEnabled(final String path) throws NodeExistsException, KeeperException { if (ZKUtil.checkExists(zookeeper, path) == -1) { // There is a race b/w PeerWatcher and ReplicationZookeeper#add method to create the @@ -266,20 +256,20 @@ public class ReplicationPeerZKImpl implements ReplicationPeer, Abortable, Closea } /** - * Tracker for (table, cf-list) map of this peer + * Tracker for PeerConfigNode of this peer */ - public class TableCFsTracker extends ZooKeeperNodeTracker { + public class PeerConfigTracker extends ZooKeeperNodeTracker { - public TableCFsTracker(String tableCFsZNode, ZooKeeperWatcher watcher, + public PeerConfigTracker(String peerConfigNode, ZooKeeperWatcher watcher, Abortable abortable) { - super(watcher, tableCFsZNode, abortable); + super(watcher, peerConfigNode, abortable); } @Override public synchronized void nodeCreated(String path) { if (path.equals(node)) { super.nodeCreated(path); - readTableCFsZnode(); + readPeerConfig(); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/7f39baf0/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java index 8bf21d5..1961a65 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java @@ -18,6 +18,7 @@ */ package org.apache.hadoop.hbase.replication; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Set; @@ -49,10 +50,8 @@ public interface ReplicationPeers { * Add a new remote slave cluster for replication. * @param peerId a short that identifies the cluster * @param peerConfig configuration for the replication slave cluster - * @param tableCFs the table and column-family list which will be replicated for this peer or null - * for all table and column families */ - void addPeer(String peerId, ReplicationPeerConfig peerConfig, String tableCFs) + void addPeer(String peerId, ReplicationPeerConfig peerConfig) throws ReplicationException; /** @@ -81,21 +80,17 @@ public interface ReplicationPeers { * Get the table and column-family list string of the peer from ZK. * @param peerId a short that identifies the cluster */ - public String getPeerTableCFsConfig(String peerId) throws ReplicationException; + public Map> getPeerTableCFsConfig(String peerId) + throws ReplicationException; /** * Set the table and column-family list string of the peer to ZK. * @param peerId a short that identifies the cluster * @param tableCFs the table and column-family list which will be replicated for this peer */ - public void setPeerTableCFsConfig(String peerId, String tableCFs) throws ReplicationException; - - /** - * Get the table and column-family-list map of the peer. - * @param peerId a short that identifies the cluster - * @return the table and column-family list which will be replicated for this peer - */ - public Map> getTableCFs(String peerId); + public void setPeerTableCFsConfig(String peerId, + Map> tableCFs) + throws ReplicationException; /** * Returns the ReplicationPeer http://git-wip-us.apache.org/repos/asf/hbase/blob/7f39baf0/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java index e14f2c6..367c688 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.replication; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Set; @@ -29,19 +30,16 @@ import java.util.concurrent.ConcurrentMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.CompoundConfiguration; +import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.replication.ReplicationSerDeHelper; import org.apache.hadoop.hbase.exceptions.DeserializationException; -import org.apache.hadoop.hbase.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair; -import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos; import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState; -import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp; @@ -49,8 +47,6 @@ import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.NoNodeException; -import com.google.protobuf.ByteString; - /** * This class provides an implementation of the ReplicationPeers interface using Zookeeper. The * peers znode contains a list of all peer replication clusters and the current replication state of @@ -82,15 +78,15 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re // Map of peer clusters keyed by their id private Map peerClusters; - private final String tableCFsNodeName; private final ReplicationQueuesClient queuesClient; + private Abortable abortable; private static final Log LOG = LogFactory.getLog(ReplicationPeersZKImpl.class); public ReplicationPeersZKImpl(final ZooKeeperWatcher zk, final Configuration conf, final ReplicationQueuesClient queuesClient, Abortable abortable) { super(zk, conf, abortable); - this.tableCFsNodeName = conf.get("zookeeper.znode.replication.peers.tableCFs", "tableCFs"); + this.abortable = abortable; this.peerClusters = new ConcurrentHashMap(); this.queuesClient = queuesClient; } @@ -108,7 +104,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re } @Override - public void addPeer(String id, ReplicationPeerConfig peerConfig, String tableCFs) + public void addPeer(String id, ReplicationPeerConfig peerConfig) throws ReplicationException { try { if (peerExists(id)) { @@ -136,18 +132,15 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re } List listOfOps = new ArrayList(); - ZKUtilOp op1 = ZKUtilOp.createAndFailSilent(ZKUtil.joinZNode(this.peersZNode, id), - toByteArray(peerConfig)); + ZKUtilOp op1 = ZKUtilOp.createAndFailSilent(getPeerNode(id), + ReplicationSerDeHelper.toByteArray(peerConfig)); // There is a race (if hbase.zookeeper.useMulti is false) // b/w PeerWatcher and ReplicationZookeeper#add method to create the // peer-state znode. This happens while adding a peer // The peer state data is set as "ENABLED" by default. ZKUtilOp op2 = ZKUtilOp.createAndFailSilent(getPeerStateNode(id), ENABLED_ZNODE_BYTES); - String tableCFsStr = (tableCFs == null) ? "" : tableCFs; - ZKUtilOp op3 = ZKUtilOp.createAndFailSilent(getTableCFsNode(id), Bytes.toBytes(tableCFsStr)); listOfOps.add(op1); listOfOps.add(op2); - listOfOps.add(op3); ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false); // A peer is enabled by default } catch (KeeperException e) { @@ -192,13 +185,17 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re } @Override - public String getPeerTableCFsConfig(String id) throws ReplicationException { + public Map> getPeerTableCFsConfig(String id) throws ReplicationException { try { if (!peerExists(id)) { throw new IllegalArgumentException("peer " + id + " doesn't exist"); } try { - return Bytes.toString(ZKUtil.getData(this.zookeeper, getTableCFsNode(id))); + ReplicationPeerConfig rpc = getReplicationPeerConfig(id); + if (rpc == null) { + throw new ReplicationException("Unable to get tableCFs of the peer with id=" + id); + } + return rpc.getTableCFsMap(); } catch (Exception e) { throw new ReplicationException(e); } @@ -208,35 +205,29 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re } @Override - public void setPeerTableCFsConfig(String id, String tableCFsStr) throws ReplicationException { + public void setPeerTableCFsConfig(String id, + Map> tableCFs) + throws ReplicationException { try { if (!peerExists(id)) { throw new IllegalArgumentException("Cannot set peer tableCFs because id=" + id + " does not exist."); } - String tableCFsZKNode = getTableCFsNode(id); - byte[] tableCFs = Bytes.toBytes(tableCFsStr); - if (ZKUtil.checkExists(this.zookeeper, tableCFsZKNode) != -1) { - ZKUtil.setData(this.zookeeper, tableCFsZKNode, tableCFs); - } else { - ZKUtil.createAndWatch(this.zookeeper, tableCFsZKNode, tableCFs); + ReplicationPeerConfig rpc = getReplicationPeerConfig(id); + if (rpc == null) { + throw new ReplicationException("Unable to get tableCFs of the peer with id=" + id); } - LOG.info("Peer tableCFs with id= " + id + " is now " + tableCFsStr); + rpc.setTableCFsMap(tableCFs); + ZKUtil.setData(this.zookeeper, getPeerNode(id), + ReplicationSerDeHelper.toByteArray(rpc)); + LOG.info("Peer tableCFs with id= " + id + " is now " + + ReplicationSerDeHelper.convertToString(tableCFs)); } catch (KeeperException e) { throw new ReplicationException("Unable to change tableCFs of the peer with id=" + id, e); } } @Override - public Map> getTableCFs(String id) throws IllegalArgumentException { - ReplicationPeer replicationPeer = this.peerClusters.get(id); - if (replicationPeer == null) { - throw new IllegalArgumentException("Peer with id= " + id + " is not connected"); - } - return replicationPeer.getTableCFs(); - } - - @Override public boolean getStatusOfPeer(String id) { ReplicationPeer replicationPeer = this.peerClusters.get(id); if (replicationPeer == null) { @@ -306,7 +297,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re @Override public ReplicationPeerConfig getReplicationPeerConfig(String peerId) throws ReplicationException { - String znode = ZKUtil.joinZNode(this.peersZNode, peerId); + String znode = getPeerNode(peerId); byte[] data = null; try { data = ZKUtil.getData(this.zookeeper, znode); @@ -325,7 +316,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re } try { - return parsePeerFrom(data); + return ReplicationSerDeHelper.parsePeerFrom(data); } catch (DeserializationException e) { LOG.warn("Failed to parse cluster key from peerId=" + peerId + ", specifically the content from the following znode: " + znode); @@ -438,14 +429,6 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re return true; } - private String getTableCFsNode(String id) { - return ZKUtil.joinZNode(this.peersZNode, ZKUtil.joinZNode(id, this.tableCFsNodeName)); - } - - private String getPeerStateNode(String id) { - return ZKUtil.joinZNode(this.peersZNode, ZKUtil.joinZNode(id, this.peerStateNodeName)); - } - /** * Update the state znode of a peer cluster. * @param id @@ -486,16 +469,17 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re } Configuration peerConf = pair.getSecond(); - ReplicationPeerZKImpl peer = new ReplicationPeerZKImpl(peerConf, peerId, pair.getFirst()); + ReplicationPeerZKImpl peer = new ReplicationPeerZKImpl(zookeeper, + peerConf, peerId, pair.getFirst(), abortable); try { - peer.startStateTracker(this.zookeeper, this.getPeerStateNode(peerId)); + peer.startStateTracker(this.getPeerStateNode(peerId)); } catch (KeeperException e) { throw new ReplicationException("Error starting the peer state tracker for peerId=" + peerId, e); } try { - peer.startTableCFsTracker(this.zookeeper, this.getTableCFsNode(peerId)); + peer.startPeerConfigTracker(this.getPeerNode(peerId)); } catch (KeeperException e) { throw new ReplicationException("Error starting the peer tableCFs tracker for peerId=" + peerId, e); @@ -504,89 +488,6 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re return peer; } - /** - * @param bytes Content of a peer znode. - * @return ClusterKey parsed from the passed bytes. - * @throws DeserializationException - */ - private static ReplicationPeerConfig parsePeerFrom(final byte[] bytes) - throws DeserializationException { - if (ProtobufUtil.isPBMagicPrefix(bytes)) { - int pblen = ProtobufUtil.lengthOfPBMagic(); - ZooKeeperProtos.ReplicationPeer.Builder builder = - ZooKeeperProtos.ReplicationPeer.newBuilder(); - ZooKeeperProtos.ReplicationPeer peer; - try { - ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen); - peer = builder.build(); - } catch (IOException e) { - throw new DeserializationException(e); - } - return convert(peer); - } else { - if (bytes.length > 0) { - return new ReplicationPeerConfig().setClusterKey(Bytes.toString(bytes)); - } - return new ReplicationPeerConfig().setClusterKey(""); - } - } - - private static ReplicationPeerConfig convert(ZooKeeperProtos.ReplicationPeer peer) { - ReplicationPeerConfig peerConfig = new ReplicationPeerConfig(); - if (peer.hasClusterkey()) { - peerConfig.setClusterKey(peer.getClusterkey()); - } - if (peer.hasReplicationEndpointImpl()) { - peerConfig.setReplicationEndpointImpl(peer.getReplicationEndpointImpl()); - } - - for (BytesBytesPair pair : peer.getDataList()) { - peerConfig.getPeerData().put(pair.getFirst().toByteArray(), pair.getSecond().toByteArray()); - } - - for (NameStringPair pair : peer.getConfigurationList()) { - peerConfig.getConfiguration().put(pair.getName(), pair.getValue()); - } - return peerConfig; - } - - private static ZooKeeperProtos.ReplicationPeer convert(ReplicationPeerConfig peerConfig) { - ZooKeeperProtos.ReplicationPeer.Builder builder = ZooKeeperProtos.ReplicationPeer.newBuilder(); - if (peerConfig.getClusterKey() != null) { - builder.setClusterkey(peerConfig.getClusterKey()); - } - if (peerConfig.getReplicationEndpointImpl() != null) { - builder.setReplicationEndpointImpl(peerConfig.getReplicationEndpointImpl()); - } - - for (Map.Entry entry : peerConfig.getPeerData().entrySet()) { - builder.addData(BytesBytesPair.newBuilder() - .setFirst(ByteString.copyFrom(entry.getKey())) - .setSecond(ByteString.copyFrom(entry.getValue())) - .build()); - } - - for (Map.Entry entry : peerConfig.getConfiguration().entrySet()) { - builder.addConfiguration(NameStringPair.newBuilder() - .setName(entry.getKey()) - .setValue(entry.getValue()) - .build()); - } - - return builder.build(); - } - - /** - * @param peerConfig - * @return Serialized protobuf of peerConfig with pb magic prefix prepended suitable - * for use as content of a this.peersZNode; i.e. the content of PEER_ID znode under - * /hbase/replication/peers/PEER_ID - */ - private static byte[] toByteArray(final ReplicationPeerConfig peerConfig) { - byte[] bytes = convert(peerConfig).toByteArray(); - return ProtobufUtil.prependPBMagic(bytes); - } - private void checkQueuesDeleted(String peerId) throws ReplicationException { if (queuesClient == null) return; try { http://git-wip-us.apache.org/repos/asf/hbase/blob/7f39baf0/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java index a1dc1c8..79853a8 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.replication; import java.util.List; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -52,6 +53,9 @@ public abstract class ReplicationStateZKBase { protected final String hfileRefsZNode; /** The cluster key of the local cluster */ protected final String ourClusterKey; + /** The name of the znode that contains tableCFs */ + protected final String tableCFsNodeName; + protected final ZooKeeperWatcher zookeeper; protected final Configuration conf; protected final Abortable abortable; @@ -77,6 +81,7 @@ public abstract class ReplicationStateZKBase { String hfileRefsZNodeName = conf.get(ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY, ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT); this.peerStateNodeName = conf.get("zookeeper.znode.replication.peers.state", "peer-state"); + this.tableCFsNodeName = conf.get("zookeeper.znode.replication.peers.tableCFs", "tableCFs"); this.ourClusterKey = ZKConfig.getZooKeeperClusterKey(this.conf); this.replicationZNode = ZKUtil.joinZNode(this.zookeeper.baseZNode, replicationZNodeName); this.peersZNode = ZKUtil.joinZNode(replicationZNode, peersZNodeName); @@ -118,4 +123,18 @@ public abstract class ReplicationStateZKBase { protected boolean isPeerPath(String path) { return path.split("/").length == peersZNode.split("/").length + 1; } + + @VisibleForTesting + protected String getTableCFsNode(String id) { + return ZKUtil.joinZNode(this.peersZNode, ZKUtil.joinZNode(id, this.tableCFsNodeName)); + } + + @VisibleForTesting + protected String getPeerStateNode(String id) { + return ZKUtil.joinZNode(this.peersZNode, ZKUtil.joinZNode(id, this.peerStateNodeName)); + } + @VisibleForTesting + protected String getPeerNode(String id) { + return ZKUtil.joinZNode(this.peersZNode, id); + } }