couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vatam...@apache.org
Subject [couchdb] branch 63012-scheduler updated: [fixup] more style formatting
Date Mon, 10 Apr 2017 15:00:05 GMT
This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch 63012-scheduler
in repository https://gitbox.apache.org/repos/asf/couchdb.git

The following commit(s) were added to refs/heads/63012-scheduler by this push:
       new  e612172   [fixup] more style formatting
e612172 is described below

commit e6121724f60cd5643341f0d97d9565a64a668dc5
Author: Nick Vatamaniuc <vatamane@apache.org>
AuthorDate: Mon Apr 10 10:59:59 2017 -0400

    [fixup] more style formatting
---
 src/couch_replicator/src/couch_multidb_changes.erl |  41 ++-
 src/couch_replicator/src/couch_replicator.erl      |   7 +-
 .../src/couch_replicator_clustering.erl            |   3 -
 .../src/couch_replicator_connection.erl            |   4 -
 .../src/couch_replicator_db_changes.erl            |   1 -
 .../src/couch_replicator_doc_processor.erl         |  10 +-
 src/couch_replicator/src/couch_replicator_docs.erl |   5 +-
 .../src/couch_replicator_filters.erl               |   2 +-
 src/couch_replicator/src/couch_replicator_ids.erl  |   3 +-
 .../src/couch_replicator_manager.erl               |   5 +-
 .../src/couch_replicator_rate_limiter.erl          |   9 +-
 .../src/couch_replicator_rate_limiter_tables.erl   |   1 -
 .../src/couch_replicator_scheduler.erl             |  21 +-
 .../src/couch_replicator_scheduler_job.erl         | 274 +++++++++++----------
 .../src/couch_replicator_scheduler_sup.erl         |  10 +-
 15 files changed, 204 insertions(+), 192 deletions(-)

diff --git a/src/couch_replicator/src/couch_multidb_changes.erl b/src/couch_replicator/src/couch_multidb_changes.erl
index d8e4e05..ba624a4 100644
--- a/src/couch_replicator/src/couch_multidb_changes.erl
+++ b/src/couch_replicator/src/couch_multidb_changes.erl
@@ -14,7 +14,6 @@
 
 -behaviour(gen_server).
 
-
 -export([
    start_link/4
 ]).
@@ -187,13 +186,10 @@ register_with_event_server(Server) ->
 -spec db_callback(created | deleted | updated, binary(), #state{}) -> #state{}.
 db_callback(created, DbName, #state{mod = Mod, ctx = Ctx} = State) ->
     State#state{ctx = Mod:db_created(DbName, Ctx)};
-
 db_callback(deleted, DbName, #state{mod = Mod, ctx = Ctx} = State) ->
     State#state{ctx = Mod:db_deleted(DbName, Ctx)};
-
 db_callback(updated, DbName, State) ->
     resume_scan(DbName, State);
-
 db_callback(_Other, _DbName, State) ->
     State.
 
@@ -249,11 +245,9 @@ changes_reader(Server, DbName, Since) ->
 changes_reader_cb({change, Change, _}, _, {Server, DbName}) ->
     ok = gen_server:call(Server, {change, DbName, Change}, infinity),
     {Server, DbName};
-
 changes_reader_cb({stop, EndSeq}, _, {Server, DbName}) ->
     ok = gen_server:call(Server, {checkpoint, DbName, EndSeq}, infinity),
     {Server, DbName};
-
 changes_reader_cb(_, _, Acc) ->
     Acc.
 
@@ -318,7 +312,6 @@ is_design_doc({Change}) ->
 
 is_design_doc_id(<<?DESIGN_DOC_PREFIX, _/binary>>) ->
     true;
-
 is_design_doc_id(_) ->
     false.
 
@@ -367,6 +360,7 @@ couch_multidb_changes_test_() ->
         ]
     }.
 
+
 setup() ->
     mock_logs(),
     mock_callback_mod(),
@@ -403,6 +397,7 @@ t_handle_call_change() ->
         ?assert(meck:called(?MOD, db_change, [?DBNAME, Change, zig]))
     end).
 
+
 t_handle_call_change_filter_design_docs() ->
     ?_test(begin
         State0 = mock_state(),
@@ -413,6 +408,7 @@ t_handle_call_change_filter_design_docs() ->
         ?assertNot(meck:called(?MOD, db_change, [?DBNAME, Change, zig]))
     end).
 
+
 t_handle_call_checkpoint_new() ->
     ?_test(begin
         Tid = mock_ets(),
@@ -422,6 +418,7 @@ t_handle_call_checkpoint_new() ->
         ets:delete(Tid)
     end).
 
+
 t_handle_call_checkpoint_existing() ->
     ?_test(begin
         Tid = mock_ets(),
@@ -432,6 +429,7 @@ t_handle_call_checkpoint_existing() ->
         ets:delete(Tid)
     end).
 
+
 t_handle_info_created() ->
     ?_test(begin
         State = mock_state(),
@@ -440,6 +438,7 @@ t_handle_info_created() ->
         ?assert(meck:called(?MOD, db_created, [?DBNAME, zig]))
     end).
 
+
 t_handle_info_deleted() ->
      ?_test(begin
         State = mock_state(),
@@ -448,6 +447,7 @@ t_handle_info_deleted() ->
         ?assert(meck:called(?MOD, db_deleted, [?DBNAME, zig]))
     end).
 
+
 t_handle_info_updated() ->
      ?_test(begin
         Tid = mock_ets(),
@@ -457,12 +457,14 @@ t_handle_info_updated() ->
         ?assert(meck:called(?MOD, db_found, [?DBNAME, zig]))
     end).
 
+
 t_handle_info_other_event() ->
      ?_test(begin
         State = mock_state(),
         handle_info_check({'$couch_event', ?DBNAME, somethingelse}, State)
     end).
 
+
 t_handle_info_created_other_db() ->
      ?_test(begin
         State = mock_state(),
@@ -470,6 +472,7 @@ t_handle_info_created_other_db() ->
         ?assertNot(meck:called(?MOD, db_created, [?DBNAME, zig]))
     end).
 
