Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 7D285200B40 for ; Fri, 17 Jun 2016 00:39:11 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 7BA3C160A66; Thu, 16 Jun 2016 22:39:11 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 350D0160A63 for ; Fri, 17 Jun 2016 00:39:09 +0200 (CEST) Received: (qmail 23814 invoked by uid 500); 16 Jun 2016 22:39:02 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 20264 invoked by uid 99); 16 Jun 2016 22:39:00 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 16 Jun 2016 22:39:00 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 16918E009D; Thu, 16 Jun 2016 22:39:00 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: misty@apache.org To: commits@hbase.apache.org Date: Thu, 16 Jun 2016 22:39:41 -0000 Message-Id: <9019f22fce6348d0b9322de137365de1@git.apache.org> In-Reply-To: <2bb5fea3d05240089da56857bc07c624@git.apache.org> References: <2bb5fea3d05240089da56857bc07c624@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [43/52] [partial] hbase-site git commit: Published site at 6d02f36ac78426f178b977566c170fb2e4b3503a. archived-at: Thu, 16 Jun 2016 22:39:11 -0000 http://git-wip-us.apache.org/repos/asf/hbase-site/blob/b80ecc4e/apidocs/src-html/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.html ---------------------------------------------------------------------- diff --git a/apidocs/src-html/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.html b/apidocs/src-html/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.html index 8714acb..894ccdd 100644 --- a/apidocs/src-html/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.html +++ b/apidocs/src-html/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.html @@ -62,578 +62,580 @@ 054import org.apache.hadoop.hbase.replication.ReplicationPeerZKImpl; 055import org.apache.hadoop.hbase.replication.ReplicationPeers; 056import org.apache.hadoop.hbase.replication.ReplicationQueuesClient; -057import org.apache.hadoop.hbase.util.Pair; -058import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; -059 -060/** -061 * <p> -062 * This class provides the administrative interface to HBase cluster -063 * replication. In order to use it, the cluster and the client using -064 * ReplicationAdmin must be configured with <code>hbase.replication</code> -065 * set to true. -066 * </p> -067 * <p> -068 * Adding a new peer results in creating new outbound connections from every -069 * region server to a subset of region servers on the slave cluster. Each -070 * new stream of replication will start replicating from the beginning of the -071 * current WAL, meaning that edits from that past will be replicated. -072 * </p> -073 * <p> -074 * Removing a peer is a destructive and irreversible operation that stops -075 * all the replication streams for the given cluster and deletes the metadata -076 * used to keep track of the replication state. -077 * </p> -078 * <p> -079 * To see which commands are available in the shell, type -080 * <code>replication</code>. -081 * </p> -082 */ -083@InterfaceAudience.Public -084@InterfaceStability.Evolving -085public class ReplicationAdmin implements Closeable { -086 private static final Log LOG = LogFactory.getLog(ReplicationAdmin.class); -087 -088 public static final String TNAME = "tableName"; -089 public static final String CFNAME = "columnFamilyName"; -090 -091 // only Global for now, can add other type -092 // such as, 1) no global replication, or 2) the table is replicated to this cluster, etc. -093 public static final String REPLICATIONTYPE = "replicationType"; -094 public static final String REPLICATIONGLOBAL = Integer -095 .toString(HConstants.REPLICATION_SCOPE_GLOBAL); -096 -097 private final Connection connection; -098 // TODO: replication should be managed by master. All the classes except ReplicationAdmin should -099 // be moved to hbase-server. Resolve it in HBASE-11392. -100 private final ReplicationQueuesClient replicationQueuesClient; -101 private final ReplicationPeers replicationPeers; -102 /** -103 * A watcher used by replicationPeers and replicationQueuesClient. Keep reference so can dispose -104 * on {@link #close()}. -105 */ -106 private final ZooKeeperWatcher zkw; -107 -108 /** -109 * Constructor that creates a connection to the local ZooKeeper ensemble. -110 * @param conf Configuration to use -111 * @throws IOException if an internal replication error occurs -112 * @throws RuntimeException if replication isn't enabled. -113 */ -114 public ReplicationAdmin(Configuration conf) throws IOException { -115 if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY, -116 HConstants.REPLICATION_ENABLE_DEFAULT)) { -117 throw new RuntimeException("hbase.replication isn't true, please " + -118 "enable it in order to use replication"); -119 } -120 this.connection = ConnectionFactory.createConnection(conf); -121 try { -122 zkw = createZooKeeperWatcher(); -123 try { -124 this.replicationQueuesClient = -125 ReplicationFactory.getReplicationQueuesClient(zkw, conf, this.connection); -126 this.replicationQueuesClient.init(); -127 this.replicationPeers = ReplicationFactory.getReplicationPeers(zkw, conf, -128 this.replicationQueuesClient, this.connection); -129 this.replicationPeers.init(); -130 } catch (Exception exception) { -131 if (zkw != null) { -132 zkw.close(); -133 } -134 throw exception; -135 } -136 } catch (Exception exception) { -137 if (connection != null) { -138 connection.close(); -139 } -140 if (exception instanceof IOException) { -141 throw (IOException) exception; -142 } else if (exception instanceof RuntimeException) { -143 throw (RuntimeException) exception; -144 } else { -145 throw new IOException("Error initializing the replication admin client.", exception); -146 } -147 } -148 } -149 -150 private ZooKeeperWatcher createZooKeeperWatcher() throws IOException { -151 // This Abortable doesn't 'abort'... it just logs. -152 return new ZooKeeperWatcher(connection.getConfiguration(), "ReplicationAdmin", new Abortable() { -153 @Override -154 public void abort(String why, Throwable e) { -155 LOG.error(why, e); -156 // We used to call system.exit here but this script can be embedded by other programs that -157 // want to do replication stuff... so inappropriate calling System.exit. Just log for now. -158 } -159 -160 @Override -161 public boolean isAborted() { -162 return false; -163 } -164 }); -165 } -166 -167 /** -168 * Add a new remote slave cluster for replication. -169 * @param id a short name that identifies the cluster -170 * @param peerConfig configuration for the replication slave cluster -171 * @param tableCfs the table and column-family list which will be replicated for this peer. -172 * A map from tableName to column family names. An empty collection can be passed -173 * to indicate replicating all column families. Pass null for replicating all table and column -174 * families -175 * @deprecated as release of 2.0.0, and it will be removed in 3.0.0, -176 * use {@link #addPeer(String, ReplicationPeerConfig)} instead. -177 */ -178 @Deprecated -179 public void addPeer(String id, ReplicationPeerConfig peerConfig, -180 Map<TableName, ? extends Collection<String>> tableCfs) throws ReplicationException { -181 if (tableCfs != null) { -182 peerConfig.setTableCFsMap(tableCfs); -183 } -184 this.replicationPeers.addPeer(id, peerConfig); -185 } -186 -187 /** -188 * Add a new remote slave cluster for replication. -189 * @param id a short name that identifies the cluster -190 * @param peerConfig configuration for the replication slave cluster -191 */ -192 public void addPeer(String id, ReplicationPeerConfig peerConfig) throws ReplicationException { -193 this.replicationPeers.addPeer(id, peerConfig); -194 } -195 -196 /** -197 * @deprecated as release of 2.0.0, and it will be removed in 3.0.0 -198 * */ -199 @Deprecated -200 public static Map<TableName, List<String>> parseTableCFsFromConfig(String tableCFsConfig) { -201 return ReplicationSerDeHelper.parseTableCFsFromConfig(tableCFsConfig); -202 } -203 -204 public void updatePeerConfig(String id, ReplicationPeerConfig peerConfig) -205 throws ReplicationException { -206 this.replicationPeers.updatePeerConfig(id, peerConfig); -207 } -208 /** -209 * Removes a peer cluster and stops the replication to it. -210 * @param id a short name that identifies the cluster -211 */ -212 public void removePeer(String id) throws ReplicationException { -213 this.replicationPeers.removePeer(id); -214 } -215 -216 /** -217 * Restart the replication stream to the specified peer. -218 * @param id a short name that identifies the cluster -219 */ -220 public void enablePeer(String id) throws ReplicationException { -221 this.replicationPeers.enablePeer(id); -222 } -223 -224 /** -225 * Stop the replication stream to the specified peer. -226 * @param id a short name that identifies the cluster -227 */ -228 public void disablePeer(String id) throws ReplicationException { -229 this.replicationPeers.disablePeer(id); -230 } -231 -232 /** -233 * Get the number of slave clusters the local cluster has. -234 * @return number of slave clusters -235 */ -236 public int getPeersCount() { -237 return this.replicationPeers.getAllPeerIds().size(); -238 } -239 -240 public Map<String, ReplicationPeerConfig> listPeerConfigs() { -241 return this.replicationPeers.getAllPeerConfigs(); -242 } -243 -244 public ReplicationPeerConfig getPeerConfig(String id) throws ReplicationException { -245 return this.replicationPeers.getReplicationPeerConfig(id); -246 } -247 -248 /** -249 * Get the replicable table-cf config of the specified peer. -250 * @param id a short name that identifies the cluster -251 * @deprecated as release of 2.0.0, and it will be removed in 3.0.0, -252 * use {@link #getPeerConfig(String)} instead. -253 * */ -254 @Deprecated -255 public String getPeerTableCFs(String id) throws ReplicationException { -256 return ReplicationSerDeHelper.convertToString(this.replicationPeers.getPeerTableCFsConfig(id)); -257 } -258 -259 /** -260 * Append the replicable table-cf config of the specified peer -261 * @param id a short that identifies the cluster -262 * @param tableCfs table-cfs config str -263 * @throws ReplicationException -264 * @deprecated as release of 2.0.0, and it will be removed in 3.0.0, -265 * use {@link #appendPeerTableCFs(String, Map)} instead. -266 */ -267 @Deprecated -268 public void appendPeerTableCFs(String id, String tableCfs) throws ReplicationException { -269 appendPeerTableCFs(id, ReplicationSerDeHelper.parseTableCFsFromConfig(tableCfs)); -270 } -271 -272 /** -273 * Append the replicable table-cf config of the specified peer -274 * @param id a short that identifies the cluster -275 * @param tableCfs A map from tableName to column family names -276 * @throws ReplicationException -277 */ -278 public void appendPeerTableCFs(String id, Map<TableName, ? extends Collection<String>> tableCfs) -279 throws ReplicationException { -280 if (tableCfs == null) { -281 throw new ReplicationException("tableCfs is null"); -282 } -283 Map<TableName, List<String>> preTableCfs = this.replicationPeers.getPeerTableCFsConfig(id); -284 if (preTableCfs == null) { -285 setPeerTableCFs(id, tableCfs); -286 return; -287 } -288 for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) { -289 TableName table = entry.getKey(); -290 Collection<String> appendCfs = entry.getValue(); -291 if (preTableCfs.containsKey(table)) { -292 List<String> cfs = preTableCfs.get(table); -293 if (cfs == null || appendCfs == null) { -294 preTableCfs.put(table, null); -295 } else { -296 Set<String> cfSet = new HashSet<String>(cfs); -297 cfSet.addAll(appendCfs); -298 preTableCfs.put(table, Lists.newArrayList(cfSet)); -299 -300 } -301 } else { -302 if (appendCfs == null || appendCfs.isEmpty()) { -303 preTableCfs.put(table, null); -304 } else { -305 preTableCfs.put(table, Lists.newArrayList(appendCfs)); -306 } -307 } -308 } -309 setPeerTableCFs(id, preTableCfs); -310 } -311 -312 /** -313 * Remove some table-cfs from table-cfs config of the specified peer -314 * @param id a short name that identifies the cluster -315 * @param tableCf table-cfs config str -316 * @throws ReplicationException -317 * @deprecated as release of 2.0.0, and it will be removed in 3.0.0, -318 * use {@link #removePeerTableCFs(String, Map)} instead. -319 */ -320 @Deprecated -321 public void removePeerTableCFs(String id, String tableCf) throws ReplicationException { -322 removePeerTableCFs(id, ReplicationSerDeHelper.parseTableCFsFromConfig(tableCf)); -323 } -324 -325 /** -326 * Remove some table-cfs from config of the specified peer -327 * @param id a short name that identifies the cluster -328 * @param tableCfs A map from tableName to column family names -329 * @throws ReplicationException -330 */ -331 public void removePeerTableCFs(String id, Map<TableName, ? extends Collection<String>> tableCfs) -332 throws ReplicationException { -333 if (tableCfs == null) { -334 throw new ReplicationException("tableCfs is null"); -335 } -336 Map<TableName, List<String>> preTableCfs = this.replicationPeers.getPeerTableCFsConfig(id); -337 if (preTableCfs == null) { -338 throw new ReplicationException("Table-Cfs for peer" + id + " is null"); -339 } -340 for (Map.Entry<TableName, ? extends Collection<String>> entry: tableCfs.entrySet()) { -341 -342 TableName table = entry.getKey(); -343 Collection<String> removeCfs = entry.getValue(); -344 if (preTableCfs.containsKey(table)) { -345 List<String> cfs = preTableCfs.get(table); -346 if (cfs == null && removeCfs == null) { -347 preTableCfs.remove(table); -348 } else if (cfs != null && removeCfs != null) { -349 Set<String> cfSet = new HashSet<String>(cfs); -350 cfSet.removeAll(removeCfs); -351 if (cfSet.isEmpty()) { -352 preTableCfs.remove(table); -353 } else { -354 preTableCfs.put(table, Lists.newArrayList(cfSet)); -355 } -356 } else if (cfs == null && removeCfs != null) { -357 throw new ReplicationException("Cannot remove cf of table: " + table -358 + " which doesn't specify cfs from table-cfs config in peer: " + id); -359 } else if (cfs != null && removeCfs == null) { -360 throw new ReplicationException("Cannot remove table: " + table -361 + " which has specified cfs from table-cfs config in peer: " + id); -362 } -363 } else { -364 throw new ReplicationException("No table: " + table + " in table-cfs config of peer: " + id); -365 -366 } -367 } -368 setPeerTableCFs(id, preTableCfs); -369 } -370 -371 /** -372 * Set the replicable table-cf config of the specified peer -373 * @param id a short name that identifies the cluster -374 * @param tableCfs the table and column-family list which will be replicated for this peer. -375 * A map from tableName to column family names. An empty collection can be passed -376 * to indicate replicating all column families. Pass null for replicating all table and column -377 * families -378 */ -379 public void setPeerTableCFs(String id, Map<TableName, ? extends Collection<String>> tableCfs) -380 throws ReplicationException { -381 this.replicationPeers.setPeerTableCFsConfig(id, tableCfs); -382 } -383 -384 /** -385 * Get the state of the specified peer cluster -386 * @param id String format of the Short name that identifies the peer, -387 * an IllegalArgumentException is thrown if it doesn't exist -388 * @return true if replication is enabled to that peer, false if it isn't -389 */ -390 public boolean getPeerState(String id) throws ReplicationException { -391 return this.replicationPeers.getStatusOfPeerFromBackingStore(id); -392 } -393 -394 @Override -395 public void close() throws IOException { -396 if (this.zkw != null) { -397 this.zkw.close(); -398 } -399 if (this.connection != null) { -400 this.connection.close(); -401 } -402 } -403 -404 -405 /** -406 * Find all column families that are replicated from this cluster -407 * @return the full list of the replicated column families of this cluster as: -408 * tableName, family name, replicationType -409 * -410 * Currently replicationType is Global. In the future, more replication -411 * types may be extended here. For example -412 * 1) the replication may only apply to selected peers instead of all peers -413 * 2) the replicationType may indicate the host Cluster servers as Slave -414 * for the table:columnFam. -415 */ -416 public List<HashMap<String, String>> listReplicated() throws IOException { -417 List<HashMap<String, String>> replicationColFams = new ArrayList<HashMap<String, String>>(); -418 -419 Admin admin = connection.getAdmin(); -420 HTableDescriptor[] tables; -421 try { -422 tables = admin.listTables(); -423 } finally { -424 if (admin!= null) admin.close(); -425 } -426 -427 for (HTableDescriptor table : tables) { -428 HColumnDescriptor[] columns = table.getColumnFamilies(); -429 String tableName = table.getNameAsString(); -430 for (HColumnDescriptor column : columns) { -431 if (column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL) { -432 // At this moment, the columfam is replicated to all peers -433 HashMap<String, String> replicationEntry = new HashMap<String, String>(); -434 replicationEntry.put(TNAME, tableName); -435 replicationEntry.put(CFNAME, column.getNameAsString()); -436 replicationEntry.put(REPLICATIONTYPE, REPLICATIONGLOBAL); -437 replicationColFams.add(replicationEntry); -438 } -439 } -440 } -441 -442 return replicationColFams; -443 } -444 -445 /** -446 * Enable a table's replication switch. -447 * @param tableName name of the table -448 * @throws IOException if a remote or network exception occurs -449 */ -450 public void enableTableRep(final TableName tableName) throws IOException { -451 if (tableName == null) { -452 throw new IllegalArgumentException("Table name cannot be null"); -453 } -454 try (Admin admin = this.connection.getAdmin()) { -455 if (!admin.tableExists(tableName)) { -456 throw new TableNotFoundException("Table '" + tableName.getNameAsString() -457 + "' does not exists."); -458 } -459 } -460 byte[][] splits = getTableSplitRowKeys(tableName); -461 checkAndSyncTableDescToPeers(tableName, splits); -462 setTableRep(tableName, true); -463 } -464 -465 /** -466 * Disable a table's replication switch. -467 * @param tableName name of the table -468 * @throws IOException if a remote or network exception occurs -469 */ -470 public void disableTableRep(final TableName tableName) throws IOException { -471 if (tableName == null) { -472 throw new IllegalArgumentException("Table name is null"); -473 } -474 try (Admin admin = this.connection.getAdmin()) { -475 if (!admin.tableExists(tableName)) { -476 throw new TableNotFoundException("Table '" + tableName.getNamespaceAsString() -477 + "' does not exists."); -478 } -479 } -480 setTableRep(tableName, false); -481 } -482 -483 /** -484 * Get the split row keys of table -485 * @param tableName table name -486 * @return array of split row keys -487 * @throws IOException -488 */ -489 private byte[][] getTableSplitRowKeys(TableName tableName) throws IOException { -490 try (RegionLocator locator = connection.getRegionLocator(tableName);) { -491 byte[][] startKeys = locator.getStartKeys(); -492 if (startKeys.length == 1) { -493 return null; -494 } -495 byte[][] splits = new byte[startKeys.length - 1][]; -496 for (int i = 1; i < startKeys.length; i++) { -497 splits[i - 1] = startKeys[i]; -498 } -499 return splits; -500 } -501 } -502 -503 /** -504 * Connect to peer and check the table descriptor on peer: -505 * <ol> -506 * <li>Create the same table on peer when not exist.</li> -507 * <li>Throw exception if the table exists on peer cluster but descriptors are not same.</li> -508 * </ol> -509 * @param tableName name of the table to sync to the peer -510 * @param splits table split keys -511 * @throws IOException -512 */ -513 private void checkAndSyncTableDescToPeers(final TableName tableName, final byte[][] splits) -514 throws IOException { -515 List<ReplicationPeer> repPeers = listReplicationPeers(); -516 if (repPeers == null || repPeers.size() <= 0) { -517 throw new IllegalArgumentException("Found no peer cluster for replication."); -518 } -519 -520 final TableName onlyTableNameQualifier = TableName.valueOf(tableName.getQualifierAsString()); +057import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments; +058import org.apache.hadoop.hbase.util.Pair; +059import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +060 +061/** +062 * <p> +063 * This class provides the administrative interface to HBase cluster +064 * replication. In order to use it, the cluster and the client using +065 * ReplicationAdmin must be configured with <code>hbase.replication</code> +066 * set to true. +067 * </p> +068 * <p> +069 * Adding a new peer results in creating new outbound connections from every +070 * region server to a subset of region servers on the slave cluster. Each +071 * new stream of replication will start replicating from the beginning of the +072 * current WAL, meaning that edits from that past will be replicated. +073 * </p> +074 * <p> +075 * Removing a peer is a destructive and irreversible operation that stops +076 * all the replication streams for the given cluster and deletes the metadata +077 * used to keep track of the replication state. +078 * </p> +079 * <p> +080 * To see which commands are available in the shell, type +081 * <code>replication</code>. +082 * </p> +083 */ +084@InterfaceAudience.Public +085@InterfaceStability.Evolving +086public class ReplicationAdmin implements Closeable { +087 private static final Log LOG = LogFactory.getLog(ReplicationAdmin.class); +088 +089 public static final String TNAME = "tableName"; +090 public static final String CFNAME = "columnFamilyName"; +091 +092 // only Global for now, can add other type +093 // such as, 1) no global replication, or 2) the table is replicated to this cluster, etc. +094 public static final String REPLICATIONTYPE = "replicationType"; +095 public static final String REPLICATIONGLOBAL = Integer +096 .toString(HConstants.REPLICATION_SCOPE_GLOBAL); +097 +098 private final Connection connection; +099 // TODO: replication should be managed by master. All the classes except ReplicationAdmin should +100 // be moved to hbase-server. Resolve it in HBASE-11392. +101 private final ReplicationQueuesClient replicationQueuesClient; +102 private final ReplicationPeers replicationPeers; +103 /** +104 * A watcher used by replicationPeers and replicationQueuesClient. Keep reference so can dispose +105 * on {@link #close()}. +106 */ +107 private final ZooKeeperWatcher zkw; +108 +109 /** +110 * Constructor that creates a connection to the local ZooKeeper ensemble. +111 * @param conf Configuration to use +112 * @throws IOException if an internal replication error occurs +113 * @throws RuntimeException if replication isn't enabled. +114 */ +115 public ReplicationAdmin(Configuration conf) throws IOException { +116 if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY, +117 HConstants.REPLICATION_ENABLE_DEFAULT)) { +118 throw new RuntimeException("hbase.replication isn't true, please " + +119 "enable it in order to use replication"); +120 } +121 this.connection = ConnectionFactory.createConnection(conf); +122 try { +123 zkw = createZooKeeperWatcher(); +124 try { +125 this.replicationQueuesClient = +126 ReplicationFactory.getReplicationQueuesClient(new ReplicationQueuesClientArguments(conf, +127 this.connection, zkw)); +128 this.replicationQueuesClient.init(); +129 this.replicationPeers = ReplicationFactory.getReplicationPeers(zkw, conf, +130 this.replicationQueuesClient, this.connection); +131 this.replicationPeers.init(); +132 } catch (Exception exception) { +133 if (zkw != null) { +134 zkw.close(); +135 } +136 throw exception; +137 } +138 } catch (Exception exception) { +139 if (connection != null) { +140 connection.close(); +141 } +142 if (exception instanceof IOException) { +143 throw (IOException) exception; +144 } else if (exception instanceof RuntimeException) { +145 throw (RuntimeException) exception; +146 } else { +147 throw new IOException("Error initializing the replication admin client.", exception); +148 } +149 } +150 } +151 +152 private ZooKeeperWatcher createZooKeeperWatcher() throws IOException { +153 // This Abortable doesn't 'abort'... it just logs. +154 return new ZooKeeperWatcher(connection.getConfiguration(), "ReplicationAdmin", new Abortable() { +155 @Override +156 public void abort(String why, Throwable e) { +157 LOG.error(why, e); +158 // We used to call system.exit here but this script can be embedded by other programs that +159 // want to do replication stuff... so inappropriate calling System.exit. Just log for now. +160 } +161 +162 @Override +163 public boolean isAborted() { +164 return false; +165 } +166 }); +167 } +168 +169 /** +170 * Add a new remote slave cluster for replication. +171 * @param id a short name that identifies the cluster +172 * @param peerConfig configuration for the replication slave cluster +173 * @param tableCfs the table and column-family list which will be replicated for this peer. +174 * A map from tableName to column family names. An empty collection can be passed +175 * to indicate replicating all column families. Pass null for replicating all table and column +176 * families +177 * @deprecated as release of 2.0.0, and it will be removed in 3.0.0, +178 * use {@link #addPeer(String, ReplicationPeerConfig)} instead. +179 */ +180 @Deprecated +181 public void addPeer(String id, ReplicationPeerConfig peerConfig, +182 Map<TableName, ? extends Collection<String>> tableCfs) throws ReplicationException { +183 if (tableCfs != null) { +184 peerConfig.setTableCFsMap(tableCfs); +185 } +186 this.replicationPeers.addPeer(id, peerConfig); +187 } +188 +189 /** +190 * Add a new remote slave cluster for replication. +191 * @param id a short name that identifies the cluster +192 * @param peerConfig configuration for the replication slave cluster +193 */ +194 public void addPeer(String id, ReplicationPeerConfig peerConfig) throws ReplicationException { +195 this.replicationPeers.addPeer(id, peerConfig); +196 } +197 +198 /** +199 * @deprecated as release of 2.0.0, and it will be removed in 3.0.0 +200 * */ +201 @Deprecated +202 public static Map<TableName, List<String>> parseTableCFsFromConfig(String tableCFsConfig) { +203 return ReplicationSerDeHelper.parseTableCFsFromConfig(tableCFsConfig); +204 } +205 +206 public void updatePeerConfig(String id, ReplicationPeerConfig peerConfig) +207 throws ReplicationException { +208 this.replicationPeers.updatePeerConfig(id, peerConfig); +209 } +210 /** +211 * Removes a peer cluster and stops the replication to it. +212 * @param id a short name that identifies the cluster +213 */ +214 public void removePeer(String id) throws ReplicationException { +215 this.replicationPeers.removePeer(id); +216 } +217 +218 /** +219 * Restart the replication stream to the specified peer. +220 * @param id a short name that identifies the cluster +221 */ +222 public void enablePeer(String id) throws ReplicationException { +223 this.replicationPeers.enablePeer(id); +224 } +225 +226 /** +227 * Stop the replication stream to the specified peer. +228 * @param id a short name that identifies the cluster +229 */ +230 public void disablePeer(String id) throws ReplicationException { +231 this.replicationPeers.disablePeer(id); +232 } +233 +234 /** +235 * Get the number of slave clusters the local cluster has. +236 * @return number of slave clusters +237 */ +238 public int getPeersCount() { +239 return this.replicationPeers.getAllPeerIds().size(); +240 } +241 +242 public Map<String, ReplicationPeerConfig> listPeerConfigs() { +243 return this.replicationPeers.getAllPeerConfigs(); +244 } +245 +246 public ReplicationPeerConfig getPeerConfig(String id) throws ReplicationException { +247 return this.replicationPeers.getReplicationPeerConfig(id); +248 } +249 +250 /** +251 * Get the replicable table-cf config of the specified peer. +252 * @param id a short name that identifies the cluster +253 * @deprecated as release of 2.0.0, and it will be removed in 3.0.0, +254 * use {@link #getPeerConfig(String)} instead. +255 * */ +256 @Deprecated +257 public String getPeerTableCFs(String id) throws ReplicationException { +258 return ReplicationSerDeHelper.convertToString(this.replicationPeers.getPeerTableCFsConfig(id)); +259 } +260 +261 /** +262 * Append the replicable table-cf config of the specified peer +263 * @param id a short that identifies the cluster +264 * @param tableCfs table-cfs config str +265 * @throws ReplicationException +266 * @deprecated as release of 2.0.0, and it will be removed in 3.0.0, +267 * use {@link #appendPeerTableCFs(String, Map)} instead. +268 */ +269 @Deprecated +270 public void appendPeerTableCFs(String id, String tableCfs) throws ReplicationException { +271 appendPeerTableCFs(id, ReplicationSerDeHelper.parseTableCFsFromConfig(tableCfs)); +272 } +273 +274 /** +275 * Append the replicable table-cf config of the specified peer +276 * @param id a short that identifies the cluster +277 * @param tableCfs A map from tableName to column family names +278 * @throws ReplicationException +279 */ +280 public void appendPeerTableCFs(String id, Map<TableName, ? extends Collection<String>> tableCfs) +281 throws ReplicationException { +282 if (tableCfs == null) { +283 throw new ReplicationException("tableCfs is null"); +284 } +285 Map<TableName, List<String>> preTableCfs = this.replicationPeers.getPeerTableCFsConfig(id); +286 if (preTableCfs == null) { +287 setPeerTableCFs(id, tableCfs); +288 return; +289 } +290 for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) { +291 TableName table = entry.getKey(); +292 Collection<String> appendCfs = entry.getValue(); +293 if (preTableCfs.containsKey(table)) { +294 List<String> cfs = preTableCfs.get(table); +295 if (cfs == null || appendCfs == null) { +296 preTableCfs.put(table, null); +297 } else { +298 Set<String> cfSet = new HashSet<String>(cfs); +299 cfSet.addAll(appendCfs); +300 preTableCfs.put(table, Lists.newArrayList(cfSet)); +301 +302 } +303 } else { +304 if (appendCfs == null || appendCfs.isEmpty()) { +305 preTableCfs.put(table, null); +306 } else { +307 preTableCfs.put(table, Lists.newArrayList(appendCfs)); +308 } +309 } +310 } +311 setPeerTableCFs(id, preTableCfs); +312 } +313 +314 /** +315 * Remove some table-cfs from table-cfs config of the specified peer +316 * @param id a short name that identifies the cluster +317 * @param tableCf table-cfs config str +318 * @throws ReplicationException +319 * @deprecated as release of 2.0.0, and it will be removed in 3.0.0, +320 * use {@link #removePeerTableCFs(String, Map)} instead. +321 */ +322 @Deprecated +323 public void removePeerTableCFs(String id, String tableCf) throws ReplicationException { +324 removePeerTableCFs(id, ReplicationSerDeHelper.parseTableCFsFromConfig(tableCf)); +325 } +326 +327 /** +328 * Remove some table-cfs from config of the specified peer +329 * @param id a short name that identifies the cluster +330 * @param tableCfs A map from tableName to column family names +331 * @throws ReplicationException +332 */ +333 public void removePeerTableCFs(String id, Map<TableName, ? extends Collection<String>> tableCfs) +334 throws ReplicationException { +335 if (tableCfs == null) { +336 throw new ReplicationException("tableCfs is null"); +337 } +338 Map<TableName, List<String>> preTableCfs = this.replicationPeers.getPeerTableCFsConfig(id); +339 if (preTableCfs == null) { +340 throw new ReplicationException("Table-Cfs for peer" + id + " is null"); +341 } +342 for (Map.Entry<TableName, ? extends Collection<String>> entry: tableCfs.entrySet()) { +343 +344 TableName table = entry.getKey(); +345 Collection<String> removeCfs = entry.getValue(); +346 if (preTableCfs.containsKey(table)) { +347 List<String> cfs = preTableCfs.get(table); +348 if (cfs == null && removeCfs == null) { +349 preTableCfs.remove(table); +350 } else if (cfs != null && removeCfs != null) { +351 Set<String> cfSet = new HashSet<String>(cfs); +352 cfSet.removeAll(removeCfs); +353 if (cfSet.isEmpty()) { +354 preTableCfs.remove(table); +355 } else { +356 preTableCfs.put(table, Lists.newArrayList(cfSet)); +357 } +358 } else if (cfs == null && removeCfs != null) { +359 throw new ReplicationException("Cannot remove cf of table: " + table +360 + " which doesn't specify cfs from table-cfs config in peer: " + id); +361 } else if (cfs != null && removeCfs == null) { +362 throw new ReplicationException("Cannot remove table: " + table +363 + " which has specified cfs from table-cfs config in peer: " + id); +364 } +365 } else { +366 throw new ReplicationException("No table: " + table + " in table-cfs config of peer: " + id); +367 +368 } +369 } +370 setPeerTableCFs(id, preTableCfs); +371 } +372 +373 /** +374 * Set the replicable table-cf config of the specified peer +375 * @param id a short name that identifies the cluster +376 * @param tableCfs the table and column-family list which will be replicated for this peer. +377 * A map from tableName to column family names. An empty collection can be passed +378 * to indicate replicating all column families. Pass null for replicating all table and column +379 * families +380 */ +381 public void setPeerTableCFs(String id, Map<TableName, ? extends Collection<String>> tableCfs) +382 throws ReplicationException { +383 this.replicationPeers.setPeerTableCFsConfig(id, tableCfs); +384 } +385 +386 /** +387 * Get the state of the specified peer cluster +388 * @param id String format of the Short name that identifies the peer, +389 * an IllegalArgumentException is thrown if it doesn't exist +390 * @return true if replication is enabled to that peer, false if it isn't +391 */ +392 public boolean getPeerState(String id) throws ReplicationException { +393 return this.replicationPeers.getStatusOfPeerFromBackingStore(id); +394 } +395 +396 @Override +397 public void close() throws IOException { +398 if (this.zkw != null) { +399 this.zkw.close(); +400 } +401 if (this.connection != null) { +402 this.connection.close(); +403 } +404 } +405 +406 +407 /** +408 * Find all column families that are replicated from this cluster +409 * @return the full list of the replicated column families of this cluster as: +410 * tableName, family name, replicationType +411 * +412 * Currently replicationType is Global. In the future, more replication +413 * types may be extended here. For example +414 * 1) the replication may only apply to selected peers instead of all peers +415 * 2) the replicationType may indicate the host Cluster servers as Slave +416 * for the table:columnFam. +417 */ +418 public List<HashMap<String, String>> listReplicated() throws IOException { +419 List<HashMap<String, String>> replicationColFams = new ArrayList<HashMap<String, String>>(); +420 +421 Admin admin = connection.getAdmin(); +422 HTableDescriptor[] tables; +423 try { +424 tables = admin.listTables(); +425 } finally { +426 if (admin!= null) admin.close(); +427 } +428 +429 for (HTableDescriptor table : tables) { +430 HColumnDescriptor[] columns = table.getColumnFamilies(); +431 String tableName = table.getNameAsString(); +432 for (HColumnDescriptor column : columns) { +433 if (column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL) { +434 // At this moment, the columfam is replicated to all peers +435 HashMap<String, String> replicationEntry = new HashMap<String, String>(); +436 replicationEntry.put(TNAME, tableName); +437 replicationEntry.put(CFNAME, column.getNameAsString()); +438 replicationEntry.put(REPLICATIONTYPE, REPLICATIONGLOBAL); +439 replicationColFams.add(replicationEntry); +440 } +441 } +442 } +443 +444 return replicationColFams; +445 } +446 +447 /** +448 * Enable a table's replication switch. +449 * @param tableName name of the table +450 * @throws IOException if a remote or network exception occurs +451 */ +452 public void enableTableRep(final TableName tableName) throws IOException { +453 if (tableName == null) { +454 throw new IllegalArgumentException("Table name cannot be null"); +455 } +456 try (Admin admin = this.connection.getAdmin()) { +457 if (!admin.tableExists(tableName)) { +458 throw new TableNotFoundException("Table '" + tableName.getNameAsString() +459 + "' does not exists."); +460 } +461 } +462 byte[][] splits = getTableSplitRowKeys(tableName); +463 checkAndSyncTableDescToPeers(tableName, splits); +464 setTableRep(tableName, true); +465 } +466 +467 /** +468 * Disable a table's replication switch. +469 * @param tableName name of the table +470 * @throws IOException if a remote or network exception occurs +471 */ +472 public void disableTableRep(final TableName tableName) throws IOException { +473 if (tableName == null) { +474 throw new IllegalArgumentException("Table name is null"); +475 } +476 try (Admin admin = this.connection.getAdmin()) { +477 if (!admin.tableExists(tableName)) {