couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rnew...@apache.org
Subject [14/44] couch-replicator commit: updated refs/heads/63012-defensive to 1afa5ea
Date Tue, 07 Jun 2016 11:05:18 GMT
Split couch_replicator.erl into 2 modules.

`couch_replicator_doc_processor`
  - Handles multi-db changes callbacks so it starts replications
    defined in _replicator docs.

`couch_replicator`
  - Handles `_replicate` http endpoint and creating local
    `_replicator` db.


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/fd06ae1d
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/fd06ae1d
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/fd06ae1d

Branch: refs/heads/63012-defensive
Commit: fd06ae1d23f5e370650f2f9c708ecb8599181668
Parents: 72e1f1e
Author: Nick Vatamaniuc <vatamane@gmail.com>
Authored: Mon May 23 17:20:20 2016 -0400
Committer: Nick Vatamaniuc <vatamane@gmail.com>
Committed: Mon May 23 17:20:20 2016 -0400

----------------------------------------------------------------------
 src/couch_replicator.erl               | 146 +--------------------------
 src/couch_replicator_doc_processor.erl | 150 ++++++++++++++++++++++++++++
 src/couch_replicator_sup.erl           |   8 +-
 3 files changed, 158 insertions(+), 146 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/fd06ae1d/src/couch_replicator.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator.erl b/src/couch_replicator.erl
index 996bda2..82eff26 100644
--- a/src/couch_replicator.erl
+++ b/src/couch_replicator.erl
@@ -11,33 +11,19 @@
 % the License.
 
 -module(couch_replicator).
--behaviour(couch_multidb_changes).
 
-% public API
--export([replicate/2]).
-
-% called from couch_replicator_sup supervisor
--export([ensure_rep_db_exists/0]).
-
-% multidb changes callback
--export([db_created/2, db_deleted/2, db_found/2, db_change/3]).
+-export([replicate/2, ensure_rep_db_exists/0]).
+-export([rep_state/1]).
 
 -include_lib("couch/include/couch_db.hrl").
 -include("couch_replicator.hrl").
 
--import(couch_replicator_utils, [
-    get_json_value/2,
-    get_json_value/3,
-    pp_rep_id/1
-]).
-
 -import(couch_util, [
     get_value/2,
     get_value/3
 ]).
 
 
