Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 36123200C3D for ; Tue, 14 Mar 2017 20:25:59 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 34D98160B98; Tue, 14 Mar 2017 19:25:59 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 05A03160B89 for ; Tue, 14 Mar 2017 20:25:57 +0100 (CET) Received: (qmail 48333 invoked by uid 500); 14 Mar 2017 19:25:57 -0000 Mailing-List: contact commits-help@couchdb.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@couchdb.apache.org Delivered-To: mailing list commits@couchdb.apache.org Received: (qmail 48081 invoked by uid 99); 14 Mar 2017 19:25:56 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 14 Mar 2017 19:25:56 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 16110DFE61; Tue, 14 Mar 2017 19:25:56 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vatamane@apache.org To: commits@couchdb.apache.org Date: Tue, 14 Mar 2017 19:25:59 -0000 Message-Id: <8d7a6c9f2b5742f5a325c05e0968949b@git.apache.org> In-Reply-To: <5a40ed67f6bf401bbfa1d0ee49ec37be@git.apache.org> References: <5a40ed67f6bf401bbfa1d0ee49ec37be@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [04/50] couch-replicator commit: updated refs/heads/63012-scheduler to 27a5eae archived-at: Tue, 14 Mar 2017 19:25:59 -0000 Add support for _scheduler/jobs/ and _scheduler/docs/ Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/f1140e94 Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/f1140e94 Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/f1140e94 Branch: refs/heads/63012-scheduler Commit: f1140e941d8e417f4f271f2b3c3f81d4f09a2e67 Parents: a1d7554 Author: Nick Vatamaniuc Authored: Fri Oct 21 15:44:53 2016 -0400 Committer: Nick Vatamaniuc Committed: Fri Oct 21 15:55:44 2016 -0400 ---------------------------------------------------------------------- src/couch_replicator.erl | 51 +++++++++++++++ src/couch_replicator_doc_processor.erl | 23 ++++++- src/couch_replicator_docs.erl | 3 +- src/couch_replicator_ids.erl | 13 +++- src/couch_replicator_scheduler.erl | 99 +++++++++++++++++------------ 5 files changed, 143 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/f1140e94/src/couch_replicator.erl ---------------------------------------------------------------------- diff --git a/src/couch_replicator.erl b/src/couch_replicator.erl index b888f82..0bfd9f6 100644 --- a/src/couch_replicator.erl +++ b/src/couch_replicator.erl @@ -15,6 +15,7 @@ -export([replicate/2, ensure_rep_db_exists/0]). -export([stream_active_docs_info/3, stream_terminal_docs_info/4]). -export([replication_states/0]). +-export([job/1, doc/3]). -include_lib("couch/include/couch_db.hrl"). -include("couch_replicator.hrl"). @@ -230,3 +231,53 @@ filter_replicator_doc_query(_DocState, []) -> true; filter_replicator_doc_query(State, States) when is_list(States) -> lists:member(State, States). + + +-spec job(binary()) -> {ok, {[_]}} | {error, not_found}. +job(JobId0) when is_binary(JobId0) -> + JobId = couch_replicator_ids:convert(JobId0), + {Res, _Bad} = rpc:multicall(couch_replicator_scheduler, job, [JobId]), + case [JobInfo || {ok, JobInfo} <- Res] of + [JobInfo| _] -> + {ok, JobInfo}; + [] -> + {error, not_found} + end. + + +-spec doc(binary(), binary(), [_]) -> {ok, {[_]}} | {error, not_found}. +doc(RepDb, DocId, UserCtx) -> + {Res, _Bad} = rpc:multicall(couch_replicator_doc_processor, doc, [RepDb, DocId]), + case [DocInfo || {ok, DocInfo} <- Res] of + [DocInfo| _] -> + {ok, DocInfo}; + [] -> + doc_from_db(RepDb, DocId, UserCtx) + end. + + +-spec doc_from_db(binary(), binary(), [_]) -> {ok, {[_]}} | {error, not_found}. +doc_from_db(RepDb, DocId, UserCtx) -> + case fabric:open_doc(RepDb, DocId, [UserCtx, ejson_body]) of + {ok, Doc} -> + {Props} = couch_doc:to_json_obj(Doc, []), + State = couch_util:get_value(<<"_replication_state">>, Props, null), + {StateInfo, ErrorCount} = case State of + <<"completed">> -> + {couch_util:get_value(<<"_replication_stats">>, Props, null), 0}; + <<"failed">> -> + {couch_util:get_value(<<"_replication_state_reason">>, Props, null), 1}; + _OtherState -> + {null, 0} + end, + {ok, {[ + {doc_id, DocId}, + {database, RepDb}, + {id, null}, + {state, State}, + {error_count, ErrorCount}, + {info, StateInfo} + ]}}; + {not_found, _Reason} -> + {error, not_found} + end. http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/f1140e94/src/couch_replicator_doc_processor.erl ---------------------------------------------------------------------- diff --git a/src/couch_replicator_doc_processor.erl b/src/couch_replicator_doc_processor.erl index 60a3ad1..0349fa6 100644 --- a/src/couch_replicator_doc_processor.erl +++ b/src/couch_replicator_doc_processor.erl @@ -14,7 +14,7 @@ -behaviour(couch_multidb_changes). -export([start_link/0]). --export([docs/1]). +-export([docs/1, doc/2]). % multidb changes callback -export([db_created/2, db_deleted/2, db_found/2, db_change/3]). @@ -375,6 +375,27 @@ docs(States) -> end, [], ?MODULE). +-spec doc(binary(), binary()) -> {ok, {[_]}} | {error, not_found}. +doc(Db, DocId) -> + HealthThreshold = couch_replicator_scheduler:health_threshold(), + Res = ets:foldl(fun(_RDoc, [_] = Acc) -> Acc; + (RDoc, []) -> + {Shard, RDocId} = RDoc#rdoc.id, + case {mem3:dbname(Shard), RDocId} of + {Db, DocId} -> + [ejson_doc(RDoc, HealthThreshold)]; + {_OtherDb, _OtherDocId} -> + [] + end + end, [], ?MODULE), + case Res of + [DocInfo] -> + {ok, DocInfo}; + [] -> + {error, not_found} + end. + + -spec ejson_state_info(binary() | nil) -> binary() | null. ejson_state_info(nil) -> null; http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/f1140e94/src/couch_replicator_docs.erl ---------------------------------------------------------------------- diff --git a/src/couch_replicator_docs.erl b/src/couch_replicator_docs.erl index 777c8a4..aeb9219 100644 --- a/src/couch_replicator_docs.erl +++ b/src/couch_replicator_docs.erl @@ -438,8 +438,7 @@ convert_options([{<<"cancel">>, V} | R]) -> [{cancel, V} | convert_options(R)]; convert_options([{IdOpt, V} | R]) when IdOpt =:= <<"_local_id">>; IdOpt =:= <<"replication_id">>; IdOpt =:= <<"id">> -> - Id = lists:splitwith(fun(X) -> X =/= $+ end, ?b2l(V)), - [{id, Id} | convert_options(R)]; + [{id, couch_replicator_ids:convert(V)} | convert_options(R)]; convert_options([{<<"create_target">>, V} | _R]) when not is_boolean(V)-> throw({bad_request, <<"parameter `create_target` must be a boolean">>}); convert_options([{<<"create_target">>, V} | R]) -> http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/f1140e94/src/couch_replicator_ids.erl ---------------------------------------------------------------------- diff --git a/src/couch_replicator_ids.erl b/src/couch_replicator_ids.erl index 50760b0..565ed9d 100644 --- a/src/couch_replicator_ids.erl +++ b/src/couch_replicator_ids.erl @@ -12,7 +12,7 @@ -module(couch_replicator_ids). --export([replication_id/1, replication_id/2]). +-export([replication_id/1, replication_id/2, convert/1]). -include_lib("couch/include/couch_db.hrl"). -include("couch_replicator_api_wrap.hrl"). @@ -67,6 +67,17 @@ replication_id(#rep{user_ctx = UserCtx} = Rep, 1) -> maybe_append_filters([HostName, Src, Tgt], Rep). +-spec convert([_] | binary() | {string(), string()}) -> {string(), string()}. +convert(Id) when is_list(Id) -> + convert(?l2b(Id)); + +convert(Id) when is_binary(Id) -> + lists:splitwith(fun(Char) -> Char =/= $+ end, ?b2l(Id)); + +convert({BaseId, Ext} = Id) when is_list(BaseId), is_list(Ext) -> + Id. + + % Private functions maybe_append_filters(Base, http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/f1140e94/src/couch_replicator_scheduler.erl ---------------------------------------------------------------------- diff --git a/src/couch_replicator_scheduler.erl b/src/couch_replicator_scheduler.erl index 3ca417a..b43b36c 100644 --- a/src/couch_replicator_scheduler.erl +++ b/src/couch_replicator_scheduler.erl @@ -24,7 +24,7 @@ -export([start_link/0, add_job/1, remove_job/1, reschedule/0]). -export([rep_state/1, find_jobs_by_dbname/1, find_jobs_by_doc/2]). -export([job_summary/2, health_threshold/0]). --export([jobs/0]). +-export([jobs/0, job/1]). %% gen_server callbacks -export([init/1, terminate/2, code_change/3]). @@ -748,48 +748,63 @@ ejson_url(DbName) when is_binary(DbName) -> DbName. +-spec job_ejson(#job{}) -> {[_ | _]}. +job_ejson(Job) -> + Rep = Job#job.rep, + Source = ejson_url(Rep#rep.source), + Target = ejson_url(Rep#rep.target), + History = lists:map(fun(Event) -> + EventProps = case Event of + {{crashed, Reason}, _When} -> + [{type, crashed}, {reason, crash_reason_json(Reason)}]; + {Type, _When} -> + [{type, Type}] + end, + {_Type, {_Mega, _Sec, Micros}=When} = Event, + {{Y, Mon, D}, {H, Min, S}} = calendar:now_to_universal_time(When), + ISO8601 = iolist_to_binary(io_lib:format( + "~B-~2..0B-~2..0BT~2..0B-~2..0B-~2..0B.~BZ", + [Y,Mon,D,H,Min,S,Micros] + )), + {[{timestamp, ISO8601} | EventProps]} + end, Job#job.history), + {BaseID, Ext} = Job#job.id, + Pid = case Job#job.pid of + undefined -> + null; + P when is_pid(P) -> + ?l2b(pid_to_list(P)) + end, + {[ + {id, iolist_to_binary([BaseID, Ext])}, + {pid, Pid}, + {source, iolist_to_binary(Source)}, + {target, iolist_to_binary(Target)}, + {database, Rep#rep.db_name}, + {user, (Rep#rep.user_ctx)#user_ctx.name}, + {doc_id, Rep#rep.doc_id}, + {history, History}, + {node, node()} + ]}. + + -spec jobs() -> [[tuple()]]. jobs() -> ets:foldl(fun(Job, Acc) -> - Rep = Job#job.rep, - Source = ejson_url(Rep#rep.source), - Target = ejson_url(Rep#rep.target), - History = lists:map(fun(Event) -> - EventProps = case Event of - {{crashed, Reason}, _When} -> - [{type, crashed}, {reason, crash_reason_json(Reason)}]; - {Type, _When} -> - [{type, Type}] - end, - {_Type, {_Mega, _Sec, Micros}=When} = Event, - {{Y, Mon, D}, {H, Min, S}} = calendar:now_to_universal_time(When), - ISO8601 = iolist_to_binary(io_lib:format( - "~B-~2..0B-~2..0BT~2..0B-~2..0B-~2..0B.~BZ", - [Y,Mon,D,H,Min,S,Micros] - )), - {[{timestamp, ISO8601} | EventProps]} - end, Job#job.history), - {BaseID, Ext} = Job#job.id, - Pid = case Job#job.pid of - undefined -> - null; - P when is_pid(P) -> - ?l2b(pid_to_list(P)) - end, - [{[ - {id, iolist_to_binary([BaseID, Ext])}, - {pid, Pid}, - {source, iolist_to_binary(Source)}, - {target, iolist_to_binary(Target)}, - {database, Rep#rep.db_name}, - {user, (Rep#rep.user_ctx)#user_ctx.name}, - {doc_id, Rep#rep.doc_id}, - {history, History}, - {node, node()} - ]} | Acc] + [job_ejson(Job) | Acc] end, [], couch_replicator_scheduler). +-spec job(job_id()) -> {ok, {[_ | _]}} | {error, not_found}. +job(JobId) -> + case job_by_id(JobId) of + {ok, Job} -> + {ok, job_ejson(Job)}; + Error -> + Error + end. + + crash_reason_json({_CrashType, Info}) when is_binary(Info) -> Info; crash_reason_json(Reason) when is_binary(Reason) -> @@ -884,7 +899,7 @@ latest_crash_timestamp_test_() -> last_started_test_() -> - [?_assertEqual({0, R, 0}, last_started(job(H))) || {R, H} <- [ + [?_assertEqual({0, R, 0}, last_started(testjob(H))) || {R, H} <- [ {0, [added()]}, {0, [crashed(1)]}, {1, [started(1)]}, @@ -895,9 +910,9 @@ last_started_test_() -> oldest_job_first_test() -> - J0 = job([crashed()]), - J1 = job([started(1)]), - J2 = job([started(2)]), + J0 = testjob([crashed()]), + J1 = testjob([started(1)]), + J2 = testjob([started(2)]), Sort = fun(Jobs) -> lists:sort(fun oldest_job_first/2, Jobs) end, ?assertEqual([], Sort([])), ?assertEqual([J1], Sort([J1])), @@ -1312,7 +1327,7 @@ oneshot_running(Id) when is_integer(Id) -> }. -job(Hist) when is_list(Hist) -> +testjob(Hist) when is_list(Hist) -> #job{history = Hist}.