Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 9E845200C81 for ; Thu, 20 Apr 2017 17:01:34 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 9CF6B160BBA; Thu, 20 Apr 2017 15:01:34 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 9DFD1160BB1 for ; Thu, 20 Apr 2017 17:01:30 +0200 (CEST) Received: (qmail 69002 invoked by uid 500); 20 Apr 2017 15:01:29 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 68425 invoked by uid 99); 20 Apr 2017 15:01:27 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 20 Apr 2017 15:01:27 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 3F6DFF4A21; Thu, 20 Apr 2017 15:01:27 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: git-site-role@apache.org To: commits@hbase.apache.org Date: Thu, 20 Apr 2017 15:01:31 -0000 Message-Id: In-Reply-To: <4a35df37c6d74f6186c8aef0e7b24bdc@git.apache.org> References: <4a35df37c6d74f6186c8aef0e7b24bdc@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [05/51] [partial] hbase-site git commit: Published site at 82d554e3783372cc6b05489452c815b57c06f6cd. archived-at: Thu, 20 Apr 2017 15:01:34 -0000 http://git-wip-us.apache.org/repos/asf/hbase-site/blob/662ea7dc/devapidocs/src-html/org/apache/hadoop/hbase/client/HBaseAdmin.MergeTableRegionsFuture.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/client/HBaseAdmin.MergeTableRegionsFuture.html b/devapidocs/src-html/org/apache/hadoop/hbase/client/HBaseAdmin.MergeTableRegionsFuture.html index b798d4b..8c56a67 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/client/HBaseAdmin.MergeTableRegionsFuture.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/client/HBaseAdmin.MergeTableRegionsFuture.html @@ -3877,425 +3877,371 @@ 3869 throw new ReplicationException("tableCfs is null"); 3870 } 3871 ReplicationPeerConfig peerConfig = getReplicationPeerConfig(id); -3872 Map<TableName, List<String>> preTableCfs = peerConfig.getTableCFsMap(); -3873 if (preTableCfs == null) { -3874 peerConfig.setTableCFsMap(tableCfs); -3875 } else { -3876 for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) { -3877 TableName table = entry.getKey(); -3878 Collection<String> appendCfs = entry.getValue(); -3879 if (preTableCfs.containsKey(table)) { -3880 List<String> cfs = preTableCfs.get(table); -3881 if (cfs == null || appendCfs == null || appendCfs.isEmpty()) { -3882 preTableCfs.put(table, null); -3883 } else { -3884 Set<String> cfSet = new HashSet<String>(cfs); -3885 cfSet.addAll(appendCfs); -3886 preTableCfs.put(table, Lists.newArrayList(cfSet)); -3887 } -3888 } else { -3889 if (appendCfs == null || appendCfs.isEmpty()) { -3890 preTableCfs.put(table, null); -3891 } else { -3892 preTableCfs.put(table, Lists.newArrayList(appendCfs)); -3893 } -3894 } -3895 } -3896 } -3897 updateReplicationPeerConfig(id, peerConfig); -3898 } -3899 -3900 @Override -3901 public void removeReplicationPeerTableCFs(String id, -3902 Map<TableName, ? extends Collection<String>> tableCfs) throws ReplicationException, -3903 IOException { -3904 if (tableCfs == null) { -3905 throw new ReplicationException("tableCfs is null"); -3906 } -3907 ReplicationPeerConfig peerConfig = getReplicationPeerConfig(id); -3908 Map<TableName, List<String>> preTableCfs = peerConfig.getTableCFsMap(); -3909 if (preTableCfs == null) { -3910 throw new ReplicationException("Table-Cfs for peer" + id + " is null"); -3911 } -3912 for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) { -3913 -3914 TableName table = entry.getKey(); -3915 Collection<String> removeCfs = entry.getValue(); -3916 if (preTableCfs.containsKey(table)) { -3917 List<String> cfs = preTableCfs.get(table); -3918 if (cfs == null && (removeCfs == null || removeCfs.isEmpty())) { -3919 preTableCfs.remove(table); -3920 } else if (cfs != null && (removeCfs != null && !removeCfs.isEmpty())) { -3921 Set<String> cfSet = new HashSet<String>(cfs); -3922 cfSet.removeAll(removeCfs); -3923 if (cfSet.isEmpty()) { -3924 preTableCfs.remove(table); -3925 } else { -3926 preTableCfs.put(table, Lists.newArrayList(cfSet)); -3927 } -3928 } else if (cfs == null && (removeCfs != null && !removeCfs.isEmpty())) { -3929 throw new ReplicationException("Cannot remove cf of table: " + table -3930 + " which doesn't specify cfs from table-cfs config in peer: " + id); -3931 } else if (cfs != null && (removeCfs == null || removeCfs.isEmpty())) { -3932 throw new ReplicationException("Cannot remove table: " + table -3933 + " which has specified cfs from table-cfs config in peer: " + id); -3934 } -3935 } else { -3936 throw new ReplicationException("No table: " + table + " in table-cfs config of peer: " + id); -3937 } -3938 } -3939 updateReplicationPeerConfig(id, peerConfig); -3940 } -3941 -3942 @Override -3943 public List<ReplicationPeerDescription> listReplicationPeers() throws IOException { -3944 return listReplicationPeers((Pattern)null); -3945 } -3946 -3947 @Override -3948 public List<ReplicationPeerDescription> listReplicationPeers(String regex) throws IOException { -3949 return listReplicationPeers(Pattern.compile(regex)); -3950 } -3951 -3952 @Override -3953 public List<ReplicationPeerDescription> listReplicationPeers(Pattern pattern) -3954 throws IOException { -3955 return executeCallable(new MasterCallable<List<ReplicationPeerDescription>>(getConnection(), -3956 getRpcControllerFactory()) { -3957 @Override -3958 protected List<ReplicationPeerDescription> rpcCall() throws Exception { -3959 List<ReplicationProtos.ReplicationPeerDescription> peersList = master.listReplicationPeers( -3960 getRpcController(), RequestConverter.buildListReplicationPeersRequest(pattern)) -3961 .getPeerDescList(); -3962 List<ReplicationPeerDescription> result = new ArrayList<>(peersList.size()); -3963 for (ReplicationProtos.ReplicationPeerDescription peer : peersList) { -3964 result.add(ReplicationSerDeHelper.toReplicationPeerDescription(peer)); -3965 } -3966 return result; -3967 } -3968 }); -3969 } -3970 -3971 @Override -3972 public void drainRegionServers(List<ServerName> servers) throws IOException { -3973 final List<HBaseProtos.ServerName> pbServers = new ArrayList<>(servers.size()); -3974 for (ServerName server : servers) { -3975 // Parse to ServerName to do simple validation. -3976 ServerName.parseServerName(server.toString()); -3977 pbServers.add(ProtobufUtil.toServerName(server)); -3978 } -3979 -3980 executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) { -3981 @Override -3982 public Void rpcCall() throws ServiceException { -3983 DrainRegionServersRequest req = -3984 DrainRegionServersRequest.newBuilder().addAllServerName(pbServers).build(); -3985 master.drainRegionServers(getRpcController(), req); -3986 return null; -3987 } -3988 }); +3872 ReplicationSerDeHelper.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig); +3873 updateReplicationPeerConfig(id, peerConfig); +3874 } +3875 +3876 @Override +3877 public void removeReplicationPeerTableCFs(String id, +3878 Map<TableName, ? extends Collection<String>> tableCfs) throws ReplicationException, +3879 IOException { +3880 if (tableCfs == null) { +3881 throw new ReplicationException("tableCfs is null"); +3882 } +3883 ReplicationPeerConfig peerConfig = getReplicationPeerConfig(id); +3884 ReplicationSerDeHelper.removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, id); +3885 updateReplicationPeerConfig(id, peerConfig); +3886 } +3887 +3888 @Override +3889 public List<ReplicationPeerDescription> listReplicationPeers() throws IOException { +3890 return listReplicationPeers((Pattern)null); +3891 } +3892 +3893 @Override +3894 public List<ReplicationPeerDescription> listReplicationPeers(String regex) throws IOException { +3895 return listReplicationPeers(Pattern.compile(regex)); +3896 } +3897 +3898 @Override +3899 public List<ReplicationPeerDescription> listReplicationPeers(Pattern pattern) +3900 throws IOException { +3901 return executeCallable(new MasterCallable<List<ReplicationPeerDescription>>(getConnection(), +3902 getRpcControllerFactory()) { +3903 @Override +3904 protected List<ReplicationPeerDescription> rpcCall() throws Exception { +3905 List<ReplicationProtos.ReplicationPeerDescription> peersList = master.listReplicationPeers( +3906 getRpcController(), RequestConverter.buildListReplicationPeersRequest(pattern)) +3907 .getPeerDescList(); +3908 List<ReplicationPeerDescription> result = new ArrayList<>(peersList.size()); +3909 for (ReplicationProtos.ReplicationPeerDescription peer : peersList) { +3910 result.add(ReplicationSerDeHelper.toReplicationPeerDescription(peer)); +3911 } +3912 return result; +3913 } +3914 }); +3915 } +3916 +3917 @Override +3918 public void drainRegionServers(List<ServerName> servers) throws IOException { +3919 final List<HBaseProtos.ServerName> pbServers = new ArrayList<>(servers.size()); +3920 for (ServerName server : servers) { +3921 // Parse to ServerName to do simple validation. +3922 ServerName.parseServerName(server.toString()); +3923 pbServers.add(ProtobufUtil.toServerName(server)); +3924 } +3925 +3926 executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) { +3927 @Override +3928 public Void rpcCall() throws ServiceException { +3929 DrainRegionServersRequest req = +3930 DrainRegionServersRequest.newBuilder().addAllServerName(pbServers).build(); +3931 master.drainRegionServers(getRpcController(), req); +3932 return null; +3933 } +3934 }); +3935 } +3936 +3937 @Override +3938 public List<ServerName> listDrainingRegionServers() throws IOException { +3939 return executeCallable(new MasterCallable<List<ServerName>>(getConnection(), +3940 getRpcControllerFactory()) { +3941 @Override +3942 public List<ServerName> rpcCall() throws ServiceException { +3943 ListDrainingRegionServersRequest req = ListDrainingRegionServersRequest.newBuilder().build(); +3944 List<ServerName> servers = new ArrayList<>(); +3945 for (HBaseProtos.ServerName server : master.listDrainingRegionServers(null, req) +3946 .getServerNameList()) { +3947 servers.add(ProtobufUtil.toServerName(server)); +3948 } +3949 return servers; +3950 } +3951 }); +3952 } +3953 +3954 @Override +3955 public void removeDrainFromRegionServers(List<ServerName> servers) throws IOException { +3956 final List<HBaseProtos.ServerName> pbServers = new ArrayList<>(servers.size()); +3957 for (ServerName server : servers) { +3958 pbServers.add(ProtobufUtil.toServerName(server)); +3959 } +3960 +3961 executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) { +3962 @Override +3963 public Void rpcCall() throws ServiceException { +3964 RemoveDrainFromRegionServersRequest req = RemoveDrainFromRegionServersRequest.newBuilder() +3965 .addAllServerName(pbServers).build(); +3966 master.removeDrainFromRegionServers(getRpcController(), req); +3967 return null; +3968 } +3969 }); +3970 } +3971 +3972 @Override +3973 public List<TableCFs> listReplicatedTableCFs() throws IOException { +3974 List<TableCFs> replicatedTableCFs = new ArrayList<>(); +3975 HTableDescriptor[] tables = listTables(); +3976 for (HTableDescriptor table : tables) { +3977 HColumnDescriptor[] columns = table.getColumnFamilies(); +3978 Map<String, Integer> cfs = new HashMap<>(); +3979 for (HColumnDescriptor column : columns) { +3980 if (column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL) { +3981 cfs.put(column.getNameAsString(), column.getScope()); +3982 } +3983 } +3984 if (!cfs.isEmpty()) { +3985 replicatedTableCFs.add(new TableCFs(table.getTableName(), cfs)); +3986 } +3987 } +3988 return replicatedTableCFs; 3989 } 3990 3991 @Override -3992 public List<ServerName> listDrainingRegionServers() throws IOException { -3993 return executeCallable(new MasterCallable<List<ServerName>>(getConnection(), -3994 getRpcControllerFactory()) { -3995 @Override -3996 public List<ServerName> rpcCall() throws ServiceException { -3997 ListDrainingRegionServersRequest req = ListDrainingRegionServersRequest.newBuilder().build(); -3998 List<ServerName> servers = new ArrayList<>(); -3999 for (HBaseProtos.ServerName server : master.listDrainingRegionServers(null, req) -4000 .getServerNameList()) { -4001 servers.add(ProtobufUtil.toServerName(server)); -4002 } -4003 return servers; -4004 } -4005 }); -4006 } -4007 -4008 @Override -4009 public void removeDrainFromRegionServers(List<ServerName> servers) throws IOException { -4010 final List<HBaseProtos.ServerName> pbServers = new ArrayList<>(servers.size()); -4011 for (ServerName server : servers) { -4012 pbServers.add(ProtobufUtil.toServerName(server)); +3992 public void enableTableReplication(final TableName tableName) throws IOException { +3993 if (tableName == null) { +3994 throw new IllegalArgumentException("Table name cannot be null"); +3995 } +3996 if (!tableExists(tableName)) { +3997 throw new TableNotFoundException("Table '" + tableName.getNameAsString() +3998 + "' does not exists."); +3999 } +4000 byte[][] splits = getTableSplits(tableName); +4001 checkAndSyncTableDescToPeers(tableName, splits); +4002 setTableRep(tableName, true); +4003 } +4004 +4005 @Override +4006 public void disableTableReplication(final TableName tableName) throws IOException { +4007 if (tableName == null) { +4008 throw new IllegalArgumentException("Table name is null"); +4009 } +4010 if (!tableExists(tableName)) { +4011 throw new TableNotFoundException("Table '" + tableName.getNamespaceAsString() +4012 + "' does not exists."); 4013 } -4014 -4015 executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) { -4016 @Override -4017 public Void rpcCall() throws ServiceException { -4018 RemoveDrainFromRegionServersRequest req = RemoveDrainFromRegionServersRequest.newBuilder() -4019 .addAllServerName(pbServers).build(); -4020 master.removeDrainFromRegionServers(getRpcController(), req); -4021 return null; -4022 } -4023 }); -4024 } -4025 -4026 @Override -4027 public List<TableCFs> listReplicatedTableCFs() throws IOException { -4028 List<TableCFs> replicatedTableCFs = new ArrayList<>(); -4029 HTableDescriptor[] tables = listTables(); -4030 for (HTableDescriptor table : tables) { -4031 HColumnDescriptor[] columns = table.getColumnFamilies(); -4032 Map<String, Integer> cfs = new HashMap<>(); -4033 for (HColumnDescriptor column : columns) { -4034 if (column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL) { -4035 cfs.put(column.getNameAsString(), column.getScope()); -4036 } -4037 } -4038 if (!cfs.isEmpty()) { -4039 replicatedTableCFs.add(new TableCFs(table.getTableName(), cfs)); -4040 } -4041 } -4042 return replicatedTableCFs; -4043 } -4044 -4045 @Override -4046 public void enableTableReplication(final TableName tableName) throws IOException { -4047 if (tableName == null) { -4048 throw new IllegalArgumentException("Table name cannot be null"); -4049 } -4050 if (!tableExists(tableName)) { -4051 throw new TableNotFoundException("Table '" + tableName.getNameAsString() -4052 + "' does not exists."); -4053 } -4054 byte[][] splits = getTableSplits(tableName); -4055 checkAndSyncTableDescToPeers(tableName, splits); -4056 setTableRep(tableName, true); -4057 } -4058 -4059 @Override -4060 public void disableTableReplication(final TableName tableName) throws IOException { -4061 if (tableName == null) { -4062 throw new IllegalArgumentException("Table name is null"); -4063 } -4064 if (!tableExists(tableName)) { -4065 throw new TableNotFoundException("Table '" + tableName.getNamespaceAsString() -4066 + "' does not exists."); -4067 } -4068 setTableRep(tableName, false); -4069 } -4070 -4071 /** -4072 * Copies the REPLICATION_SCOPE of table descriptor passed as an argument. Before copy, the method -4073 * ensures that the name of table and column-families should match. -4074 * @param peerHtd descriptor on peer cluster -4075 * @param localHtd - The HTableDescriptor of table from source cluster. -4076 * @return true If the name of table and column families match and REPLICATION_SCOPE copied -4077 * successfully. false If there is any mismatch in the names. -4078 */ -4079 private boolean copyReplicationScope(final HTableDescriptor peerHtd, -4080 final HTableDescriptor localHtd) { -4081 // Copy the REPLICATION_SCOPE only when table names and the names of -4082 // Column-Families are same. -4083 int result = peerHtd.getTableName().compareTo(localHtd.getTableName()); +4014 setTableRep(tableName, false); +4015 } +4016 +4017 /** +4018 * Copies the REPLICATION_SCOPE of table descriptor passed as an argument. Before copy, the method +4019 * ensures that the name of table and column-families should match. +4020 * @param peerHtd descriptor on peer cluster +4021 * @param localHtd - The HTableDescriptor of table from source cluster. +4022 * @return true If the name of table and column families match and REPLICATION_SCOPE copied +4023 * successfully. false If there is any mismatch in the names. +4024 */ +4025 private boolean copyReplicationScope(final HTableDescriptor peerHtd, +4026 final HTableDescriptor localHtd) { +4027 // Copy the REPLICATION_SCOPE only when table names and the names of +4028 // Column-Families are same. +4029 int result = peerHtd.getTableName().compareTo(localHtd.getTableName()); +4030 +4031 if (result == 0) { +4032 Iterator<HColumnDescriptor> remoteHCDIter = peerHtd.getFamilies().iterator(); +4033 Iterator<HColumnDescriptor> localHCDIter = localHtd.getFamilies().iterator(); +4034 +4035 while (remoteHCDIter.hasNext() && localHCDIter.hasNext()) { +4036 HColumnDescriptor remoteHCD = remoteHCDIter.next(); +4037 HColumnDescriptor localHCD = localHCDIter.next(); +4038 +4039 String remoteHCDName = remoteHCD.getNameAsString(); +4040 String localHCDName = localHCD.getNameAsString(); +4041 +4042 if (remoteHCDName.equals(localHCDName)) { +4043 remoteHCD.setScope(localHCD.getScope()); +4044 } else { +4045 result = -1; +4046 break; +4047 } +4048 } +4049 if (remoteHCDIter.hasNext() || localHCDIter.hasNext()) { +4050 return false; +4051 } +4052 } +4053 +4054 return result == 0; +4055 } +4056 +4057 /** +4058 * Compare the contents of the descriptor with another one passed as a parameter for replication +4059 * purpose. The REPLICATION_SCOPE field is ignored during comparison. +4060 * @param peerHtd descriptor on peer cluster +4061 * @param localHtd descriptor on source cluster which needs to be replicated. +4062 * @return true if the contents of the two descriptors match (ignoring just REPLICATION_SCOPE). +4063 * @see java.lang.Object#equals(java.lang.Object) +4064 */ +4065 private boolean compareForReplication(HTableDescriptor peerHtd, HTableDescriptor localHtd) { +4066 if (peerHtd == localHtd) { +4067 return true; +4068 } +4069 if (peerHtd == null) { +4070 return false; +4071 } +4072 boolean result = false; +4073 +4074 // Create a copy of peer HTD as we need to change its replication +4075 // scope to match with the local HTD. +4076 HTableDescriptor peerHtdCopy = new HTableDescriptor(peerHtd); +4077 +4078 result = copyReplicationScope(peerHtdCopy, localHtd); +4079 +4080 // If copy was successful, compare the two tables now. +4081 if (result) { +4082 result = (peerHtdCopy.compareTo(localHtd) == 0); +4083 } 4084 -4085 if (result == 0) { -4086 Iterator<HColumnDescriptor> remoteHCDIter = peerHtd.getFamilies().iterator(); -4087 Iterator<HColumnDescriptor> localHCDIter = localHtd.getFamilies().iterator(); -4088 -4089 while (remoteHCDIter.hasNext() && localHCDIter.hasNext()) { -4090 HColumnDescriptor remoteHCD = remoteHCDIter.next(); -4091 HColumnDescriptor localHCD = localHCDIter.next(); -4092 -4093 String remoteHCDName = remoteHCD.getNameAsString(); -4094 String localHCDName = localHCD.getNameAsString(); -4095 -4096 if (remoteHCDName.equals(localHCDName)) { -4097 remoteHCD.setScope(localHCD.getScope()); -4098 } else { -4099 result = -1; -4100 break; -4101 } -4102 } -4103 if (remoteHCDIter.hasNext() || localHCDIter.hasNext()) { -4104 return false; -4105 } -4106 } -4107 -4108 return result == 0; -4109 } -4110 -4111 /** -4112 * Compare the contents of the descriptor with another one passed as a parameter for replication -4113 * purpose. The REPLICATION_SCOPE field is ignored during comparison. -4114 * @param peerHtd descriptor on peer cluster -4115 * @param localHtd descriptor on source cluster which needs to be replicated. -4116 * @return true if the contents of the two descriptors match (ignoring just REPLICATION_SCOPE). -4117 * @see java.lang.Object#equals(java.lang.Object) -4118 */ -4119 private boolean compareForReplication(HTableDescriptor peerHtd, HTableDescriptor localHtd) { -4120 if (peerHtd == localHtd) { -4121 return true; -4122 } -4123 if (peerHtd == null) { -4124 return false; -4125 } -4126 boolean result = false; -4127 -4128 // Create a copy of peer HTD as we need to change its replication -4129 // scope to match with the local HTD. -4130 HTableDescriptor peerHtdCopy = new HTableDescriptor(peerHtd); -4131 -4132 result = copyReplicationScope(peerHtdCopy, localHtd); +4085 return result; +4086 } +4087 +4088 /** +4089 * Connect to peer and check the table descriptor on peer: +4090 * <ol> +4091 * <li>Create the same table on peer when not exist.</li> +4092 * <li>Throw an exception if the table already has replication enabled on any of the column +4093 * families.</li> +4094 * <li>Throw an exception if the table exists on peer cluster but descriptors are not same.</li> +4095 * </ol> +4096 * @param tableName name of the table to sync to the peer +4097 * @param splits table split keys +4098 * @throws IOException +4099 */ +4100 private void checkAndSyncTableDescToPeers(final TableName tableName, final byte[][] splits) +4101 throws IOException { +4102 List<ReplicationPeerDescription> peers = listReplicationPeers(); +4103 if (peers == null || peers.size() <= 0) { +4104 throw new IllegalArgumentException("Found no peer cluster for replication."); +4105 } +4106 +4107 for (ReplicationPeerDescription peerDesc : peers) { +4108 if (needToReplicate(tableName, peerDesc)) { +4109 Configuration peerConf = getPeerClusterConfiguration(peerDesc); +4110 try (Connection conn = ConnectionFactory.createConnection(peerConf); +4111 Admin repHBaseAdmin = conn.getAdmin()) { +4112 HTableDescriptor localHtd = getTableDescriptor(tableName); +4113 HTableDescriptor peerHtd = null; +4114 if (!repHBaseAdmin.tableExists(tableName)) { +4115 repHBaseAdmin.createTable(localHtd, splits); +4116 } else { +4117 peerHtd = repHBaseAdmin.getTableDescriptor(tableName); +4118 if (peerHtd == null) { +4119 throw new IllegalArgumentException("Failed to get table descriptor for table " +4120 + tableName.getNameAsString() + " from peer cluster " + peerDesc.getPeerId()); +4121 } +4122 if (!compareForReplication(peerHtd, localHtd)) { +4123 throw new IllegalArgumentException("Table " + tableName.getNameAsString() +4124 + " exists in peer cluster " + peerDesc.getPeerId() +4125 + ", but the table descriptors are not same when compared with source cluster." +4126 + " Thus can not enable the table's replication switch."); +4127 } +4128 } +4129 } +4130 } +4131 } +4132 } 4133 -4134 // If copy was successful, compare the two tables now. -4135 if (result) { -4136 result = (peerHtdCopy.compareTo(localHtd) == 0); -4137 } -4138 -4139 return result; -4140 } -4141 -4142 /** -4143 * Connect to peer and check the table descriptor on peer: -4144 * <ol> -4145 * <li>Create the same table on peer when not exist.</li> -4146 * <li>Throw an exception if the table already has replication enabled on any of the column -4147 * families.</li> -4148 * <li>Throw an exception if the table exists on peer cluster but descriptors are not same.</li> -4149 * </ol> -4150 * @param tableName name of the table to sync to the peer -4151 * @param splits table split keys -4152 * @throws IOException -4153 */ -4154 private void checkAndSyncTableDescToPeers(final TableName tableName, final byte[][] splits) -4155 throws IOException { -4156 List<ReplicationPeerDescription> peers = listReplicationPeers(); -4157 if (peers == null || peers.size() <= 0) { -4158 throw new IllegalArgumentException("Found no peer cluster for replication."); -4159 } +4134 /** +4135 * Decide whether the table need replicate to the peer cluster according to the peer config +4136 * @param table name of the table +4137 * @param peerConfig config for the peer +4138 * @return true if the table need replicate to the peer cluster +4139 */ +4140 private boolean needToReplicate(TableName table, ReplicationPeerDescription peer) { +4141 ReplicationPeerConfig peerConfig = peer.getPeerConfig(); +4142 Set<String> namespaces = peerConfig.getNamespaces(); +4143 Map<TableName, List<String>> tableCFsMap = peerConfig.getTableCFsMap(); +4144 // If null means user has explicitly not configured any namespaces and table CFs +4145 // so all the tables data are applicable for replication +4146 if (namespaces == null && tableCFsMap == null) { +4147 return true; +4148 } +4149 if (namespaces != null && namespaces.contains(table.getNamespaceAsString())) { +4150 return true; +4151 } +4152 if (tableCFsMap != null && tableCFsMap.containsKey(table)) { +4153 return true; +4154 } +4155 LOG.debug("Table " + table.getNameAsString() +4156 + " doesn't need replicate to peer cluster, peerId=" + peer.getPeerId() + ", clusterKey=" +4157 + peerConfig.getClusterKey()); +4158 return false; +4159 } 4160 -4161 for (ReplicationPeerDescription peerDesc : peers) { -4162 if (needToReplicate(tableName, peerDesc)) { -4163 Configuration peerConf = getPeerClusterConfiguration(peerDesc); -4164 try (Connection conn = ConnectionFactory.createConnection(peerConf); -4165 Admin repHBaseAdmin = conn.getAdmin()) { -4166 HTableDescriptor localHtd = getTableDescriptor(tableName); -4167 HTableDescriptor peerHtd = null; -4168 if (!repHBaseAdmin.tableExists(tableName)) { -4169 repHBaseAdmin.createTable(localHtd, splits); -4170 } else { -4171 peerHtd = repHBaseAdmin.getTableDescriptor(tableName); -4172 if (peerHtd == null) { -4173 throw new IllegalArgumentException("Failed to get table descriptor for table " -4174 + tableName.getNameAsString() + " from peer cluster " + peerDesc.getPeerId()); -4175 } -4176 if (!compareForReplication(peerHtd, localHtd)) { -4177 throw new IllegalArgumentException("Table " + tableName.getNameAsString() -4178 + " exists in peer cluster " + peerDesc.getPeerId() -4179 + ", but the table descriptors are not same when compared with source cluster." -4180 + " Thus can not enable the table's replication switch."); -4181 } -4182 } -4183 } -4184 } -4185 } -4186 } -4187 -4188 /** -4189 * Decide whether the table need replicate to the peer cluster according to the peer config -4190 * @param table name of the table -4191 * @param peerConfig config for the peer -4192 * @return true if the table need replicate to the peer cluster -4193 */ -4194 private boolean needToReplicate(TableName table, ReplicationPeerDescription peer) { -4195 ReplicationPeerConfig peerConfig = peer.getPeerConfig(); -4196 Set<String> namespaces = peerConfig.getNamespaces(); -4197 Map<TableName, List<String>> tableCFsMap = peerConfig.getTableCFsMap(); -4198 // If null means user has explicitly not configured any namespaces and table CFs -4199 // so all the tables data are applicable for replication -4200 if (namespaces == null && tableCFsMap == null) { -4201 return true; -4202 } -4203 if (namespaces != null && namespaces.contains(table.getNamespaceAsString())) { -4204 return true; -4205 } -4206 if (tableCFsMap != null && tableCFsMap.containsKey(table)) { -4207 return true; -4208 } -4209 LOG.debug("Table " + table.getNameAsString() -4210 + " doesn't need replicate to peer cluster, peerId=" + peer.getPeerId() + ", clusterKey=" -4211 + peerConfig.getClusterKey()); -4212 return false; -4213 } -4214 -4215 /** -4216 * Set the table's replication switch if the table's replication switch is already not set. -4217 * @param tableName name of the table -4218 * @param enableRep is replication switch enable or disable -4219 * @throws IOException if a remote or network exception occurs -4220 */ -4221 private void setTableRep(final TableName tableName, boolean enableRep) throws IOException { -4222 HTableDescriptor htd = getTableDescriptor(tableName); -4223 ReplicationState currentReplicationState = getTableReplicationState(htd); -4224 if (enableRep && currentReplicationState != ReplicationState.ENABLED -4225 || !enableRep && currentReplicationState != ReplicationState.DISABLED) { -4226 for (HColumnDescriptor hcd : htd.getFamilies()) { -4227 hcd.setScope(enableRep ? HConstants.REPLICATION_SCOPE_GLOBAL -4228 : HConstants.REPLICATION_SCOPE_LOCAL); -4229 } -4230 modifyTable(tableName, htd); -4231 } -4232 } +4161 /** +4162 * Set the table's replication switch if the table's replication switch is already not set. +4163 * @param tableName name of the table +4164 * @param enableRep is replication switch enable or disable +4165 * @throws IOException if a remote or network exception occurs +4166 */ +4167 private void setTableRep(final TableName tableName, boolean enableRep) throws IOException { +4168 HTableDescriptor htd = getTableDescriptor(tableName); +4169 ReplicationState currentReplicationState = getTableReplicationState(htd); +4170 if (enableRep && currentReplicationState != ReplicationState.ENABLED +4171 || !enableRep && currentReplicationState != ReplicationState.DISABLED) { +4172 for (HColumnDescriptor hcd : htd.getFamilies()) { +4173 hcd.setScope(enableRep ? HConstants.REPLICATION_SCOPE_GLOBAL +4174 : HConstants.REPLICATION_SCOPE_LOCAL); +4175 } +4176 modifyTable(tableName, htd); +4177 } +4178 } +4179 +4180 /** +4181 * This enum indicates the current state of the replication for a given table. +4182 */ +4183 private enum ReplicationState { +4184 ENABLED, // all column families enabled +4185 MIXED, // some column families enabled, some disabled +4186 DISABLED // all column families disabled +4187 } +4188 +4189 /** +4190 * @param htd table descriptor details for the table to check +4191 * @return ReplicationState the current state of the table. +4192 */ +4193 private ReplicationState getTableReplicationState(HTableDescriptor htd) { +4194 boolean hasEnabled = false; +4195 boolean hasDisabled = false; +4196 +4197 for (HColumnDescriptor hcd : htd.getFamilies()) { +4198 if (hcd.getScope() != HConstants.REPLICATION_SCOPE_GLOBAL +4199 && hcd.getScope() != HConstants.REPLICATION_SCOPE_SERIAL) { +4200 hasDisabled = true; +4201 } else { +4202 hasEnabled = true; +4203 } +4204 } +4205 +4206 if (hasEnabled && hasDisabled) return ReplicationState.MIXED; +4207 if (hasEnabled) return ReplicationState.ENABLED; +4208 return ReplicationState.DISABLED; +4209 } +4210 +4211 /** +4212 * Returns the configuration needed to talk to the remote slave cluster. +4213 * @param peer the description of replication peer +4214 * @return the configuration for the peer cluster, null if it was unable to get the configuration +4215 * @throws IOException +4216 */ +4217 private Configuration getPeerClusterConfiguration(ReplicationPeerDescription peer) +4218 throws IOException { +4219 ReplicationPeerConfig peerConfig = peer.getPeerConfig(); +4220 Configuration otherConf; +4221 try { +4222 otherConf = HBaseConfiguration.createClusterConf(this.conf, peerConfig.getClusterKey()); +4223 } catch (IOException e) { +4224 throw new IOException("Can't get peer configuration for peerId=" + peer.getPeerId(), e); +4225 } +4226 +4227 if (!peerConfig.getConfiguration().isEmpty()) { +4228 CompoundConfiguration compound = new CompoundConfiguration(); +4229 compound.add(otherConf); +4230 compound.addStringMap(peerConfig.getConfiguration()); +4231 return compound; +4232 } 4233 -4234 /** -4235 * This enum indicates the current state of the replication for a given table. -4236 */ -4237 private enum ReplicationState { -4238 ENABLED, // all column families enabled -4239 MIXED, // some column families enabled, some disabled -4240 DISABLED // all column families disabled -4241 } -4242 -4243 /** -4244 * @param htd table descriptor details for the table to check -4245 * @return ReplicationState the current state of the table. -4246 */ -4247 private ReplicationState getTableReplicationState(HTableDescriptor htd) { -4248 boolean hasEnabled = false; -4249 boolean hasDisabled = false; -4250 -4251 for (HColumnDescriptor hcd : htd.getFamilies()) { -4252 if (hcd.getScope() != HConstants.REPLICATION_SCOPE_GLOBAL -4253 && hcd.getScope() != HConstants.REPLICATION_SCOPE_SERIAL) { -4254 hasDisabled = true; -4255 } else { -4256 hasEnabled = true; -4257 } -4258 } -4259 -4260 if (hasEnabled && hasDisabled) return ReplicationState.MIXED; -4261 if (hasEnabled) return ReplicationState.ENABLED; -4262 return ReplicationState.DISABLED; -4263 } -4264 -4265 /** -4266 * Returns the configuration needed to talk to the remote slave cluster. -4267 * @param peer the description of replication peer -4268 * @return the configuration for the peer cluster, null if it was unable to get the configuration -4269 * @throws IOException -4270 */ -4271 private Configuration getPeerClusterConfiguration(ReplicationPeerDescription peer) -4272 throws IOException { -4273 ReplicationPeerConfig peerConfig = peer.getPeerConfig(); -4274 Configuration otherConf; -4275 try { -4276 otherConf = HBaseConfiguration.createClusterConf(this.conf, peerConfig.getClusterKey()); -4277 } catch (IOException e) { -4278 throw new IOException("Can't get peer configuration for peerId=" + peer.getPeerId(), e); -4279 } -4280 -4281 if (!peerConfig.getConfiguration().isEmpty()) { -4282 CompoundConfiguration compound = new CompoundConfiguration(); -4283 compound.add(otherConf); -4284 compound.addStringMap(peerConfig.getConfiguration()); -4285 return compound; -4286 } -4287 -4288 return otherConf; -4289 } -4290} +4234 return otherConf; +4235 } +4236}