couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
Subject [couchdb] branch 63012-scheduler updated: [fixup] Add developer-oriented
Date Wed, 12 Apr 2017 16:05:30 GMT
This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch 63012-scheduler
in repository

The following commit(s) were added to refs/heads/63012-scheduler by this push:
       new  2bde652   [fixup] Add developer-oriented
2bde652 is described below

commit 2bde652105d59353050275485cb5865c3a084f73
Author: Nick Vatamaniuc <>
AuthorDate: Wed Apr 12 12:05:24 2017 -0400

    [fixup] Add developer-oriented
 src/couch_replicator/ | 297 +++++++++++++++++++++++++++++++++++++++++
 1 file changed, 297 insertions(+)

diff --git a/src/couch_replicator/ b/src/couch_replicator/
new file mode 100644
index 0000000..1b0e01b
--- /dev/null
+++ b/src/couch_replicator/
@@ -0,0 +1,297 @@
+Developer Oriented Replicator Description
+This description of scheduling replicator's functionality is mainly geared to
+CouchDB developers. It dives a bit into the internal and explains how
+everything is connected together.
+A natural place to start is the top applicatin supervisor:
+`couch_replicator_sup`. It's a `rest_for_one` so if a child process
+terminates, the rest of the childred in the hierarchy following it are also
+terminated. This structure implies a useful constraint -- children to the "right"
+if viewing it vertically with the root at the top, can safely call children
+on the "left", because this supervisor ensures those on the "left" will already
+be started and runnig.
+A description of each child:
+ * `couch_replication_event`: Starts a gen_event publication bus to handle some
+    replication related events. This used for example, to publish cluster
+    membership changes by the `couch_replicator_clustering` process. But is
+    also used in replication tests to minotor for replication events.
+    Notification is performed via the `couch_replicator_notifier:notify/1`
+    function. It's the first (left-most) child because
+    `couch_replicator_clustering` is using.
+ * `couch_replicator_clustering`: This module maintains cluster membership
+    information for replication application and provides functions to check
+    ownership of replication jobs. A cluster membership change is published via
+    the `gen_event` event server set up in the `couch_replication_event` child
+    above. Published events are `{cluster, stable}` when cluster membership has
+    stabilized, that it is not fluctuating anymore, and `{cluster, unstable}`
+    which indicates there was a recent change to the cluster membership and now
+    it's considered unstable. Listeners for cluster membership change include
+    `couch_replicator_doc_processor` and `couch_replicator_db_changes`. When
+    doc processor gets an `{cluster, stable}` event it will remove all the
+    replication jobs not belonging to the current node. When
+    `couch_replicator_db_chanages` gets a `{cluster, stable}` event, it will
+    restart `couch_multidb_changes` process it controls which will launch an
+    new scan of all the replicator databases.
+  * `couch_replicator_connection`: Maintains a global replication connection
+    pool. It allows reusing connection across replication tasks. Main interface
+    is a `acquire/1` and `release/1`. The main idea here is that once a
+    connection is established, it is kept around for
+    `replicator.connection_close_interval` milliseconds in case another
+    replication task wants to re-use it. It is worth pointing out how linking
+    and monitoring is handled: Workers are linked to the connection pool when
+    they are created. If they crash connection pool listens for the EXIT event
+    and cleans up. Connection pool also monitors owners (by monitoring the the
+    `Pid` from the `From` argument in the call to `acquire/1`) and cleans up if
+    owner dies. Another interesting thing is that connection establishment
+    (creation) happens in the owner process so the pool is not blocked on it.
+ * `couch_replicator_rate_limiter` : Implements a rate limiter to handle
+    connection throttling from sources or targets where requests return 429
+    error codes. Uses the Additive Increase / Multiplicative Decrease feedback
+    control algorithm to converge on the channel capacity. Implemented using a
+    16-way sharded ETS table to maintain connection state. The table sharding
+    code is split out to `couch_replicator_rate_limiter_tables` module. The
+    main idea of the module it so maintain and continually estimate an interval
+    for each connection represented by the `{Method, Url}`. The interval is
+    updated accordingly on each call to `failure/1` or `success/1` calls. A
+    `failure/1` is supposed to be called after a 429 is received and
+    `success/1` when a successful request has been made. Also when no failures
+    are happening the code is ensuring the ETS tables are empty in order to
+    have a lower impact on a running system.
+ * `couch_replicator_scheduler` : Scheduler is the core component of the
+    scheduling replicator. It allows handling a larger number of jobs than
+    might be possible to actively run on the cluster. It accomplishes this by
+    switching between jobs (stopping some and starting others) to ensure all
+    make progress. Replication jobs which fail are penalized using exponential
+    backoff. That is, each consecutive failure will double the time penalty.
+    This frees up system resources for more useful work than just continuously
+    trying to run the same subset of failing jobs.
+    The main API function is `add_job/1`. Its argument is an instance of
+    `#rep{}` record, which could also be the result of a document update from a
+    _replicator db or it could be the result of a POST to `_replicate`
+    endpoint. Once the replication job is added to the scheduler it doesn't
+    matter much where it originated.
+    Each job internally is represented by the `#job{}` record. It contains the
+    original `#rep{}` but also, among a few other things, maintain an event
+    history. The history maintains a sequence of events of each job. These are
+    timestamped and ordered such that the most recent event is at the head.
+    History length is limited based on the `replicator.max_history` config
+    value. The default is 20 entries. History events types are:
+    * `added` : job was just added to the scheduler. This is the first event.
+    * `started` : job was started. This was an attempt to run the job.
+    * `stopped` : job was stopped by the scheduler.
+    * `crashed` : job has crashed (instead of stopping cleanly).
+    The core of the algorithm is the `reschedule/1` function. That function is
+    called every `replicator.interval` milliseconds (default is 60000 i.e. a
+    minute). During each call scheduler will try to stop some jobs, start some
+    new ones and will also try to keep the maximum amount of jobs running less
+    than `replicator.max_jobs` (deafult is 500). So the functions does these
+    operations (actual code paste):
+    ```
+    Running = running_job_count(),
+    Pending = pending_job_count(),
+    stop_excess_jobs(State, Running),
+    start_pending_jobs(State, Running, Pending),
+    rotate_jobs(State, Running, Pending),
+    update_running_jobs_stats(State#state.stats_pid)
+    ```
+    `Running` is gathering the total number of currently runnig jobs. `Pending`
+    is the total number of jobs waiting to be run. `stop_excess_jobs` will stop
+    any exceeding `replicator.max_jobs` configured limit. This code takes
+    effect if user reduces `max_jobs` configuration value. `start_pending_jobs`
+    will start any jobs if there is more room available. This will take effect
+    on startup or when user increases `max_jobs` configuration value.
+    `rotate_jobs` is where all the action happens. There scheduler picks
+    `replicator.max_churn` running jobs to stop and then picks the same number
+    of pending jobs to start. The default value of `max_churn` is 20. So by
+    default every minute, 20 running jobs are stopped, and 20 new pending jobs
+    are started.
+    Before moving on it is worth pointing out that scheduler treats continuous
+    and non-continuous replications differently. Normal (non-continuous)
+    replications once started will be allowed to run to completion. That
+    behavior is to preserve their semantics of replicating a snapshot of the
+    source database to the target. For example if new documents are added to
+    the source after the replication are started, those updates should not show
+    up on the target database. Stopping and restring a normal replication would
+    violate that constraint. The only exception to the rule is the user
+    explicitly reduces `replicator.max_jobs` configuration value. Even then
+    scheduler will first attempt to stop as many continuous jobs as possible
+    and only if it has no choice left, it will stop normal jobs.
+    Keeping that in mind and going back to the scheduling algorithm, the next
+    interesting part is how the scheduler picks which jobs to stop and which
+    ones to start:
+    * Stopping: When picking jobs to stop the cheduler will pick longest
+      running continuous jobs first. The sorting callback function to get the
+      longest running jobs is unsurprisingly called `longest_running/2`. To
+      pick the longest running jobs it looks at the most recent `started`
+      event. After it gets a sorted list by longest running, it simply
+      picks first few depending on the value of `max_churn` using
+      `lists:sublist/2`. Then those jobs are stopped.
+    * Starting: When starting the scheduler will pick the jobs which have been
+      waiting the longest. Surprisingly, in this case it also looks at the
+      `started` timestamp and picks the jobs which have the oldest `started`
+      timestamp. If there are 3 jobs, A[started=10], B[started=7],
+      C[started=9], then B will be picked first, then C then A. This ensures
+      that jobs are not starved, which is a classic scheduling pitfall.
+    In the code, the list of pending jobs is picked slightly differently than
+    how the list of running jobs is picked. `pending_jobs/1` uses `ets:foldl`
+    to iterate over all the pending jobs. As it iterates it tries to keep only
+    up to `max_churn` oldest items in the accumulator. The reason this is done
+    is that there could be a very large number of pending jobs and loading them
+    all in a list (making a copy from ETS) and then sorting it can be quite
+    expensive performance-wise. The tricky part of the iteration is happening
+    in `pending_maybe_replace/2`. A `gb_sets` ordered set is used to keep top-N
+    longest waiting jobs so far. The code has a comment with a helpful example
+    on how this algorithm works.
+    The last part is how the scheduler treats jobs which keep crashing. If a
+    job is started but then crashes then that job is considered unhealthy. The
+    main idea is to penalize such jobs such that they are forced to wait an
+    exponentially larger amount of time with each consecutive crash. A central
+    part to this algorithm is determining what forms a sequence of consecutive
+    crashes. If a job starts then quickly crashes, and after next start it
+    crashes again, then that would become a sequence of 2 consecutive crashes.
+    The penalty then would be calcualted by `backoff_micros/1` function where
+    the consecutive crash count would end up as the exponent. However for
+    practical concerns there is also maximum penalty specified and that's the
+    equivalent of 10 consecutive crashes. Timewise it ends up being about 8
+    hours. That means even a job which keep crashing will still get a chance to
+    retry once in 8 hours.
+    There is subtlety when calculating consecutive crashes and that is deciding
+    when the sequence stops. That is, figuring out when a job becomes healthy
+    again. Scheduler considers a job healthy again if it started and hasn't
+    crashed in a while. The "in a while" part is a configuration parameter
+    `replicator.health_threshold` defaulting to 2 minutes. This means if job
+    has been crashing, for example 5 times in a row, but then on the 6th
+    attempt it started and ran for more than 2 minutes then it is considered
+    healthy again. Next time it crashes its sequence of consecutive crashes
+    will restart at 1.
+ * `couch_replicator_scheduler_sup`: This module is a supervisor for running
+   replication tasks. The most interesting thing about it is perhaps that it is
+   not used to restart children. Scheduler handles restarts and error handling
+   backoffs.
+ * `couch_replicator_doc_processor`: Doc procesoor component is in charge of
+   processing replication document updates, turning them into replication jobs
+   and adding those jobs to the scheduler. Unfortunately the only reason there
+   is even a `couch_replicator_doc_processor` gen_server, instead of
+   replication documents being turned to jobs and inserted into the scheduler
+   directly, is because of one corner case - filtered replications using custom
+   (Javascript mostly) filter. More about it later. It is better to start with
+   how updates flow through the doc processor:
+   Document updates are coming via the `db_change/3` callback from
+   `couch_multidb_changes`, then go to the `process_change/2` function.
+   In `process_change/2` a few decisions are made regarding how to proceed. The
+   first is "ownership" checking. That is a check if replication document
+   belongs on the current node. If not, then it is ignored. Another check is to
+   see if the update has arrived during a time when the cluster is considered
+   "unstable". If so, it is ignored, because soon enough a rescan will be
+   launched and all the documents will be reprocessed anyway. Another
+   noteworthy thing in `process_change/2` is handling of upgrades from the
+   previous version of the replicator when transient states were written to the
+   documents. Two such states were `triggered` and `error`. Both of those
+   states are removed from the document then update proceeds in the regular
+   fashion. `failed` documents are also ignored here. `failed` is a terminal
+   state which indicates the document was somehow unsuitable to become a
+   replication job (it was malforemd or a duplicate). Otherwise the state
+   update proceeds to `process_updated/2`.
+   `process_updated/2` is where replication document updates are parsed and
+   translated to `#rep{}` records. The interesting part here is that the
+   replication ID isn't calculated yet. Unsurprisingly the parsing function
+   used is called `parse_rep_doc_without_id/1`. Also note that up until now
+   everything is still running in the context of the `db_change/3`
+   callback. After replication filter type is determined, finally the update
+   gets passed to the `couch_replicator_doc_processor` gen_server.
+   `couch_replicator_doc_processor` gen_server's main role is to try to
+   calculate replication IDs for each `#rep{}` record passed to it, then add
+   that as a scheduler job. As noted before, `#rep{}` records parsed up until
+   this point lack a replication ID. The reason is replication ID calculation
+   include a hash of the filter code. And because user defined replication
+   filters live in the source DB, which most likely involves a remote network
+   fetch, there is a possibility of blocking, and a need to handle various
+   network failures and retries. Because of that `replication_doc_processor`
+   dispatchies most of that blocking and retrying to a separate `worker`
+   process (`couch_replicator_doc_processor_worker` module).
+   `couch_replicator_doc_processor_worker` is where a replication IDs are
+   calculated for each individual doc update. There are two separate modules
+   which contain utilities related to replication ID calculation:
+   `couch_replicator_ids` and `couch_replicator_filters`. The first one
+   contains ID calculation algorithms and the second one knows how to parse and
+   fetch user filters from remote source DB. One interesting thing about the
+   worker is that it is time-bounded and is guaranteed to not be stuck forever.
+   That's why it spawn an extra process with `spawn_monitor`, just so it can do
+   an `after` clause in receive and bound the maximum time this workerw will
+   take.
+   A doc processor worker will either succeed or fail but never block for too
+   long. Success and failure are returned as exit values. Those are handled in
+   the `worker_returned/3` doc processor clauses. The most common pattern is
+   that a worker is spawned to add a replication job, it does so and returns a
+   `{ok, ReplicationID}` value in `worker_returned`.
+   In case of a filtered replication with custom user code there are two case to
+   consider:
+     1. Filter fetching code has failed. In that case worker returns an error.
+        But because the error could be a transient network error, another
+        worker is started to try again. It could fail and return error again,
+        then another one is started and so on. However each consecutive worker
+        will do an exponential backoff, not unlike the scheduler code.
+        `error_backoff/1` is where the backoff period is calculated.
+        Consecutive errors are held in the `errcnt` field in the ETS table.
+     2. Fetchig filter code succeeds, replication ID is calculated and job is
+        added to the scheduler. However, because this is a filtered replication
+        source database could get an updated filter. Which means replication ID
+        should change again. So a worker is spawned again even if worker just
+        successfully returned successfully. The purpose is to check the filter
+        and see if it changed. So in other words doc processor will to do the
+        work of checking of filtered replications, get an updated filter and
+        will then refresh the replication job (remove the old one and add a new
+        one with a different ID). Filter checking interval is determined by the
+        `filter_backoff` function. An unusual thing about that function is that
+        it calculates the period based on the size of the ETS table. The
+        intuition is when there are few replications in a cluster, it's ok
+        to check the filter for changes often. When there are lots of
+        replications running, having each one checking their filter often is
+        not a good idea.
+ * `couch_replicator`: This is an unusual but useful pattern. This child is not
+   an actual process but a one-time call to the
+   `couch_replicator:ensure_rep_db_exists/0` function, executed by the
+   supervisor in the correct order (and monitored for crashes). This ensures
+   the local replicator db exists, then returns `ignore`. This pattern is
+   useful for doing setup-like things at the top level and in the correct order
+   regaring the rest of the children in the supervisor.
+ * `couch_replicator_db_changes`: This process specializes and configure
+   `couch_multidb_changes` so that it looks for `_replicator` suffixed shards
+   and makes sure to restart when cluster configuration changes. This restart
+   on cluster membership changes is often referred to as a "rescan".

To stop receiving notification emails like this one, please contact
['"" <>'].

View raw message