couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vatam...@apache.org
Subject [couchdb] 09/09: Add `_scheduler/{jobs,docs}` API endpoints
Date Wed, 05 Apr 2017 19:08:55 GMT
This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch 63012-scheduler
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit c2d381c48eb4f3cdf38012bcda78909b262d9c17
Author: Benjamin Bastian <benjamin.bastian@gmail.com>
AuthorDate: Fri Mar 10 13:06:04 2017 -0800

    Add `_scheduler/{jobs,docs}` API endpoints
    
    The `_scheduler/docs` endpoint provides a view of all
    replicator docs which have been seen by the scheduler. This endpoint
    includes useful information such as the state of the replication and the
    coordinator node.
    
    The `_scheduler/jobs` endpoint provides a view of all replications
    managed by the scheduler. This endpoint includes more information on the
    replication than the `_scheduler/docs` endpoint, including the history
    of state transitions of the replication.
    
    Jira: COUCHDB-3324
---
 src/chttpd/src/chttpd_httpd_handlers.erl |   1 +
 src/chttpd/src/chttpd_misc.erl           | 127 ++++++++++++++++++++++++++++++-
 2 files changed, 127 insertions(+), 1 deletion(-)

diff --git a/src/chttpd/src/chttpd_httpd_handlers.erl b/src/chttpd/src/chttpd_httpd_handlers.erl
index b91aae9..8d2c280 100644
--- a/src/chttpd/src/chttpd_httpd_handlers.erl
+++ b/src/chttpd/src/chttpd_httpd_handlers.erl
@@ -19,6 +19,7 @@ url_handler(<<"favicon.ico">>)     -> fun chttpd_misc:handle_favicon_req/1;
 url_handler(<<"_utils">>)          -> fun chttpd_misc:handle_utils_dir_req/1;
 url_handler(<<"_all_dbs">>)        -> fun chttpd_misc:handle_all_dbs_req/1;
 url_handler(<<"_active_tasks">>)   -> fun chttpd_misc:handle_task_status_req/1;
+url_handler(<<"_scheduler">>)      -> fun chttpd_misc:handle_scheduler_req/1;
 url_handler(<<"_node">>)           -> fun chttpd_misc:handle_node_req/1;
 url_handler(<<"_reload_query_servers">>) -> fun chttpd_misc:handle_reload_query_servers_req/1;
 url_handler(<<"_replicate">>)      -> fun chttpd_misc:handle_replicate_req/1;
diff --git a/src/chttpd/src/chttpd_misc.erl b/src/chttpd/src/chttpd_misc.erl
index cfeeb3f..b39a85c 100644
--- a/src/chttpd/src/chttpd_misc.erl
+++ b/src/chttpd/src/chttpd_misc.erl
@@ -21,13 +21,16 @@
     handle_reload_query_servers_req/1,
     handle_system_req/1,
     handle_task_status_req/1,
+    handle_scheduler_req/1,
     handle_up_req/1,
     handle_utils_dir_req/1,
     handle_utils_dir_req/2,
     handle_uuids_req/1,
     handle_welcome_req/1,
     handle_welcome_req/2,
-    get_stats/0
+    get_stats/0,
+    parse_int_param/5,
+    parse_replication_state_filter/1
 ]).
 
 -include_lib("couch/include/couch_db.hrl").
@@ -37,6 +40,13 @@
     [send_json/2,send_json/3,send_method_not_allowed/2,
     send_chunk/2,start_chunked_response/3]).
 
+
+-record(rep_docs_acc, {prepend, resp, count, skip, limit}).
+
+-define(DEFAULT_TASK_LIMIT, 100).
+-define(DEFAULT_DOCS_LIMIT, 100).
+-define(REPDB, <<"_replicator">>).
+
 % httpd global handlers
 
 handle_welcome_req(Req) ->