+
 t_handle_info_scanner_exit_normal() ->
     ?_test(begin
         Res = handle_info({'EXIT', spid, normal}, mock_state()),
@@ -478,18 +481,21 @@ t_handle_info_scanner_exit_normal() ->
         ?assertEqual(nil, RState#state.scanner)
     end).
 
+
 t_handle_info_scanner_crashed() ->
     ?_test(begin
         Res = handle_info({'EXIT', spid, oops}, mock_state()),
         ?assertMatch({stop, {scanner_died, oops}, _State}, Res)
     end).
 
+
 t_handle_info_event_server_exited() ->
     ?_test(begin
         Res = handle_info({'DOWN', esref, type, espid, reason}, mock_state()),
         ?assertMatch({stop, {couch_event_server_died, reason}, _}, Res)
     end).
 
+
 t_handle_info_unknown_pid_exited() ->
     ?_test(begin
         State0 = mock_state(),
@@ -500,6 +506,7 @@ t_handle_info_unknown_pid_exited() ->
         ?assertMatch({stop, {unexpected_exit, somepid, oops}, State1}, Res1)
     end).
 
+
 t_handle_info_change_feed_exited() ->
     ?_test(begin
         Tid0 = mock_ets(),
@@ -518,6 +525,7 @@ t_handle_info_change_feed_exited() ->
         ets:delete(Tid1)
     end).
 
+
 t_handle_info_change_feed_exited_and_need_rescan() ->
     ?_test(begin
         Tid = mock_ets(),
@@ -536,6 +544,7 @@ t_handle_info_change_feed_exited_and_need_rescan() ->
         ets:delete(Tid)
     end).
 
+
 t_spawn_changes_reader() ->
     ?_test(begin
         Pid = start_changes_reader(?DBNAME, 3),
@@ -554,6 +563,7 @@ t_spawn_changes_reader() ->
             }, {json_req, null}, db]))
     end).
 
+
 t_changes_reader_cb_change() ->
     ?_test(begin
         {ok, Pid} = start_link(?SUFFIX, ?MOD, zig, []),
@@ -565,6 +575,7 @@ t_changes_reader_cb_change() ->
         exit(Pid, kill)
     end).
 
+
 t_changes_reader_cb_stop() ->
     ?_test(begin
         {ok, Pid} = start_link(?SUFFIX, ?MOD, zig, []),
@@ -577,9 +588,11 @@ t_changes_reader_cb_stop() ->
         exit(Pid, kill)
     end).
 
+
 t_changes_reader_cb_other() ->
     ?_assertEqual(acc, changes_reader_cb(other, chtype, acc)).
 
+
 t_handle_call_resume_scan_no_chfeed_no_ets_entry() ->
     ?_test(begin
         Tid = mock_ets(),
@@ -603,6 +616,7 @@ t_handle_call_resume_scan_no_chfeed_no_ets_entry() ->
         ets:delete(Tid)
     end).
 
+
 t_handle_call_resume_scan_chfeed_no_ets_entry() ->
     ?_test(begin
         Tid = mock_ets(),
@@ -615,6 +629,7 @@ t_handle_call_resume_scan_chfeed_no_ets_entry() ->
         kill_mock_change_reader_and_get_its_args(Pid)
     end).
 
+
 t_handle_call_resume_scan_chfeed_ets_entry() ->
     ?_test(begin
         Tid = mock_ets(),
@@ -628,6 +643,7 @@ t_handle_call_resume_scan_chfeed_ets_entry() ->
         kill_mock_change_reader_and_get_its_args(Pid)
     end).
 
+
 t_handle_call_resume_scan_no_chfeed_ets_entry() ->
     ?_test(begin
         Tid = mock_ets(),
@@ -650,6 +666,7 @@ t_handle_call_resume_scan_no_chfeed_ets_entry() ->
         ets:delete(Tid)
     end).
 
+
 t_start_link() ->
     ?_test(begin
         {ok, Pid} = start_link(?SUFFIX, ?MOD, nil, []),
@@ -666,6 +683,7 @@ t_start_link() ->
         ?assert(meck:called(couch_event, register_all, [Pid]))
     end).
 
+
 t_start_link_no_ddocs() ->
     ?_test(begin
         {ok, Pid} = start_link(?SUFFIX, ?MOD, nil, [skip_ddocs]),
@@ -681,6 +699,7 @@ t_start_link_no_ddocs() ->
         exit(Pid, kill)
     end).
 
+
 t_misc_gen_server_callbacks() ->
     ?_test(begin
         ?assertEqual(ok, terminate(reason, state)),
@@ -786,15 +805,18 @@ kill_mock_change_reader_and_get_its_args(Pid) ->
             erlang:error(spawn_change_reader_timeout)
     end.
 
+
 mock_changes_reader() ->
     meck:expect(couch_changes, handle_db_changes,
         fun(_ChArgs, _Req, db) ->
             fun mock_changes_reader_loop/1
         end).
 
+
 mock_ets() ->
     ets:new(multidb_test_ets, [set, public]).
 
+
 mock_state() ->
     #state{
         mod = ?MOD,
@@ -804,10 +826,12 @@ mock_state() ->
         scanner = spid,
         pids = []}.
 
+
 mock_state(Ets) ->
     State = mock_state(),
     State#state{tid = Ets}.
 
+
 mock_state(Ets, Pid) ->
     State = mock_state(Ets),
     State#state{pids = [{?DBNAME, Pid}]}.
@@ -821,10 +845,13 @@ change_row(Id) when is_binary(Id) ->
         {doc, {[{<<"_id">>, Id}, {<<"_rev">>, <<"1-f00">>}]}}
     ]}.
 
+
 handle_call_ok(Msg, State) ->
     ?assertMatch({reply, ok, _}, handle_call(Msg, from, State)).
 
+
 handle_info_check(Msg, State) ->
     ?assertMatch({noreply, _}, handle_info(Msg, State)).
 
+
 -endif.
