accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From els...@apache.org
Subject [2/3] git commit: ACCUMULO-378 Add details on replication "bookkeeping" on the master cluster.
Date Fri, 04 Apr 2014 22:35:48 GMT
ACCUMULO-378 Add details on replication "bookkeeping" on the master cluster.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/de7f591a
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/de7f591a
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/de7f591a

Branch: refs/heads/ACCUMULO-378
Commit: de7f591ab6f818e1967f0d4d0e266a803b6f086d
Parents: 13561eb
Author: Josh Elser <elserj@apache.org>
Authored: Thu Apr 3 17:14:39 2014 -0400
Committer: Josh Elser <elserj@apache.org>
Committed: Fri Apr 4 16:52:12 2014 -0400

----------------------------------------------------------------------
 .../resources/design/ACCUMULO-378-design.mdtext | 663 +++++++++++++------
 1 file changed, 453 insertions(+), 210 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/de7f591a/docs/src/main/resources/design/ACCUMULO-378-design.mdtext
----------------------------------------------------------------------
diff --git a/docs/src/main/resources/design/ACCUMULO-378-design.mdtext b/docs/src/main/resources/design/ACCUMULO-378-design.mdtext
index 23a85d1..28a2187 100644
--- a/docs/src/main/resources/design/ACCUMULO-378-design.mdtext
+++ b/docs/src/main/resources/design/ACCUMULO-378-design.mdtext
@@ -1,210 +1,453 @@
-Accumulo Multi-DataCenter Replication
-=====================================
-
-ACCUMULO-378 deals with disaster recovery techniques in Accumulo through cross-site replication
of tables. Data which is written to one Accumulo instance will automatically be replicated
to a separate Accumulo instance.
-
-
-Justification
--------------
-
-Losing an entire instance really stinks. In addition to natural disasters or facility problems,
Hadoop always has the potential for failure. In the newest versions of Hadoop, the high availability
(HA) namenode functionality increases the redundancy of Hadoop in regards to the single point
of failure which the namenode previously was. Despite this, there is always a varying amount
of required administrative intervention to ensure that failure does not result in data loss:
userspace software (the entire Hadoop and Java stack), kernel-space software (filesystem implementations),
“expected” hardware failures (hard drives), unexpected compute hardware failures (NICs,
CPU, Memory), and infrastructure failures (switches and routers). Accumulo currently has the
ability for manual snapshots/copies across multiple instances; however, this is not sufficient
for multiple reasons with the biggest reason being a lack of automated replication.
-
-
-Background
-----------
-
-Apache HBase has had master-master replication, cyclic replication and multi-slave replication
since 0.92. This satisfies a wide range of cross-site replication strategies. Master-master
replication lets us have two systems which both replicate to each other. Both systems can
service new writes and will update their “view” of a table from one another. Cyclic replication
allows us to have cycles in our replication graph. This is a generalization of the master-master
strategy in which we may have ultimately have a system which replicates to a system that it
receives data from. A system with three masters, A, B and C, which replicate in a row (A to
B, B to C and C to A) is an example of this. More complicated examples of this can be envisioned
when dealing with multiple replicas inside one geographic region or data center. Multi-slave
replication is a relatively simple in that a single master system will replicate to multiple
slaves instead of just one.
-
-
-While these are relatively different to one another, I believe most can be satisfied through
a single, master-push, replication implementation. Although, the proposed data structure should
also be capable of supporting a slave-pull strategy.
-
-
-Implementation
---------------
-
-As a first implementation, I will prototype a single master with multiple slave replication
strategy. This should grant us the most flexibility and the most functionality. The general
implementation should be capable of application to the other replication structures (master-master
and cyclic-replication). I’ll outline a simple master-slave replication use case, followed
by application of this approach to replication cycles and master-master replication. This
approach does not consider conditional mutations.
-
-### Replication Framework
-
-In an attempt to be as clear as possible, I’ll use the following terminology when explaining
the implementation: master will refer to the “master” Accumulo cluster (the system accepting
new writes), slave will refer to the “slave” Accumulo cluster (the system which does not
receive new data through the Accumulo client API, but only from master through replication).
The design results in an eventual consistency model of replication which will allow for slaves
to be offline and the online master to still process new updates.
-
-
-In the simplest notion, when a new file is created by master, we want to ensure that this
file is also sent to the slave. In practice, this new file can either be an RFile that was
bulk-imported to master or this can be a write-ahead log (WAL) file. The bulk-imported RFile
is the easy case, but the WAL case merits additional explanation. While data is being written
to Accumulo is it written to a sorted, in-memory map and an append-only WAL file. While the
in-memory map provides a very useful interface for the TabletServer to use for scans and compactions,
it is difficult to extract new updates at the RFile level. As such, this proposed implementation
uses the WAL as the transport “file format”[a]. While it is noted that in sending a WAL
to multiple slaves, each slave will need to reprocess each WAL to make Mutations to apply
whereas they could likely be transformed once, that is left as a future optimization.
-
-
-To increase the speed in eventual consistency can be achieved, WAL offsets can be tracked
to begin the replication process before a WAL is closed. We can bin these mutations together
for a lazy replication which can be combined to each target server which amortizes the cost
into a single write set message. It is not apparent that this requires co-location within
each source tablet in the Accumulo metadata table which means that the worry of inadvertent
errors caused by placing this data in the metadata table is entirely removed.
-
-
-In every replication graph, which consists of master(s) and slave(s), each system should
have a unique identifier. It is desirable to be able to uniquely identify each system, and
each system should have knowledge of the other systems participating.
-
-
-These identifiers also make implementing cyclic replication easier, as a cluster can ignore
any requests to replicate some data when that request already contains the current cluster’s
identifier. In other words, data we try to replicate will contain a linked list of identifiers
with the provenance of where that data came and each cluster can make the determination of
whether or not it has seen this data already (and thus needs to process and propagate it).
This also lets us treat replication rules as a graph which grants us a common terminology
to use when describing replication.
-
-
-This framework provides a general strategy to allow pluggable replication strategies to export
data out of an Accumulo cluster. An AccumuloReplicationStrategy is the only presently targeted
replication strategy; however, the implementation should not prohibit alternative approaches
to replication such as other databases or filesystems.
-
-
-### Replication Strategy Implementation
-
-
-Henceforth, both of the RFiles and WAL files that need replication can be treated as a chunk
of data. This chunk references a start offset and length from the source (RFile or WAL) which
needs to be replicated. This has the nice property of being able to use a Combiner to combine
multiple, sequential chunks into one larger chunk to amortize RPC costs.
-
-
-#### Make the master aware of file to replicate
-
-
-Let us define a column family that is used to denote a chunk that needs to be replicated:
REPL. We first need to let master know that it has a new chunk which needs to be replicated.
When the file comes from a bulk-import, we need to create a new entry in the !METADATA table
for the given tablet with the REPL column family. If the file is a WAL, we also want to write
an entry for the REPL column[b]. In both cases, the chunk’s URI will be stored in the column
qualifier. The Value can contain some serialized data structure to track cluster replication
provenance and offset values. Each row (tablet) in the !METADATA table will contain zero to
many REPL columns. As such, the garbage collector needs to be modified to not delete these
files on the master’s HDFS instance until these files are replicated (copied to the slave).
-
-
-#### Choose local TabletServer to perform replication
-
-
-The Accumulo Master can have a thread that scans the replication table to look for chunks
to replicate. When it finds some, choose a TabletServer to perform the replication to all
slaves. The master should use a FATE operation to manage the state machine of this replication
process. The expected principles, such as exponential backoff on network errors, should be
followed. When all slaves have reported successfully receiving the file, the master can remove
the REPL column for the given chunk. On the slave, before beginning transfer, the slave should
ascertain a new local, unique filename to use for the remote file. When the transfer is complete,
the file should be treated like log recovery and brought into the appropriate Tablet. If the
slave is also a master (replicating to other nodes), the replicated data should create a new
REPL column in the slave’s table to repeat the replication process, adding in its cluster
identifier to the provenance list. Otherwise, the file can be a c
 andidate for deletion by the garbage collection.
