couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rnew...@apache.org
Subject [08/30] couch-replicator commit: updated refs/heads/63012-scheduler-dont-start-immediately to 6a913dc
Date Fri, 03 Jun 2016 15:17:32 GMT
Fix errors in replication manager which prevented replication from runnig.

Add a few more dialyzer types to couch_replication_manager.

Replications tested with `rep.py` utility:

```
rep.replicate_1_to_n_then_check_replication(2)
creating rdyno_src_0001
 > created  1 dbs with prefix rdyno_src_
creating rdyno_tgt_0001
creating rdyno_tgt_0002
 > created  2 dbs with prefix rdyno_tgt_
updating documents
 > _replicator rdyno_0001_0001 : 1-34f09a6a62181f4d4631bf8544b582fc
 > _replicator rdyno_0001_0002 : 1-0f52a360f959fea67e6cb20aa1bb0606
waiting for replication documents to trigger
 > retrying function wait_to_trigger
 > function wait_to_trigger succeded after 50.088 +/- 10.0  sec.
all replication documents triggered
>>> update cycle 0  <<<
 > waiting for target  1
 > function wait_till_dbs_equal succeded after 0.038 +/- 1.0  sec.
 > waiting for target  2
 > function wait_till_dbs_equal succeded after 0.025 +/- 1.0  sec.
 > waiting to propagate changes from  1 to 2  : 0.120 sec.

```


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/c8dbd7b7
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/c8dbd7b7
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/c8dbd7b7

Branch: refs/heads/63012-scheduler-dont-start-immediately
Commit: c8dbd7b7f3fdb79f98d9536b8d91bfe9c33b8fc2
Parents: 30c033d
Author: Nick Vatamaniuc <vatamane@gmail.com>
Authored: Mon May 16 18:47:17 2016 -0400
Committer: Nick Vatamaniuc <vatamane@gmail.com>
Committed: Mon May 16 18:47:17 2016 -0400

----------------------------------------------------------------------
 src/couch_replicator_manager.erl | 54 +++++++++++++++++++++++------------
 1 file changed, 36 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/c8dbd7b7/src/couch_replicator_manager.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_manager.erl b/src/couch_replicator_manager.erl
index 710edc5..63d625e 100644
--- a/src/couch_replicator_manager.erl
+++ b/src/couch_replicator_manager.erl
@@ -77,7 +77,7 @@ db_change(DbName, Change, Server) ->
     Server.
 
 