diff --git a/src/couch_replicator/src/couch_replicator.erl b/src/couch_replicator/src/couch_replicator.erl
index df5a347..0b66331 100644
--- a/src/couch_replicator/src/couch_replicator.erl
+++ b/src/couch_replicator/src/couch_replicator.erl
@@ -182,7 +182,6 @@ stream_active_docs_info(Cb, UserAcc, States) ->
     any().
 stream_active_docs_info([], _Cb, UserAcc, _States) ->
     UserAcc;
-
 stream_active_docs_info([Node | Nodes], Cb, UserAcc, States) ->
     case rpc:call(Node, couch_replicator_doc_processor, docs, [States]) of
         {badrpc, Reason} ->
@@ -201,10 +200,10 @@ stream_active_docs_info([Node | Nodes], Cb, UserAcc, States) ->
     ({meta, any()}, query_acc()) -> {ok,  query_acc()};
     (complete, query_acc()) -> {ok, query_acc()}.
 handle_replicator_doc_query({row, Props}, {Db, Cb, UserAcc, States}) ->
-    DocId = couch_util:get_value(id, Props),
-    DocStateBin = couch_util:get_value(key, Props),
+    DocId = get_value(id, Props),
+    DocStateBin = get_value(key, Props),
     DocState = erlang:binary_to_existing_atom(DocStateBin, utf8),
-    MapValue = couch_util:get_value(value, Props),
+    MapValue = get_value(value, Props),
     {Source, Target, StartTime, StateTime, StateInfo} = case MapValue of
         [Src, Tgt, StartT, StateT, Info] ->
             {Src, Tgt, StartT, StateT, Info};
diff --git a/src/couch_replicator/src/couch_replicator_clustering.erl b/src/couch_replicator/src/couch_replicator_clustering.erl
index 585916a..7231ece 100644
--- a/src/couch_replicator/src/couch_replicator_clustering.erl
+++ b/src/couch_replicator/src/couch_replicator_clustering.erl
@@ -27,10 +27,8 @@
 -module(couch_replicator_clustering).
 
 -behaviour(gen_server).
-
 -behaviour(config_listener).
 
-
 -export([
    start_link/0
 ]).
@@ -225,7 +223,6 @@ handle_config_change(_, _, _, _, S) ->
 
 
 handle_config_terminate(_, stop, _) -> ok;
-
 handle_config_terminate(_S, _R, _St) ->
     Pid = whereis(?MODULE),
     erlang:send_after(?RELISTEN_DELAY, Pid, restart_config_listener).
diff --git a/src/couch_replicator/src/couch_replicator_connection.erl b/src/couch_replicator/src/couch_replicator_connection.erl
index c1f1048..627f95b 100644
--- a/src/couch_replicator/src/couch_replicator_connection.erl
+++ b/src/couch_replicator/src/couch_replicator_connection.erl
@@ -13,10 +13,8 @@
 -module(couch_replicator_connection).
 
 -behavior(gen_server).
-
 -behavior(config_listener).
 
-
 -export([
     start_link/0
 ]).
@@ -40,10 +38,8 @@
     handle_config_terminate/3
 ]).
 
-
 -include_lib("ibrowse/include/ibrowse.hrl").
 
-
 -define(DEFAULT_CLOSE_INTERVAL, 90000).
 -define(RELISTEN_DELAY, 5000).
 
