accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ctubb...@apache.org
Subject [3/6] accumulo git commit: ACCUMULO-4451 Fix line-endings
Date Fri, 09 Sep 2016 19:19:06 GMT
ACCUMULO-4451 Fix line-endings

* Use UNIX line endings
* Remove BOM in unicode text file


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/200be5f2
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/200be5f2
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/200be5f2

Branch: refs/heads/master
Commit: 200be5f29a8150b9cf2dd2042cfc1d79345ac23e
Parents: 2be9231
Author: Christopher Tubbs <ctubbsii@apache.org>
Authored: Fri Sep 9 15:13:53 2016 -0400
Committer: Christopher Tubbs <ctubbsii@apache.org>
Committed: Fri Sep 9 15:13:53 2016 -0400

----------------------------------------------------------------------
 .../resources/design/ACCUMULO-378-design.mdtext | 936 +++++++++----------
 1 file changed, 468 insertions(+), 468 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/200be5f2/docs/src/main/resources/design/ACCUMULO-378-design.mdtext
----------------------------------------------------------------------
diff --git a/docs/src/main/resources/design/ACCUMULO-378-design.mdtext b/docs/src/main/resources/design/ACCUMULO-378-design.mdtext
index 7905840..1521110 100644
--- a/docs/src/main/resources/design/ACCUMULO-378-design.mdtext
+++ b/docs/src/main/resources/design/ACCUMULO-378-design.mdtext
@@ -1,468 +1,468 @@
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements.  See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-
-Accumulo Multi-DataCenter Replication
-=====================================
-
-ACCUMULO-378 deals with disaster recovery techniques in Accumulo through cross-site replication of tables. Data which is
-written to one Accumulo instance will automatically be replicated to a separate Accumulo instance.
-
-
-Justification
--------------
-
-Losing an entire instance really stinks. In addition to natural disasters or facility problems, Hadoop always has the
-potential for failure. In the newest versions of Hadoop, the high availability (HA) namenode functionality increases the
-redundancy of Hadoop in regards to the single point of failure which the namenode previously was. Despite this, there is
-always a varying amount of required administrative intervention to ensure that failure does not result in data loss:
-userspace software (the entire Hadoop and Java stack), kernel-space software (filesystem implementations), “expected”
-hardware failures (hard drives), unexpected compute hardware failures (NICs, CPU, Memory), and infrastructure failures
-(switches and routers). Accumulo currently has the ability for manual snapshots/copies across multiple instances;
-however, this is not sufficient for multiple reasons with the biggest reason being a lack of automated replication.
-
-
-Background
-----------
-
-Apache HBase has had master-master replication, cyclic replication and multi-peer replication since 0.92. This
-satisfies a wide range of cross-site replication strategies. Master-master replication lets us have two systems which
-both replicate to each other. Both systems can service new writes and will update their “view” of a table from one
-another. Cyclic replication allows us to have cycles in our replication graph. This is a generalization of the
-master-master strategy in which we may have ultimately have a system which replicates to a system that it receives data
-from. A system with three masters, A, B and C, which replicate in a row (A to B, B to C and C to A) is an example of
-this. More complicated examples of this can be envisioned when dealing with multiple replicas inside one geographic
-region or data center. Multi-peer replication is a relatively simple in that a single master system will replicate to
-multiple peers instead of just one.
-
-
-While these are relatively different to one another, I believe most can be satisfied through a single, master-push,
-      replication implementation. Although, the proposed data structure should also be capable of supporting a
-      peer-pull strategy.
-
-
-Implementation
---------------
-
-As a first implementation, I will prototype a single master with multiple peer replication strategy. This should grant
-us the most flexibility and the most functionality. The general implementation should be capable of application to the
-other replication structures (master-master and cyclic-replication). I’ll outline a simple master-peer replication use
-case, followed by application of this approach to replication cycles and master-master replication. This approach does
-not consider conditional mutations.
-
-
-### Replication Framework
-
-In an attempt to be as clear as possible, I’ll use the following terminology when explaining the implementation: master
-will refer to the “master” Accumulo cluster (the system accepting new writes), peer will refer to the “peer” Accumulo
-cluster (the system which does not receive new data through the Accumulo client API, but only from master through
-        replication). The design results in an eventual consistency model of replication which will allow for peers to
-be offline and the online master to still process new updates.
-
-
-In the simplest notion, when a new file is created by master, we want to ensure that this file is also sent to the
-peer. In practice, this new file can either be an RFile that was bulk-imported to master or this can be a write-ahead
-log (WAL) file. The bulk-imported RFile is the easy case, but the WAL case merits additional explanation. While data is
-being written to Accumulo is it written to a sorted, in-memory map and an append-only WAL file. While the in-memory map
-provides a very useful interface for the TabletServer to use for scans and compactions, it is difficult to extract new
-updates at the RFile level. As such, this proposed implementation uses the WAL as the transport “file format”[a]. While
-it is noted that in sending a WAL to multiple peers, each peer will need to reprocess each WAL to make Mutations to
-apply whereas they could likely be transformed once, that is left as a future optimization.
-
-
-To increase the speed in eventual consistency can be achieved, WAL offsets can be tracked to begin the replication
-process before a WAL is closed. We can bin these mutations together for a lazy replication which can be combined to each
-target server which amortizes the cost into a single write set message. It is not apparent that this requires
-co-location within each source tablet in the Accumulo metadata table which means that the worry of inadvertent errors
-caused by placing this data in the metadata table is entirely removed.
-
-
-In every replication graph, which consists of master(s) and peer(s), each system should have a unique identifier. It is
-desirable to be able to uniquely identify each system, and each system should have knowledge of the other systems
-participating.
-
-
-These identifiers also make implementing cyclic replication easier, as a cluster can ignore any requests to replicate
-some data when that request already contains the current cluster’s identifier. In other words, data we try to replicate
-will contain a linked list of identifiers with the provenance of where that data came and each cluster can make the
-determination of whether or not it has seen this data already (and thus needs to process and propagate it). This also
-lets us treat replication rules as a graph which grants us a common terminology to use when describing replication.
-
-
-This framework provides a general strategy to allow pluggable replication strategies to export data out of an Accumulo
-cluster. An AccumuloReplicationStrategy is the only presently targeted replication strategy; however, the implementation
-should not prohibit alternative approaches to replication such as other databases or filesystems.
-
-
-### Replication Strategy Implementation
-
-
-Henceforth, both of the RFiles and WAL files that need replication can be treated as a chunk of data. This chunk
-references a start offset and length from the source (RFile or WAL) which needs to be replicated. This has the nice
-property of being able to use a Combiner to combine multiple, sequential chunks into one larger chunk to amortize RPC
-costs.
-
-
-#### Make the master aware of file to replicate
-
-
-Let us define a column family that is used to denote a chunk that needs to be replicated: REPL. We first need to let
-master know that it has a new chunk which needs to be replicated. When the file comes from a bulk-import, we need to
-create a new entry in the !METADATA table for the given tablet with the REPL column family. If the file is a WAL, we
-also want to write an entry for the REPL column[b]. In both cases, the chunk’s URI will be stored in the column
-qualifier. The Value can contain some serialized data structure to track cluster replication provenance and offset
-values. Each row (tablet) in the !METADATA table will contain zero to many REPL columns. As such, the garbage collector
-needs to be modified to not delete these files on the master’s HDFS instance until these files are replicated (copied to
-        the peer).
-
-
-#### Choose local TabletServer to perform replication
-
-
-The Accumulo Master can have a thread that scans the replication table to look for chunks to replicate. When it finds
-some, choose a TabletServer to perform the replication to all peers. The master should use a FATE operation to manage
-the state machine of this replication process. The expected principles, such as exponential backoff on network errors,
-    should be followed. When all peers have reported successfully receiving the file, the master can remove the REPL
-    column for the given chunk. 
-
-
-On the peer, before beginning transfer, the peer should ascertain a new local, unique filename to use for the remote
-file. When the transfer is complete, the file should be treated like log recovery and brought into the appropriate
-Tablet. If the peer is also a master (replicating to other nodes), the replicated data should create a new REPL column
-in the peer’s table to repeat the replication process, adding in its cluster identifier to the provenance list.
-Otherwise, the file can be a candidate for deletion by the garbage collection.
-
-
-The tserver chosen to replicate the data from the master cluster should ideally be the tserver that created that data.
-This helps reduce the complexity of dealing with locality later on. If the HDFS blocks written by the tserver are local,
-     then we gain the same locality perks.
-
-
-#### Recurse
-
-
-In our simple master and peer replication scheme, we are done after the new updates are made available on peer. As
-aforementioned, it is relatively easy to “schedule” replication of a new file on peer because we just repeat the same
-process that master did to replicate to peer in the first place.
-
-
-### Master cluster replication “bookkeeping”
-
-
-This section outlines the steps on the master cluster to manage the lifecycle of data: when/what data needs to be
-replicated and when is a file safe to be removed.
-
-
-Two key structures are used to implement this bookkeeping:
-
-
-1. Tablet-level entry: tabletId        repl:fully-qualified-file        []        value
-
-
-2. Row-prefix space at end of accumulo.metadata or its own table: *~repl*_fully-qualified-file
-clusterName:remoteTableID        []        value
-
-
-These two key structures will be outlined below, with “*repl* column” and “*~repl* row” denoting which is being referred to.
-
-
-#### Data Structure in Value
-
-
-To avoid the necessity of using conditional mutations or other “transaction-like” operations, we can use a combiner to
-generate an aggregate view of replication information. Protobuf is decent choice, however, the description isn’t tied to
-any implementation. I believe a Combiner used in conjunction with the following data structure provides all necessary
-functionality:
-
-
-        ``// Tracks general lifecycle of the data: file is open and might have new data to replicate, or the file has been``
-        ``// closed and will have no new data to replicate``
-
-
-        ``State:Enum { OPEN, CLOSED }``
-
-
-        ``ReplicationStatus { State state, long replication_needed_offset, long replication_finished_offset }``
-
-
-The offsets refer to the contiguous ranges of records (key-values) written to the WAL. The replication_finished_offset
-value tracks what data has been replicated to the given cluster and while the replication_needed_offset value tracks how
-much data has been written to the WAL that is ready for replication. replication_finished_offset should always be less
-than or equal to replication_needed_offset. For RFiles instead of WALs, state is always CLOSED and
-replication_needed_offset is initialized to the length of the RFile. In this context, one can consider the RFile as a
-read-only file and the WAL as an append-only file.
-
-
-For *~repl* entries, the target clusterName and remote tableId would be stored in the key to preserve uniqueness. Using
-this information, we would be able to implement the following methods:
-
-
-    ``bool        isFullyReplicated(ReplicationStatus)``
-    ``Pair<long,long> rangeNeedingReplication(ReplicationStatus)``
-
-
-The isFullyReplicated method is straightforward: given the values for start/finish stored for data that needs to be
-replicated, and the values for start/finish stored for data that has been replicated and the state is CLOSED, is there
-still more data for this ReplicationStatus that needs to be replicated for the given clustername/tableID.
-
-
-rangeNeedingReplication is a bit more complicated. Given the end of a range of data that has already been replicated,
-some the end of a range of data that still needs replication, return a range of data that has
-not yet been replicated. For example, if keyvalues up to offset 100 in a WAL have already been
-replicated and keyvalues up to offset 300 are marked as needing replication, this method should
-return [101,300]. Ranges of data replicated, and data needing replication must always be
-disjoint and contiguous to ensure that data is replayed in the correct order on the peer.
-
-
-The use of a Combiner is used to create a basic notion of “addition” and “subtraction”. We cannot use deletes to manage
-this without creating a custom iterator, which would not be desirable since it would required to run over the entire
-accumulo.metadata table. Avoiding deletions exception on cleanup is also desired to avoid handling “tombstone’ing”
-future version of a Key. The addition operation is when new data is appended to the WAL which signifies new data to be
-replicated. This equates to an addition to replication_needed_offset. The subtraction operation is when data from the
-WAL has be successfully replicated to the peer for this *~repl* record. This is implemented as an addition to the
-replication_finished_offset.
-
-
-When CLOSED is set on a ReplicationStatus, this implies that the WAL has been closed and no new offsets will be added is
-would be tracked via the *repl* column. As such, a ReplicationStatus “object” is candidate for deletion when the state is
-CLOSED and replication_finished_offset is equal to replication_needed_offset. A value of CLOSED for state is always
-propagated over the NEW state. An addition after the state is CLOSED is an invalid operation and would be a logic error.
-
-
-Consider the case of a new data being ingested to the cluster: the following discrete steps should happen. The
-assumption that replication is enabled is made to not distract from the actual steps. As previously mentioned, a
-combiner must be set on the *repl* column to aggregate the values to properly maintain replication state. The following is
-what a tserver will do.
-
-
-1) When a new WAL is created by request of a tserver and the log column is created for a *repl* column within the tablet’s
-row to track that this WAL will need to be replicated.
-
-
-        INSERT
-        tablet        repl:hdfs://localhost:8020/accumulo/.../wal/...  -> ReplicationStatus(state=OPEN)
-
-
-2) As the tserver using this WAL finishes commits to the WAL, it should submit a new mutation to track the current
-length of data in the WAL that it just wrote that needs to be read for purposes of replication.
-
-
-        INSERT
-        tablet        repl:hdfs://localhost:8020/accumulo/.../wal/...        -> ReplicationStatus(addition
-        offset)
-
-
-3) Eventually, the tablet server will finish using a WAL, minc contents of memory to disk, and mark the WAL as unused.
-This results in updating the state to be CLOSED.
-
-
-        INSERT
-        tablet repl:hdfs://localhost:8020/accumulo/.../wal/…        -> ReplicationStatus(state=CLOSED)
-
-
-The master also needs a new thread to process the *repl* columns across all tablets in a table and create *~repl* row
-entries for the file and where it should be replicated to. The high-level goals for this thread are as follows:
-
-
-1) Create mutations for a WAL that outline where the file must be replicated to (cluster and tabletID)
-
-
-        INSERT
-        *~repl*_hdfs://localhost:8020/accumulo/.../wal/… clusterName:tableId        -> ReplicationStatus(addition
-        offset)
-
-
-2) Determine when the *repl* column in a tablet is safe for deletion (all data for it has been replicated). This is the
-sign that the GC can then remove this file.
-
-
-        DELETE
-        tablet repl:hdfs://localhost:8020/accumulo/.../wal/… 
-
-
-This can be accomplished with a single thread that scans the metadata table:
-
-
-1) Construct “snapshot” of tablet *repl* file entries with aggregated offsets, sorted by file, 
-
-
-        [hdfs://localhost:8020/.../file1 => {[tablet1, RS], [tablet2, RS], ... },
-         hdfs://localhost:8020/.../file2 => {[tablet3, RS], [tablet4, RS], ... },
-         hdfs://localhost:8020/.../file3 => {[tablet5, [RS:CLOSED]], [tablet6, [RS:CLOSED]], ...] ]
-
-
-2) Begin scanning *~repl* row-prefix with Scanner, perform merged read to join “state” from aggregate *repl* column across
-tablets, and columns in *~repl* row for the file.
-
-
-   for each file in *~repl* rowspace:
-       if all columns in *~repl*_file row isFullyReplicated:
-           issue deletes for file in *repl* column for all tablets with references
-           if delete of *repl* is successful:
-               delete *~repl* row
-       else if *~repl* row exists but no *repl* columns:
-           // Catch failure case from first conditional
-           delete *~repl* row
-       else
-           for each file in “snapshot” of *repl* columns:
-           make mutation for *~repl*_file
-           for each peer cluster in configuration:
-               if file should be replicated on peer:
-                   add column for clusterid:remote_tableID -> RS
-
-
-Combiner should be set on all columns in *~repl* prefix rowspace and the *repl* colfam to ensure multiple runs of the
-described procedure without actual replication occurring to aggregate data that needs replication.  Configuration
-
-
-Replication can be configured on a per-locality-group, replicated that locality group to one or more peers. Given that
-we have dynamic column families, trying to track per-column-family replication would be unnecessarily difficult.
-Configuration requires new configuration variables that need to be introduced to support the necessary information. Each
-peer is defined with a name and the zookeeper quorum of the remote cluster to locate the active Accumulo Master. The
-API should ease configuration on replication across all locality groups. Replication cannot be configured on the root or
-metadata table.
-
-
-Site-wide:
-// The name and location of other clusters
-instance.cluster.$name.zookeepers=zk1,zk2,zk3[c]
-// The name of this cluster
-instance.replication.name=my_cluster_name[d]
-
-Per-table:
-// Declare the locality group(s) that should be replicated and the clusters that they should be replicated to
-table.replication.$locality_group_name=cluster1,cluster2,...
-
-
-Shell commands can also be created to make this configuration easier.
-
-
-definecluster cluster_name zookeeper_quorum
-
-
-e.g.  definecluster peer peerZK1:2181,peerZK2:2181,peerZK3:2181
-
-
-
-
-deletecluster cluster_name zookeeper_quorum
-
-
-e.g.  deletecluster peer peerZK1:2181,peerZK2:2181,peerZK3:2181
-
-
-
-
-enablereplication -t table (-lg loc_group | --all-loc-groups) cluster_name
-
-
-e.g. enablereplication -t foo -lg cf1 peer1 enablereplication -t foo -all-loc-groups peer1
-
-
-
-
-
-
-disablereplication -t table (-lg loc_group | --all-loc-groups) cluster_name
-
-
-e.g. disablereplication -t foo -lg cf1 peer1 disablereplication -t foo -all-loc-groups peer1
-
-
-For peers, we likely do not want to allow users to perform writes against the cluster. Thus, they should be read-only.
-This likely requires custom configuration and some ZK state to not accept regular API connections. Should be
-exposed/controllable by the shell, too.  Common Questions
-
-
-*How do conditional mutations work with this approach?*
-
-
-They do not. They will need to throw an Exception.
-
-
-*How does replication work on a table which already contains data?*
-
-
-When replication is enabled on a table, all new data will be replicated. This implementation does not attempt to support
-this as the existing importtable and exporttable already provide support to do this.
-
-
-*When I update a table property on the master, will it propagate to the peer?*
-
-
-There are both arguments for and against this. We likely want to revisit this later as a configuration parameter that
-could allow the user to choose if this should happen. We should avoid implementations that would tie us to one or the
-other.
-
-
-As an argument against this, consider a production and a backup cluster where the backup cluster is smaller in number of
-nodes, but contains more disks. Despite wanting to replicate the data in a table, the configuration of that table may
-not be desired (e.g. split threshold, compression codecs, etc). Another argument against could be age-off. If a replica
-cluster is not the same size as the production cluster (which is extremely plausible) you would not want the same
-age-off rules for both the production and replica.
-
-
-An argument for this feature is that you would want custom compaction iterators (as a combiner, for example) to only be
-configured on a table once. You would want these iterators to appear on all replicas. Such an implementation is also
-difficult in master-master situations as we don’t have a shared ZooKeeper instance that we can use to reliably commit
-these changes.
-
-
-*What happens in master-master if two Keys are exactly the same with different values?*
-
-
-Non-deterministic - mostly because we already have this problem: https://issues.apache.org/jira/browse/ACCUMULO-1528
-
-
-*Did you come up with this all on your own?*
-
-
-Ha, no. Big thanks goes out to HBase’s documentation, Enis Söztutar (HBase), and other Accumulo devs that I’ve bounced
-these ideas off of (too many to enumerate).
-
-
-
-
-Goals
-1. Master-Slave configuration that doesn’t exclude future master-master work Per locality-group replication configuration
-2. Shell administration of replication Accumulo Monitor integration/insight to replication status State machines for
-3. lifecycle of chunks Versionable (read-as protobuf) datastructure to track chunk metadata Thrift for RPC Replication
-4. does not require “closed” files (can send incremental updates to peers) Ability to replicate “live inserts” and “bulk
-5. imports” Provide replication interface with Accumulo->Accumulo implementation Do not rely on active Accumulo Master to
-6. perform replication (send or receive) -- delegate to a TabletServer Use FATE where applicable Gracefully handle
-7. offline peers Implement read-only variant Master/TabletServer[e]
-
-
-Non-Goals
-1. Replicate on smaller granularity than locality group (not individual colfams/colquals or based on visibilities)
-2. Wire security between master and peer
-3. Support replication of encrypted data[f]
-4. Replication of existing data (use importtable & exporttable)
-5. Enforce replication of table configuration
-
-
-References
-
-
-* http://www.cs.mcgill.ca/~kemme/papers/vldb00.html
-[a] While the WAL is a useful file format for shipping updates (an append-only file), the actual LogFileKey and
-LogFileValue pairs may not be sufficient? Might need some extra data internally? Maybe the DFSLogger header could
-contain that? 
-[b] This approach makes the assumption that we only begin the replication process when a WAL is closed.
-This is likely too long of a period of time: an offset and length likely needed to be interested to decrease latency.
-[c] This needs to be consistent across clusters. Do we need to control access to ensure that it is? Is it excessive to
-force users to configure it correctly? 
-[d] Same as instance.cluster.$name: Do we need to enforce these values? 
-[e] This isn't an immediate necessity, so I'm tempted to consider punting it as a non-goal for the first implementation
-[f] While not in the original scope, it is definitely of great concern.
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+Accumulo Multi-DataCenter Replication
+=====================================
+
+ACCUMULO-378 deals with disaster recovery techniques in Accumulo through cross-site replication of tables. Data which is
+written to one Accumulo instance will automatically be replicated to a separate Accumulo instance.
+
+
+Justification
+-------------
+
+Losing an entire instance really stinks. In addition to natural disasters or facility problems, Hadoop always has the
+potential for failure. In the newest versions of Hadoop, the high availability (HA) namenode functionality increases the
+redundancy of Hadoop in regards to the single point of failure which the namenode previously was. Despite this, there is
+always a varying amount of required administrative intervention to ensure that failure does not result in data loss:
+userspace software (the entire Hadoop and Java stack), kernel-space software (filesystem implementations), “expected”
+hardware failures (hard drives), unexpected compute hardware failures (NICs, CPU, Memory), and infrastructure failures
+(switches and routers). Accumulo currently has the ability for manual snapshots/copies across multiple instances;
+however, this is not sufficient for multiple reasons with the biggest reason being a lack of automated replication.
+
+
+Background
+----------
+
+Apache HBase has had master-master replication, cyclic replication and multi-peer replication since 0.92. This
+satisfies a wide range of cross-site replication strategies. Master-master replication lets us have two systems which
+both replicate to each other. Both systems can service new writes and will update their “view” of a table from one
+another. Cyclic replication allows us to have cycles in our replication graph. This is a generalization of the
+master-master strategy in which we may have ultimately have a system which replicates to a system that it receives data
+from. A system with three masters, A, B and C, which replicate in a row (A to B, B to C and C to A) is an example of
+this. More complicated examples of this can be envisioned when dealing with multiple replicas inside one geographic
+region or data center. Multi-peer replication is a relatively simple in that a single master system will replicate to
+multiple peers instead of just one.
+
+
+While these are relatively different to one another, I believe most can be satisfied through a single, master-push,
+      replication implementation. Although, the proposed data structure should also be capable of supporting a
+      peer-pull strategy.
+
+
+Implementation
+--------------
+
+As a first implementation, I will prototype a single master with multiple peer replication strategy. This should grant
+us the most flexibility and the most functionality. The general implementation should be capable of application to the
+other replication structures (master-master and cyclic-replication). I’ll outline a simple master-peer replication use
+case, followed by application of this approach to replication cycles and master-master replication. This approach does
+not consider conditional mutations.
+
+
+### Replication Framework
+
+In an attempt to be as clear as possible, I’ll use the following terminology when explaining the implementation: master
+will refer to the “master” Accumulo cluster (the system accepting new writes), peer will refer to the “peer” Accumulo
+cluster (the system which does not receive new data through the Accumulo client API, but only from master through
+        replication). The design results in an eventual consistency model of replication which will allow for peers to
+be offline and the online master to still process new updates.
+
+
+In the simplest notion, when a new file is created by master, we want to ensure that this file is also sent to the
+peer. In practice, this new file can either be an RFile that was bulk-imported to master or this can be a write-ahead
+log (WAL) file. The bulk-imported RFile is the easy case, but the WAL case merits additional explanation. While data is
+being written to Accumulo is it written to a sorted, in-memory map and an append-only WAL file. While the in-memory map
+provides a very useful interface for the TabletServer to use for scans and compactions, it is difficult to extract new
+updates at the RFile level. As such, this proposed implementation uses the WAL as the transport “file format”[a]. While
+it is noted that in sending a WAL to multiple peers, each peer will need to reprocess each WAL to make Mutations to
+apply whereas they could likely be transformed once, that is left as a future optimization.
+
+
+To increase the speed in eventual consistency can be achieved, WAL offsets can be tracked to begin the replication
+process before a WAL is closed. We can bin these mutations together for a lazy replication which can be combined to each
+target server which amortizes the cost into a single write set message. It is not apparent that this requires
+co-location within each source tablet in the Accumulo metadata table which means that the worry of inadvertent errors
+caused by placing this data in the metadata table is entirely removed.
+
+
+In every replication graph, which consists of master(s) and peer(s), each system should have a unique identifier. It is
+desirable to be able to uniquely identify each system, and each system should have knowledge of the other systems
+participating.
+
+
+These identifiers also make implementing cyclic replication easier, as a cluster can ignore any requests to replicate
+some data when that request already contains the current cluster’s identifier. In other words, data we try to replicate
+will contain a linked list of identifiers with the provenance of where that data came and each cluster can make the
+determination of whether or not it has seen this data already (and thus needs to process and propagate it). This also
+lets us treat replication rules as a graph which grants us a common terminology to use when describing replication.
+
+
+This framework provides a general strategy to allow pluggable replication strategies to export data out of an Accumulo
+cluster. An AccumuloReplicationStrategy is the only presently targeted replication strategy; however, the implementation
+should not prohibit alternative approaches to replication such as other databases or filesystems.
+
+
+### Replication Strategy Implementation
+
+
+Henceforth, both of the RFiles and WAL files that need replication can be treated as a chunk of data. This chunk
+references a start offset and length from the source (RFile or WAL) which needs to be replicated. This has the nice
+property of being able to use a Combiner to combine multiple, sequential chunks into one larger chunk to amortize RPC
+costs.
+
+
+#### Make the master aware of file to replicate
+
+
+Let us define a column family that is used to denote a chunk that needs to be replicated: REPL. We first need to let
+master know that it has a new chunk which needs to be replicated. When the file comes from a bulk-import, we need to
+create a new entry in the !METADATA table for the given tablet with the REPL column family. If the file is a WAL, we
+also want to write an entry for the REPL column[b]. In both cases, the chunk’s URI will be stored in the column
+qualifier. The Value can contain some serialized data structure to track cluster replication provenance and offset
+values. Each row (tablet) in the !METADATA table will contain zero to many REPL columns. As such, the garbage collector
+needs to be modified to not delete these files on the master’s HDFS instance until these files are replicated (copied to
+        the peer).
+
+
+#### Choose local TabletServer to perform replication
+
+
+The Accumulo Master can have a thread that scans the replication table to look for chunks to replicate. When it finds
+some, choose a TabletServer to perform the replication to all peers. The master should use a FATE operation to manage
+the state machine of this replication process. The expected principles, such as exponential backoff on network errors,
+    should be followed. When all peers have reported successfully receiving the file, the master can remove the REPL
+    column for the given chunk. 
+
+
+On the peer, before beginning transfer, the peer should ascertain a new local, unique filename to use for the remote
+file. When the transfer is complete, the file should be treated like log recovery and brought into the appropriate
+Tablet. If the peer is also a master (replicating to other nodes), the replicated data should create a new REPL column
+in the peer’s table to repeat the replication process, adding in its cluster identifier to the provenance list.
+Otherwise, the file can be a candidate for deletion by the garbage collection.
+
+
+The tserver chosen to replicate the data from the master cluster should ideally be the tserver that created that data.
+This helps reduce the complexity of dealing with locality later on. If the HDFS blocks written by the tserver are local,
+     then we gain the same locality perks.
+
+
+#### Recurse
+
+
+In our simple master and peer replication scheme, we are done after the new updates are made available on peer. As
+aforementioned, it is relatively easy to “schedule” replication of a new file on peer because we just repeat the same
+process that master did to replicate to peer in the first place.
+
+
+### Master cluster replication “bookkeeping”
+
+
+This section outlines the steps on the master cluster to manage the lifecycle of data: when/what data needs to be
+replicated and when is a file safe to be removed.
+
+
+Two key structures are used to implement this bookkeeping:
+
+
+1. Tablet-level entry: tabletId        repl:fully-qualified-file        []        value
+
+
+2. Row-prefix space at end of accumulo.metadata or its own table: *~repl*_fully-qualified-file
+clusterName:remoteTableID        []        value
+
+
+These two key structures will be outlined below, with “*repl* column” and “*~repl* row” denoting which is being referred to.
+
+
+#### Data Structure in Value
+
+
+To avoid the necessity of using conditional mutations or other “transaction-like” operations, we can use a combiner to
+generate an aggregate view of replication information. Protobuf is decent choice, however, the description isn’t tied to
+any implementation. I believe a Combiner used in conjunction with the following data structure provides all necessary
+functionality:
+
+
+        ``// Tracks general lifecycle of the data: file is open and might have new data to replicate, or the file has been``
+        ``// closed and will have no new data to replicate``
+
+
+        ``State:Enum { OPEN, CLOSED }``
+
+
+        ``ReplicationStatus { State state, long replication_needed_offset, long replication_finished_offset }``
+
+
+The offsets refer to the contiguous ranges of records (key-values) written to the WAL. The replication_finished_offset
+value tracks what data has been replicated to the given cluster and while the replication_needed_offset value tracks how
+much data has been written to the WAL that is ready for replication. replication_finished_offset should always be less
+than or equal to replication_needed_offset. For RFiles instead of WALs, state is always CLOSED and
+replication_needed_offset is initialized to the length of the RFile. In this context, one can consider the RFile as a
+read-only file and the WAL as an append-only file.
+
+
+For *~repl* entries, the target clusterName and remote tableId would be stored in the key to preserve uniqueness. Using
+this information, we would be able to implement the following methods:
+
+
+    ``bool        isFullyReplicated(ReplicationStatus)``
+    ``Pair<long,long> rangeNeedingReplication(ReplicationStatus)``
+
+
+The isFullyReplicated method is straightforward: given the values for start/finish stored for data that needs to be
+replicated, and the values for start/finish stored for data that has been replicated and the state is CLOSED, is there
+still more data for this ReplicationStatus that needs to be replicated for the given clustername/tableID.
+
+
+rangeNeedingReplication is a bit more complicated. Given the end of a range of data that has already been replicated,
+some the end of a range of data that still needs replication, return a range of data that has
+not yet been replicated. For example, if keyvalues up to offset 100 in a WAL have already been
+replicated and keyvalues up to offset 300 are marked as needing replication, this method should
+return [101,300]. Ranges of data replicated, and data needing replication must always be
+disjoint and contiguous to ensure that data is replayed in the correct order on the peer.
+
+
+The use of a Combiner is used to create a basic notion of “addition” and “subtraction”. We cannot use deletes to manage
+this without creating a custom iterator, which would not be desirable since it would required to run over the entire
+accumulo.metadata table. Avoiding deletions exception on cleanup is also desired to avoid handling “tombstone’ing”
+future version of a Key. The addition operation is when new data is appended to the WAL which signifies new data to be
+replicated. This equates to an addition to replication_needed_offset. The subtraction operation is when data from the
+WAL has be successfully replicated to the peer for this *~repl* record. This is implemented as an addition to the
+replication_finished_offset.
+
+
+When CLOSED is set on a ReplicationStatus, this implies that the WAL has been closed and no new offsets will be added is
+would be tracked via the *repl* column. As such, a ReplicationStatus “object” is candidate for deletion when the state is
+CLOSED and replication_finished_offset is equal to replication_needed_offset. A value of CLOSED for state is always
+propagated over the NEW state. An addition after the state is CLOSED is an invalid operation and would be a logic error.
+
+
+Consider the case of a new data being ingested to the cluster: the following discrete steps should happen. The
+assumption that replication is enabled is made to not distract from the actual steps. As previously mentioned, a
+combiner must be set on the *repl* column to aggregate the values to properly maintain replication state. The following is
+what a tserver will do.
+
+
+1) When a new WAL is created by request of a tserver and the log column is created for a *repl* column within the tablet’s
+row to track that this WAL will need to be replicated.
+
+
+        INSERT
+        tablet        repl:hdfs://localhost:8020/accumulo/.../wal/...  -> ReplicationStatus(state=OPEN)
+
+
+2) As the tserver using this WAL finishes commits to the WAL, it should submit a new mutation to track the current
+length of data in the WAL that it just wrote that needs to be read for purposes of replication.
+
+
+        INSERT
+        tablet        repl:hdfs://localhost:8020/accumulo/.../wal/...        -> ReplicationStatus(addition
+        offset)
+
+
+3) Eventually, the tablet server will finish using a WAL, minc contents of memory to disk, and mark the WAL as unused.
+This results in updating the state to be CLOSED.
+
+
+        INSERT
+        tablet repl:hdfs://localhost:8020/accumulo/.../wal/…        -> ReplicationStatus(state=CLOSED)
+
+
+The master also needs a new thread to process the *repl* columns across all tablets in a table and create *~repl* row
+entries for the file and where it should be replicated to. The high-level goals for this thread are as follows:
+
+
+1) Create mutations for a WAL that outline where the file must be replicated to (cluster and tabletID)
+
+
+        INSERT
+        *~repl*_hdfs://localhost:8020/accumulo/.../wal/… clusterName:tableId        -> ReplicationStatus(addition
+        offset)
+
+
+2) Determine when the *repl* column in a tablet is safe for deletion (all data for it has been replicated). This is the
+sign that the GC can then remove this file.
+
+
+        DELETE
+        tablet repl:hdfs://localhost:8020/accumulo/.../wal/… 
+
+
+This can be accomplished with a single thread that scans the metadata table:
+
+
+1) Construct “snapshot” of tablet *repl* file entries with aggregated offsets, sorted by file, 
+
+
+        [hdfs://localhost:8020/.../file1 => {[tablet1, RS], [tablet2, RS], ... },
+         hdfs://localhost:8020/.../file2 => {[tablet3, RS], [tablet4, RS], ... },
+         hdfs://localhost:8020/.../file3 => {[tablet5, [RS:CLOSED]], [tablet6, [RS:CLOSED]], ...] ]
+
+
+2) Begin scanning *~repl* row-prefix with Scanner, perform merged read to join “state” from aggregate *repl* column across
+tablets, and columns in *~repl* row for the file.
+
+
+   for each file in *~repl* rowspace:
+       if all columns in *~repl*_file row isFullyReplicated:
+           issue deletes for file in *repl* column for all tablets with references
+           if delete of *repl* is successful:
+               delete *~repl* row
+       else if *~repl* row exists but no *repl* columns:
+           // Catch failure case from first conditional
+           delete *~repl* row
+       else
+           for each file in “snapshot” of *repl* columns:
+           make mutation for *~repl*_file
+           for each peer cluster in configuration:
+               if file should be replicated on peer:
+                   add column for clusterid:remote_tableID -> RS
+
+
+Combiner should be set on all columns in *~repl* prefix rowspace and the *repl* colfam to ensure multiple runs of the
+described procedure without actual replication occurring to aggregate data that needs replication.  Configuration
+
+
+Replication can be configured on a per-locality-group, replicated that locality group to one or more peers. Given that
+we have dynamic column families, trying to track per-column-family replication would be unnecessarily difficult.
+Configuration requires new configuration variables that need to be introduced to support the necessary information. Each
+peer is defined with a name and the zookeeper quorum of the remote cluster to locate the active Accumulo Master. The
+API should ease configuration on replication across all locality groups. Replication cannot be configured on the root or
+metadata table.
+
+
+Site-wide:
+// The name and location of other clusters
+instance.cluster.$name.zookeepers=zk1,zk2,zk3[c]
+// The name of this cluster
+instance.replication.name=my_cluster_name[d]
+
+Per-table:
+// Declare the locality group(s) that should be replicated and the clusters that they should be replicated to
+table.replication.$locality_group_name=cluster1,cluster2,...
+
+
+Shell commands can also be created to make this configuration easier.
+
+
+definecluster cluster_name zookeeper_quorum
+
+
+e.g.  definecluster peer peerZK1:2181,peerZK2:2181,peerZK3:2181
+
+
+
+
+deletecluster cluster_name zookeeper_quorum
+
+
+e.g.  deletecluster peer peerZK1:2181,peerZK2:2181,peerZK3:2181
+
+
+
+
+enablereplication -t table (-lg loc_group | --all-loc-groups) cluster_name
+
+
+e.g. enablereplication -t foo -lg cf1 peer1 enablereplication -t foo -all-loc-groups peer1
+
+
+
+
+
+
+disablereplication -t table (-lg loc_group | --all-loc-groups) cluster_name
+
+
+e.g. disablereplication -t foo -lg cf1 peer1 disablereplication -t foo -all-loc-groups peer1
+
+
+For peers, we likely do not want to allow users to perform writes against the cluster. Thus, they should be read-only.
+This likely requires custom configuration and some ZK state to not accept regular API connections. Should be
+exposed/controllable by the shell, too.  Common Questions
+
+
+*How do conditional mutations work with this approach?*
+
+
+They do not. They will need to throw an Exception.
+
+
+*How does replication work on a table which already contains data?*
+
+
+When replication is enabled on a table, all new data will be replicated. This implementation does not attempt to support
+this as the existing importtable and exporttable already provide support to do this.
+
+
+*When I update a table property on the master, will it propagate to the peer?*
+
+
+There are both arguments for and against this. We likely want to revisit this later as a configuration parameter that
+could allow the user to choose if this should happen. We should avoid implementations that would tie us to one or the
+other.
+
+
+As an argument against this, consider a production and a backup cluster where the backup cluster is smaller in number of
+nodes, but contains more disks. Despite wanting to replicate the data in a table, the configuration of that table may
+not be desired (e.g. split threshold, compression codecs, etc). Another argument against could be age-off. If a replica
+cluster is not the same size as the production cluster (which is extremely plausible) you would not want the same
+age-off rules for both the production and replica.
+
+
+An argument for this feature is that you would want custom compaction iterators (as a combiner, for example) to only be
+configured on a table once. You would want these iterators to appear on all replicas. Such an implementation is also
+difficult in master-master situations as we don’t have a shared ZooKeeper instance that we can use to reliably commit
+these changes.
+
+
+*What happens in master-master if two Keys are exactly the same with different values?*
+
+
+Non-deterministic - mostly because we already have this problem: https://issues.apache.org/jira/browse/ACCUMULO-1528
+
+
+*Did you come up with this all on your own?*
+
+
+Ha, no. Big thanks goes out to HBase’s documentation, Enis Söztutar (HBase), and other Accumulo devs that I’ve bounced
+these ideas off of (too many to enumerate).
+
+
+
+
+Goals
+1. Master-Slave configuration that doesn’t exclude future master-master work Per locality-group replication configuration
+2. Shell administration of replication Accumulo Monitor integration/insight to replication status State machines for
+3. lifecycle of chunks Versionable (read-as protobuf) datastructure to track chunk metadata Thrift for RPC Replication
+4. does not require “closed” files (can send incremental updates to peers) Ability to replicate “live inserts” and “bulk
+5. imports” Provide replication interface with Accumulo->Accumulo implementation Do not rely on active Accumulo Master to
+6. perform replication (send or receive) -- delegate to a TabletServer Use FATE where applicable Gracefully handle
+7. offline peers Implement read-only variant Master/TabletServer[e]
+
+
+Non-Goals
+1. Replicate on smaller granularity than locality group (not individual colfams/colquals or based on visibilities)
+2. Wire security between master and peer
+3. Support replication of encrypted data[f]
+4. Replication of existing data (use importtable & exporttable)
+5. Enforce replication of table configuration
+
+
+References
+
+
+* http://www.cs.mcgill.ca/~kemme/papers/vldb00.html
+[a] While the WAL is a useful file format for shipping updates (an append-only file), the actual LogFileKey and
+LogFileValue pairs may not be sufficient? Might need some extra data internally? Maybe the DFSLogger header could
+contain that? 
+[b] This approach makes the assumption that we only begin the replication process when a WAL is closed.
+This is likely too long of a period of time: an offset and length likely needed to be interested to decrease latency.
+[c] This needs to be consistent across clusters. Do we need to control access to ensure that it is? Is it excessive to
+force users to configure it correctly? 
+[d] Same as instance.cluster.$name: Do we need to enforce these values? 
+[e] This isn't an immediate necessity, so I'm tempted to consider punting it as a non-goal for the first implementation
+[f] While not in the original scope, it is definitely of great concern.


Mime
View raw message