-
-
-The tserver chosen to replicate the data from the master cluster should ideally be the tserver
that created that data. This helps reduce the complexity of dealing with locality later on.
If the HDFS blocks written by the tserver are local, then we gain the same locality perks.
-
-
-#### Recurse
-
-
-In our simple master and slave replication scheme, we are done after the new updates are
made available on slave. As aforementioned, it is relatively easy to “schedule” replication
of a new file on slave because we just repeat the same process that master did to replicate
to slave in the first place.
-
-
-Configuration
--------------
-
-Replication can be configured on a per-locality-group, replicated that locality group to
one or more slaves. Given that we have dynamic column families, trying to track per-column-family
replication would be unnecessarily difficult. Configuration requires new configuration variables
that need to be introduced to support the necessary information. Each slave is defined with
a name and the zookeeper quorum of the remote cluster to locate the active Accumulo Master.
The API should ease configuration on replication across all locality groups. Replication cannot
be configured on the root or metadata table.
-
-
-Site-wide:
-# The name and location of other clusters
-instance.cluster.$name.zookeepers=zk1,zk2,zk3[c]
-# The name of this cluster
-instance.replication.name=my_cluster_name[d]
-
-Per-table:
-# Declare the locality group(s) that should be replicated and the clusters that they should
be replicated to
-table.replication.$locality_group_name=cluster1,cluster2,...
-
-
-Shell commands can also be created to make this configuration easier.
-
-
-definecluster cluster_name zookeeper_quorum
-
-
-e.g.  definecluster slave slaveZK1:2181,slaveZK2:2181,slaveZK3:2181
-
-
-
-
-deletecluster cluster_name zookeeper_quorum
-
-
-e.g.  deletecluster slave slaveZK1:2181,slaveZK2:2181,slaveZK3:2181
-
-
-
-
-enablereplication -t table (-lg loc_group | --all-loc-groups) cluster_name
-
-
-e.g. enablereplication -t foo -lg cf1 slave1
-       enablereplication -t foo -all-loc-groups slave1
-
-
-
-
-
-
-disablereplication -t table (-lg loc_group | --all-loc-groups) cluster_name
-
-
-e.g. disablereplication -t foo -lg cf1 slave1
-       disablereplication -t foo -all-loc-groups slave1
-
-
-For slaves, we likely do not want to allow users to perform writes against the cluster. Thus,
they should be read-only. This likely requires custom configuration and some ZK state to not
accept regular API connections. Should be exposed/controllable by the shell, too.
-
-Common Questions
-----------------
-
-*How do conditional mutations work with this approach?*
-
-
-I have absolutely no idea. They likely won’t work out in master-master situations, but
might be ok in master-slave cases?
-
-
-*How does replication work on a table which already contains data?*
-
-
-When replication is enabled on a table, all new data will be replicated. This implementation
does not attempt to support this as the existing importtable and exporttable already provide
support to do this.
-
-
-*When I update a table property on the master, will it propagate to the slave?*
-
-
-There are both arguments for and against this. We likely want to revisit this later as a
configuration parameter that could allow the user to choose if this should happen. We should
avoid implementations that would tie us to one or the other.
-
-
-As an argument against this, consider a production and a backup cluster where the backup
cluster is smaller in number of nodes, but contains more disks. Despite wanting to replicate
the data in a table, the configuration of that table may not be desired (e.g. split threshold,
compression codecs, etc). Another argument against could be age-off. If a replica cluster
is not the same size as the production cluster (which is extremely plausible) you would not
want the same age-off rules for both the production and replica.
-
-
-An argument for this feature is that you would want custom compaction iterators (as a combiner,
for example) to only be configured on a table once. You would want these iterators to appear
on all replicas. Such an implementation is also difficult in master-master situations as we
don’t have a shared ZooKeeper instance that we can use to reliably commit these changes.
-
-
-*What happens in master-master if two Keys are exactly the same with different values?*
-
-
-Non-deterministic - mostly because we already have this problem: https://issues.apache.org/jira/browse/ACCUMULO-1528
-
-
-*Did you come up with this all on your own?*
-
-
-Ha, no. Big thanks goes out to HBase’s documentation, Enis Söztutar (HBase), and other
Accumulo devs that I’ve bounced these ideas off of (too many to enumerate).
-
-
-
-
-Goals
------
- * Master-Slave configuration that doesn’t exclude future master-master work
- * Per locality-group replication configuration
- * Shell administration of replication
- * Accumulo Monitor integration/insight to replication status
- * State machines for lifecycle of chunks
- * Versionable (read-as protobuf) datastructure to track chunk metadata
- * Thrift for RPC
- * Replication does not require “closed” files (can send incremental updates to slaves)
- * Ability to replicate “live inserts” and “bulk imports”
- * Provide replication interface with Accumulo->Accumulo implementation
- * Do not rely on active Accumulo Master to perform replication (send or receive) -- delegate
to a TabletServer
- * Use FATE where applicable
- * Gracefully handle offline slaves
- * Implement read-only variant Master/TabletServer[e]
-
-
-Non-Goals
----------
- * Replicate on smaller granularity than locality group (not individual colfams/colquals
or based on visibilities)
- * Wire security between master and slave
- * Support replication of encrypted data[f]
- * Replication of existing data (use importtable & exporttable)
- * Enforce replication of table configuration
-
-
-Footnotes
----------
-
-*footnotes from google doc, markdown does not support footnotes, left as
-is when exported to text from google docs *
-
-* http://www.cs.mcgill.ca/~kemme/papers/vldb00.html
-[a]While the WAL is a useful file format for shipping updates (an append-only file), the
actual LogFileKey and LogFileValue pairs may not be sufficient? Might need some extra data
internally? Maybe the DFSLogger header could contain that?
-[b]This approach makes the assumption that we only begin the replication process when a WAL
is closed. This is likely too long of a period of time: an offset and length likely needed
to be interested to decrease latency.
-[c]This needs to be consistent across clusters. Do we need to control access to ensure that
it is? Is it excessive to force users to configure it correctly?
-[d]Same as instance.cluster.$name: Do we need to enforce these values?
-[e]This isn't an immediate necessity, so I'm tempted to consider punting it as a non-goal
for the first implementation
-[f]While not in the original scope, it is definitely of great concern.
+Accumulo Multi-DataCenter Replication
+=====================================
+
+ACCUMULO-378 deals with disaster recovery techniques in Accumulo through cross-site replication
of tables. Data which is
+written to one Accumulo instance will automatically be replicated to a separate Accumulo
instance.
+
+
+Justification
+-------------
+
+Losing an entire instance really stinks. In addition to natural disasters or facility problems,
Hadoop always has the
+potential for failure. In the newest versions of Hadoop, the high availability (HA) namenode
functionality increases the
+redundancy of Hadoop in regards to the single point of failure which the namenode previously
was. Despite this, there is
+always a varying amount of required administrative intervention to ensure that failure does
not result in data loss:
+userspace software (the entire Hadoop and Java stack), kernel-space software (filesystem
implementations), “expected”
+hardware failures (hard drives), unexpected compute hardware failures (NICs, CPU, Memory),
and infrastructure failures
+(switches and routers). Accumulo currently has the ability for manual snapshots/copies across
multiple instances;
+however, this is not sufficient for multiple reasons with the biggest reason being a lack
of automated replication.
+
+
+Background
+----------
+
+Apache HBase has had master-master replication, cyclic replication and multi-slave replication
since 0.92. This
+satisfies a wide range of cross-site replication strategies. Master-master replication lets
us have two systems which
+both replicate to each other. Both systems can service new writes and will update their “view”
of a table from one
+another. Cyclic replication allows us to have cycles in our replication graph. This is a
generalization of the
+master-master strategy in which we may have ultimately have a system which replicates to
a system that it receives data
+from. A system with three masters, A, B and C, which replicate in a row (A to B, B to C and
C to A) is an example of
+this. More complicated examples of this can be envisioned when dealing with multiple replicas
inside one geographic
+region or data center. Multi-slave replication is a relatively simple in that a single master
system will replicate to
+multiple slaves instead of just one.
+
+
+While these are relatively different to one another, I believe most can be satisfied through
a single, master-push,
+      replication implementation. Although, the proposed data structure should also be capable
of supporting a
+      slave-pull strategy.
+
+
+Implementation
+--------------
+
+As a first implementation, I will prototype a single master with multiple slave replication
strategy. This should grant
+us the most flexibility and the most functionality. The general implementation should be
capable of application to the
+other replication structures (master-master and cyclic-replication). I’ll outline a simple
master-slave replication use
+case, followed by application of this approach to replication cycles and master-master replication.
This approach does
+not consider conditional mutations.
+
+
+### Replication Framework
+
+In an attempt to be as clear as possible, I’ll use the following terminology when explaining
the implementation: master
+will refer to the “master” Accumulo cluster (the system accepting new writes), slave
will refer to the “slave” Accumulo
+cluster (the system which does not receive new data through the Accumulo client API, but
only from master through
+        replication). The design results in an eventual consistency model of replication
which will allow for slaves to
+be offline and the online master to still process new updates.
+
+
+In the simplest notion, when a new file is created by master, we want to ensure that this
file is also sent to the
+slave. In practice, this new file can either be an RFile that was bulk-imported to master
or this can be a write-ahead
+log (WAL) file. The bulk-imported RFile is the easy case, but the WAL case merits additional
explanation. While data is
+being written to Accumulo is it written to a sorted, in-memory map and an append-only WAL
file. While the in-memory map
+provides a very useful interface for the TabletServer to use for scans and compactions, it
is difficult to extract new
+updates at the RFile level. As such, this proposed implementation uses the WAL as the transport
“file format”[a]. While
+it is noted that in sending a WAL to multiple slaves, each slave will need to reprocess each
WAL to make Mutations to
+apply whereas they could likely be transformed once, that is left as a future optimization.
+
+
+To increase the speed in eventual consistency can be achieved, WAL offsets can be tracked
to begin the replication
+process before a WAL is closed. We can bin these mutations together for a lazy replication
which can be combined to each
+target server which amortizes the cost into a single write set message. It is not apparent
that this requires
+co-location within each source tablet in the Accumulo metadata table which means that the
worry of inadvertent errors
+caused by placing this data in the metadata table is entirely removed.
+
+
+In every replication graph, which consists of master(s) and slave(s), each system should
have a unique identifier. It is
+desirable to be able to uniquely identify each system, and each system should have knowledge
of the other systems
+participating.
+
+
+These identifiers also make implementing cyclic replication easier, as a cluster can ignore
any requests to replicate
+some data when that request already contains the current cluster’s identifier. In other
words, data we try to replicate
+will contain a linked list of identifiers with the provenance of where that data came and
each cluster can make the
+determination of whether or not it has seen this data already (and thus needs to process
and propagate it). This also
+lets us treat replication rules as a graph which grants us a common terminology to use when
describing replication.
+
+
+This framework provides a general strategy to allow pluggable replication strategies to export
data out of an Accumulo
+cluster. An AccumuloReplicationStrategy is the only presently targeted replication strategy;
however, the implementation
+should not prohibit alternative approaches to replication such as other databases or filesystems.
+
+
+### Replication Strategy Implementation
+
+
+Henceforth, both of the RFiles and WAL files that need replication can be treated as a chunk
of data. This chunk
+references a start offset and length from the source (RFile or WAL) which needs to be replicated.
This has the nice
+property of being able to use a Combiner to combine multiple, sequential chunks into one
larger chunk to amortize RPC
+costs.
+
+
+#### Make the master aware of file to replicate
+
+
+Let us define a column family that is used to denote a chunk that needs to be replicated:
REPL. We first need to let
+master know that it has a new chunk which needs to be replicated. When the file comes from
a bulk-import, we need to
+create a new entry in the !METADATA table for the given tablet with the REPL column family.
If the file is a WAL, we
+also want to write an entry for the REPL column[b]. In both cases, the chunk’s URI will
be stored in the column
+qualifier. The Value can contain some serialized data structure to track cluster replication
provenance and offset
+values. Each row (tablet) in the !METADATA table will contain zero to many REPL columns.
As such, the garbage collector
+needs to be modified to not delete these files on the master’s HDFS instance until these
files are replicated (copied to
+        the slave).
+
+
+#### Choose local TabletServer to perform replication
+
+
+The Accumulo Master can have a thread that scans the replication table to look for chunks
to replicate. When it finds
+some, choose a TabletServer to perform the replication to all slaves. The master should use
a FATE operation to manage
+the state machine of this replication process. The expected principles, such as exponential
backoff on network errors,
+    should be followed. When all slaves have reported successfully receiving the file, the
master can remove the REPL
+    column for the given chunk. 
+
+
+On the slave, before beginning transfer, the slave should ascertain a new local, unique filename
to use for the remote
+file. When the transfer is complete, the file should be treated like log recovery and brought
into the appropriate
+Tablet. If the slave is also a master (replicating to other nodes), the replicated data should
create a new REPL column
+in the slave’s table to repeat the replication process, adding in its cluster identifier
to the provenance list.
+Otherwise, the file can be a candidate for deletion by the garbage collection.
+
+
+The tserver chosen to replicate the data from the master cluster should ideally be the tserver
that created that data.
+This helps reduce the complexity of dealing with locality later on. If the HDFS blocks written
by the tserver are local,
+     then we gain the same locality perks.
+
+
+#### Recurse
+
+
+In our simple master and slave replication scheme, we are done after the new updates are
made available on slave. As
+aforementioned, it is relatively easy to “schedule” replication of a new file on slave
because we just repeat the same
+process that master did to replicate to slave in the first place.
+
+
+### Master cluster replication “bookkeeping”
+
+
+This section outlines the steps on the master cluster to manage the lifecycle of data: when/what
data needs to be
+replicated and when is a file safe to be removed.
+
+
+Two key structures are used to implement this bookkeeping:
+
+
+1. Tablet-level entry: tabletId        repl:fully-qualified-file        []        value
+
+
+2. Row-prefix space at end of accumulo.metadata or its own table: *~repl*_fully-qualified-file
+clusterName:remoteTableID        []        value
+
+
+These two key structures will be outlined below, with “*repl* column” and “*~repl*
row” denoting which is being referred to.
+
+
+#### Data Structure in Value
+
+
+To avoid the necessity of using conditional mutations or other “transaction-like” operations,
we can use a combiner to
+generate an aggregate view of replication information. Protobuf is decent choice, however,
the description isn’t tied to
+any implementation. I believe a Combiner used in conjunction with the following data structure
provides all necessary
+functionality:
+
+
+        ``// Tracks general lifecycle of the data: file is open and might have new data to
replicate, or the file has been``
+        ``// closed and will have no new data to replicate``
+
+
+        ``State:Enum { OPEN, CLOSED }``
+
+
+        ``ReplicationStatus { State state, long replication_needed_offset, long replication_finished_offset
}``
+
+
+The offsets refer to the contiguous ranges of records (key-values) written to the WAL. The
replication_finished_offset
+value tracks what data has been replicated to the given cluster and while the replication_needed_offset
value tracks how
+much data has been written to the WAL that is ready for replication. replication_finished_offset
should always be less
+than or equal to replication_needed_offset. For RFiles instead of WALs, state is always CLOSED
and
+replication_needed_offset is initialized to the length of the RFile. In this context, one
can consider the RFile as a
+read-only file and the WAL as an append-only file.
+
+
+For *~repl* entries, the target clusterName and remote tableId would be stored in the key
to preserve uniqueness. Using
+this information, we would be able to implement the following methods:
+
+
+    ``bool        isFullyReplicated(ReplicationStatus)``
+    ``Pair<long,long> rangeNeedingReplication(ReplicationStatus)``
+
+
+The isFullyReplicated method is straightforward: given the values for start/finish stored
for data that needs to be
+replicated, and the values for start/finish stored for data that has been replicated and
the state is CLOSED, is there
+still more data for this ReplicationStatus that needs to be replicated for the given clustername/tableID.
+
+
+rangeNeedingReplication is a bit more complicated. Given the end of a range of data that
has already been replicated,
+some the end of a range of data that still needs replication, return a range of data that
has
+not yet been replicated. For example, if keyvalues up to offset 100 in a WAL have already
been
+replicated and keyvalues up to offset 300 are marked as needing replication, this method
should
+return [101,300]. Ranges of data replicated, and data needing replication must always be
+disjoint and contiguous to ensure that data is replayed in the correct order on the slave.
+
+
+The use of a Combiner is used to create a basic notion of “addition” and “subtraction”.
We cannot use deletes to manage
+this without creating a custom iterator, which would not be desirable since it would required
to run over the entire
+accumulo.metadata table. Avoiding deletions exception on cleanup is also desired to avoid
handling “tombstone’ing”
+future version of a Key. The addition operation is when new data is appended to the WAL which
signifies new data to be
+replicated. This equates to an addition to replication_needed_offset. The subtraction operation
is when data from the
+WAL has be successfully replicated to the slave for this *~repl* record. This is implemented
as an addition to the
+replication_finished_offset.
+
+
+When CLOSED is set on a ReplicationStatus, this implies that the WAL has been closed and
no new offsets will be added is
+would be tracked via the *repl* column. As such, a ReplicationStatus “object” is candidate
for deletion when the state is
+CLOSED and replication_finished_offset is equal to replication_needed_offset. A value of
CLOSED for state is always
+propagated over the NEW state. An addition after the state is CLOSED is an invalid operation
and would be a logic error.
+
+
+Consider the case of a new data being ingested to the cluster: the following discrete steps
should happen. The
+assumption that replication is enabled is made to not distract from the actual steps. As
previously mentioned, a
+combiner must be set on the *repl* column to aggregate the values to properly maintain replication
state. The following is
+what a tserver will do.
+
+
+1) When a new WAL is created by request of a tserver and the log column is created for a
*repl* column within the tablet’s
+row to track that this WAL will need to be replicated.
+
+
+        INSERT
+        tablet        repl:hdfs://localhost:8020/accumulo/.../wal/...  -> ReplicationStatus(state=OPEN)
+
+
+2) As the tserver using this WAL finishes commits to the WAL, it should submit a new mutation
to track the current
+length of data in the WAL that it just wrote that needs to be read for purposes of replication.
+
+
+        INSERT
+        tablet        repl:hdfs://localhost:8020/accumulo/.../wal/...        -> ReplicationStatus(addition
+        offset)
+
+
+3) Eventually, the tablet server will finish using a WAL, minc contents of memory to disk,
and mark the WAL as unused.
+This results in updating the state to be CLOSED.
+
+
+        INSERT
+        tablet repl:hdfs://localhost:8020/accumulo/.../wal/…        -> ReplicationStatus(state=CLOSED)
+
+
+The master also needs a new thread to process the *repl* columns across all tablets in a
table and create *~repl* row
+entries for the file and where it should be replicated to. The high-level goals for this
thread are as follows:
+
+
+1) Create mutations for a WAL that outline where the file must be replicated to (cluster
and tabletID)
+
+
+        INSERT
+        *~repl*_hdfs://localhost:8020/accumulo/.../wal/… clusterName:tableId        ->
ReplicationStatus(addition
+        offset)
+
+
+2) Determine when the *repl* column in a tablet is safe for deletion (all data for it has
been replicated). This is the
+sign that the GC can then remove this file.
+
+
+        DELETE
+        tablet repl:hdfs://localhost:8020/accumulo/.../wal/… 
+
+
+This can be accomplished with a single thread that scans the metadata table:
+
+
+1) Construct “snapshot” of tablet *repl* file entries with aggregated offsets, sorted
by file, 
+
+
+        [hdfs://localhost:8020/.../file1 => {[tablet1, RS], [tablet2, RS], ... },
+         hdfs://localhost:8020/.../file2 => {[tablet3, RS], [tablet4, RS], ... },
+         hdfs://localhost:8020/.../file3 => {[tablet5, [RS:CLOSED]], [tablet6, [RS:CLOSED]],
...] ]
+
+
+2) Begin scanning *~repl* row-prefix with Scanner, perform merged read to join “state”
from aggregate *repl* column across
+tablets, and columns in *~repl* row for the file.
+
+
+   for each file in *~repl* rowspace:
+       if all columns in *~repl*_file row isFullyReplicated:
+           issue deletes for file in *repl* column for all tablets with references
+           if delete of *repl* is successful:
+               delete *~repl* row
+       else if *~repl* row exists but no *repl* columns:
+           // Catch failure case from first conditional
+           delete *~repl* row
+       else
+           for each file in “snapshot” of *repl* columns:
+           make mutation for *~repl*_file
+           for each slave cluster in configuration:
+               if file should be replicated on slave:
+                   add column for clusterid:remote_tableID -> RS
+
+
+Combiner should be set on all columns in *~repl* prefix rowspace and the *repl* colfam to
ensure multiple runs of the
+described procedure without actual replication occurring to aggregate data that needs replication.
 Configuration