diff --git a/src/couch_replicator/src/couch_replicator_db_changes.erl b/src/couch_replicator/src/couch_replicator_db_changes.erl
index f58eac8..92b0222 100644
--- a/src/couch_replicator/src/couch_replicator_db_changes.erl
+++ b/src/couch_replicator/src/couch_replicator_db_changes.erl
@@ -101,7 +101,6 @@ restart_mdb_changes(#state{mdb_changes = _Pid} = State) ->
 -spec stop_mdb_changes(#state{}) -> #state{}.
 stop_mdb_changes(#state{mdb_changes = nil} = State) ->
     State;
-
 stop_mdb_changes(#state{mdb_changes = Pid} = State) ->
     couch_log:notice("Stopping replicator db changes listener ~p", [Pid]),
     unlink(Pid),
diff --git a/src/couch_replicator/src/couch_replicator_doc_processor.erl b/src/couch_replicator/src/couch_replicator_doc_processor.erl
index 4ef7c95..8b19836 100644
--- a/src/couch_replicator/src/couch_replicator_doc_processor.erl
+++ b/src/couch_replicator/src/couch_replicator_doc_processor.erl
@@ -13,7 +13,6 @@
 -module(couch_replicator_doc_processor).
 
 -behaviour(gen_server).
-
 -behaviour(couch_multidb_changes).
 
 -export([
@@ -44,7 +43,6 @@
     notify_cluster_event/2
 ]).
 
-
 -include_lib("couch/include/couch_db.hrl").
 -include("couch_replicator.hrl").
 
@@ -396,6 +394,7 @@ worker_returned(Ref, Id, {permanent_failure, _Reason}) ->
     end,
     ok.
 
+
 -spec maybe_update_doc_error(#rep{}, any()) -> ok.
 maybe_update_doc_error(Rep, Reason) ->
     case update_docs() of
@@ -405,6 +404,7 @@ maybe_update_doc_error(Rep, Reason) ->
             ok
     end.
 
+
 -spec maybe_update_doc_triggered(#rep{}, rep_id()) -> ok.
 maybe_update_doc_triggered(Rep, RepId) ->
     case update_docs() of
@@ -467,13 +467,12 @@ maybe_start_worker(Id) ->
 -spec get_worker_wait(#rdoc{}) -> seconds().
 get_worker_wait(#rdoc{state = scheduled, filter = user}) ->
     filter_backoff();
-
 get_worker_wait(#rdoc{state = error, errcnt = ErrCnt}) ->
     error_backoff(ErrCnt);
-
 get_worker_wait(#rdoc{state = initializing}) ->
     0.
 
+
 -spec update_docs() -> boolean().
 update_docs() ->
     config:get_boolean("replicator", "update_docs", ?DEFAULT_UPDATE_DOCS).
@@ -575,11 +574,9 @@ ejson_doc(#rdoc{state = RepState} = RDoc, _HealthThreshold) ->
     ]}.
 
 
-
 -spec ejson_doc_state_filter(atom(), [atom()]) -> boolean().
 ejson_doc_state_filter(_DocState, []) ->
     true;
-
 ejson_doc_state_filter(State, States) when is_list(States), is_atom(State) ->
     lists:member(State, States).
 
@@ -946,5 +943,4 @@ bad_change() ->
         ]}}
     ]}.
 
-
 -endif.
diff --git a/src/couch_replicator/src/couch_replicator_docs.erl b/src/couch_replicator/src/couch_replicator_docs.erl
index f0d3157..6859d5b 100644
--- a/src/couch_replicator/src/couch_replicator_docs.erl
+++ b/src/couch_replicator/src/couch_replicator_docs.erl
@@ -451,7 +451,6 @@ parse_rep_db(undefined, _Proxy, _Options) ->
 -spec maybe_add_trailing_slash(binary() | list()) -> list().
 maybe_add_trailing_slash(Url) when is_binary(Url) ->
     maybe_add_trailing_slash(?b2l(Url));
-
 maybe_add_trailing_slash(Url) ->
     case lists:last(Url) of
     $/ ->
@@ -561,6 +560,7 @@ check_options(Options) ->
                 "`doc_ids`,`filter`,`selector` are mutually exclusive"})
     end.
 
+
 -spec parse_proxy_params(binary() | [_]) -> [_].
 parse_proxy_params(ProxyUrl) when is_binary(ProxyUrl) ->
     parse_proxy_params(?b2l(ProxyUrl));
@@ -716,6 +716,7 @@ check_options_pass_values_test() ->
     ?assertEqual(check_options([{filter, x}]), [{filter, x}]),
     ?assertEqual(check_options([{selector, x}]), [{selector, x}]).
 
+
 check_options_fail_values_test() ->
     ?assertThrow({bad_request, _},
         check_options([{doc_ids, x}, {filter, y}])),
@@ -726,6 +727,7 @@ check_options_fail_values_test() ->
     ?assertThrow({bad_request, _},
         check_options([{doc_ids, x}, {selector, y}, {filter, z}])).
 
+
 check_convert_options_pass_test() ->
     ?assertEqual([], convert_options([])),
     ?assertEqual([], convert_options([{<<"random">>, 42}])),
@@ -740,6 +742,7 @@ check_convert_options_pass_test() ->
     ?assertEqual([{selector, {key, value}}],
         convert_options([{<<"selector">>, {key, value}}])).
 
+
 check_convert_options_fail_test() ->
     ?assertThrow({bad_request, _},
         convert_options([{<<"cancel">>, <<"true">>}])),
diff --git a/src/couch_replicator/src/couch_replicator_filters.erl b/src/couch_replicator/src/couch_replicator_filters.erl
index 96a6ea1..5668820 100644
--- a/src/couch_replicator/src/couch_replicator_filters.erl
+++ b/src/couch_replicator/src/couch_replicator_filters.erl
@@ -152,7 +152,6 @@ fetch_internal(DDocName, FilterName, Source, UserCtx) ->
     end.
 
 
-
 -spec query_params([_]) -> {[_]}.
 query_params(Options)->
     couch_util:get_value(query_params, Options, {[]}).
@@ -202,6 +201,7 @@ ejsort_basic_values_test() ->
     ?assertEqual(ejsort([]), []),
     ?assertEqual(ejsort({[]}), {[]}).
 
+
 ejsort_compound_values_test() ->
     ?assertEqual(ejsort([2, 1, 3, <<"a">>]), [2, 1, 3, <<"a">>]),
     Ej1 = {[{<<"a">>, 0}, {<<"c">>, 0},  {<<"b">>, 0}]},
diff --git a/src/couch_replicator/src/couch_replicator_ids.erl b/src/couch_replicator/src/couch_replicator_ids.erl
index 175b3e5..7f26db7 100644
--- a/src/couch_replicator/src/couch_replicator_ids.erl
+++ b/src/couch_replicator/src/couch_replicator_ids.erl
@@ -32,6 +32,7 @@ replication_id(#rep{options = Options} = Rep) ->
     BaseId = replication_id(Rep, ?REP_ID_VERSION),
     {BaseId, maybe_append_options([continuous, create_target], Options)}.
 
+
 % Versioned clauses for generating replication IDs.
 % If a change is made to how replications are identified,
 % please add a new clause and increase ?REP_ID_VERSION.
@@ -69,10 +70,8 @@ replication_id(#rep{user_ctx = UserCtx} = Rep, 1) ->
 -spec convert([_] | binary() | {string(), string()}) -> {string(), string()}.
 convert(Id) when is_list(Id) ->
     convert(?l2b(Id));
-
 convert(Id) when is_binary(Id) ->
     lists:splitwith(fun(Char) -> Char =/= $+ end, ?b2l(Id));
-
 convert({BaseId, Ext} = Id) when is_list(BaseId), is_list(Ext) ->
     Id.
 
diff --git a/src/couch_replicator/src/couch_replicator_manager.erl b/src/couch_replicator/src/couch_replicator_manager.erl
index 6a79ce6..afccc0b 100644
--- a/src/couch_replicator/src/couch_replicator_manager.erl
+++ b/src/couch_replicator/src/couch_replicator_manager.erl
@@ -12,8 +12,9 @@
 
 -module(couch_replicator_manager).
 
-% TODO: This is a temporary proxy module to external calls (outside replicator) to other

-% replicator modules. This is done to avoid juggling multiple repos during development.
+% TODO: This is a temporary proxy module to external calls (outside replicator)
+%  to other replicator modules. This is done to avoid juggling multiple repos
+% during development.
 
 % NV: TODO: These functions were moved to couch_replicator_docs
 % but it is still called from fabric_doc_update. Keep it here for now
diff --git a/src/couch_replicator/src/couch_replicator_rate_limiter.erl b/src/couch_replicator/src/couch_replicator_rate_limiter.erl
index 7d5f4b7..e84ae99 100644
--- a/src/couch_replicator/src/couch_replicator_rate_limiter.erl
+++ b/src/couch_replicator/src/couch_replicator_rate_limiter.erl
@@ -41,7 +41,6 @@
 
 -behaviour(gen_server).
 
-
 -export([
    start_link/0
 ]).
@@ -62,9 +61,7 @@
    success/1
 ]).
 
-
 % Types
-
 -type key() :: any().
 -type interval() :: non_neg_integer().
 -type msec() :: non_neg_integer().
@@ -72,7 +69,6 @@
 
 % Definitions
 
-
 % Main parameters of the algorithm. The factor is the multiplicative part and
 % base interval is the additive.
 -define(BASE_INTERVAL, 20).
@@ -241,13 +237,10 @@ time_decay(_Dt, Interval) ->
 -spec additive_factor(interval()) -> interval().
 additive_factor(Interval) when Interval > 10000 ->
     ?BASE_INTERVAL * 50;
-
- additive_factor(Interval) when Interval > 1000 ->
+additive_factor(Interval) when Interval > 1000 ->
     ?BASE_INTERVAL * 5;
-
 additive_factor(Interval) when Interval > 100 ->
     ?BASE_INTERVAL * 2;
-
 additive_factor(_Interval) ->
     ?BASE_INTERVAL.
 
diff --git a/src/couch_replicator/src/couch_replicator_rate_limiter_tables.erl b/src/couch_replicator/src/couch_replicator_rate_limiter_tables.erl
index bad3264..dd5cdaa 100644
--- a/src/couch_replicator/src/couch_replicator_rate_limiter_tables.erl
+++ b/src/couch_replicator/src/couch_replicator_rate_limiter_tables.erl
@@ -31,7 +31,6 @@
    term_to_table/1
 ]).
 