-
+-spec replication_started(#rep{}) -> ok.
 replication_started(#rep{id = RepId}) ->
     case rep_state(RepId) of
     nil ->
@@ -89,9 +89,11 @@ replication_started(#rep{id = RepId}) ->
         % now just write triggered for compatibility, in the future do something
         % in the scheduler to handle repeated failed starts
         couch_log:notice("Document `~s` triggered replication `~s`",
-            [DocId, pp_rep_id(RepId)])
+            [DocId, pp_rep_id(RepId)]),
+        ok
     end.
 
+-spec replication_completed(#rep{}, list()) -> ok.
 replication_completed(#rep{id = RepId}, Stats) ->
     case rep_state(RepId) of
     nil ->
@@ -100,10 +102,12 @@ replication_completed(#rep{id = RepId}, Stats) ->
         couch_replicator_docs:update_doc_completed(DbName, DocId, Stats),
         ok = gen_server:call(?MODULE, {rep_complete, RepId}, infinity),
         couch_log:notice("Replication `~s` finished (triggered by document `~s`)",
-            [pp_rep_id(RepId), DocId])
+            [pp_rep_id(RepId), DocId]),
+        ok
     end.
 
 
+-spec replication_usurped(#rep{}, node()) -> ok.
 replication_usurped(#rep{id = RepId}, By) ->
     case rep_state(RepId) of
     nil ->
@@ -111,22 +115,25 @@ replication_usurped(#rep{id = RepId}, By) ->
     #rep{doc_id = DocId} ->
         ok = gen_server:call(?MODULE, {rep_complete, RepId}, infinity),
         couch_log:notice("Replication `~s` usurped by ~s (triggered by document `~s`)",
-            [pp_rep_id(RepId), By, DocId])
+            [pp_rep_id(RepId), By, DocId]),
+        ok
     end.
 
-
+-spec replication_error(#rep{}, any()) -> ok.
 replication_error(#rep{id = RepId}, Error) ->
     case rep_state(RepId) of
     nil ->
         ok;
     #rep{db_name = DbName, doc_id = DocId} ->
         % NV: TODO: later, perhaps don't update doc on each error
-         couch_replicator_docs:update_doc_error(DbName, DocId, RepId, Error),
+        couch_replicator_docs:update_doc_error(DbName, DocId, RepId, Error),
         ok = gen_server:call(?MODULE, {rep_error, RepId, Error}, infinity)
     end.
 
 
 % NV: TODO: Here need to use the new cluster ownership bit.
+-spec continue(#rep{}) -> {true, no_owner | unstable | node()} |
+    {false, node()}.
 continue(#rep{doc_id = null}) ->
     {true, no_owner};
 continue(#rep{id = RepId}) ->
@@ -226,12 +233,12 @@ terminate(_Reason, #state{rep_start_pids = StartPids}) ->
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
 
-
+-spec process_update(#state{}, binary(), tuple()) -> #state{}.
 process_update(State, DbName, {Change}) ->
     {RepProps} = JsonRepDoc = get_json_value(doc, Change),
     DocId = get_json_value(<<"_id">>, RepProps),
-    Owner = couch_replicator_clustering:owner(DbName, DocId),
-    case {Owner, get_json_value(deleted, Change, false)} of
+    OwnerRes = couch_replicator_clustering:owner(DbName, DocId),
+    case {OwnerRes, get_json_value(deleted, Change, false)} of
     {_, true} ->
         rep_doc_deleted(DbName, DocId),
         State;
@@ -261,7 +268,7 @@ process_update(State, DbName, {Change}) ->
         end
     end.
 
-
+-spec maybe_start_replication(#state{}, binary(), binary(), tuple()) -> #state{}.
 maybe_start_replication(State, DbName, DocId, RepDoc) ->
     Rep0 = couch_replicator_docs:parse_rep_doc(RepDoc),
     #rep{id = {BaseId, _} = RepId} = Rep0,
@@ -285,7 +292,7 @@ maybe_start_replication(State, DbName, DocId, RepDoc) ->
         State
     end.
 
-
+-spec maybe_tag_rep_doc(binary(), binary(), tuple(), binary()) -> ok.
 maybe_tag_rep_doc(DbName, DocId, {RepProps}, RepId) ->
     case get_json_value(<<"_replication_id">>, RepProps) of
     RepId ->
@@ -294,26 +301,31 @@ maybe_tag_rep_doc(DbName, DocId, {RepProps}, RepId) ->
         couch_replicator_docs:update_doc_replication_id(DbName, DocId, RepId)
     end.
 
+-spec start_replication(#rep{}) -> ok.
 start_replication(Rep) ->
     % NV: TODO: Removed splay and back-off sleep on error. Instead to replace that
     % temporarily add some random sleep here. To avoid repeated failed restarts in
     % a loop if source doc is broken
     timer:sleep(random:uniform(1000)),
     case (catch couch_replicator_scheduler:add_job(Rep)) of
-    {ok, _} ->
+    ok ->
         ok;
-    Error ->
+    {error, Error} ->
+        couch_log:error("replicator scheduler add_job ~p failed: ~p", [Rep, Error]),
         replication_error(Rep, Error)
     end.
 
+-spec replication_complete(binary(), binary()) -> ok.
 replication_complete(DbName, DocId) ->
     case ets:lookup(?DOC_TO_REP, {DbName, DocId}) of
     [{{DbName, DocId}, _RepId}] ->
-        true = ets:delete(?DOC_TO_REP, {DbName, DocId});
+        true = ets:delete(?DOC_TO_REP, {DbName, DocId}),
+        ok;
     _ ->
         ok
     end.
 
+-spec rep_doc_deleted(binary(), binary()) -> ok.
 rep_doc_deleted(DbName, DocId) ->
     case ets:lookup(?DOC_TO_REP, {DbName, DocId}) of
     [{{DbName, DocId}, RepId}] ->
@@ -321,12 +333,12 @@ rep_doc_deleted(DbName, DocId) ->
         true = ets:delete(?REP_TO_STATE, RepId),
         true = ets:delete(?DOC_TO_REP, {DbName, DocId}),
         couch_log:notice("Stopped replication `~s` because replication document `~s`"
-            " was deleted", [pp_rep_id(RepId), DocId]);
+            " was deleted", [pp_rep_id(RepId), DocId]),
+        ok;
     [] ->
         ok
     end.
 
-
 replication_error(State, RepId, Error) ->
     case rep_state(RepId) of
     nil ->
@@ -335,6 +347,7 @@ replication_error(State, RepId, Error) ->
         maybe_retry_replication(RepState, Error, State)
     end.
 
+-spec maybe_retry_replication(#rep{}, any(), #state{}) -> #state{}.
 maybe_retry_replication(#rep{id = RepId, doc_id = DocId} = Rep, Error, State) ->
     ErrorBinary = couch_replicator_utils:rep_error_to_binary(Error),
     couch_log:error("Error in replication `~s` (triggered by `~s`): ~s",
@@ -347,6 +360,7 @@ maybe_retry_replication(#rep{id = RepId, doc_id = DocId} = Rep, Error,
State) ->
     }.
 
 
+-spec stop_all_replications() -> ok.
 stop_all_replications() ->
     couch_log:notice("Stopping all ongoing replications", []),
     ets:foldl(
@@ -355,9 +369,11 @@ stop_all_replications() ->
         end,
         ok, ?DOC_TO_REP),
     true = ets:delete_all_objects(?REP_TO_STATE),
-    true = ets:delete_all_objects(?DOC_TO_REP).
+    true = ets:delete_all_objects(?DOC_TO_REP),
+    ok.
 
 
+-spec clean_up_replications(binary()) -> ok.
 clean_up_replications(DbName) ->
     ets:foldl(
         fun({{Name, DocId}, RepId}, _) when Name =:= DbName ->
@@ -367,10 +383,12 @@ clean_up_replications(DbName) ->
            ({_,_}, _) ->
             ok
         end,
-        ok, ?DOC_TO_REP).
+        ok, ?DOC_TO_REP),
+    ok.
 
 
 % pretty-print replication id
+-spec pp_rep_id(#rep{}) -> string().
 pp_rep_id(#rep{id = RepId}) ->
     pp_rep_id(RepId);
 pp_rep_id({Base, Extension}) ->


Mime
View raw message