Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 70CD4200CE4 for ; Sun, 9 Jul 2017 17:02:04 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 6F6E416A1D2; Sun, 9 Jul 2017 15:02:04 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 7E69216A1A0 for ; Sun, 9 Jul 2017 17:02:02 +0200 (CEST) Received: (qmail 79867 invoked by uid 500); 9 Jul 2017 15:01:57 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 77296 invoked by uid 99); 9 Jul 2017 15:01:55 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 09 Jul 2017 15:01:55 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id EE53BF552D; Sun, 9 Jul 2017 15:01:53 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: git-site-role@apache.org To: commits@hbase.apache.org Date: Sun, 09 Jul 2017 15:02:25 -0000 Message-Id: In-Reply-To: <74374d495ef643a5be8965892b791015@git.apache.org> References: <74374d495ef643a5be8965892b791015@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [33/51] [partial] hbase-site git commit: Published site at 82d554e3783372cc6b05489452c815b57c06f6cd. archived-at: Sun, 09 Jul 2017 15:02:04 -0000 http://git-wip-us.apache.org/repos/asf/hbase-site/blob/2d27954a/devapidocs/src-html/org/apache/hadoop/hbase/client/HBaseAdmin.DisableTableFuture.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/client/HBaseAdmin.DisableTableFuture.html b/devapidocs/src-html/org/apache/hadoop/hbase/client/HBaseAdmin.DisableTableFuture.html index f5bc73a..feb42ea 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/client/HBaseAdmin.DisableTableFuture.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/client/HBaseAdmin.DisableTableFuture.html @@ -4044,345 +4044,330 @@ 4036 4037 @Override 4038 public void drainRegionServers(List<ServerName> servers) throws IOException { -4039 final List<HBaseProtos.ServerName> pbServers = new ArrayList<>(servers.size()); -4040 for (ServerName server : servers) { -4041 // Parse to ServerName to do simple validation. -4042 ServerName.parseServerName(server.toString()); -4043 pbServers.add(ProtobufUtil.toServerName(server)); -4044 } -4045 -4046 executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) { -4047 @Override -4048 public Void rpcCall() throws ServiceException { -4049 DrainRegionServersRequest req = -4050 DrainRegionServersRequest.newBuilder().addAllServerName(pbServers).build(); -4051 master.drainRegionServers(getRpcController(), req); -4052 return null; -4053 } -4054 }); -4055 } -4056 -4057 @Override -4058 public List<ServerName> listDrainingRegionServers() throws IOException { -4059 return executeCallable(new MasterCallable<List<ServerName>>(getConnection(), -4060 getRpcControllerFactory()) { -4061 @Override -4062 public List<ServerName> rpcCall() throws ServiceException { -4063 ListDrainingRegionServersRequest req = ListDrainingRegionServersRequest.newBuilder().build(); -4064 List<ServerName> servers = new ArrayList<>(); -4065 for (HBaseProtos.ServerName server : master.listDrainingRegionServers(null, req) -4066 .getServerNameList()) { -4067 servers.add(ProtobufUtil.toServerName(server)); -4068 } -4069 return servers; -4070 } -4071 }); -4072 } -4073 -4074 @Override -4075 public void removeDrainFromRegionServers(List<ServerName> servers) throws IOException { -4076 final List<HBaseProtos.ServerName> pbServers = new ArrayList<>(servers.size()); -4077 for (ServerName server : servers) { -4078 pbServers.add(ProtobufUtil.toServerName(server)); -4079 } -4080 -4081 executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) { -4082 @Override -4083 public Void rpcCall() throws ServiceException { -4084 RemoveDrainFromRegionServersRequest req = RemoveDrainFromRegionServersRequest.newBuilder() -4085 .addAllServerName(pbServers).build(); -4086 master.removeDrainFromRegionServers(getRpcController(), req); -4087 return null; +4039 executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) { +4040 @Override +4041 public Void rpcCall() throws ServiceException { +4042 master.drainRegionServers(getRpcController(), +4043 RequestConverter.buildDrainRegionServersRequest(servers)); +4044 return null; +4045 } +4046 }); +4047 } +4048 +4049 @Override +4050 public List<ServerName> listDrainingRegionServers() throws IOException { +4051 return executeCallable(new MasterCallable<List<ServerName>>(getConnection(), +4052 getRpcControllerFactory()) { +4053 @Override +4054 public List<ServerName> rpcCall() throws ServiceException { +4055 ListDrainingRegionServersRequest req = ListDrainingRegionServersRequest.newBuilder().build(); +4056 List<ServerName> servers = new ArrayList<>(); +4057 for (HBaseProtos.ServerName server : master.listDrainingRegionServers(null, req) +4058 .getServerNameList()) { +4059 servers.add(ProtobufUtil.toServerName(server)); +4060 } +4061 return servers; +4062 } +4063 }); +4064 } +4065 +4066 @Override +4067 public void removeDrainFromRegionServers(List<ServerName> servers) throws IOException { +4068 executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) { +4069 @Override +4070 public Void rpcCall() throws ServiceException { +4071 master.removeDrainFromRegionServers(getRpcController(), RequestConverter.buildRemoveDrainFromRegionServersRequest(servers)); +4072 return null; +4073 } +4074 }); +4075 } +4076 +4077 @Override +4078 public List<TableCFs> listReplicatedTableCFs() throws IOException { +4079 List<TableCFs> replicatedTableCFs = new ArrayList<>(); +4080 HTableDescriptor[] tables = listTables(); +4081 for (HTableDescriptor table : tables) { +4082 HColumnDescriptor[] columns = table.getColumnFamilies(); +4083 Map<String, Integer> cfs = new HashMap<>(); +4084 for (HColumnDescriptor column : columns) { +4085 if (column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL) { +4086 cfs.put(column.getNameAsString(), column.getScope()); +4087 } 4088 } -4089 }); -4090 } -4091 -4092 @Override -4093 public List<TableCFs> listReplicatedTableCFs() throws IOException { -4094 List<TableCFs> replicatedTableCFs = new ArrayList<>(); -4095 HTableDescriptor[] tables = listTables(); -4096 for (HTableDescriptor table : tables) { -4097 HColumnDescriptor[] columns = table.getColumnFamilies(); -4098 Map<String, Integer> cfs = new HashMap<>(); -4099 for (HColumnDescriptor column : columns) { -4100 if (column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL) { -4101 cfs.put(column.getNameAsString(), column.getScope()); -4102 } -4103 } -4104 if (!cfs.isEmpty()) { -4105 replicatedTableCFs.add(new TableCFs(table.getTableName(), cfs)); -4106 } -4107 } -4108 return replicatedTableCFs; -4109 } -4110 -4111 @Override -4112 public void enableTableReplication(final TableName tableName) throws IOException { -4113 if (tableName == null) { -4114 throw new IllegalArgumentException("Table name cannot be null"); -4115 } -4116 if (!tableExists(tableName)) { -4117 throw new TableNotFoundException("Table '" + tableName.getNameAsString() -4118 + "' does not exists."); -4119 } -4120 byte[][] splits = getTableSplits(tableName); -4121 checkAndSyncTableDescToPeers(tableName, splits); -4122 setTableRep(tableName, true); -4123 } -4124 -4125 @Override -4126 public void disableTableReplication(final TableName tableName) throws IOException { -4127 if (tableName == null) { -4128 throw new IllegalArgumentException("Table name is null"); -4129 } -4130 if (!tableExists(tableName)) { -4131 throw new TableNotFoundException("Table '" + tableName.getNamespaceAsString() -4132 + "' does not exists."); -4133 } -4134 setTableRep(tableName, false); -4135 } -4136 -4137 /** -4138 * Copies the REPLICATION_SCOPE of table descriptor passed as an argument. Before copy, the method -4139 * ensures that the name of table and column-families should match. -4140 * @param peerHtd descriptor on peer cluster -4141 * @param localHtd - The HTableDescriptor of table from source cluster. -4142 * @return true If the name of table and column families match and REPLICATION_SCOPE copied -4143 * successfully. false If there is any mismatch in the names. -4144 */ -4145 private boolean copyReplicationScope(final HTableDescriptor peerHtd, -4146 final HTableDescriptor localHtd) { -4147 // Copy the REPLICATION_SCOPE only when table names and the names of -4148 // Column-Families are same. -4149 int result = peerHtd.getTableName().compareTo(localHtd.getTableName()); -4150 -4151 if (result == 0) { -4152 Iterator<HColumnDescriptor> remoteHCDIter = peerHtd.getFamilies().iterator(); -4153 Iterator<HColumnDescriptor> localHCDIter = localHtd.getFamilies().iterator(); -4154 -4155 while (remoteHCDIter.hasNext() && localHCDIter.hasNext()) { -4156 HColumnDescriptor remoteHCD = remoteHCDIter.next(); -4157 HColumnDescriptor localHCD = localHCDIter.next(); +4089 if (!cfs.isEmpty()) { +4090 replicatedTableCFs.add(new TableCFs(table.getTableName(), cfs)); +4091 } +4092 } +4093 return replicatedTableCFs; +4094 } +4095 +4096 @Override +4097 public void enableTableReplication(final TableName tableName) throws IOException { +4098 if (tableName == null) { +4099 throw new IllegalArgumentException("Table name cannot be null"); +4100 } +4101 if (!tableExists(tableName)) { +4102 throw new TableNotFoundException("Table '" + tableName.getNameAsString() +4103 + "' does not exists."); +4104 } +4105 byte[][] splits = getTableSplits(tableName); +4106 checkAndSyncTableDescToPeers(tableName, splits); +4107 setTableRep(tableName, true); +4108 } +4109 +4110 @Override +4111 public void disableTableReplication(final TableName tableName) throws IOException { +4112 if (tableName == null) { +4113 throw new IllegalArgumentException("Table name is null"); +4114 } +4115 if (!tableExists(tableName)) { +4116 throw new TableNotFoundException("Table '" + tableName.getNamespaceAsString() +4117 + "' does not exists."); +4118 } +4119 setTableRep(tableName, false); +4120 } +4121 +4122 /** +4123 * Copies the REPLICATION_SCOPE of table descriptor passed as an argument. Before copy, the method +4124 * ensures that the name of table and column-families should match. +4125 * @param peerHtd descriptor on peer cluster +4126 * @param localHtd - The HTableDescriptor of table from source cluster. +4127 * @return true If the name of table and column families match and REPLICATION_SCOPE copied +4128 * successfully. false If there is any mismatch in the names. +4129 */ +4130 private boolean copyReplicationScope(final HTableDescriptor peerHtd, +4131 final HTableDescriptor localHtd) { +4132 // Copy the REPLICATION_SCOPE only when table names and the names of +4133 // Column-Families are same. +4134 int result = peerHtd.getTableName().compareTo(localHtd.getTableName()); +4135 +4136 if (result == 0) { +4137 Iterator<HColumnDescriptor> remoteHCDIter = peerHtd.getFamilies().iterator(); +4138 Iterator<HColumnDescriptor> localHCDIter = localHtd.getFamilies().iterator(); +4139 +4140 while (remoteHCDIter.hasNext() && localHCDIter.hasNext()) { +4141 HColumnDescriptor remoteHCD = remoteHCDIter.next(); +4142 HColumnDescriptor localHCD = localHCDIter.next(); +4143 +4144 String remoteHCDName = remoteHCD.getNameAsString(); +4145 String localHCDName = localHCD.getNameAsString(); +4146 +4147 if (remoteHCDName.equals(localHCDName)) { +4148 remoteHCD.setScope(localHCD.getScope()); +4149 } else { +4150 result = -1; +4151 break; +4152 } +4153 } +4154 if (remoteHCDIter.hasNext() || localHCDIter.hasNext()) { +4155 return false; +4156 } +4157 } 4158 -4159 String remoteHCDName = remoteHCD.getNameAsString(); -4160 String localHCDName = localHCD.getNameAsString(); +4159 return result == 0; +4160 } 4161 -4162 if (remoteHCDName.equals(localHCDName)) { -4163 remoteHCD.setScope(localHCD.getScope()); -4164 } else { -4165 result = -1; -4166 break; -4167 } -4168 } -4169 if (remoteHCDIter.hasNext() || localHCDIter.hasNext()) { -4170 return false; -4171 } -4172 } -4173 -4174 return result == 0; -4175 } -4176 -4177 /** -4178 * Compare the contents of the descriptor with another one passed as a parameter for replication -4179 * purpose. The REPLICATION_SCOPE field is ignored during comparison. -4180 * @param peerHtd descriptor on peer cluster -4181 * @param localHtd descriptor on source cluster which needs to be replicated. -4182 * @return true if the contents of the two descriptors match (ignoring just REPLICATION_SCOPE). -4183 * @see java.lang.Object#equals(java.lang.Object) -4184 */ -4185 private boolean compareForReplication(HTableDescriptor peerHtd, HTableDescriptor localHtd) { -4186 if (peerHtd == localHtd) { -4187 return true; +4162 /** +4163 * Compare the contents of the descriptor with another one passed as a parameter for replication +4164 * purpose. The REPLICATION_SCOPE field is ignored during comparison. +4165 * @param peerHtd descriptor on peer cluster +4166 * @param localHtd descriptor on source cluster which needs to be replicated. +4167 * @return true if the contents of the two descriptors match (ignoring just REPLICATION_SCOPE). +4168 * @see java.lang.Object#equals(java.lang.Object) +4169 */ +4170 private boolean compareForReplication(HTableDescriptor peerHtd, HTableDescriptor localHtd) { +4171 if (peerHtd == localHtd) { +4172 return true; +4173 } +4174 if (peerHtd == null) { +4175 return false; +4176 } +4177 boolean result = false; +4178 +4179 // Create a copy of peer HTD as we need to change its replication +4180 // scope to match with the local HTD. +4181 HTableDescriptor peerHtdCopy = new HTableDescriptor(peerHtd); +4182 +4183 result = copyReplicationScope(peerHtdCopy, localHtd); +4184 +4185 // If copy was successful, compare the two tables now. +4186 if (result) { +4187 result = (peerHtdCopy.compareTo(localHtd) == 0); 4188 } -4189 if (peerHtd == null) { -4190 return false; -4191 } -4192 boolean result = false; -4193 -4194 // Create a copy of peer HTD as we need to change its replication -4195 // scope to match with the local HTD. -4196 HTableDescriptor peerHtdCopy = new HTableDescriptor(peerHtd); -4197 -4198 result = copyReplicationScope(peerHtdCopy, localHtd); -4199 -4200 // If copy was successful, compare the two tables now. -4201 if (result) { -4202 result = (peerHtdCopy.compareTo(localHtd) == 0); -4203 } -4204 -4205 return result; -4206 } -4207 -4208 /** -4209 * Connect to peer and check the table descriptor on peer: -4210 * <ol> -4211 * <li>Create the same table on peer when not exist.</li> -4212 * <li>Throw an exception if the table already has replication enabled on any of the column -4213 * families.</li> -4214 * <li>Throw an exception if the table exists on peer cluster but descriptors are not same.</li> -4215 * </ol> -4216 * @param tableName name of the table to sync to the peer -4217 * @param splits table split keys -4218 * @throws IOException -4219 */ -4220 private void checkAndSyncTableDescToPeers(final TableName tableName, final byte[][] splits) -4221 throws IOException { -4222 List<ReplicationPeerDescription> peers = listReplicationPeers(); -4223 if (peers == null || peers.size() <= 0) { -4224 throw new IllegalArgumentException("Found no peer cluster for replication."); -4225 } -4226 -4227 for (ReplicationPeerDescription peerDesc : peers) { -4228 if (needToReplicate(tableName, peerDesc)) { -4229 Configuration peerConf = getPeerClusterConfiguration(peerDesc); -4230 try (Connection conn = ConnectionFactory.createConnection(peerConf); -4231 Admin repHBaseAdmin = conn.getAdmin()) { -4232 HTableDescriptor localHtd = getTableDescriptor(tableName); -4233 HTableDescriptor peerHtd = null; -4234 if (!repHBaseAdmin.tableExists(tableName)) { -4235 repHBaseAdmin.createTable(localHtd, splits); -4236 } else { -4237 peerHtd = repHBaseAdmin.getTableDescriptor(tableName); -4238 if (peerHtd == null) { -4239 throw new IllegalArgumentException("Failed to get table descriptor for table " -4240 + tableName.getNameAsString() + " from peer cluster " + peerDesc.getPeerId()); -4241 } -4242 if (!compareForReplication(peerHtd, localHtd)) { -4243 throw new IllegalArgumentException("Table " + tableName.getNameAsString() -4244 + " exists in peer cluster " + peerDesc.getPeerId() -4245 + ", but the table descriptors are not same when compared with source cluster." -4246 + " Thus can not enable the table's replication switch."); -4247 } -4248 } -4249 } -4250 } -4251 } -4252 } -4253 -4254 /** -4255 * Decide whether the table need replicate to the peer cluster according to the peer config -4256 * @param table name of the table -4257 * @param peer config for the peer -4258 * @return true if the table need replicate to the peer cluster -4259 */ -4260 private boolean needToReplicate(TableName table, ReplicationPeerDescription peer) { -4261 ReplicationPeerConfig peerConfig = peer.getPeerConfig(); -4262 Set<String> namespaces = peerConfig.getNamespaces(); -4263 Map<TableName, List<String>> tableCFsMap = peerConfig.getTableCFsMap(); -4264 // If null means user has explicitly not configured any namespaces and table CFs -4265 // so all the tables data are applicable for replication -4266 if (namespaces == null && tableCFsMap == null) { -4267 return true; -4268 } -4269 if (namespaces != null && namespaces.contains(table.getNamespaceAsString())) { -4270 return true; -4271 } -4272 if (tableCFsMap != null && tableCFsMap.containsKey(table)) { -4273 return true; -4274 } -4275 LOG.debug("Table " + table.getNameAsString() -4276 + " doesn't need replicate to peer cluster, peerId=" + peer.getPeerId() + ", clusterKey=" -4277 + peerConfig.getClusterKey()); -4278 return false; -4279 } -4280 -4281 /** -4282 * Set the table's replication switch if the table's replication switch is already not set. -4283 * @param tableName name of the table -4284 * @param enableRep is replication switch enable or disable -4285 * @throws IOException if a remote or network exception occurs -4286 */ -4287 private void setTableRep(final TableName tableName, boolean enableRep) throws IOException { -4288 HTableDescriptor htd = new HTableDescriptor(getTableDescriptor(tableName)); -4289 ReplicationState currentReplicationState = getTableReplicationState(htd); -4290 if (enableRep && currentReplicationState != ReplicationState.ENABLED -4291 || !enableRep && currentReplicationState != ReplicationState.DISABLED) { -4292 for (HColumnDescriptor hcd : htd.getFamilies()) { -4293 hcd.setScope(enableRep ? HConstants.REPLICATION_SCOPE_GLOBAL -4294 : HConstants.REPLICATION_SCOPE_LOCAL); -4295 } -4296 modifyTable(tableName, htd); -4297 } -4298 } -4299 -4300 /** -4301 * This enum indicates the current state of the replication for a given table. -4302 */ -4303 private enum ReplicationState { -4304 ENABLED, // all column families enabled -4305 MIXED, // some column families enabled, some disabled -4306 DISABLED // all column families disabled -4307 } -4308 -4309 /** -4310 * @param htd table descriptor details for the table to check -4311 * @return ReplicationState the current state of the table. -4312 */ -4313 private ReplicationState getTableReplicationState(HTableDescriptor htd) { -4314 boolean hasEnabled = false; -4315 boolean hasDisabled = false; -4316 -4317 for (HColumnDescriptor hcd : htd.getFamilies()) { -4318 if (hcd.getScope() != HConstants.REPLICATION_SCOPE_GLOBAL -4319 && hcd.getScope() != HConstants.REPLICATION_SCOPE_SERIAL) { -4320 hasDisabled = true; -4321 } else { -4322 hasEnabled = true; -4323 } -4324 } -4325 -4326 if (hasEnabled && hasDisabled) return ReplicationState.MIXED; -4327 if (hasEnabled) return ReplicationState.ENABLED; -4328 return ReplicationState.DISABLED; -4329 } -4330 -4331 /** -4332 * Returns the configuration needed to talk to the remote slave cluster. -4333 * @param peer the description of replication peer -4334 * @return the configuration for the peer cluster, null if it was unable to get the configuration -4335 * @throws IOException -4336 */ -4337 private Configuration getPeerClusterConfiguration(ReplicationPeerDescription peer) -4338 throws IOException { -4339 ReplicationPeerConfig peerConfig = peer.getPeerConfig(); -4340 Configuration otherConf; -4341 try { -4342 otherConf = HBaseConfiguration.createClusterConf(this.conf, peerConfig.getClusterKey()); -4343 } catch (IOException e) { -4344 throw new IOException("Can't get peer configuration for peerId=" + peer.getPeerId(), e); -4345 } -4346 -4347 if (!peerConfig.getConfiguration().isEmpty()) { -4348 CompoundConfiguration compound = new CompoundConfiguration(); -4349 compound.add(otherConf); -4350 compound.addStringMap(peerConfig.getConfiguration()); -4351 return compound; -4352 } -4353 -4354 return otherConf; -4355 } -4356 -4357 @Override -4358 public void clearCompactionQueues(final ServerName sn, final Set<String> queues) -4359 throws IOException, InterruptedException { -4360 if (queues == null || queues.size() == 0) { -4361 throw new IllegalArgumentException("queues cannot be null or empty"); -4362 } -4363 final AdminService.BlockingInterface admin = this.connection.getAdmin(sn); -4364 Callable<Void> callable = new Callable<Void>() { -4365 @Override -4366 public Void call() throws Exception { -4367 // TODO: There is no timeout on this controller. Set one! -4368 HBaseRpcController controller = rpcControllerFactory.newController(); -4369 ClearCompactionQueuesRequest request = -4370 RequestConverter.buildClearCompactionQueuesRequest(queues); -4371 admin.clearCompactionQueues(controller, request); -4372 return null; -4373 } -4374 }; -4375 ProtobufUtil.call(callable); -4376 } -4377} +4189 +4190 return result; +4191 } +4192 +4193 /** +4194 * Connect to peer and check the table descriptor on peer: +4195 * <ol> +4196 * <li>Create the same table on peer when not exist.</li> +4197 * <li>Throw an exception if the table already has replication enabled on any of the column +4198 * families.</li> +4199 * <li>Throw an exception if the table exists on peer cluster but descriptors are not same.</li> +4200 * </ol> +4201 * @param tableName name of the table to sync to the peer +4202 * @param splits table split keys +4203 * @throws IOException +4204 */ +4205 private void checkAndSyncTableDescToPeers(final TableName tableName, final byte[][] splits) +4206 throws IOException { +4207 List<ReplicationPeerDescription> peers = listReplicationPeers(); +4208 if (peers == null || peers.size() <= 0) { +4209 throw new IllegalArgumentException("Found no peer cluster for replication."); +4210 } +4211 +4212 for (ReplicationPeerDescription peerDesc : peers) { +4213 if (needToReplicate(tableName, peerDesc)) { +4214 Configuration peerConf = getPeerClusterConfiguration(peerDesc); +4215 try (Connection conn = ConnectionFactory.createConnection(peerConf); +4216 Admin repHBaseAdmin = conn.getAdmin()) { +4217 HTableDescriptor localHtd = getTableDescriptor(tableName); +4218 HTableDescriptor peerHtd = null; +4219 if (!repHBaseAdmin.tableExists(tableName)) { +4220 repHBaseAdmin.createTable(localHtd, splits); +4221 } else { +4222 peerHtd = repHBaseAdmin.getTableDescriptor(tableName); +4223 if (peerHtd == null) { +4224 throw new IllegalArgumentException("Failed to get table descriptor for table " +4225 + tableName.getNameAsString() + " from peer cluster " + peerDesc.getPeerId()); +4226 } +4227 if (!compareForReplication(peerHtd, localHtd)) { +4228 throw new IllegalArgumentException("Table " + tableName.getNameAsString() +4229 + " exists in peer cluster " + peerDesc.getPeerId() +4230 + ", but the table descriptors are not same when compared with source cluster." +4231 + " Thus can not enable the table's replication switch."); +4232 } +4233 } +4234 } +4235 } +4236 } +4237 } +4238 +4239 /** +4240 * Decide whether the table need replicate to the peer cluster according to the peer config +4241 * @param table name of the table +4242 * @param peer config for the peer +4243 * @return true if the table need replicate to the peer cluster +4244 */ +4245 private boolean needToReplicate(TableName table, ReplicationPeerDescription peer) { +4246 ReplicationPeerConfig peerConfig = peer.getPeerConfig(); +4247 Set<String> namespaces = peerConfig.getNamespaces(); +4248 Map<TableName, List<String>> tableCFsMap = peerConfig.getTableCFsMap(); +4249 // If null means user has explicitly not configured any namespaces and table CFs +4250 // so all the tables data are applicable for replication +4251 if (namespaces == null && tableCFsMap == null) { +4252 return true; +4253 } +4254 if (namespaces != null && namespaces.contains(table.getNamespaceAsString())) { +4255 return true; +4256 } +4257 if (tableCFsMap != null && tableCFsMap.containsKey(table)) { +4258 return true; +4259 } +4260 LOG.debug("Table " + table.getNameAsString() +4261 + " doesn't need replicate to peer cluster, peerId=" + peer.getPeerId() + ", clusterKey=" +4262 + peerConfig.getClusterKey()); +4263 return false; +4264 } +4265 +4266 /** +4267 * Set the table's replication switch if the table's replication switch is already not set. +4268 * @param tableName name of the table +4269 * @param enableRep is replication switch enable or disable +4270 * @throws IOException if a remote or network exception occurs +4271 */ +4272 private void setTableRep(final TableName tableName, boolean enableRep) throws IOException { +4273 HTableDescriptor htd = new HTableDescriptor(getTableDescriptor(tableName)); +4274 ReplicationState currentReplicationState = getTableReplicationState(htd); +4275 if (enableRep && currentReplicationState != ReplicationState.ENABLED +4276 || !enableRep && currentReplicationState != ReplicationState.DISABLED) { +4277 for (HColumnDescriptor hcd : htd.getFamilies()) { +4278 hcd.setScope(enableRep ? HConstants.REPLICATION_SCOPE_GLOBAL +4279 : HConstants.REPLICATION_SCOPE_LOCAL); +4280 } +4281 modifyTable(tableName, htd); +4282 } +4283 } +4284 +4285 /** +4286 * This enum indicates the current state of the replication for a given table. +4287 */ +4288 private enum ReplicationState { +4289 ENABLED, // all column families enabled +4290 MIXED, // some column families enabled, some disabled +4291 DISABLED // all column families disabled +4292 } +4293 +4294 /** +4295 * @param htd table descriptor details for the table to check +4296 * @return ReplicationState the current state of the table. +4297 */ +4298 private ReplicationState getTableReplicationState(HTableDescriptor htd) { +4299 boolean hasEnabled = false; +4300 boolean hasDisabled = false; +4301 +4302 for (HColumnDescriptor hcd : htd.getFamilies()) { +4303 if (hcd.getScope() != HConstants.REPLICATION_SCOPE_GLOBAL +4304 && hcd.getScope() != HConstants.REPLICATION_SCOPE_SERIAL) { +4305 hasDisabled = true; +4306 } else { +4307 hasEnabled = true; +4308 } +4309 } +4310 +4311 if (hasEnabled && hasDisabled) return ReplicationState.MIXED; +4312 if (hasEnabled) return ReplicationState.ENABLED; +4313 return ReplicationState.DISABLED; +4314 } +4315 +4316 /** +4317 * Returns the configuration needed to talk to the remote slave cluster. +4318 * @param peer the description of replication peer +4319 * @return the configuration for the peer cluster, null if it was unable to get the configuration +4320 * @throws IOException +4321 */ +4322 private Configuration getPeerClusterConfiguration(ReplicationPeerDescription peer) +4323 throws IOException { +4324 ReplicationPeerConfig peerConfig = peer.getPeerConfig(); +4325 Configuration otherConf; +4326 try { +4327 otherConf = HBaseConfiguration.createClusterConf(this.conf, peerConfig.getClusterKey()); +4328 } catch (IOException e) { +4329 throw new IOException("Can't get peer configuration for peerId=" + peer.getPeerId(), e); +4330 } +4331 +4332 if (!peerConfig.getConfiguration().isEmpty()) { +4333 CompoundConfiguration compound = new CompoundConfiguration(); +4334 compound.add(otherConf); +4335 compound.addStringMap(peerConfig.getConfiguration()); +4336 return compound; +4337 } +4338 +4339 return otherConf; +4340 } +4341 +4342 @Override +4343 public void clearCompactionQueues(final ServerName sn, final Set<String> queues) +4344 throws IOException, InterruptedException { +4345 if (queues == null || queues.size() == 0) { +4346 throw new IllegalArgumentException("queues cannot be null or empty"); +4347 } +4348 final AdminService.BlockingInterface admin = this.connection.getAdmin(sn); +4349 Callable<Void> callable = new Callable<Void>() { +4350 @Override +4351 public Void call() throws Exception { +4352 // TODO: There is no timeout on this controller. Set one! +4353 HBaseRpcController controller = rpcControllerFactory.newController(); +4354 ClearCompactionQueuesRequest request = +4355 RequestConverter.buildClearCompactionQueuesRequest(queues); +4356 admin.clearCompactionQueues(controller, request); +4357 return null; +4358 } +4359 }; +4360 ProtobufUtil.call(callable); +4361 } +4362}