Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 2DA5310926 for ; Fri, 4 Apr 2014 22:35:52 +0000 (UTC) Received: (qmail 97510 invoked by uid 500); 4 Apr 2014 22:35:50 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 97473 invoked by uid 500); 4 Apr 2014 22:35:50 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 97385 invoked by uid 99); 4 Apr 2014 22:35:48 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Apr 2014 22:35:48 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id E273194355D; Fri, 4 Apr 2014 22:35:47 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit From: elserj@apache.org To: commits@accumulo.apache.org Date: Fri, 04 Apr 2014 22:35:48 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/3] git commit: ACCUMULO-378 Add details on replication "bookkeeping" on the master cluster. ACCUMULO-378 Add details on replication "bookkeeping" on the master cluster. Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/de7f591a Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/de7f591a Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/de7f591a Branch: refs/heads/ACCUMULO-378 Commit: de7f591ab6f818e1967f0d4d0e266a803b6f086d Parents: 13561eb Author: Josh Elser Authored: Thu Apr 3 17:14:39 2014 -0400 Committer: Josh Elser Committed: Fri Apr 4 16:52:12 2014 -0400 ---------------------------------------------------------------------- .../resources/design/ACCUMULO-378-design.mdtext | 663 +++++++++++++------ 1 file changed, 453 insertions(+), 210 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/de7f591a/docs/src/main/resources/design/ACCUMULO-378-design.mdtext ---------------------------------------------------------------------- diff --git a/docs/src/main/resources/design/ACCUMULO-378-design.mdtext b/docs/src/main/resources/design/ACCUMULO-378-design.mdtext index 23a85d1..28a2187 100644 --- a/docs/src/main/resources/design/ACCUMULO-378-design.mdtext +++ b/docs/src/main/resources/design/ACCUMULO-378-design.mdtext @@ -1,210 +1,453 @@ -Accumulo Multi-DataCenter Replication -===================================== - -ACCUMULO-378 deals with disaster recovery techniques in Accumulo through cross-site replication of tables. Data which is written to one Accumulo instance will automatically be replicated to a separate Accumulo instance. - - -Justification -------------- - -Losing an entire instance really stinks. In addition to natural disasters or facility problems, Hadoop always has the potential for failure. In the newest versions of Hadoop, the high availability (HA) namenode functionality increases the redundancy of Hadoop in regards to the single point of failure which the namenode previously was. Despite this, there is always a varying amount of required administrative intervention to ensure that failure does not result in data loss: userspace software (the entire Hadoop and Java stack), kernel-space software (filesystem implementations), “expected” hardware failures (hard drives), unexpected compute hardware failures (NICs, CPU, Memory), and infrastructure failures (switches and routers). Accumulo currently has the ability for manual snapshots/copies across multiple instances; however, this is not sufficient for multiple reasons with the biggest reason being a lack of automated replication. - - -Background ----------- - -Apache HBase has had master-master replication, cyclic replication and multi-slave replication since 0.92. This satisfies a wide range of cross-site replication strategies. Master-master replication lets us have two systems which both replicate to each other. Both systems can service new writes and will update their “view” of a table from one another. Cyclic replication allows us to have cycles in our replication graph. This is a generalization of the master-master strategy in which we may have ultimately have a system which replicates to a system that it receives data from. A system with three masters, A, B and C, which replicate in a row (A to B, B to C and C to A) is an example of this. More complicated examples of this can be envisioned when dealing with multiple replicas inside one geographic region or data center. Multi-slave replication is a relatively simple in that a single master system will replicate to multiple slaves instead of just one. - - -While these are relatively different to one another, I believe most can be satisfied through a single, master-push, replication implementation. Although, the proposed data structure should also be capable of supporting a slave-pull strategy. - - -Implementation --------------- - -As a first implementation, I will prototype a single master with multiple slave replication strategy. This should grant us the most flexibility and the most functionality. The general implementation should be capable of application to the other replication structures (master-master and cyclic-replication). I’ll outline a simple master-slave replication use case, followed by application of this approach to replication cycles and master-master replication. This approach does not consider conditional mutations. - -### Replication Framework - -In an attempt to be as clear as possible, I’ll use the following terminology when explaining the implementation: master will refer to the “master” Accumulo cluster (the system accepting new writes), slave will refer to the “slave” Accumulo cluster (the system which does not receive new data through the Accumulo client API, but only from master through replication). The design results in an eventual consistency model of replication which will allow for slaves to be offline and the online master to still process new updates. - - -In the simplest notion, when a new file is created by master, we want to ensure that this file is also sent to the slave. In practice, this new file can either be an RFile that was bulk-imported to master or this can be a write-ahead log (WAL) file. The bulk-imported RFile is the easy case, but the WAL case merits additional explanation. While data is being written to Accumulo is it written to a sorted, in-memory map and an append-only WAL file. While the in-memory map provides a very useful interface for the TabletServer to use for scans and compactions, it is difficult to extract new updates at the RFile level. As such, this proposed implementation uses the WAL as the transport “file format”[a]. While it is noted that in sending a WAL to multiple slaves, each slave will need to reprocess each WAL to make Mutations to apply whereas they could likely be transformed once, that is left as a future optimization. - - -To increase the speed in eventual consistency can be achieved, WAL offsets can be tracked to begin the replication process before a WAL is closed. We can bin these mutations together for a lazy replication which can be combined to each target server which amortizes the cost into a single write set message. It is not apparent that this requires co-location within each source tablet in the Accumulo metadata table which means that the worry of inadvertent errors caused by placing this data in the metadata table is entirely removed. - - -In every replication graph, which consists of master(s) and slave(s), each system should have a unique identifier. It is desirable to be able to uniquely identify each system, and each system should have knowledge of the other systems participating. - - -These identifiers also make implementing cyclic replication easier, as a cluster can ignore any requests to replicate some data when that request already contains the current cluster’s identifier. In other words, data we try to replicate will contain a linked list of identifiers with the provenance of where that data came and each cluster can make the determination of whether or not it has seen this data already (and thus needs to process and propagate it). This also lets us treat replication rules as a graph which grants us a common terminology to use when describing replication. - - -This framework provides a general strategy to allow pluggable replication strategies to export data out of an Accumulo cluster. An AccumuloReplicationStrategy is the only presently targeted replication strategy; however, the implementation should not prohibit alternative approaches to replication such as other databases or filesystems. - - -### Replication Strategy Implementation - - -Henceforth, both of the RFiles and WAL files that need replication can be treated as a chunk of data. This chunk references a start offset and length from the source (RFile or WAL) which needs to be replicated. This has the nice property of being able to use a Combiner to combine multiple, sequential chunks into one larger chunk to amortize RPC costs. - - -#### Make the master aware of file to replicate - - -Let us define a column family that is used to denote a chunk that needs to be replicated: REPL. We first need to let master know that it has a new chunk which needs to be replicated. When the file comes from a bulk-import, we need to create a new entry in the !METADATA table for the given tablet with the REPL column family. If the file is a WAL, we also want to write an entry for the REPL column[b]. In both cases, the chunk’s URI will be stored in the column qualifier. The Value can contain some serialized data structure to track cluster replication provenance and offset values. Each row (tablet) in the !METADATA table will contain zero to many REPL columns. As such, the garbage collector needs to be modified to not delete these files on the master’s HDFS instance until these files are replicated (copied to the slave). - - -#### Choose local TabletServer to perform replication - - -The Accumulo Master can have a thread that scans the replication table to look for chunks to replicate. When it finds some, choose a TabletServer to perform the replication to all slaves. The master should use a FATE operation to manage the state machine of this replication process. The expected principles, such as exponential backoff on network errors, should be followed. When all slaves have reported successfully receiving the file, the master can remove the REPL column for the given chunk. On the slave, before beginning transfer, the slave should ascertain a new local, unique filename to use for the remote file. When the transfer is complete, the file should be treated like log recovery and brought into the appropriate Tablet. If the slave is also a master (replicating to other nodes), the replicated data should create a new REPL column in the slave’s table to repeat the replication process, adding in its cluster identifier to the provenance list. Otherwise, the file can be a c andidate for deletion by the garbage collection. - - -The tserver chosen to replicate the data from the master cluster should ideally be the tserver that created that data. This helps reduce the complexity of dealing with locality later on. If the HDFS blocks written by the tserver are local, then we gain the same locality perks. - - -#### Recurse - - -In our simple master and slave replication scheme, we are done after the new updates are made available on slave. As aforementioned, it is relatively easy to “schedule” replication of a new file on slave because we just repeat the same process that master did to replicate to slave in the first place. - - -Configuration -------------- - -Replication can be configured on a per-locality-group, replicated that locality group to one or more slaves. Given that we have dynamic column families, trying to track per-column-family replication would be unnecessarily difficult. Configuration requires new configuration variables that need to be introduced to support the necessary information. Each slave is defined with a name and the zookeeper quorum of the remote cluster to locate the active Accumulo Master. The API should ease configuration on replication across all locality groups. Replication cannot be configured on the root or metadata table. - - -Site-wide: -# The name and location of other clusters -instance.cluster.$name.zookeepers=zk1,zk2,zk3[c] -# The name of this cluster -instance.replication.name=my_cluster_name[d] - -Per-table: -# Declare the locality group(s) that should be replicated and the clusters that they should be replicated to -table.replication.$locality_group_name=cluster1,cluster2,... - - -Shell commands can also be created to make this configuration easier. - - -definecluster cluster_name zookeeper_quorum - - -e.g. definecluster slave slaveZK1:2181,slaveZK2:2181,slaveZK3:2181 - - - - -deletecluster cluster_name zookeeper_quorum - - -e.g. deletecluster slave slaveZK1:2181,slaveZK2:2181,slaveZK3:2181 - - - - -enablereplication -t table (-lg loc_group | --all-loc-groups) cluster_name - - -e.g. enablereplication -t foo -lg cf1 slave1 - enablereplication -t foo -all-loc-groups slave1 - - - - - - -disablereplication -t table (-lg loc_group | --all-loc-groups) cluster_name - - -e.g. disablereplication -t foo -lg cf1 slave1 - disablereplication -t foo -all-loc-groups slave1 - - -For slaves, we likely do not want to allow users to perform writes against the cluster. Thus, they should be read-only. This likely requires custom configuration and some ZK state to not accept regular API connections. Should be exposed/controllable by the shell, too. - -Common Questions ----------------- - -*How do conditional mutations work with this approach?* - - -I have absolutely no idea. They likely won’t work out in master-master situations, but might be ok in master-slave cases? - - -*How does replication work on a table which already contains data?* - - -When replication is enabled on a table, all new data will be replicated. This implementation does not attempt to support this as the existing importtable and exporttable already provide support to do this. - - -*When I update a table property on the master, will it propagate to the slave?* - - -There are both arguments for and against this. We likely want to revisit this later as a configuration parameter that could allow the user to choose if this should happen. We should avoid implementations that would tie us to one or the other. - - -As an argument against this, consider a production and a backup cluster where the backup cluster is smaller in number of nodes, but contains more disks. Despite wanting to replicate the data in a table, the configuration of that table may not be desired (e.g. split threshold, compression codecs, etc). Another argument against could be age-off. If a replica cluster is not the same size as the production cluster (which is extremely plausible) you would not want the same age-off rules for both the production and replica. - - -An argument for this feature is that you would want custom compaction iterators (as a combiner, for example) to only be configured on a table once. You would want these iterators to appear on all replicas. Such an implementation is also difficult in master-master situations as we don’t have a shared ZooKeeper instance that we can use to reliably commit these changes. - - -*What happens in master-master if two Keys are exactly the same with different values?* - - -Non-deterministic - mostly because we already have this problem: https://issues.apache.org/jira/browse/ACCUMULO-1528 - - -*Did you come up with this all on your own?* - - -Ha, no. Big thanks goes out to HBase’s documentation, Enis Söztutar (HBase), and other Accumulo devs that I’ve bounced these ideas off of (too many to enumerate). - - - - -Goals ------ - * Master-Slave configuration that doesn’t exclude future master-master work - * Per locality-group replication configuration - * Shell administration of replication - * Accumulo Monitor integration/insight to replication status - * State machines for lifecycle of chunks - * Versionable (read-as protobuf) datastructure to track chunk metadata - * Thrift for RPC - * Replication does not require “closed” files (can send incremental updates to slaves) - * Ability to replicate “live inserts” and “bulk imports” - * Provide replication interface with Accumulo->Accumulo implementation - * Do not rely on active Accumulo Master to perform replication (send or receive) -- delegate to a TabletServer - * Use FATE where applicable - * Gracefully handle offline slaves - * Implement read-only variant Master/TabletServer[e] - - -Non-Goals ---------- - * Replicate on smaller granularity than locality group (not individual colfams/colquals or based on visibilities) - * Wire security between master and slave - * Support replication of encrypted data[f] - * Replication of existing data (use importtable & exporttable) - * Enforce replication of table configuration - - -Footnotes ---------- - -*footnotes from google doc, markdown does not support footnotes, left as -is when exported to text from google docs * - -* http://www.cs.mcgill.ca/~kemme/papers/vldb00.html -[a]While the WAL is a useful file format for shipping updates (an append-only file), the actual LogFileKey and LogFileValue pairs may not be sufficient? Might need some extra data internally? Maybe the DFSLogger header could contain that? -[b]This approach makes the assumption that we only begin the replication process when a WAL is closed. This is likely too long of a period of time: an offset and length likely needed to be interested to decrease latency. -[c]This needs to be consistent across clusters. Do we need to control access to ensure that it is? Is it excessive to force users to configure it correctly? -[d]Same as instance.cluster.$name: Do we need to enforce these values? -[e]This isn't an immediate necessity, so I'm tempted to consider punting it as a non-goal for the first implementation -[f]While not in the original scope, it is definitely of great concern. +Accumulo Multi-DataCenter Replication +===================================== + +ACCUMULO-378 deals with disaster recovery techniques in Accumulo through cross-site replication of tables. Data which is +written to one Accumulo instance will automatically be replicated to a separate Accumulo instance. + + +Justification +------------- + +Losing an entire instance really stinks. In addition to natural disasters or facility problems, Hadoop always has the +potential for failure. In the newest versions of Hadoop, the high availability (HA) namenode functionality increases the +redundancy of Hadoop in regards to the single point of failure which the namenode previously was. Despite this, there is +always a varying amount of required administrative intervention to ensure that failure does not result in data loss: +userspace software (the entire Hadoop and Java stack), kernel-space software (filesystem implementations), “expected” +hardware failures (hard drives), unexpected compute hardware failures (NICs, CPU, Memory), and infrastructure failures +(switches and routers). Accumulo currently has the ability for manual snapshots/copies across multiple instances; +however, this is not sufficient for multiple reasons with the biggest reason being a lack of automated replication. + + +Background +---------- + +Apache HBase has had master-master replication, cyclic replication and multi-slave replication since 0.92. This +satisfies a wide range of cross-site replication strategies. Master-master replication lets us have two systems which +both replicate to each other. Both systems can service new writes and will update their “view” of a table from one +another. Cyclic replication allows us to have cycles in our replication graph. This is a generalization of the +master-master strategy in which we may have ultimately have a system which replicates to a system that it receives data +from. A system with three masters, A, B and C, which replicate in a row (A to B, B to C and C to A) is an example of +this. More complicated examples of this can be envisioned when dealing with multiple replicas inside one geographic +region or data center. Multi-slave replication is a relatively simple in that a single master system will replicate to +multiple slaves instead of just one. + + +While these are relatively different to one another, I believe most can be satisfied through a single, master-push, + replication implementation. Although, the proposed data structure should also be capable of supporting a + slave-pull strategy. + + +Implementation +-------------- + +As a first implementation, I will prototype a single master with multiple slave replication strategy. This should grant +us the most flexibility and the most functionality. The general implementation should be capable of application to the +other replication structures (master-master and cyclic-replication). I’ll outline a simple master-slave replication use +case, followed by application of this approach to replication cycles and master-master replication. This approach does +not consider conditional mutations. + + +### Replication Framework + +In an attempt to be as clear as possible, I’ll use the following terminology when explaining the implementation: master +will refer to the “master” Accumulo cluster (the system accepting new writes), slave will refer to the “slave” Accumulo +cluster (the system which does not receive new data through the Accumulo client API, but only from master through + replication). The design results in an eventual consistency model of replication which will allow for slaves to +be offline and the online master to still process new updates. + + +In the simplest notion, when a new file is created by master, we want to ensure that this file is also sent to the +slave. In practice, this new file can either be an RFile that was bulk-imported to master or this can be a write-ahead +log (WAL) file. The bulk-imported RFile is the easy case, but the WAL case merits additional explanation. While data is +being written to Accumulo is it written to a sorted, in-memory map and an append-only WAL file. While the in-memory map +provides a very useful interface for the TabletServer to use for scans and compactions, it is difficult to extract new +updates at the RFile level. As such, this proposed implementation uses the WAL as the transport “file format”[a]. While +it is noted that in sending a WAL to multiple slaves, each slave will need to reprocess each WAL to make Mutations to +apply whereas they could likely be transformed once, that is left as a future optimization. + + +To increase the speed in eventual consistency can be achieved, WAL offsets can be tracked to begin the replication +process before a WAL is closed. We can bin these mutations together for a lazy replication which can be combined to each +target server which amortizes the cost into a single write set message. It is not apparent that this requires +co-location within each source tablet in the Accumulo metadata table which means that the worry of inadvertent errors +caused by placing this data in the metadata table is entirely removed. + + +In every replication graph, which consists of master(s) and slave(s), each system should have a unique identifier. It is +desirable to be able to uniquely identify each system, and each system should have knowledge of the other systems +participating. + + +These identifiers also make implementing cyclic replication easier, as a cluster can ignore any requests to replicate +some data when that request already contains the current cluster’s identifier. In other words, data we try to replicate +will contain a linked list of identifiers with the provenance of where that data came and each cluster can make the +determination of whether or not it has seen this data already (and thus needs to process and propagate it). This also +lets us treat replication rules as a graph which grants us a common terminology to use when describing replication. + + +This framework provides a general strategy to allow pluggable replication strategies to export data out of an Accumulo +cluster. An AccumuloReplicationStrategy is the only presently targeted replication strategy; however, the implementation +should not prohibit alternative approaches to replication such as other databases or filesystems. + + +### Replication Strategy Implementation + + +Henceforth, both of the RFiles and WAL files that need replication can be treated as a chunk of data. This chunk +references a start offset and length from the source (RFile or WAL) which needs to be replicated. This has the nice +property of being able to use a Combiner to combine multiple, sequential chunks into one larger chunk to amortize RPC +costs. + + +#### Make the master aware of file to replicate + + +Let us define a column family that is used to denote a chunk that needs to be replicated: REPL. We first need to let +master know that it has a new chunk which needs to be replicated. When the file comes from a bulk-import, we need to +create a new entry in the !METADATA table for the given tablet with the REPL column family. If the file is a WAL, we +also want to write an entry for the REPL column[b]. In both cases, the chunk’s URI will be stored in the column +qualifier. The Value can contain some serialized data structure to track cluster replication provenance and offset +values. Each row (tablet) in the !METADATA table will contain zero to many REPL columns. As such, the garbage collector +needs to be modified to not delete these files on the master’s HDFS instance until these files are replicated (copied to + the slave). + + +#### Choose local TabletServer to perform replication + + +The Accumulo Master can have a thread that scans the replication table to look for chunks to replicate. When it finds +some, choose a TabletServer to perform the replication to all slaves. The master should use a FATE operation to manage +the state machine of this replication process. The expected principles, such as exponential backoff on network errors, + should be followed. When all slaves have reported successfully receiving the file, the master can remove the REPL + column for the given chunk. + + +On the slave, before beginning transfer, the slave should ascertain a new local, unique filename to use for the remote +file. When the transfer is complete, the file should be treated like log recovery and brought into the appropriate +Tablet. If the slave is also a master (replicating to other nodes), the replicated data should create a new REPL column +in the slave’s table to repeat the replication process, adding in its cluster identifier to the provenance list. +Otherwise, the file can be a candidate for deletion by the garbage collection. + + +The tserver chosen to replicate the data from the master cluster should ideally be the tserver that created that data. +This helps reduce the complexity of dealing with locality later on. If the HDFS blocks written by the tserver are local, + then we gain the same locality perks. + + +#### Recurse + + +In our simple master and slave replication scheme, we are done after the new updates are made available on slave. As +aforementioned, it is relatively easy to “schedule” replication of a new file on slave because we just repeat the same +process that master did to replicate to slave in the first place. + + +### Master cluster replication “bookkeeping” + + +This section outlines the steps on the master cluster to manage the lifecycle of data: when/what data needs to be +replicated and when is a file safe to be removed. + + +Two key structures are used to implement this bookkeeping: + + +1. Tablet-level entry: tabletId repl:fully-qualified-file [] value + + +2. Row-prefix space at end of accumulo.metadata or its own table: *~repl*_fully-qualified-file +clusterName:remoteTableID [] value + + +These two key structures will be outlined below, with “*repl* column” and “*~repl* row” denoting which is being referred to. + + +#### Data Structure in Value + + +To avoid the necessity of using conditional mutations or other “transaction-like” operations, we can use a combiner to +generate an aggregate view of replication information. Protobuf is decent choice, however, the description isn’t tied to +any implementation. I believe a Combiner used in conjunction with the following data structure provides all necessary +functionality: + + + ``// Tracks general lifecycle of the data: file is open and might have new data to replicate, or the file has been`` + ``// closed and will have no new data to replicate`` + + + ``State:Enum { OPEN, CLOSED }`` + + + ``ReplicationStatus { State state, long replication_needed_offset, long replication_finished_offset }`` + + +The offsets refer to the contiguous ranges of records (key-values) written to the WAL. The replication_finished_offset +value tracks what data has been replicated to the given cluster and while the replication_needed_offset value tracks how +much data has been written to the WAL that is ready for replication. replication_finished_offset should always be less +than or equal to replication_needed_offset. For RFiles instead of WALs, state is always CLOSED and +replication_needed_offset is initialized to the length of the RFile. In this context, one can consider the RFile as a +read-only file and the WAL as an append-only file. + + +For *~repl* entries, the target clusterName and remote tableId would be stored in the key to preserve uniqueness. Using +this information, we would be able to implement the following methods: + + + ``bool isFullyReplicated(ReplicationStatus)`` + ``Pair rangeNeedingReplication(ReplicationStatus)`` + + +The isFullyReplicated method is straightforward: given the values for start/finish stored for data that needs to be +replicated, and the values for start/finish stored for data that has been replicated and the state is CLOSED, is there +still more data for this ReplicationStatus that needs to be replicated for the given clustername/tableID. + + +rangeNeedingReplication is a bit more complicated. Given the end of a range of data that has already been replicated, +some the end of a range of data that still needs replication, return a range of data that has +not yet been replicated. For example, if keyvalues up to offset 100 in a WAL have already been +replicated and keyvalues up to offset 300 are marked as needing replication, this method should +return [101,300]. Ranges of data replicated, and data needing replication must always be +disjoint and contiguous to ensure that data is replayed in the correct order on the slave. + + +The use of a Combiner is used to create a basic notion of “addition” and “subtraction”. We cannot use deletes to manage +this without creating a custom iterator, which would not be desirable since it would required to run over the entire +accumulo.metadata table. Avoiding deletions exception on cleanup is also desired to avoid handling “tombstone’ing” +future version of a Key. The addition operation is when new data is appended to the WAL which signifies new data to be +replicated. This equates to an addition to replication_needed_offset. The subtraction operation is when data from the +WAL has be successfully replicated to the slave for this *~repl* record. This is implemented as an addition to the +replication_finished_offset. + + +When CLOSED is set on a ReplicationStatus, this implies that the WAL has been closed and no new offsets will be added is +would be tracked via the *repl* column. As such, a ReplicationStatus “object” is candidate for deletion when the state is +CLOSED and replication_finished_offset is equal to replication_needed_offset. A value of CLOSED for state is always +propagated over the NEW state. An addition after the state is CLOSED is an invalid operation and would be a logic error. + + +Consider the case of a new data being ingested to the cluster: the following discrete steps should happen. The +assumption that replication is enabled is made to not distract from the actual steps. As previously mentioned, a +combiner must be set on the *repl* column to aggregate the values to properly maintain replication state. The following is +what a tserver will do. + + +1) When a new WAL is created by request of a tserver and the log column is created for a *repl* column within the tablet’s +row to track that this WAL will need to be replicated. + + + INSERT + tablet repl:hdfs://localhost:8020/accumulo/.../wal/... -> ReplicationStatus(state=OPEN) + + +2) As the tserver using this WAL finishes commits to the WAL, it should submit a new mutation to track the current +length of data in the WAL that it just wrote that needs to be read for purposes of replication. + + + INSERT + tablet repl:hdfs://localhost:8020/accumulo/.../wal/... -> ReplicationStatus(addition + offset) + + +3) Eventually, the tablet server will finish using a WAL, minc contents of memory to disk, and mark the WAL as unused. +This results in updating the state to be CLOSED. + + + INSERT + tablet repl:hdfs://localhost:8020/accumulo/.../wal/… -> ReplicationStatus(state=CLOSED) + + +The master also needs a new thread to process the *repl* columns across all tablets in a table and create *~repl* row +entries for the file and where it should be replicated to. The high-level goals for this thread are as follows: + + +1) Create mutations for a WAL that outline where the file must be replicated to (cluster and tabletID) + + + INSERT + *~repl*_hdfs://localhost:8020/accumulo/.../wal/… clusterName:tableId -> ReplicationStatus(addition + offset) + + +2) Determine when the *repl* column in a tablet is safe for deletion (all data for it has been replicated). This is the +sign that the GC can then remove this file. + + + DELETE + tablet repl:hdfs://localhost:8020/accumulo/.../wal/… + + +This can be accomplished with a single thread that scans the metadata table: + + +1) Construct “snapshot” of tablet *repl* file entries with aggregated offsets, sorted by file, + + + [hdfs://localhost:8020/.../file1 => {[tablet1, RS], [tablet2, RS], ... }, + hdfs://localhost:8020/.../file2 => {[tablet3, RS], [tablet4, RS], ... }, + hdfs://localhost:8020/.../file3 => {[tablet5, [RS:CLOSED]], [tablet6, [RS:CLOSED]], ...] ] + + +2) Begin scanning *~repl* row-prefix with Scanner, perform merged read to join “state” from aggregate *repl* column across +tablets, and columns in *~repl* row for the file. + + + for each file in *~repl* rowspace: + if all columns in *~repl*_file row isFullyReplicated: + issue deletes for file in *repl* column for all tablets with references + if delete of *repl* is successful: + delete *~repl* row + else if *~repl* row exists but no *repl* columns: + // Catch failure case from first conditional + delete *~repl* row + else + for each file in “snapshot” of *repl* columns: + make mutation for *~repl*_file + for each slave cluster in configuration: + if file should be replicated on slave: + add column for clusterid:remote_tableID -> RS + + +Combiner should be set on all columns in *~repl* prefix rowspace and the *repl* colfam to ensure multiple runs of the +described procedure without actual replication occurring to aggregate data that needs replication. Configuration + + +Replication can be configured on a per-locality-group, replicated that locality group to one or more slaves. Given that +we have dynamic column families, trying to track per-column-family replication would be unnecessarily difficult. +Configuration requires new configuration variables that need to be introduced to support the necessary information. Each +slave is defined with a name and the zookeeper quorum of the remote cluster to locate the active Accumulo Master. The +API should ease configuration on replication across all locality groups. Replication cannot be configured on the root or +metadata table. + + +Site-wide: +// The name and location of other clusters +instance.cluster.$name.zookeepers=zk1,zk2,zk3[c] +// The name of this cluster +instance.replication.name=my_cluster_name[d] + +Per-table: +// Declare the locality group(s) that should be replicated and the clusters that they should be replicated to +table.replication.$locality_group_name=cluster1,cluster2,... + + +Shell commands can also be created to make this configuration easier. + + +definecluster cluster_name zookeeper_quorum + + +e.g. definecluster slave slaveZK1:2181,slaveZK2:2181,slaveZK3:2181 + + + + +deletecluster cluster_name zookeeper_quorum + + +e.g. deletecluster slave slaveZK1:2181,slaveZK2:2181,slaveZK3:2181 + + + + +enablereplication -t table (-lg loc_group | --all-loc-groups) cluster_name + + +e.g. enablereplication -t foo -lg cf1 slave1 enablereplication -t foo -all-loc-groups slave1 + + + + + + +disablereplication -t table (-lg loc_group | --all-loc-groups) cluster_name + + +e.g. disablereplication -t foo -lg cf1 slave1 disablereplication -t foo -all-loc-groups slave1 + + +For slaves, we likely do not want to allow users to perform writes against the cluster. Thus, they should be read-only. +This likely requires custom configuration and some ZK state to not accept regular API connections. Should be +exposed/controllable by the shell, too. Common Questions + + +*How do conditional mutations work with this approach?* + + +They do not. They will need to throw an Exception. + + +*How does replication work on a table which already contains data?* + + +When replication is enabled on a table, all new data will be replicated. This implementation does not attempt to support +this as the existing importtable and exporttable already provide support to do this. + + +*When I update a table property on the master, will it propagate to the slave?* + + +There are both arguments for and against this. We likely want to revisit this later as a configuration parameter that +could allow the user to choose if this should happen. We should avoid implementations that would tie us to one or the +other. + + +As an argument against this, consider a production and a backup cluster where the backup cluster is smaller in number of +nodes, but contains more disks. Despite wanting to replicate the data in a table, the configuration of that table may +not be desired (e.g. split threshold, compression codecs, etc). Another argument against could be age-off. If a replica +cluster is not the same size as the production cluster (which is extremely plausible) you would not want the same +age-off rules for both the production and replica. + + +An argument for this feature is that you would want custom compaction iterators (as a combiner, for example) to only be +configured on a table once. You would want these iterators to appear on all replicas. Such an implementation is also +difficult in master-master situations as we don’t have a shared ZooKeeper instance that we can use to reliably commit +these changes. + + +*What happens in master-master if two Keys are exactly the same with different values?* + + +Non-deterministic - mostly because we already have this problem: https://issues.apache.org/jira/browse/ACCUMULO-1528 + + +*Did you come up with this all on your own?* + + +Ha, no. Big thanks goes out to HBase’s documentation, Enis Söztutar (HBase), and other Accumulo devs that I’ve bounced +these ideas off of (too many to enumerate). + + + + +Goals +1. Master-Slave configuration that doesn’t exclude future master-master work Per locality-group replication configuration +2. Shell administration of replication Accumulo Monitor integration/insight to replication status State machines for +3. lifecycle of chunks Versionable (read-as protobuf) datastructure to track chunk metadata Thrift for RPC Replication +4. does not require “closed” files (can send incremental updates to slaves) Ability to replicate “live inserts” and “bulk +5. imports” Provide replication interface with Accumulo->Accumulo implementation Do not rely on active Accumulo Master to +6. perform replication (send or receive) -- delegate to a TabletServer Use FATE where applicable Gracefully handle +7. offline slaves Implement read-only variant Master/TabletServer[e] + + +Non-Goals +1. Replicate on smaller granularity than locality group (not individual colfams/colquals or based on visibilities) +2. Wire security between master and slave +3. Support replication of encrypted data[f] +4. Replication of existing data (use importtable & exporttable) +5. Enforce replication of table configuration + + +References + + +* http://www.cs.mcgill.ca/~kemme/papers/vldb00.html +[a] While the WAL is a useful file format for shipping updates (an append-only file), the actual LogFileKey and +LogFileValue pairs may not be sufficient? Might need some extra data internally? Maybe the DFSLogger header could +contain that? +[b] This approach makes the assumption that we only begin the replication process when a WAL is closed. +This is likely too long of a period of time: an offset and length likely needed to be interested to decrease latency. +[c] This needs to be consistent across clusters. Do we need to control access to ensure that it is? Is it excessive to +force users to configure it correctly? +[d] Same as instance.cluster.$name: Do we need to enforce these values? +[e] This isn't an immediate necessity, so I'm tempted to consider punting it as a non-goal for the first implementation +[f] While not in the original scope, it is definitely of great concern.