+
+
+Replication can be configured on a per-locality-group, replicated that locality group to
one or more slaves. Given that
+we have dynamic column families, trying to track per-column-family replication would be unnecessarily
difficult.
+Configuration requires new configuration variables that need to be introduced to support
the necessary information. Each
+slave is defined with a name and the zookeeper quorum of the remote cluster to locate the
active Accumulo Master. The
+API should ease configuration on replication across all locality groups. Replication cannot
be configured on the root or
+metadata table.
+
+
+Site-wide:
+// The name and location of other clusters
+instance.cluster.$name.zookeepers=zk1,zk2,zk3[c]
+// The name of this cluster
+instance.replication.name=my_cluster_name[d]
+
+Per-table:
+// Declare the locality group(s) that should be replicated and the clusters that they should
be replicated to
+table.replication.$locality_group_name=cluster1,cluster2,...
+
+
+Shell commands can also be created to make this configuration easier.
+
+
+definecluster cluster_name zookeeper_quorum
+
+
+e.g.  definecluster slave slaveZK1:2181,slaveZK2:2181,slaveZK3:2181
+
+
+
+
+deletecluster cluster_name zookeeper_quorum
+
+
+e.g.  deletecluster slave slaveZK1:2181,slaveZK2:2181,slaveZK3:2181
+
+
+
+
+enablereplication -t table (-lg loc_group | --all-loc-groups) cluster_name
+
+
+e.g. enablereplication -t foo -lg cf1 slave1 enablereplication -t foo -all-loc-groups slave1
+
+
+
+
+
+
+disablereplication -t table (-lg loc_group | --all-loc-groups) cluster_name
+
+
+e.g. disablereplication -t foo -lg cf1 slave1 disablereplication -t foo -all-loc-groups slave1
+
+
+For slaves, we likely do not want to allow users to perform writes against the cluster. Thus,
they should be read-only.
+This likely requires custom configuration and some ZK state to not accept regular API connections.
Should be
+exposed/controllable by the shell, too.  Common Questions
+
+
+*How do conditional mutations work with this approach?*
+
+
+They do not. They will need to throw an Exception.
+
+
+*How does replication work on a table which already contains data?*
+
+
+When replication is enabled on a table, all new data will be replicated. This implementation
does not attempt to support
+this as the existing importtable and exporttable already provide support to do this.
+
+
+*When I update a table property on the master, will it propagate to the slave?*
+
+
+There are both arguments for and against this. We likely want to revisit this later as a
configuration parameter that
+could allow the user to choose if this should happen. We should avoid implementations that
would tie us to one or the
+other.
+
+
+As an argument against this, consider a production and a backup cluster where the backup
cluster is smaller in number of
+nodes, but contains more disks. Despite wanting to replicate the data in a table, the configuration
of that table may
+not be desired (e.g. split threshold, compression codecs, etc). Another argument against
could be age-off. If a replica
+cluster is not the same size as the production cluster (which is extremely plausible) you
would not want the same
+age-off rules for both the production and replica.
+
+
+An argument for this feature is that you would want custom compaction iterators (as a combiner,
for example) to only be
+configured on a table once. You would want these iterators to appear on all replicas. Such
an implementation is also
+difficult in master-master situations as we don’t have a shared ZooKeeper instance that
we can use to reliably commit
+these changes.
+
+
+*What happens in master-master if two Keys are exactly the same with different values?*
+
+
+Non-deterministic - mostly because we already have this problem: https://issues.apache.org/jira/browse/ACCUMULO-1528
+
+
+*Did you come up with this all on your own?*
+
+
+Ha, no. Big thanks goes out to HBase’s documentation, Enis Söztutar (HBase), and other
Accumulo devs that I’ve bounced
+these ideas off of (too many to enumerate).
+
+
+
+
+Goals
+1. Master-Slave configuration that doesn’t exclude future master-master work Per locality-group
replication configuration
+2. Shell administration of replication Accumulo Monitor integration/insight to replication
status State machines for
+3. lifecycle of chunks Versionable (read-as protobuf) datastructure to track chunk metadata
Thrift for RPC Replication
+4. does not require “closed” files (can send incremental updates to slaves) Ability to
replicate “live inserts” and “bulk
+5. imports” Provide replication interface with Accumulo->Accumulo implementation Do
not rely on active Accumulo Master to
+6. perform replication (send or receive) -- delegate to a TabletServer Use FATE where applicable
Gracefully handle
+7. offline slaves Implement read-only variant Master/TabletServer[e]
+
+
+Non-Goals
+1. Replicate on smaller granularity than locality group (not individual colfams/colquals
or based on visibilities)
+2. Wire security between master and slave
+3. Support replication of encrypted data[f]
+4. Replication of existing data (use importtable & exporttable)
+5. Enforce replication of table configuration
+
+
+References
+
+
+* http://www.cs.mcgill.ca/~kemme/papers/vldb00.html
+[a] While the WAL is a useful file format for shipping updates (an append-only file), the
actual LogFileKey and
+LogFileValue pairs may not be sufficient? Might need some extra data internally? Maybe the
DFSLogger header could
+contain that? 
+[b] This approach makes the assumption that we only begin the replication process when a
WAL is closed.
+This is likely too long of a period of time: an offset and length likely needed to be interested
to decrease latency.
+[c] This needs to be consistent across clusters. Do we need to control access to ensure that
it is? Is it excessive to
+force users to configure it correctly? 
+[d] Same as instance.cluster.$name: Do we need to enforce these values? 
+[e] This isn't an immediate necessity, so I'm tempted to consider punting it as a non-goal
for the first implementation
+[f] While not in the original scope, it is definitely of great concern.


Mime
View raw message