Return-Path:
- The replication feature of Apache HBase (TM) provides a way to copy data between HBase deployments. It
- can serve as a disaster recovery solution and can contribute to provide
- higher availability at the HBase layer. It can also serve more practically;
- for example, as a way to easily copy edits from a web-facing cluster to a "MapReduce"
- cluster which will process old and new data and ship back the results
- automatically.
-
- The basic architecture pattern used for Apache HBase replication is (HBase cluster) master-push;
- it is much easier to keep track of what’s currently being replicated since
- each region server has its own write-ahead-log (aka WAL or HLog), just like
- other well known solutions like MySQL master/slave replication where
- there’s only one bin log to keep track of. One master cluster can
- replicate to any number of slave clusters, and each region server will
- participate to replicate their own stream of edits. For more information
- on the different properties of master/slave replication and other types
- of replication, please consult
- How Google Serves Data From Multiple Datacenters.
-
- The replication is done asynchronously, meaning that the clusters can
- be geographically distant, the links between them can be offline for
- some time, and rows inserted on the master cluster won’t be
- available at the same time on the slave clusters (eventual consistency).
-
- The replication format used in this design is conceptually the same as
-
- MySQL’s statement-based replication . Instead of SQL statements, whole
- WALEdits (consisting of multiple cell inserts coming from the clients'
- Put and Delete) are replicated in order to maintain atomicity.
-
- The HLogs from each region server are the basis of HBase replication,
- and must be kept in HDFS as long as they are needed to replicate data
- to any slave cluster. Each RS reads from the oldest log it needs to
- replicate and keeps the current position inside ZooKeeper to simplify
- failure recovery. That position can be different for every slave
- cluster, same for the queue of HLogs to process.
-
- The clusters participating in replication can be of asymmetric sizes
- and the master cluster will do its “best effort” to balance the stream
- of replication on the slave clusters by relying on randomization.
-
- As of version 0.92, Apache HBase supports master/master and cyclic
- replication as well as replication to multiple slaves.
-
- The guide on enabling and using cluster replication is contained
- in the API documentation shipped with your Apache HBase distribution.
-
- The most up-to-date documentation is
-
- available at this address.
-
- The following sections describe the life of a single edit going from a
- client that communicates with a master cluster all the way to a single
- slave cluster.
-
- The client uses an API that sends a Put, Delete or ICV to a region
- server. The key values are transformed into a WALEdit by the region
- server and is inspected by the replication code that, for each family
- that is scoped for replication, adds the scope to the edit. The edit
- is appended to the current WAL and is then applied to its MemStore.
-
- In a separate thread, the edit is read from the log (as part of a batch)
- and only the KVs that are replicable are kept (that is, that they are part
- of a family scoped GLOBAL in the family's schema, non-catalog so not
- hbase:meta or -ROOT-, and did not originate in the target slave cluster - in
- case of cyclic replication).
-
- The edit is then tagged with the master's cluster UUID.
- When the buffer is filled, or the reader hits the end of the file,
- the buffer is sent to a random region server on the slave cluster.
-
- Synchronously, the region server that receives the edits reads them
- sequentially and separates each of them into buffers, one per table.
- Once all edits are read, each buffer is flushed using HTable, the normal
- HBase client.The master's cluster UUID is retained in the edits applied at
- the slave cluster in order to allow cyclic replication.
-
- Back in the master cluster's region server, the offset for the current
- WAL that's being replicated is registered in ZooKeeper.
-
- The edit is inserted in the same way.
-
- In the separate thread, the region server reads, filters and buffers
- the log edits the same way as during normal processing. The slave
- region server that's contacted doesn't answer to the RPC, so the master
- region server will sleep and retry up to a configured number of times.
- If the slave RS still isn't available, the master cluster RS will select a
- new subset of RS to replicate to and will retry sending the buffer of
- edits.
-
- In the mean time, the WALs will be rolled and stored in a queue in
- ZooKeeper. Logs that are archived by their region server (archiving is
- basically moving a log from the region server's logs directory to a
- central logs archive directory) will update their paths in the in-memory
- queue of the replicating thread.
-
- When the slave cluster is finally available, the buffer will be applied
- the same way as during normal processing. The master cluster RS will then
- replicate the backlog of logs.
-
- This section describes in depth how each of replication's internal
- features operate.
-
- HBase replication maintains all of its state in Zookeeper. By default, this state is
- contained in the base znode:
-
- There are two major child znodes in the base replication znode:
- hbase.replication.rpc.codec
to use
- hbase:meta
, and did not originate from the target slave cluster, in the
+ case of cyclic replication.hbase.replication.rpc.codec
to use
+ Peers
znode and the RS
znode.Peers
Znodepeers
znode is stored in
+ eplicationPeer.PeerStateTracker
class.RS
Znoders
znode contains a list of WAL logs which need to be replicated.
+ This list is divided into a set of queues organized by region server and the peer
+ cluster the region server is shipping the logs to. The rs znode has one child znode
+ for each region server in the cluster. The child znode name is the region server's
+ hostname, client port, and start code. This list includes both live and dead region
+ servers.rs
znode contains a list of WAL replication queues, one queue
+ for each peer cluster it replicates to. These queues are represented by child znodes
+ named by the cluster ID of the peer cluster they represent.zookeeper.znode.parent
zookeeper.znode.replication
zookeeper.znode.replication.peers
peer
znodezookeeper.znode.replication.peers.state
peer-state
znodezookeeper.znode.replication.rs
rs
znodehbase.replication
eplication.sleep.before.failover
replication.executor.workers
hbase.regionserver.maxlogs
because the insertion rate is faster than regions
+ are flushed. When a log is archived, the source threads are notified that the path for that
+ log changed. If a particular source has already finished with an archived log, it will just
+ ignore the message. If the log is in the queue, the path will be updated in memory. If the
+ log is currently being replicated, the change will be done atomically so that the reader
+ doesn't attempt to open the file when has already been moved. Because moving a file is a
+ NameNode operation , if the reader is currently reading the log, it won't generate any
+ exception.source.sizeOfLogQueue
source.shippedOps
source.logEditsRead
source.ageOfLastShippedOp
- /hbase/replication
-
-
-
-
- The peers znode contains a list of all peer replication clusters and the - current replication state of those clusters. It has one child peer znode - for each peer cluster. The peer znode is named with the cluster id provided - by the user in the HBase shell. The value of the peer znode contains - the peers cluster key provided by the user in the HBase Shell. The cluster key - contains a list of zookeeper nodes in the clusters quorum, the client port for the - zookeeper quorum, and the base znode for HBase - (i.e. “zk1.host.com,zk2.host.com,zk3.host.com:2181:/hbase”). -
-- /hbase/replication/peers - /1 [Value: zk1.host.com,zk2.host.com,zk3.host.com:2181:/hbase] - /2 [Value: zk5.host.com,zk6.host.com,zk7.host.com:2181:/hbase] --
- Each of these peer znodes has a child znode that indicates whether or not - replication is enabled on that peer cluster. These peer-state znodes do not - have child znodes and simply contain a boolean value (i.e. ENABLED or DISABLED). - This value is read/maintained by the ReplicationPeer.PeerStateTracker class. -
-- /hbase/replication/peers - /1/peer-state [Value: ENABLED] - /2/peer-state [Value: DISABLED] --
- The rs znode contains a list of all outstanding HLog files in the cluster - that need to be replicated. The list is divided into a set of queues organized by - region server and the peer cluster the region server is shipping the HLogs to. The - rs znode has one child znode for each region server in the cluster. The child - znode name is simply the regionserver name (a concatenation of the region server’s - hostname, client port and start code). These region servers could either be dead or alive. -
-- /hbase/replication/rs - /hostname.example.org,6020,1234 - /hostname2.example.org,6020,2856 --
- Within each region server znode, the region server maintains a set of HLog replication - queues. Each region server has one queue for every peer cluster it replicates to. - These queues are represented by child znodes named using the cluster id of the peer - cluster they represent (see the peer znode section). -
-- /hbase/replication/rs - /hostname.example.org,6020,1234 - /1 - /2 --
- Each queue has one child znode for every HLog that still needs to be replicated. - The value of these HLog child znodes is the latest position that has been replicated. - This position is updated every time a HLog entry is replicated. -
-- /hbase/replication/rs - /hostname.example.org,6020,1234 - /1 - 23522342.23422 [VALUE: 254] - 12340993.22342 [VALUE: 0] --
- All of the base znode names are configurable through parameters: -
-Parameter | -Default Value | -
zookeeper.znode.parent | -/hbase | -
zookeeper.znode.replication | -replication | -
zookeeper.znode.replication.peers | -peers | -
zookeeper.znode.replication.peers.state | -peer-state | -
zookeeper.znode.replication.rs | -rs | -
- The default replication znode structure looks like the following: -
-- /hbase/replication/peers/{peerId}/peer-state - /hbase/replication/rs --
- When a master cluster RS initiates a replication source to a slave cluster, - it first connects to the slave's ZooKeeper ensemble using the provided - cluster key (that key is composed of the value of hbase.zookeeper.quorum, - zookeeper.znode.parent and hbase.zookeeper.property.clientPort). It - then scans the "rs" directory to discover all the available sinks - (region servers that are accepting incoming streams of edits to replicate) - and will randomly choose a subset of them using a configured - ratio (which has a default value of 10%). For example, if a slave - cluster has 150 machines, 15 will be chosen as potential recipient for - edits that this master cluster RS will be sending. Since this is done by all - master cluster RSs, the probability that all slave RSs are used is very high, - and this method works for clusters of any size. For example, a master cluster - of 10 machines replicating to a slave cluster of 5 machines with a ratio - of 10% means that the master cluster RSs will choose one machine each - at random, thus the chance of overlapping and full usage of the slave - cluster is higher. -
-- A ZK watcher is placed on the ${zookeeper.znode.parent}/rs node of - the slave cluster by each of the master cluster's region servers. - This watch is used to monitor changes in the composition of the - slave cluster. When nodes are removed from the slave cluster (or - if nodes go down and/or come back up), the master cluster's region - servers will respond by selecting a new pool of slave region servers - to replicate to. -
-- Every master cluster RS has its own znode in the replication znodes hierarchy. - It contains one znode per peer cluster (if 5 slave clusters, 5 znodes - are created), and each of these contain a queue - of HLogs to process. Each of these queues will track the HLogs created - by that RS, but they can differ in size. For example, if one slave - cluster becomes unavailable for some time then the HLogs should not be deleted, - thus they need to stay in the queue (while the others are processed). - See the section named "Region server failover" for an example. -
-- When a source is instantiated, it contains the current HLog that the - region server is writing to. During log rolling, the new file is added - to the queue of each slave cluster's znode just before it's made available. - This ensures that all the sources are aware that a new log exists - before HLog is able to append edits into it, but this operations is - now more expensive. - The queue items are discarded when the replication thread cannot read - more entries from a file (because it reached the end of the last block) - and that there are other files in the queue. - This means that if a source is up-to-date and replicates from the log - that the region server writes to, reading up to the "end" of the - current file won't delete the item in the queue. -
-- When a log is archived (because it's not used anymore or because there's - too many of them per hbase.regionserver.maxlogs typically because insertion - rate is faster than region flushing), it will notify the source threads that the path - for that log changed. If the a particular source was already done with - it, it will just ignore the message. If it's in the queue, the path - will be updated in memory. If the log is currently being replicated, - the change will be done atomically so that the reader doesn't try to - open the file when it's already moved. Also, moving a file is a NameNode - operation so, if the reader is currently reading the log, it won't - generate any exception. -
-- By default, a source will try to read from a log file and ship log - entries as fast as possible to a sink. This is first limited by the - filtering of log entries; only KeyValues that are scoped GLOBAL and - that don't belong to catalog tables will be retained. A second limit - is imposed on the total size of the list of edits to replicate per slave, - which by default is 64MB. This means that a master cluster RS with 3 slaves - will use at most 192MB to store data to replicate. This doesn't account - the data filtered that wasn't garbage collected. -
-- Once the maximum size of edits was buffered or the reader hits the end - of the log file, the source thread will stop reading and will choose - at random a sink to replicate to (from the list that was generated by - keeping only a subset of slave RSs). It will directly issue a RPC to - the chosen machine and will wait for the method to return. If it's - successful, the source will determine if the current file is emptied - or if it should continue to read from it. If the former, it will delete - the znode in the queue. If the latter, it will register the new offset - in the log's znode. If the RPC threw an exception, the source will retry - 10 times until trying to find a different sink. -
-- If replication isn't enabled, the master's logs cleaning thread will - delete old logs using a configured TTL. This doesn't work well with - replication since archived logs passed their TTL may still be in a - queue. Thus, the default behavior is augmented so that if a log is - passed its TTL, the cleaning thread will lookup every queue until it - finds the log (while caching the ones it finds). If it's not found, - the log will be deleted. The next time it has to look for a log, - it will first use its cache. -
-- As long as region servers don't fail, keeping track of the logs in ZK - doesn't add any value. Unfortunately, they do fail, so since ZooKeeper - is highly available we can count on it and its semantics to help us - managing the transfer of the queues. -
-- All the master cluster RSs keep a watcher on every other one of them to be - notified when one dies (just like the master does). When it happens, - they all race to create a znode called "lock" inside the dead RS' znode - that contains its queues. The one that creates it successfully will - proceed by transferring all the queues to its own znode (one by one - since ZK doesn't support the rename operation) and will delete all the - old ones when it's done. The recovered queues' znodes will be named - with the id of the slave cluster appended with the name of the dead - server. -
-- Once that is done, the master cluster RS will create one new source thread per - copied queue, and each of them will follow the read/filter/ship pattern. - The main difference is that those queues will never have new data since - they don't belong to their new region server, which means that when - the reader hits the end of the last log, the queue's znode will be - deleted and the master cluster RS will close that replication source. -
-- For example, consider a master cluster with 3 region servers that's - replicating to a single slave with id '2'. The following hierarchy - represents what the znodes layout could be at some point in time. We - can see the RSs' znodes all contain a "peers" znode that contains a - single queue. The znode names in the queues represent the actual file - names on HDFS in the form "address,port.timestamp". -
--/hbase/replication/rs/ - 1.1.1.1,60020,123456780/ - 2/ - 1.1.1.1,60020.1234 (Contains a position) - 1.1.1.1,60020.1265 - 1.1.1.2,60020,123456790/ - 2/ - 1.1.1.2,60020.1214 (Contains a position) - 1.1.1.2,60020.1248 - 1.1.1.2,60020.1312 - 1.1.1.3,60020, 123456630/ - 2/ - 1.1.1.3,60020.1280 (Contains a position) --
- Now let's say that 1.1.1.2 loses its ZK session. The survivors will race - to create a lock, and for some reasons 1.1.1.3 wins. It will then start - transferring all the queues to its local peers znode by appending the - name of the dead server. Right before 1.1.1.3 is able to clean up the - old znodes, the layout will look like the following: -
--/hbase/replication/rs/ - 1.1.1.1,60020,123456780/ - 2/ - 1.1.1.1,60020.1234 (Contains a position) - 1.1.1.1,60020.1265 - 1.1.1.2,60020,123456790/ - lock - 2/ - 1.1.1.2,60020.1214 (Contains a position) - 1.1.1.2,60020.1248 - 1.1.1.2,60020.1312 - 1.1.1.3,60020,123456630/ - 2/ - 1.1.1.3,60020.1280 (Contains a position) - - 2-1.1.1.2,60020,123456790/ - 1.1.1.2,60020.1214 (Contains a position) - 1.1.1.2,60020.1248 - 1.1.1.2,60020.1312 --
- Some time later, but before 1.1.1.3 is able to finish replicating the - last HLog from 1.1.1.2, let's say that it dies too (also some new logs - were created in the normal queues). The last RS will then try to lock - 1.1.1.3's znode and will begin transferring all the queues. The new - layout will be: -
--/hbase/replication/rs/ - 1.1.1.1,60020,123456780/ - 2/ - 1.1.1.1,60020.1378 (Contains a position) - - 2-1.1.1.3,60020,123456630/ - 1.1.1.3,60020.1325 (Contains a position) - 1.1.1.3,60020.1401 - - 2-1.1.1.2,60020,123456790-1.1.1.3,60020,123456630/ - 1.1.1.2,60020.1312 (Contains a position) - 1.1.1.3,60020,123456630/ - lock - 2/ - 1.1.1.3,60020.1325 (Contains a position) - 1.1.1.3,60020.1401 - - 2-1.1.1.2,60020,123456790/ - 1.1.1.2,60020.1312 (Contains a position) --
- Yes, this is for much later. -
-- You can use the HBase-provided utility called CopyTable from the package - org.apache.hadoop.hbase.mapreduce in order to have a discp-like tool to - bulk copy data. -
-- Yes, this behavior would help a lot but it's not currently available - in HBase (BatchUpdate had that, but it was lost in the new API). -
-- Yes. See HDFS-2757. -
-This information has been moved to the Cluster Replication section of the Apache HBase Reference Guide.