-
 -spec replicate({[_]}, #user_ctx{}) ->
     {ok, {continuous, binary()}} |
     {ok, {[_]}} |
@@ -69,34 +55,6 @@ ensure_rep_db_exists() ->
     ignore.
 
 
-%%%%%% Multidb changes callbacks
-
-db_created(DbName, Server) ->
-    couch_replicator_docs:ensure_rep_ddoc_exists(DbName),
-    Server.
-
-db_deleted(DbName, Server) ->
-    clean_up_replications(DbName),
-    Server.
-
-db_found(DbName, Server) ->
-    couch_replicator_docs:ensure_rep_ddoc_exists(DbName),
-    Server.
-
-db_change(DbName, {ChangeProps} = Change, Server) ->
-    try
-        ok = process_update(DbName, Change)
-    catch
-    _Tag:Error ->
-        {RepProps} = get_json_value(doc, ChangeProps),
-        DocId = get_json_value(<<"_id">>, RepProps),
-        couch_replicator_docs:update_doc_process_error(DbName, DocId, Error)
-    end,
-    Server.
-
-
-%%%%%%%%% Private helper functions
-
 -spec do_replication_loop(#rep{}) ->
     {ok, {continuous, binary()}} | {ok, tuple()} | {error, any()}.
 do_replication_loop(#rep{id = {BaseId, Ext} = Id, options = Options} = Rep) ->
@@ -114,6 +72,7 @@ do_replication_loop(#rep{id = {BaseId, Ext} = Id, options = Options} =
Rep) ->
         wait_for_result(Id)
     end.
 
+
 -spec rep_result_listener(rep_id()) -> {ok, pid()}.
 rep_result_listener(RepId) ->
     ReplyTo = self(),
@@ -124,6 +83,7 @@ rep_result_listener(RepId) ->
                 ok
         end).
 
+
 -spec wait_for_result(rep_id()) ->
     {ok, any()} | {error, any()}.
 wait_for_result(RepId) ->
@@ -170,104 +130,6 @@ cancel_replication(RepId, #user_ctx{name = Name, roles = Roles}) ->
      end.
 
 
-
--spec process_update(binary(), {[_]}) -> ok.
-process_update(DbName, {Change}) ->
-    {RepProps} = JsonRepDoc = get_json_value(doc, Change),
-    DocId = get_json_value(<<"_id">>, RepProps),
-    Owner = couch_replicator_clustering:owner(DbName, DocId),
-    case {Owner, get_json_value(deleted, Change, false)} of
-    {_, true} ->
-        remove_jobs(DbName, DocId);
-    {unstable, false} ->
-	couch_log:notice("Not starting '~s' as cluster is unstable", [DocId]);
-    {ThisNode, false} when ThisNode =:= node() ->
-        couch_log:notice("Maybe starting '~s' as I'm the owner", [DocId]),
-        case get_json_value(<<"_replication_state">>, RepProps) of
-        undefined ->
-            maybe_start_replication(DbName, DocId, JsonRepDoc);
-        <<"triggered">> ->
-            maybe_start_replication(DbName, DocId, JsonRepDoc);
-        <<"completed">> ->
-            couch_log:notice("Replication '~s' marked as completed", [DocId])
-        end;
-     {Owner, false} ->
-         couch_log:notice("Not starting '~s' as owner is ~s.", [DocId, Owner])
-    end,
-    ok.
-
-
--spec maybe_start_replication(binary(), binary(), {[_]}) -> ok.
-maybe_start_replication(DbName, DocId, RepDoc) ->
-    Rep0 = couch_replicator_docs:parse_rep_doc(RepDoc),
-    #rep{id = {BaseId, _} = RepId} = Rep0,
-    Rep = Rep0#rep{db_name = DbName},
-    case rep_state(RepId) of
-    nil ->
-        couch_log:notice("Attempting to start replication `~s` (document `~s`).",
-            [pp_rep_id(RepId), DocId]),
-        case couch_replicator_scheduler:add_job(Rep) of
-        ok ->
-            ok;
-        {error, already_added} ->
-            couch_log:warning("replicator scheduler: ~p was already added", [Rep])
-        end,
-        ok;
-    #rep{doc_id = DocId} ->
-        ok;
-    #rep{db_name = DbName, doc_id = OtherDocId} ->
-        couch_log:notice("The replication specified by the document `~s` already started"
-            " triggered by the document `~s`", [DocId, OtherDocId]),
-        maybe_tag_rep_doc(DbName, DocId, RepDoc, ?l2b(BaseId))
-    end,
-    ok.
-
-
--spec maybe_tag_rep_doc(binary(), binary(), {[_]}, binary()) -> ok.
-maybe_tag_rep_doc(DbName, DocId, {RepProps}, RepId) ->
-    case get_json_value(<<"_replication_id">>, RepProps) of
-    RepId ->
-        ok;
-    _ ->
-        couch_replicator_docs:update_doc_replication_id(DbName, DocId, RepId)
-    end.
-
-
--spec remove_jobs(binary(), binary()) -> ok.
-remove_jobs(DbName, DocId) ->
-    LogMsg = "Stopped replication `~s` , replication document `~s`",
-    [
-        begin
-            couch_replicator_scheduler:remove_job(RepId),
-            couch_log:notice(LogMsg, [pp_rep_id(RepId), DocId])
-        end || RepId <- find_jobs_by_doc(DbName, DocId)
-    ],
-    ok.
-
-
-% TODO: make this a function in couch_replicator_scheduler API
--spec clean_up_replications(binary()) -> ok.
-clean_up_replications(DbName) ->
-    RepIds = find_jobs_by_dbname(DbName),
-    lists:foreach(fun couch_replicator_scheduler:remove_job/1, RepIds).
-
-
-% TODO: make this a function in couch_replicator_scheduler API
--spec find_jobs_by_dbname(binary()) -> list(#rep{}).
-find_jobs_by_dbname(DbName) ->
-    RepSpec = #rep{db_name = DbName, _ = '_'},
-    MatchSpec = {job, '$1', RepSpec, '_', '_'},
-    [RepId || [RepId] <- ets:match(couch_replicator_scheduler, MatchSpec)].
-
-
-% TODO: make this a function in couch_replicator_scheduler API
--spec find_jobs_by_doc(binary(), binary()) -> list(#rep{}).
-find_jobs_by_doc(DbName, DocId) ->
-    RepSpec =  #rep{db_name = DbName, doc_id = DocId, _ = '_'},
-    MatchSpec = {job, '$1', RepSpec, '_', '_'},
-    [RepId || [RepId] <- ets:match(couch_replicator_scheduler, MatchSpec)].
-
-
 % TODO: make this a function in couch_replicator_scheduler API
 -spec rep_state(rep_id()) -> #rep{} | nil.
 rep_state(RepId) ->

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/fd06ae1d/src/couch_replicator_doc_processor.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_doc_processor.erl b/src/couch_replicator_doc_processor.erl
new file mode 100644
index 0000000..ddaa2b0
--- /dev/null
+++ b/src/couch_replicator_doc_processor.erl
@@ -0,0 +1,150 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_doc_processor).
+-behaviour(couch_multidb_changes).
+
+% multidb changes callback
+-export([db_created/2, db_deleted/2, db_found/2, db_change/3]).
+
+-include_lib("couch/include/couch_db.hrl").
+-include("couch_replicator.hrl").
+
+-import(couch_replicator_utils, [
+    get_json_value/2,
+    get_json_value/3,
+    pp_rep_id/1
+]).
+
+
+db_created(DbName, Server) ->
+    couch_replicator_docs:ensure_rep_ddoc_exists(DbName),
+    Server.
+
+db_deleted(DbName, Server) ->
+    clean_up_replications(DbName),
+    Server.
+
+db_found(DbName, Server) ->
+    couch_replicator_docs:ensure_rep_ddoc_exists(DbName),
+    Server.
+
+db_change(DbName, {ChangeProps} = Change, Server) ->
+    try
+        ok = process_update(DbName, Change)
+    catch
+    _Tag:Error ->
+        {RepProps} = get_json_value(doc, ChangeProps),
+        DocId = get_json_value(<<"_id">>, RepProps),
+        couch_replicator_docs:update_doc_process_error(DbName, DocId, Error)
+    end,
+    Server.
+
+
+-spec process_update(binary(), {[_]}) -> ok.
+process_update(DbName, {Change}) ->
+    {RepProps} = JsonRepDoc = get_json_value(doc, Change),
+    DocId = get_json_value(<<"_id">>, RepProps),
+    Owner = couch_replicator_clustering:owner(DbName, DocId),
+    case {Owner, get_json_value(deleted, Change, false)} of
+    {_, true} ->
+        remove_jobs(DbName, DocId);
+    {unstable, false} ->
+	couch_log:notice("Not starting '~s' as cluster is unstable", [DocId]);
+    {ThisNode, false} when ThisNode =:= node() ->
+        couch_log:notice("Maybe starting '~s' as I'm the owner", [DocId]),
+        case get_json_value(<<"_replication_state">>, RepProps) of
+        undefined ->
+            maybe_start_replication(DbName, DocId, JsonRepDoc);
+        <<"triggered">> ->
+            maybe_start_replication(DbName, DocId, JsonRepDoc);
+        <<"completed">> ->
+            couch_log:notice("Replication '~s' marked as completed", [DocId])
+        end;
+     {Owner, false} ->
+         couch_log:notice("Not starting '~s' as owner is ~s.", [DocId, Owner])
+    end,
+    ok.
+
+
+-spec maybe_start_replication(binary(), binary(), {[_]}) -> ok.
+maybe_start_replication(DbName, DocId, RepDoc) ->
+    Rep0 = couch_replicator_docs:parse_rep_doc(RepDoc),
+    #rep{id = {BaseId, _} = RepId} = Rep0,
+    Rep = Rep0#rep{db_name = DbName},
+    case couch_replicator:rep_state(RepId) of
+    nil ->
+        couch_log:notice("Attempting to start replication `~s` (document `~s`).",
+            [pp_rep_id(RepId), DocId]),
+        case couch_replicator_scheduler:add_job(Rep) of
+        ok ->
+            ok;
+        {error, already_added} ->
+            couch_log:warning("replicator scheduler: ~p was already added", [Rep])
+        end,
+        ok;
+    #rep{doc_id = DocId} ->
+        ok;
+    #rep{db_name = DbName, doc_id = OtherDocId} ->
+        couch_log:notice("The replication specified by the document `~s` already started"
+            " triggered by the document `~s`", [DocId, OtherDocId]),
+        maybe_tag_rep_doc(DbName, DocId, RepDoc, ?l2b(BaseId))
+    end,
+    ok.
+
+
+-spec maybe_tag_rep_doc(binary(), binary(), {[_]}, binary()) -> ok.
+maybe_tag_rep_doc(DbName, DocId, {RepProps}, RepId) ->
+    case get_json_value(<<"_replication_id">>, RepProps) of
+    RepId ->
+        ok;
+    _ ->
+        couch_replicator_docs:update_doc_replication_id(DbName, DocId, RepId)
+    end.
+
+
+-spec remove_jobs(binary(), binary()) -> ok.
+remove_jobs(DbName, DocId) ->
+    LogMsg = "Stopped replication `~s` , replication document `~s`",
+    [
+        begin
+            couch_replicator_scheduler:remove_job(RepId),
+            couch_log:notice(LogMsg, [pp_rep_id(RepId), DocId])
+        end || RepId <- find_jobs_by_doc(DbName, DocId)
+    ],
+    ok.
+
+
+% TODO: make this a function in couch_replicator_scheduler API
+-spec clean_up_replications(binary()) -> ok.
+clean_up_replications(DbName) ->
+    RepIds = find_jobs_by_dbname(DbName),
+    lists:foreach(fun couch_replicator_scheduler:remove_job/1, RepIds).
+
+
+% TODO: make this a function in couch_replicator_scheduler API
+-spec find_jobs_by_dbname(binary()) -> list(#rep{}).
+find_jobs_by_dbname(DbName) ->
+    RepSpec = #rep{db_name = DbName, _ = '_'},
+    MatchSpec = {job, '$1', RepSpec, '_', '_'},
+    [RepId || [RepId] <- ets:match(couch_replicator_scheduler, MatchSpec)].
+
+
+% TODO: make this a function in couch_replicator_scheduler API
+-spec find_jobs_by_doc(binary(), binary()) -> list(#rep{}).
+find_jobs_by_doc(DbName, DocId) ->
+    RepSpec =  #rep{db_name = DbName, doc_id = DocId, _ = '_'},
+    MatchSpec = {job, '$1', RepSpec, '_', '_'},
+    [RepId || [RepId] <- ets:match(couch_replicator_scheduler, MatchSpec)].
+
+
+

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/fd06ae1d/src/couch_replicator_sup.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_sup.erl b/src/couch_replicator_sup.erl
index 8ed65a8..6870d3c 100644
--- a/src/couch_replicator_sup.erl
+++ b/src/couch_replicator_sup.erl
@@ -20,10 +20,10 @@ start_link() ->
 
 init(_Args) ->
     MdbChangesArgs = [
-        <<"_replicator">>,  % DbSuffix
-        couch_replicator,   % Module
-        nil,                % Callback context
-        [skip_ddocs]        % Options
+        <<"_replicator">>,               % DbSuffix
+        couch_replicator_doc_processor,  % Module
+        nil,                             % Callback context
+        [skip_ddocs]                     % Options
     ],
     Children = [
         {couch_replicator_scheduler_sup,


Mime
View raw message