@@ -150,6 +160,57 @@ handle_task_status_req(#httpd{method='GET'}=Req) ->
 handle_task_status_req(Req) ->
     send_method_not_allowed(Req, "GET,HEAD").
 
+handle_scheduler_req(#httpd{method='GET', path_parts=[_,<<"jobs">>]}=Req) ->
+    Limit = parse_int_param(Req, "limit", ?DEFAULT_TASK_LIMIT, 0, infinity),
+    Skip = parse_int_param(Req, "skip", 0, 0, infinity),
+    {Replies, _BadNodes} = rpc:multicall(couch_replicator_scheduler, jobs, []),
+    Flatlist = lists:concat(Replies),
+    Sorted = lists:sort(fun({[{id,A}|_]},{[{id,B}|_]}) -> A =< B end, Flatlist),
+    Total = length(Sorted),
+    Offset = min(Skip, Total),
+    Sublist = lists:sublist(Sorted, Offset+1, Limit),
+    Sublist1 = [update_db_name(Task) || Task <- Sublist],
+    send_json(Req, {[{total, Total}, {offset, Offset}, {jobs, Sublist1}]});
+handle_scheduler_req(#httpd{method='GET', path_parts=[_,<<"jobs">>,JobId]}=Req)
->
+    case couch_replicator:job(JobId) of
+        {ok, JobInfo} ->
+            send_json(Req, update_db_name(JobInfo));
+        {error, not_found} ->
+            throw(not_found)
+    end;
+handle_scheduler_req(#httpd{method='GET', path_parts=[_,<<"docs">>]}=Req) ->
+    Limit = parse_int_param(Req, "limit", ?DEFAULT_DOCS_LIMIT, 0, infinity),
+    Skip = parse_int_param(Req, "skip", 0, 0, infinity),
+    States = parse_replication_state_filter(chttpd:qs_value(Req, "states")),
+    SkipStr = integer_to_list(Skip),
+    Preamble = ["{\r\n\"offset\": ", SkipStr, ",\r\n\"docs\": ["],
+    {ok, Resp} = chttpd:start_delayed_json_response(Req, 200, [], Preamble),
+    Fun = fun stream_doc_info_cb/2,
+    Acc = #rep_docs_acc{
+        prepend = "\r\n",
+        resp = Resp,
+        count = 0,
+        skip = Skip,
+        limit = Limit
+    },
+    Acc1 = couch_replicator:stream_active_docs_info(Fun, Acc, States),
+    Acc2 = couch_replicator:stream_terminal_docs_info(?REPDB, Fun, Acc1, States),
+    #rep_docs_acc{resp = Resp1, count = Total}  = Acc2,
+    TotalStr = integer_to_list(Total),
+    Postamble = ["\r\n],\r\n\"total\": ", TotalStr, "\r\n}\r\n"],
+    {ok, Resp2} = chttpd:send_delayed_chunk(Resp1, Postamble),
+    chttpd:end_delayed_json_response(Resp2);
+handle_scheduler_req(#httpd{method='GET', path_parts=[_,<<"docs">>,DocId]}=Req)
->
+    UserCtx = Req#httpd.user_ctx,
+    case couch_replicator:doc(?REPDB, DocId, UserCtx#user_ctx.roles) of
+        {ok, DocInfo} ->
+            send_json(Req, DocInfo);
+        {error, not_found} ->
+            throw(not_found)
+    end;
+handle_scheduler_req(Req) ->
+    send_method_not_allowed(Req, "GET,HEAD").
+
 handle_replicate_req(#httpd{method='POST', user_ctx=Ctx} = Req) ->
     chttpd:validate_ctype(Req, "application/json"),
     %% see HACK in chttpd.erl about replication
@@ -439,3 +500,67 @@ message_queues(Registered) ->
         {Type, Length} = process_info(whereis(Name), Type),
         {Name, Length}
     end, Registered).
+
+stream_doc_info_cb(Info, Acc) ->
+    #rep_docs_acc{
+        resp = Resp,
+        prepend = Pre,
+        count = Count,
+        skip = Skip,
+        limit = Limit
+    } = Acc,
+    case Count >= Skip andalso Count < (Skip + Limit) of
+    true ->
+        Chunk = [Pre, ?JSON_ENCODE(update_db_name(Info))],
+        {ok, Resp1} = chttpd:send_delayed_chunk(Resp, Chunk),
+        Acc#rep_docs_acc{resp = Resp1, prepend =  ",\r\n", count = Count + 1};
+    false ->
+        Acc#rep_docs_acc{count = Count + 1}
+    end.
+
+update_db_name({Props}) ->
+    {value, {database, DbName}, Props1} = lists:keytake(database, 1, Props),
+    {[{database, normalize_db_name(DbName)} | Props1]}.
+
+normalize_db_name(<<"shards/", _/binary>> = DbName) ->
+    mem3:dbname(DbName);
+normalize_db_name(DbName) ->
+    DbName.
+
+parse_replication_state_filter(undefined) ->
+    [];  % This is the default (wildcard) filter
+parse_replication_state_filter(States) when is_list(States) ->
+    AllStates = couch_replicator:replication_states(),
+    StrStates = [string:to_lower(S) || S <- string:tokens(States, ",")],
+    AtomStates = try
+        [list_to_existing_atom(S) || S <- StrStates]
+    catch error:badarg ->
+        Msg1 = io_lib:format("States must be one or more of ~w", [AllStates]),
+        throw({query_parse_error, ?l2b(Msg1)})
+    end,
+    AllSet = sets:from_list(AllStates),
+    StatesSet = sets:from_list(AtomStates),
+    Diff = sets:to_list(sets:subtract(StatesSet, AllSet)),
+    case Diff of
+    [] ->
+        AtomStates;
+    _ ->
+        Args = [Diff, AllStates],
+        Msg2 = io_lib:format("Unknown states ~w. Choose from: ~w", Args),
+        throw({query_parse_error, ?l2b(Msg2)})
+    end.
+
+parse_int_param(Req, Param, Default, Min, Max) ->
+    IntVal = try
+        list_to_integer(chttpd:qs_value(Req, Param, integer_to_list(Default)))
+    catch error:badarg ->
+        Msg1 = io_lib:format("~s must be an integer", [Param]),
+        throw({query_parse_error, ?l2b(Msg1)})
+    end,
+    case IntVal >= Min andalso IntVal =< Max of
+    true ->
+        IntVal;
+    false ->
+        Msg2 = io_lib:format("~s not in range of [~w,~w]", [Param, Min, Max]),
+        throw({query_parse_error, ?l2b(Msg2)})
+    end.

-- 
To stop receiving notification emails like this one, please contact
"commits@couchdb.apache.org" <commits@couchdb.apache.org>.

Mime
View raw message