-
 -define(SHARDS_N, 16).
 
 
diff --git a/src/couch_replicator/src/couch_replicator_scheduler.erl b/src/couch_replicator/src/couch_replicator_scheduler.erl
index 233f31b..18118e1 100644
--- a/src/couch_replicator/src/couch_replicator_scheduler.erl
+++ b/src/couch_replicator/src/couch_replicator_scheduler.erl
@@ -13,10 +13,8 @@
 -module(couch_replicator_scheduler).
 
 -behaviour(gen_server).
-
 -behaviour(config_listener).
 
-
 -export([
     start_link/0
 ]).
@@ -50,13 +48,11 @@
     handle_config_terminate/3
 ]).
 
-
 -include("couch_replicator_scheduler.hrl").
 -include("couch_replicator.hrl").
 -include("couch_replicator_api_wrap.hrl").
 -include_lib("couch/include/couch_db.hrl").
 
-
 %% types
 -type event_type() :: added | started | stopped | {crashed, any()}.
 -type event() :: {Type:: event_type(), When :: erlang:timestamp()}.
@@ -163,7 +159,6 @@ job_summary(JobId, HealthThreshold) ->
 
 job_proxy_url(#httpdb{proxy_url = ProxyUrl}) when is_list(ProxyUrl) ->
     list_to_binary(couch_util:url_strip_password(ProxyUrl));
-
 job_proxy_url(_Endpoint) ->
     null.
 
@@ -188,8 +183,6 @@ find_jobs_by_doc(DbName, DocId) ->
     [RepId || [RepId] <- ets:match(?MODULE, MatchSpec)].
 
 
-
-
 %% gen_server functions
 
 init(_) ->
@@ -331,8 +324,7 @@ handle_config_terminate(_, _, _) ->
     erlang:send_after(?RELISTEN_DELAY, Pid, restart_config_listener).
 
 
-%% private functions
-
+%% Private functions
 
 % Handle crashed jobs. Handling differs between transient and permanent jobs.
 % Transient jobs are those posted to the _replicate endpoint. They don't have a
@@ -389,6 +381,7 @@ maybe_start_newly_added_job(Job, State) ->
             ok
     end.
 
+
 % Return up to a given number of oldest, not recently crashed jobs. Try to be
 % memory efficient and use ets:foldl to accumulate jobs.
 -spec pending_jobs(non_neg_integer()) -> [#job{}].
@@ -621,6 +614,7 @@ job_by_pid(Pid) when is_pid(Pid) ->
             {ok, Job}
     end.
 
+
 -spec job_by_id(job_id()) -> {ok, #job{}} | {error, not_found}.
 job_by_id(Id) ->
     case ets:lookup(?MODULE, Id) of
@@ -705,6 +699,7 @@ start_pending_jobs(State, Running, Pending) ->
         ok
     end.
 
+
 -spec rotate_jobs(#state{}, non_neg_integer(), non_neg_integer()) -> ok.
 rotate_jobs(State, Running, Pending) ->
     #state{max_jobs=MaxJobs, max_churn=MaxChurn} = State,
@@ -764,24 +759,20 @@ stats_fold(#job{pid = undefined, history = [{added, T}]}, Acc) ->
     #stats_acc{now = Now, pending_t = SumT, pending_n = Cnt} = Acc,
     Dt = round(timer:now_diff(Now, T) / 1000000),
     Acc#stats_acc{pending_t = SumT + Dt, pending_n = Cnt + 1};
-
 stats_fold(#job{pid = undefined, history = [{stopped, T} | _]}, Acc) ->
     #stats_acc{now = Now, pending_t = SumT, pending_n = Cnt} = Acc,
     Dt = round(timer:now_diff(Now, T) / 1000000),
     Acc#stats_acc{pending_t = SumT + Dt, pending_n = Cnt + 1};
-
 stats_fold(#job{pid = undefined, history = [{{crashed, _}, T} | _]}, Acc) ->
     #stats_acc{now = Now, crashed_t = SumT, crashed_n = Cnt} = Acc,
     Dt = round(timer:now_diff(Now, T) / 1000000),
     Acc#stats_acc{crashed_t = SumT + Dt, crashed_n = Cnt + 1};
-
 stats_fold(#job{pid = P, history = [{started, T} | _]}, Acc) when is_pid(P) ->
     #stats_acc{now = Now, running_t = SumT, running_n = Cnt} = Acc,
     Dt = round(timer:now_diff(Now, T) / 1000000),
     Acc#stats_acc{running_t = SumT + Dt, running_n = Cnt + 1}.
 
 
-
 -spec avg(Sum :: non_neg_integer(), N :: non_neg_integer()) ->
     non_neg_integer().
 avg(_Sum, 0) ->
@@ -881,7 +872,6 @@ maybe_optimize_job_for_rate_limiting(Job = #job{history =
     ],
     Rep = lists:foldl(fun optimize_int_option/2, Job#job.rep, Opts),
     Job#job{rep = Rep};
-
 maybe_optimize_job_for_rate_limiting(Job) ->
     Job.
 
@@ -900,6 +890,7 @@ optimize_int_option({Key, Val}, #rep{options = Options} = Rep) ->
 
 
 -ifdef(TEST).
+
 -include_lib("eunit/include/eunit.hrl").
 
 
@@ -1414,8 +1405,8 @@ stopped() ->
 stopped(WhenSec) ->
     {stopped, {0, WhenSec, 0}}.
 
+
 added() ->
     {added, {0, 0, 0}}.
 
 -endif.
-
diff --git a/src/couch_replicator/src/couch_replicator_scheduler_job.erl b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
index 4d80af0..4534f24 100644
--- a/src/couch_replicator/src/couch_replicator_scheduler_job.erl
+++ b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
@@ -11,24 +11,28 @@
 % the License.
 
 -module(couch_replicator_scheduler_job).
+
 -behaviour(gen_server).
--vsn(1).
+
+-export([
+   start_link/1
+]).
+
+-export([
+   init/1,
+   terminate/2,
+   handle_call/3,
+   handle_info/2,
+   handle_cast/2,
+   code_change/3,
+   format_status/2
+]).
 
 -include_lib("couch/include/couch_db.hrl").
 -include("couch_replicator_api_wrap.hrl").
 -include("couch_replicator_scheduler.hrl").
 -include("couch_replicator.hrl").
 
-%% public api
--export([start_link/1]).
-
-%% gen_server callbacks
--export([init/1, terminate/2, code_change/3]).
--export([handle_call/3, handle_cast/2, handle_info/2]).
--export([format_status/2]).
-
-
-%% imports
 -import(couch_util, [
     get_value/2,
     get_value/3,
@@ -42,9 +46,10 @@
 ]).
 
 
-%% definitions
 -define(LOWEST_SEQ, 0).
 -define(DEFAULT_CHECKPOINT_INTERVAL, 30000).
+
+
 -record(rep_state, {
     rep_details,
     source_name,
@@ -81,6 +86,7 @@
     view = nil
 }).
 
+
 start_link(#rep{id = {BaseId, Ext}, source = Src, target = Tgt} = Rep) ->
     RepChildId = BaseId ++ Ext,
     Source = couch_replicator_api_wrap:db_uri(Src),
@@ -98,9 +104,11 @@ start_link(#rep{id = {BaseId, Ext}, source = Src, target = Tgt} = Rep)
->
             {error, Reason}
     end.
 
+
 init(InitArgs) ->
     {ok, InitArgs, 0}.
 
+
 do_init(#rep{options = Options, id = {BaseId, Ext}, user_ctx=UserCtx} = Rep) ->
     process_flag(trap_exit, true),
 
@@ -203,13 +211,75 @@ do_init(#rep{options = Options, id = {BaseId, Ext}, user_ctx=UserCtx}
= Rep) ->
         }
     }.
 
-adjust_maxconn(Src = #httpdb{http_connections = 1}, RepId) ->
-    Msg = "Adjusting minimum number of HTTP source connections to 2 for ~p",
-    couch_log:notice(Msg, [RepId]),
-    Src#httpdb{http_connections = 2};
 
-adjust_maxconn(Src, _RepId) ->
-    Src.
+handle_call(get_details, _From, #rep_state{rep_details = Rep} = State) ->
+    {reply, {ok, Rep}, State};
+
+handle_call({add_stats, Stats}, From, State) ->
+    gen_server:reply(From, ok),
+    NewStats = couch_replicator_utils:sum_stats(State#rep_state.stats, Stats),
+    {noreply, State#rep_state{stats = NewStats}};
+
+handle_call({report_seq_done, Seq, StatsInc}, From,
+    #rep_state{seqs_in_progress = SeqsInProgress, highest_seq_done = HighestDone,
+        current_through_seq = ThroughSeq, stats = Stats} = State) ->
+    gen_server:reply(From, ok),
+    {NewThroughSeq0, NewSeqsInProgress} = case SeqsInProgress of
+    [] ->
+        {Seq, []};
+    [Seq | Rest] ->
+        {Seq, Rest};
+    [_ | _] ->
+        {ThroughSeq, ordsets:del_element(Seq, SeqsInProgress)}
+    end,
+    NewHighestDone = lists:max([HighestDone, Seq]),
+    NewThroughSeq = case NewSeqsInProgress of
+    [] ->
+        lists:max([NewThroughSeq0, NewHighestDone]);
+    _ ->
+        NewThroughSeq0
+    end,
+    couch_log:debug("Worker reported seq ~p, through seq was ~p, "
+        "new through seq is ~p, highest seq done was ~p, "
+        "new highest seq done is ~p~n"
+        "Seqs in progress were: ~p~nSeqs in progress are now: ~p",
+        [Seq, ThroughSeq, NewThroughSeq, HighestDone,
+            NewHighestDone, SeqsInProgress, NewSeqsInProgress]),
+    NewState = State#rep_state{
+        stats = couch_replicator_utils:sum_stats(Stats, StatsInc),
+        current_through_seq = NewThroughSeq,
+        seqs_in_progress = NewSeqsInProgress,
+        highest_seq_done = NewHighestDone
+    },
+    update_task(NewState),
+    {noreply, NewState}.
+
+
+handle_cast({db_compacted, DbName},
+    #rep_state{source = #db{name = DbName} = Source} = State) ->
+    {ok, NewSource} = couch_db:reopen(Source),
+    {noreply, State#rep_state{source = NewSource}};
+
+handle_cast({db_compacted, DbName},
+    #rep_state{target = #db{name = DbName} = Target} = State) ->
+    {ok, NewTarget} = couch_db:reopen(Target),
+    {noreply, State#rep_state{target = NewTarget}};
+
+handle_cast(checkpoint, State) ->
+    case do_checkpoint(State) of
+    {ok, NewState} ->
+        couch_stats:increment_counter([couch_replicator, checkpoints, success]),
+        {noreply, NewState#rep_state{timer = start_timer(State)}};
+    Error ->
+        couch_stats:increment_counter([couch_replicator, checkpoints, failure]),
+        {stop, Error, State}
+    end;
+
+handle_cast({report_seq, Seq},
+    #rep_state{seqs_in_progress = SeqsInProgress} = State) ->
+    NewSeqsInProgress = ordsets:add_element(Seq, SeqsInProgress),
+    {noreply, State#rep_state{seqs_in_progress = NewSeqsInProgress}}.
+
 
 handle_info(shutdown, St) ->
     {stop, shutdown, St};
@@ -295,116 +365,6 @@ handle_info(timeout, InitArgs) ->
             {stop, {shutdown, ShutdownReason}, ShutdownState}
     end.
 
-handle_call(get_details, _From, #rep_state{rep_details = Rep} = State) ->
-    {reply, {ok, Rep}, State};
-
-handle_call({add_stats, Stats}, From, State) ->
-    gen_server:reply(From, ok),
-    NewStats = couch_replicator_utils:sum_stats(State#rep_state.stats, Stats),
-    {noreply, State#rep_state{stats = NewStats}};
-
-handle_call({report_seq_done, Seq, StatsInc}, From,
-    #rep_state{seqs_in_progress = SeqsInProgress, highest_seq_done = HighestDone,
-        current_through_seq = ThroughSeq, stats = Stats} = State) ->
-    gen_server:reply(From, ok),
-    {NewThroughSeq0, NewSeqsInProgress} = case SeqsInProgress of
-    [] ->
-        {Seq, []};
-    [Seq | Rest] ->
-        {Seq, Rest};
-    [_ | _] ->
-        {ThroughSeq, ordsets:del_element(Seq, SeqsInProgress)}
-    end,
-    NewHighestDone = lists:max([HighestDone, Seq]),
-    NewThroughSeq = case NewSeqsInProgress of
-    [] ->
-        lists:max([NewThroughSeq0, NewHighestDone]);
-    _ ->
-        NewThroughSeq0
-    end,
-    couch_log:debug("Worker reported seq ~p, through seq was ~p, "
-        "new through seq is ~p, highest seq done was ~p, "
-        "new highest seq done is ~p~n"
-        "Seqs in progress were: ~p~nSeqs in progress are now: ~p",
-        [Seq, ThroughSeq, NewThroughSeq, HighestDone,
-            NewHighestDone, SeqsInProgress, NewSeqsInProgress]),
-    NewState = State#rep_state{
-        stats = couch_replicator_utils:sum_stats(Stats, StatsInc),
-        current_through_seq = NewThroughSeq,
-        seqs_in_progress = NewSeqsInProgress,
-        highest_seq_done = NewHighestDone
-    },
-    update_task(NewState),
-    {noreply, NewState}.
-
-
-handle_cast({db_compacted, DbName},
-    #rep_state{source = #db{name = DbName} = Source} = State) ->
-    {ok, NewSource} = couch_db:reopen(Source),
-    {noreply, State#rep_state{source = NewSource}};
-
-handle_cast({db_compacted, DbName},
-    #rep_state{target = #db{name = DbName} = Target} = State) ->
-    {ok, NewTarget} = couch_db:reopen(Target),
-    {noreply, State#rep_state{target = NewTarget}};
-
-handle_cast(checkpoint, State) ->
-    case do_checkpoint(State) of
-    {ok, NewState} ->
-        couch_stats:increment_counter([couch_replicator, checkpoints, success]),
-        {noreply, NewState#rep_state{timer = start_timer(State)}};
-    Error ->
-        couch_stats:increment_counter([couch_replicator, checkpoints, failure]),
-        {stop, Error, State}
-    end;
-
-handle_cast({report_seq, Seq},
-    #rep_state{seqs_in_progress = SeqsInProgress} = State) ->
-    NewSeqsInProgress = ordsets:add_element(Seq, SeqsInProgress),
-    {noreply, State#rep_state{seqs_in_progress = NewSeqsInProgress}}.
-
-
-code_change(_OldVsn, #rep_state{}=State, _Extra) ->
-    {ok, State}.
-
-
-headers_strip_creds([], Acc) ->
-    lists:reverse(Acc);
-headers_strip_creds([{Key, Value0} | Rest], Acc) ->
-    Value = case string:to_lower(Key) of
-    "authorization" ->
-        "****";
-    _ ->
-        Value0
-    end,
-    headers_strip_creds(Rest, [{Key, Value} | Acc]).
-
-
-httpdb_strip_creds(#httpdb{url = Url, headers = Headers} = HttpDb) ->
-    HttpDb#httpdb{
-        url = couch_util:url_strip_password(Url),
-        headers = headers_strip_creds(Headers, [])
-    };
-httpdb_strip_creds(LocalDb) ->
-    LocalDb.
-
-
-rep_strip_creds(#rep{source = Source, target = Target} = Rep) ->
-    Rep#rep{
-        source = httpdb_strip_creds(Source),
-        target = httpdb_strip_creds(Target)
-    }.
-
-
-state_strip_creds(#rep_state{rep_details = Rep, source = Source, target = Target} = State)
->
-    % #rep_state contains the source and target at the top level and also
-    % in the nested #rep_details record
-    State#rep_state{
-        rep_details = rep_strip_creds(Rep),
-        source = httpdb_strip_creds(Source),
-        target = httpdb_strip_creds(Target)
-    }.
-
 
 terminate(normal, #rep_state{rep_details = #rep{id = RepId} = Rep,
     checkpoint_history = CheckpointHistory} = State) ->
@@ -471,10 +431,60 @@ terminate_cleanup(State) ->
     couch_replicator_api_wrap:db_close(State#rep_state.target).
 
 
+code_change(_OldVsn, #rep_state{}=State, _Extra) ->
+    {ok, State}.
+
+
 format_status(_Opt, [_PDict, State]) ->
     [{data, [{"State", state_strip_creds(State)}]}].
 
 
+headers_strip_creds([], Acc) ->
+    lists:reverse(Acc);
+headers_strip_creds([{Key, Value0} | Rest], Acc) ->
+    Value = case string:to_lower(Key) of
+    "authorization" ->
+        "****";
+    _ ->
+        Value0
+    end,
+    headers_strip_creds(Rest, [{Key, Value} | Acc]).
+
+
+httpdb_strip_creds(#httpdb{url = Url, headers = Headers} = HttpDb) ->
+    HttpDb#httpdb{
+        url = couch_util:url_strip_password(Url),
+        headers = headers_strip_creds(Headers, [])
+    };
+httpdb_strip_creds(LocalDb) ->
+    LocalDb.
+
+
+rep_strip_creds(#rep{source = Source, target = Target} = Rep) ->
+    Rep#rep{
+        source = httpdb_strip_creds(Source),
+        target = httpdb_strip_creds(Target)
+    }.
+
+
+state_strip_creds(#rep_state{rep_details = Rep, source = Source, target = Target} = State)
->
+    % #rep_state contains the source and target at the top level and also
+    % in the nested #rep_details record
+    State#rep_state{
+        rep_details = rep_strip_creds(Rep),
+        source = httpdb_strip_creds(Source),
+        target = httpdb_strip_creds(Target)
+    }.
+
+
+adjust_maxconn(Src = #httpdb{http_connections = 1}, RepId) ->
+    Msg = "Adjusting minimum number of HTTP source connections to 2 for ~p",
+    couch_log:notice(Msg, [RepId]),
+    Src#httpdb{http_connections = 2};
+adjust_maxconn(Src, _RepId) ->
+    Src.
+
+
 -spec doc_update_triggered(#rep{}) -> ok.
 doc_update_triggered(#rep{db_name = null}) ->
     ok;
@@ -489,6 +499,7 @@ doc_update_triggered(#rep{id = RepId, doc_id = DocId} = Rep) ->
         [DocId, pp_rep_id(RepId)]),
     ok.
 
+
 -spec doc_update_completed(#rep{}, list()) -> ok.
 doc_update_completed(#rep{db_name = null}, _Stats) ->
     ok;
@@ -500,6 +511,7 @@ doc_update_completed(#rep{id = RepId, doc_id = DocId, db_name = DbName,
         [pp_rep_id(RepId), DocId]),
     ok.
 
+
 do_last_checkpoint(#rep_state{seqs_in_progress = [],
     highest_seq_done = {_Ts, ?LOWEST_SEQ}} = State) ->
     {stop, normal, cancel_timer(State)};
@@ -624,6 +636,7 @@ spawn_changes_manager(Parent, ChangesQueue, BatchSize) ->
         changes_manager_loop_open(Parent, ChangesQueue, BatchSize, 1)
     end).
 
+
 changes_manager_loop_open(Parent, ChangesQueue, BatchSize, Ts) ->
     receive
     {get_changes, From} ->
@@ -752,6 +765,7 @@ update_checkpoint(Db, Doc, DbType) ->
                 " checkpoint document: ", (to_binary(Reason))/binary>>})
     end.
 
+
 update_checkpoint(Db, #doc{id = LogId, body = LogBody} = Doc) ->
     try
         case couch_replicator_api_wrap:update_doc(Db, Doc, [delay_commit]) of
@@ -831,6 +845,7 @@ compare_replication_logs(SrcDoc, TgtDoc) ->
         compare_rep_history(SourceHistory, TargetHistory)
     end.
 
+
 compare_rep_history(S, T) when S =:= [] orelse T =:= [] ->
     couch_log:notice("no common ancestry -- performing full replication", []),
     {?LOWEST_SEQ, []};
@@ -872,6 +887,7 @@ db_monitor(#db{} = Db) ->
 db_monitor(_HttpDb) ->
     nil.
 
+
 get_pending_count(St) ->
     Rep = St#rep_state.rep_details,
     Timeout = get_value(connection_timeout, Rep#rep.options),
@@ -938,9 +954,7 @@ rep_stats(State) ->
 
 replication_start_error({unauthorized, DbUri}) ->
     {unauthorized, <<"unauthorized to access or create database ", DbUri/binary>>};
-
 replication_start_error({db_not_found, DbUri}) ->
     {db_not_found, <<"could not open ", DbUri/binary>>};
-
 replication_start_error(Error) ->
     Error.
diff --git a/src/couch_replicator/src/couch_replicator_scheduler_sup.erl b/src/couch_replicator/src/couch_replicator_scheduler_sup.erl
index 17ae55c..8ab55f8 100644
--- a/src/couch_replicator/src/couch_replicator_scheduler_sup.erl
+++ b/src/couch_replicator/src/couch_replicator_scheduler_sup.erl
@@ -14,12 +14,6 @@
 
 -behaviour(supervisor).
 
--vsn(1).
-
-
-%% includes
--include("couch_replicator.hrl").
-
 %% public api
 -export([
     start_link/0,
@@ -33,6 +27,10 @@
 ]).
 
 
+%% includes
+-include("couch_replicator.hrl").
+
+
 %% public functions
 
 start_link() ->

-- 
To stop receiving notification emails like this one, please contact
['"commits@couchdb.apache.org" <commits@couchdb.apache.org>'].

Mime
View raw message