couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vatam...@apache.org
Subject [couchdb] branch 63012-scheduler updated: Implement fabric-based _scheduler/docs endpoint
Date Thu, 13 Apr 2017 18:28:57 GMT
This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch 63012-scheduler
in repository https://gitbox.apache.org/repos/asf/couchdb.git

The following commit(s) were added to refs/heads/63012-scheduler by this push:
       new  ec36f9f   Implement fabric-based _scheduler/docs endpoint
ec36f9f is described below

commit ec36f9f046f13f67f999c1497e75ddb367b45d41
Author: Nick Vatamaniuc <vatamane@apache.org>
AuthorDate: Thu Apr 13 14:12:37 2017 -0400

    Implement fabric-based _scheduler/docs endpoint
    
    Previously _scheduler/docs implementation was not optimal. All documents
    were fetched via rpc:multicall from each of the nodes.
    
    Switch implementation to mimic _all_docs behavior. The algorithm is roughly
    as follows:
    
     * chttpd endpoint:
       - parses query args like it does for any view query
       - parses states to filter by, states are kept in the `extra` query arg
    
     * Call is made to couch_replicator_fabric. This is equivalent to
       fabric:all_docs. Here the typical fabric / rexi setup is happening.
    
     * Fabric worker is in `couch_replicator_fabric_rpc:docs/3`. This worker is
       similar to fabric_rpc's all_docs handler. However it is a bit more intricate
       to handle both replication document in terminal state as well as those which
       are active.
    
       - Before emitting it queries the state of the document to see if it is in a
         terminal state. If it is, it filters it and decides if it should be
         emitted or not.
    
       - If the document state cannot be found from the document. It tries to
         fetch active state from local node's doc processor via key based lookup.
         If it finds, it can also filter it based on state and emit it or skip.
    
       - If the document cannot be found in the node's local doc processor ETS
         table, the row is emitted with a doc value of `undecided`. This will
         let the coordinator fetch the state by possibly querying other nodes's
         doc processors.
    
      * Coordinator then starts handling messages. This also mostly mimics all_docs.
        At this point the most interesting thing is handling `undecided` docs. If
        one is found, then `replicator:active_doc/2` is queried. There, all nodes
        where document shards live are queries. This is better than a previous
        implementation where all nodes were queries all the time.
    
      * The final work happens in `couch_replicator_httpd` where the emitting
        callback is. There we only the doc is emitted (not keys, rows, values).
        Another thing that happens is the `Total` value is decremented to
        account for the always-present _design  doc.
    
    Because of this a bunch of stuff was removed. Including an extra view which
    was build and managed by the previous implementation.
    
    As a bonus, other view-related parameters such as skip and limit seems to
    work out of the box and don't have to be implemented ad-hoc.
    
    Also, most importantly  many thanks to Paul Davis for suggesting this approach.
    
    Jira: COUCHDB-3324
---
 src/chttpd/src/chttpd_misc.erl                     |  52 ++---
 src/couch_replicator/src/couch_replicator.erl      | 225 +++++++--------------
 .../src/couch_replicator_doc_processor.erl         |  15 +-
 src/couch_replicator/src/couch_replicator_docs.erl |   9 +-
 .../src/couch_replicator_fabric.erl                | 154 ++++++++++++++
 .../src/couch_replicator_fabric_rpc.erl            |  97 +++++++++
 .../src/couch_replicator_httpd.erl                 | 112 +++++++++-
 .../src/couch_replicator_js_functions.hrl          |  30 ---
 .../src/couch_replicator_utils.erl                 |  20 +-
 9 files changed, 486 insertions(+), 228 deletions(-)

diff --git a/src/chttpd/src/chttpd_misc.erl b/src/chttpd/src/chttpd_misc.erl
index 4c6090c..cc51bee 100644
--- a/src/chttpd/src/chttpd_misc.erl
+++ b/src/chttpd/src/chttpd_misc.erl
@@ -41,8 +41,6 @@
     send_chunk/2,start_chunked_response/3]).
 
 
--record(rep_docs_acc, {prepend, resp, count, skip, limit}).
-
 -define(DEFAULT_TASK_LIMIT, 100).
 -define(DEFAULT_DOCS_LIMIT, 100).
 -define(REPDB, <<"_replicator">>).
@@ -179,27 +177,22 @@ handle_scheduler_req(#httpd{method='GET', path_parts=[_,<<"jobs">>,JobId]}=Req)
             throw(not_found)
     end;
 handle_scheduler_req(#httpd{method='GET', path_parts=[_,<<"docs">>]}=Req) ->
-    Limit = parse_int_param(Req, "limit", ?DEFAULT_DOCS_LIMIT, 0, infinity),
-    Skip = parse_int_param(Req, "skip", 0, 0, infinity),
+    VArgs0 = couch_mrview_http:parse_params(Req, undefined),
     States = parse_replication_state_filter(chttpd:qs_value(Req, "states")),
-    SkipStr = integer_to_list(Skip),
-    Preamble = ["{\r\n\"offset\": ", SkipStr, ",\r\n\"docs\": ["],
-    {ok, Resp} = chttpd:start_delayed_json_response(Req, 200, [], Preamble),
-    Fun = fun stream_doc_info_cb/2,
-    Acc = #rep_docs_acc{
-        prepend = "\r\n",
-        resp = Resp,
-        count = 0,
-        skip = Skip,
-        limit = Limit
+    VArgs1 = VArgs0#mrargs{
+        view_type = map,
+        include_docs = true,
+        reduce = false,
+        extra = [{filter_states, States}]
     },
-    Acc1 = couch_replicator:stream_active_docs_info(Fun, Acc, States),
-    Acc2 = couch_replicator:stream_terminal_docs_info(?REPDB, Fun, Acc1, States),
-    #rep_docs_acc{resp = Resp1, count = Total}  = Acc2,
-    TotalStr = integer_to_list(Total),
-    Postamble = ["\r\n],\r\n\"total\": ", TotalStr, "\r\n}\r\n"],
-    {ok, Resp2} = chttpd:send_delayed_chunk(Resp1, Postamble),
-    chttpd:end_delayed_json_response(Resp2);
+    VArgs2 = couch_mrview_util:validate_args(VArgs1),
+    Opts = [{user_ctx, Req#httpd.user_ctx}],
+    Max = chttpd:chunked_response_buffer_size(),
+    Db = ?REPDB,
+    Acc = #vacc{db=Db, req=Req, threshold=Max},
+    Cb = fun couch_replicator_httpd:docs_cb/2,
+    {ok, Res} = couch_replicator_fabric:docs(Db, Opts, VArgs2, Cb, Acc),
+    {ok, Res#vacc.resp};
 handle_scheduler_req(#httpd{method='GET', path_parts=[_,<<"docs">>,DocId]}=Req)
->
     UserCtx = Req#httpd.user_ctx,
     case couch_replicator:doc(?REPDB, DocId, UserCtx#user_ctx.roles) of
@@ -501,23 +494,6 @@ message_queues(Registered) ->
         {Name, Length}
     end, Registered).
 
-stream_doc_info_cb(Info, Acc) ->
-    #rep_docs_acc{
-        resp = Resp,
-        prepend = Pre,
-        count = Count,
-        skip = Skip,
-        limit = Limit
-    } = Acc,
-    case Count >= Skip andalso Count < (Skip + Limit) of
-    true ->
-        Chunk = [Pre, ?JSON_ENCODE(update_db_name(Info))],
-        {ok, Resp1} = chttpd:send_delayed_chunk(Resp, Chunk),
-        Acc#rep_docs_acc{resp = Resp1, prepend =  ",\r\n", count = Count + 1};
-    false ->
-        Acc#rep_docs_acc{count = Count + 1}
-    end.
-
 update_db_name({Props}) ->
     {value, {database, DbName}, Props1} = lists:keytake(database, 1, Props),
     {[{database, normalize_db_name(DbName)} | Props1]}.
diff --git a/src/couch_replicator/src/couch_replicator.erl b/src/couch_replicator/src/couch_replicator.erl
index e393f44..e99c9b9 100644
--- a/src/couch_replicator/src/couch_replicator.erl
+++ b/src/couch_replicator/src/couch_replicator.erl
@@ -15,17 +15,18 @@
 -export([
     replicate/2,
     ensure_rep_db_exists/0,
-    stream_active_docs_info/3,
-    stream_terminal_docs_info/4,
     replication_states/0,
     job/1,
-    doc/3
+    doc/3,
+    active_doc/2,
+    info_from_doc/2
 ]).
 
 -include_lib("couch/include/couch_db.hrl").
 -include("couch_replicator.hrl").
 -include("couch_replicator_api_wrap.hrl").
 -include_lib("couch_mrview/include/couch_mrview.hrl").
+-include_lib("mem3/include/mem3.hrl").
 
 -define(DESIGN_DOC_CREATION_DELAY_MSEC, 1000).
 -define(REPLICATION_STATES, [
@@ -44,10 +45,6 @@
 ]).
 
 
--type user_doc_cb() :: fun(({[_]}, any()) -> any()).
--type query_acc() :: {binary(), user_doc_cb(), any()}.
-
-
 -spec replicate({[_]}, #user_ctx{}) ->
     {ok, {continuous, binary()}} |
     {ok, {[_]}} |
@@ -150,98 +147,6 @@ replication_states() ->
     ?REPLICATION_STATES.
 
 
--spec stream_terminal_docs_info(binary(), user_doc_cb(), any(), [atom()]) ->
-    any().
-stream_terminal_docs_info(Db, Cb, UserAcc, States) ->
-    DDoc = <<"_replicator">>,
-    View = <<"terminal_states">>,
-    QueryCb = fun handle_replicator_doc_query/2,
-    Args = #mrargs{view_type = map, reduce = false},
-    Acc = {Db, Cb, UserAcc, States},
-    try fabric:query_view(Db, DDoc, View, QueryCb, Acc, Args) of
-    {ok, {Db, Cb, UserAcc1, States}} ->
-        UserAcc1
-    catch
-        error:database_does_not_exist ->
-            UserAcc;
-        error:{badmatch, {not_found, Reason}} ->
-            Msg = "Could not find _design/~s ~s view in replicator db ~s : ~p",
-            couch_log:error(Msg, [DDoc, View, Db, Reason]),
-            couch_replicator_docs:ensure_cluster_rep_ddoc_exists(Db),
-            timer:sleep(?DESIGN_DOC_CREATION_DELAY_MSEC),
-            stream_terminal_docs_info(Db, Cb, UserAcc, States)
-    end.
-
-
--spec stream_active_docs_info(user_doc_cb(), any(), [atom()]) -> any().
-stream_active_docs_info(Cb, UserAcc, States) ->
-    Nodes = lists:sort([node() | nodes()]),
-    stream_active_docs_info(Nodes, Cb, UserAcc, States).
-
-
--spec stream_active_docs_info([node()], user_doc_cb(), any(), [atom()]) ->
-    any().
-stream_active_docs_info([], _Cb, UserAcc, _States) ->
-    UserAcc;
-stream_active_docs_info([Node | Nodes], Cb, UserAcc, States) ->
-    case rpc:call(Node, couch_replicator_doc_processor, docs, [States]) of
-        {badrpc, Reason} ->
-            ErrMsg = "Could not get replicator docs from ~p. Error: ~p",
-            couch_log:error(ErrMsg, [Node, Reason]),
-            stream_active_docs_info(Nodes, Cb, UserAcc, States);
-        Results ->
-            UserAcc1 = lists:foldl(Cb, UserAcc, Results),
-            stream_active_docs_info(Nodes, Cb, UserAcc1, States)
-    end.
-
-
--spec handle_replicator_doc_query
-    ({row, [_]} , query_acc()) -> {ok, query_acc()};
-    ({error, any()}, query_acc()) -> {error, any()};
-    ({meta, any()}, query_acc()) -> {ok,  query_acc()};
-    (complete, query_acc()) -> {ok, query_acc()}.
-handle_replicator_doc_query({row, Props}, {Db, Cb, UserAcc, States}) ->
-    DocId = get_value(id, Props),
-    DocStateBin = get_value(key, Props),
-    DocState = erlang:binary_to_existing_atom(DocStateBin, utf8),
-    MapValue = get_value(value, Props),
-    {Source, Target, StartTime, StateTime, StateInfo} = case MapValue of
-        [Src, Tgt, StartT, StateT, Info] ->
-            {Src, Tgt, StartT, StateT, Info};
-        _Other ->
-            % Handle the case where the view code was upgraded but new view code
-            % wasn't updated yet (before a _scheduler/docs request was made)
-            {null, null, null, null, null}
-    end,
-    % Set the error_count to 1 if failed. This is mainly for consistency as
-    % jobs from doc_processor and scheduler will have that value set
-    ErrorCount = case DocState of failed -> 1; _ -> 0 end,
-    case filter_replicator_doc_query(DocState, States) of
-        true ->
-            EjsonInfo = {[
-                {doc_id, DocId},
-                {database, Db},
-                {id, null},
-                {source, strip_url_creds(Source)},
-                {target, strip_url_creds(Target)},
-                {state, DocState},
-                {error_count, ErrorCount},
-                {info, StateInfo},
-                {last_updated, StateTime},
-                {start_time, StartTime}
-            ]},
-            {ok, {Db, Cb, Cb(EjsonInfo, UserAcc), States}};
-        false ->
-            {ok, {Db, Cb, UserAcc, States}}
-    end;
-handle_replicator_doc_query({error, Reason}, _Acc) ->
-    {error, Reason};
-handle_replicator_doc_query({meta, _Meta}, Acc) ->
-    {ok, Acc};
-handle_replicator_doc_query(complete, Acc) ->
-    {stop, Acc}.
-
-
 -spec strip_url_creds(binary() | {[_]}) -> binary().
 strip_url_creds(Endpoint) ->
     case couch_replicator_docs:parse_rep_db(Endpoint, [], []) of
@@ -252,13 +157,6 @@ strip_url_creds(Endpoint) ->
     end.
 
 
--spec filter_replicator_doc_query(atom(), [atom()]) -> boolean().
-filter_replicator_doc_query(_DocState, []) ->
-    true;
-filter_replicator_doc_query(State, States) when is_list(States) ->
-    lists:member(State, States).
-
-
 -spec job(binary()) -> {ok, {[_]}} | {error, not_found}.
 job(JobId0) when is_binary(JobId0) ->
     JobId = couch_replicator_ids:convert(JobId0),
@@ -271,14 +169,31 @@ job(JobId0) when is_binary(JobId0) ->
     end.
 
 
--spec doc(binary(), binary(), [_]) -> {ok, {[_]}} | {error, not_found}.
-doc(RepDb, DocId, UserCtx) ->
-    {Res, _Bad} = rpc:multicall(couch_replicator_doc_processor, doc,
-        [RepDb, DocId]),
+-spec active_doc(binary(), binary()) -> {ok, {[_]}} | {error, not_found}.
+active_doc(DbName, DocId) ->
+    Nodes = try
+        lists:usort([Shard#shard.node || Shard <- mem3:shards(DbName)])
+    catch
+        % Might be a local database
+        error:database_does_not_exist ->
+            [node() | nodes()]
+    end,
+    {Res, _Bad} = rpc:multicall(Nodes, couch_replicator_doc_processor, doc,
+        [DbName, DocId]),
     case [DocInfo || {ok, DocInfo} <- Res] of
-        [DocInfo| _] ->
+        [DocInfo | _] ->
             {ok, DocInfo};
         [] ->
+            {error, not_found}
+    end.
+
+
+-spec doc(binary(), binary(), [_]) -> {ok, {[_]}} | {error, not_found}.
+doc(RepDb, DocId, UserCtx) ->
+    case active_doc(RepDb, DocId) of
+        {ok, DocInfo} ->
+            {ok, DocInfo};
+        {error, not_found} ->
             doc_from_db(RepDb, DocId, UserCtx)
     end.
 
@@ -287,49 +202,61 @@ doc(RepDb, DocId, UserCtx) ->
 doc_from_db(RepDb, DocId, UserCtx) ->
     case fabric:open_doc(RepDb, DocId, [UserCtx, ejson_body]) of
         {ok, Doc} ->
-            {Props} = couch_doc:to_json_obj(Doc, []),
-            Source = get_value(<<"source">>, Props),
-            Target = get_value(<<"target">>, Props),
-            State = get_value(<<"_replication_state">>, Props, null),
-            StateTime = get_value(<<"_replication_state_time">>, Props, null),
-            {StateInfo, ErrorCount, StartTime} = case State of
-                <<"completed">> ->
-                    {InfoP} = get_value(<<"_replication_stats">>, Props, {[]}),
-                    case lists:keytake(<<"start_time">>, 1, InfoP) of
-                        {value, {_, Time}, InfoP1} ->
-                            {{InfoP1}, 0, Time};
-                        false ->
-                            case lists:keytake(start_time, 1, InfoP) of
-                                {value, {_, Time}, InfoP1} ->
-                                    {{InfoP1}, 0, Time};
-                            false ->
-                                    {{InfoP}, 0, null}
-                            end
-                    end;
-                <<"failed">> ->
-                    Info = get_value(<<"_replication_state_reason">>, Props,
-                        null),
-                    {Info, 1, StateTime};
-                _OtherState ->
-                    {null, 0, null}
-            end,
-            {ok, {[
-                {doc_id, DocId},
-                {database, RepDb},
-                {id, null},
-                {source, strip_url_creds(Source)},
-                {target, strip_url_creds(Target)},
-                {state, State},
-                {error_count, ErrorCount},
-                {info, StateInfo},
-                {start_time, StartTime},
-                {last_updated, StateTime}
-            ]}};
+            {ok, info_from_doc(RepDb, couch_doc:to_json_obj(Doc, []))};
          {not_found, _Reason} ->
             {error, not_found}
     end.
 
 
+-spec info_from_doc(binary(), {[_]}) -> {[_]}.
+info_from_doc(RepDb, {Props}) ->
+    DocId = get_value(<<"_id">>, Props),
+    Source = get_value(<<"source">>, Props),
+    Target = get_value(<<"target">>, Props),
+    State = state_atom(get_value(<<"_replication_state">>, Props, null)),
+    StateTime = get_value(<<"_replication_state_time">>, Props, null),
+    {StateInfo, ErrorCount, StartTime} = case State of
+        completed ->
+            {InfoP} = get_value(<<"_replication_stats">>, Props, {[]}),
+            case lists:keytake(<<"start_time">>, 1, InfoP) of
+                {value, {_, Time}, InfoP1} ->
+                    {{InfoP1}, 0, Time};
+                false ->
+                    case lists:keytake(start_time, 1, InfoP) of
+                        {value, {_, Time}, InfoP1} ->
+                            {{InfoP1}, 0, Time};
+                        false ->
+                            {{InfoP}, 0, null}
+                        end
+            end;
+        failed ->
+            Info = get_value(<<"_replication_state_reason">>, Props, null),
+            {Info, 1, StateTime};
+        _OtherState ->
+            {null, 0, null}
+    end,
+    {[
+        {doc_id, DocId},
+        {database, RepDb},
+        {id, null},
+        {source, strip_url_creds(Source)},
+        {target, strip_url_creds(Target)},
+        {state, State},
+        {error_count, ErrorCount},
+        {info, StateInfo},
+        {start_time, StartTime},
+        {last_updated, StateTime}
+     ]}.
+
+
+state_atom(<<"triggered">>) ->
+    triggered;  % This handles a legacy case were document wasn't converted yet
+state_atom(State) when is_binary(State) ->
+    erlang:binary_to_existing_atom(State, utf8);
+state_atom(State) when is_atom(State) ->
+    State.
+
+
 -spec check_authorization(rep_id(), #user_ctx{}) -> ok | not_found.
 check_authorization(RepId, #user_ctx{name = Name} = Ctx) ->
     case couch_replicator_scheduler:rep_state(RepId) of
diff --git a/src/couch_replicator/src/couch_replicator_doc_processor.erl b/src/couch_replicator/src/couch_replicator_doc_processor.erl
index 40c6cfe..45d5b3b 100644
--- a/src/couch_replicator/src/couch_replicator_doc_processor.erl
+++ b/src/couch_replicator/src/couch_replicator_doc_processor.erl
@@ -38,6 +38,7 @@
 -export([
     docs/1,
     doc/2,
+    doc_lookup/3,
     update_docs/0,
     get_worker_ref/1,
     notify_cluster_event/2
@@ -45,6 +46,7 @@
 
 -include_lib("couch/include/couch_db.hrl").
 -include("couch_replicator.hrl").
+-include_lib("mem3/include/mem3.hrl").
 
 -import(couch_replicator_utils, [
     get_json_value/2,
@@ -197,7 +199,7 @@ start_link() ->
 
 
 init([]) ->
-    ?MODULE = ets:new(?MODULE, [ordered_set, named_table, {keypos, #rdoc.id}]),
+    ?MODULE = ets:new(?MODULE, [set, named_table, {keypos, #rdoc.id}]),
     couch_replicator_clustering:link_cluster_event_listener(?MODULE,
         notify_cluster_event, [self()]),
     {ok, nil}.
@@ -531,6 +533,17 @@ doc(Db, DocId) ->
     end.
 
 
+-spec doc_lookup(binary(), binary(), integer()) ->
+    {ok, {[_]}} | {error, not_found}.
+doc_lookup(Db, DocId, HealthThreshold) ->
+    case ets:lookup(?MODULE, {Db, DocId}) of
+        [#rdoc{} = RDoc] ->
+            {ok, ejson_doc(RDoc, HealthThreshold)};
+        [] ->
+            {error, not_found}
+    end.
+
+
 -spec ejson_state_info(binary() | nil) -> binary() | null.
 ejson_state_info(nil) ->
     null;
diff --git a/src/couch_replicator/src/couch_replicator_docs.erl b/src/couch_replicator/src/couch_replicator_docs.erl
index c8c677a..cce4ce2 100644
--- a/src/couch_replicator/src/couch_replicator_docs.erl
+++ b/src/couch_replicator/src/couch_replicator_docs.erl
@@ -195,17 +195,10 @@ compare_ejson(EJson1, EJson2) ->
 
 -spec replication_design_doc_props(binary()) -> [_].
 replication_design_doc_props(DDocId) ->
-    TerminalViewEJson = {[
-                {<<"map">>, ?REP_DB_TERMINAL_STATE_VIEW_MAP_FUN},
-                {<<"reduce">>, <<"_count">>}
-            ]},
     [
         {<<"_id">>, DDocId},
         {<<"language">>, <<"javascript">>},
-        {<<"validate_doc_update">>, ?REP_DB_DOC_VALIDATE_FUN},
-        {<<"views">>, {[
-            {<<"terminal_states">>, TerminalViewEJson}
-        ]}}
+        {<<"validate_doc_update">>, ?REP_DB_DOC_VALIDATE_FUN}
     ].
 
 
diff --git a/src/couch_replicator/src/couch_replicator_fabric.erl b/src/couch_replicator/src/couch_replicator_fabric.erl
new file mode 100644
index 0000000..8634e76
--- /dev/null
+++ b/src/couch_replicator/src/couch_replicator_fabric.erl
@@ -0,0 +1,154 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_fabric).
+
+-export([
+   docs/5
+]).
+
+-include_lib("fabric/include/fabric.hrl").
+-include_lib("mem3/include/mem3.hrl").
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch_mrview/include/couch_mrview.hrl").
+
+docs(DbName, Options, QueryArgs, Callback, Acc) ->
+    Shards = mem3:shards(DbName),
+    Workers0 = fabric_util:submit_jobs(
+           Shards, couch_replicator_fabric_rpc, docs, [Options, QueryArgs]),
+    RexiMon = fabric_util:create_monitors(Workers0),
+    try
+        case fabric_util:stream_start(Workers0, #shard.ref) of
+            {ok, Workers} ->
+                try
+                    docs_int(DbName, Workers, QueryArgs, Callback, Acc)
+                after
+                    fabric_util:cleanup(Workers)
+                end;
+            {timeout, NewState} ->
+                DefunctWorkers = fabric_util:remove_done_workers(
+                    NewState#stream_acc.workers, waiting
+                ),
+                fabric_util:log_timeout(
+                    DefunctWorkers,
+                    "replicator docs"
+                ),
+                Callback({error, timeout}, Acc);
+            {error, Error} ->
+                Callback({error, Error}, Acc)
+        end
+    after
+        rexi_monitor:stop(RexiMon)
+    end.
+
+
+docs_int(DbName, Workers, QueryArgs, Callback, Acc0) ->
+    #mrargs{limit = Limit, skip = Skip} = QueryArgs,
+    State = #collector{
+        db_name = DbName,
+        query_args = QueryArgs,
+        callback = Callback,
+        counters = fabric_dict:init(Workers, 0),
+        skip = Skip,
+        limit = Limit,
+        user_acc = Acc0,
+        update_seq = nil
+    },
+    case rexi_utils:recv(Workers, #shard.ref, fun handle_message/3,
+        State, infinity, 5000) of
+    {ok, NewState} ->
+        {ok, NewState#collector.user_acc};
+    {timeout, NewState} ->
+        Callback({error, timeout}, NewState#collector.user_acc);
+    {error, Resp} ->
+        {ok, Resp}
+    end.
+
+handle_message({rexi_DOWN, _, {_, NodeRef}, _}, _, State) ->
+    fabric_view:check_down_shards(State, NodeRef);
+
+handle_message({rexi_EXIT, Reason}, Worker, State) ->
+    fabric_view:handle_worker_exit(State, Worker, Reason);
+
+handle_message({meta, Meta0}, {Worker, From}, State) ->
+    Tot = couch_util:get_value(total, Meta0, 0),
+    Off = couch_util:get_value(offset, Meta0, 0),
+    #collector{
+        callback = Callback,
+        counters = Counters0,
+        total_rows = Total0,
+        offset = Offset0,
+        user_acc = AccIn
+    } = State,
+    % Assert that we don't have other messages from this
+    % worker when the total_and_offset message arrives.
+    0 = fabric_dict:lookup_element(Worker, Counters0),
+    rexi:stream_ack(From),
+    Counters1 = fabric_dict:update_counter(Worker, 1, Counters0),
+    Total = Total0 + Tot,
+    Offset = Offset0 + Off,
+    case fabric_dict:any(0, Counters1) of
+    true ->
+        {ok, State#collector{
+            counters = Counters1,
+            total_rows = Total,
+            offset = Offset
+        }};
+    false ->
+        FinalOffset = erlang:min(Total, Offset+State#collector.skip),
+        Meta = [{total, Total}, {offset, FinalOffset}],
+        {Go, Acc} = Callback({meta, Meta}, AccIn),
+        {Go, State#collector{
+            counters = fabric_dict:decrement_all(Counters1),
+            total_rows = Total,
+            offset = FinalOffset,
+            user_acc = Acc
+        }}
+    end;
+
+handle_message(#view_row{id = Id, doc = Doc} = Row0, {Worker, From}, State) ->
+    #collector{query_args = Args, counters = Counters0, rows = Rows0} = State,
+    case maybe_fetch_and_filter_doc(Id, Doc, State) of
+        {[_ | _]} = NewDoc ->
+            Row = Row0#view_row{doc = NewDoc},
+            Dir = Args#mrargs.direction,
+            Rows = merge_row(Dir, Row#view_row{worker={Worker, From}}, Rows0),
+            Counters1 = fabric_dict:update_counter(Worker, 1, Counters0),
+            State1 = State#collector{rows=Rows, counters=Counters1},
+            fabric_view:maybe_send_row(State1);
+        skip ->
+            {ok, State}
+    end;
+
+handle_message(complete, Worker, State) ->
+    Counters = fabric_dict:update_counter(Worker, 1, State#collector.counters),
+    fabric_view:maybe_send_row(State#collector{counters = Counters}).
+
+
+merge_row(fwd, Row, Rows) ->
+    lists:keymerge(#view_row.id, [Row], Rows);
+merge_row(rev, Row, Rows) ->
+    lists:rkeymerge(#view_row.id, [Row], Rows).
+
+
+maybe_fetch_and_filter_doc(Id, undecided, State) ->
+    #collector{db_name = DbName, query_args = #mrargs{extra = Extra}} = State,
+    FilterStates = proplists:get_value(filter_states, Extra),
+    case couch_replicator:active_doc(DbName, Id) of
+        {ok, {Props} = DocInfo} ->
+            DocState = couch_util:get_value(state, Props),
+            couch_replicator_utils:filter_state(DocState, FilterStates, DocInfo);
+        {error, not_found} ->
+            skip  % could have been deleted
+    end;
+maybe_fetch_and_filter_doc(_Id, Doc, _State) ->
+    Doc.
diff --git a/src/couch_replicator/src/couch_replicator_fabric_rpc.erl b/src/couch_replicator/src/couch_replicator_fabric_rpc.erl
new file mode 100644
index 0000000..d67f875
--- /dev/null
+++ b/src/couch_replicator/src/couch_replicator_fabric_rpc.erl
@@ -0,0 +1,97 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_fabric_rpc).
+
+-export([
+   docs/3
+]).
+
+-include_lib("fabric/include/fabric.hrl").
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch_mrview/include/couch_mrview.hrl").
+
+
+docs(DbName, Options, Args0) ->
+    set_io_priority(DbName, Options),
+    #mrargs{skip = Skip, limit = Limit, extra = Extra} = Args0,
+    FilterStates = proplists:get_value(filter_states, Extra),
+    Args = Args0#mrargs{skip = 0, limit = Skip + Limit},
+    HealthThreshold = couch_replicator_scheduler:health_threshold(),
+    {ok, Db} = couch_db:open_int(DbName, Options),
+    Acc = {DbName, FilterStates, HealthThreshold},
+    couch_mrview:query_all_docs(Db, Args, fun docs_cb/2, Acc).
+
+
+docs_cb({meta, Meta}, Acc) ->
+    ok = rexi:stream2({meta, Meta}),
+    {ok, Acc};
+docs_cb({row, Row}, {DbName, States, HealthThreshold} = Acc) ->
+    Id = couch_util:get_value(id, Row),
+    Doc = couch_util:get_value(doc, Row),
+    ViewRow = #view_row{
+        id = Id,
+        key = couch_util:get_value(key, Row),
+        value = couch_util:get_value(value, Row)
+    },
+    case rep_doc_state(DbName, Id, Doc, States, HealthThreshold) of
+        skip ->
+            ok;
+        Other ->
+            ok = rexi:stream2(ViewRow#view_row{doc = Other})
+    end,
+    {ok, Acc};
+docs_cb(complete, Acc) ->
+    ok = rexi:stream_last(complete),
+    {ok, Acc}.
+
+
+set_io_priority(DbName, Options) ->
+    case lists:keyfind(io_priority, 1, Options) of
+    {io_priority, Pri} ->
+        erlang:put(io_priority, Pri);
+    false ->
+        erlang:put(io_priority, {interactive, DbName})
+    end.
+
+
+%% Get the state of the replication document. If it is found and has a terminal
+%% state then it can be filtered and either included in the results or skipped.
+%% If it is not in a terminal state, look it up in the local doc processor ETS
+%% table. If it is there then filter by state. If it is not found there either
+%% then mark it as `undecided` and let the coordinator try to fetch it. The
+%% The idea is to do as much work as possible locally and leave the minimum
+%% amount of work for the coordinator.
+rep_doc_state(_Shard, <<"_design/", _/binary>>, _, _, _) ->
+    skip;
+rep_doc_state(Shard, Id, {[_ | _]} = Doc, States, HealthThreshold) ->
+    DbName = mem3:dbname(Shard),
+    DocInfo = couch_replicator:info_from_doc(DbName, Doc),
+    case get_doc_state(DocInfo) of
+        null ->
+            % Fetch from local doc processor. If there, filter by state.
+            % If not there, mark as undecided. Let coordinator figure it out.
+            case couch_replicator_doc_processor:doc_lookup(Shard, Id,
+                    HealthThreshold) of
+                {ok, EtsInfo} ->
+                    State = get_doc_state(EtsInfo),
+                    couch_replicator_utils:filter_state(State, States, EtsInfo);
+                {error, not_found} ->
+                    undecided
+            end;
+        OtherState when is_atom(OtherState) ->
+            couch_replicator_utils:filter_state(OtherState, States, DocInfo)
+    end.
+
+
+get_doc_state({Props})->
+    couch_util:get_value(state, Props).
diff --git a/src/couch_replicator/src/couch_replicator_httpd.erl b/src/couch_replicator/src/couch_replicator_httpd.erl
index 6b43472..834524d 100644
--- a/src/couch_replicator/src/couch_replicator_httpd.erl
+++ b/src/couch_replicator/src/couch_replicator_httpd.erl
@@ -13,6 +13,12 @@
 -module(couch_replicator_httpd).
 
 -include_lib("couch/include/couch_db.hrl").
+-include_lib("couch_mrview/include/couch_mrview.hrl").
+
+-export([
+    handle_req/1,
+    docs_cb/2
+]).
 
 -import(couch_httpd, [
     send_json/2,
@@ -24,7 +30,75 @@
     to_binary/1
 ]).
 
--export([handle_req/1]).
+
+%% Handle replicator docs streaming
+
+docs_cb({error, Reason}, #vacc{resp=undefined}=Acc) ->
+    {ok, Resp} = chttpd:send_error(Acc#vacc.req, Reason),
+    {ok, Acc#vacc{resp=Resp}};
+
+
+docs_cb(complete, #vacc{resp=undefined}=Acc) ->
+    % Nothing in view
+    {ok, Resp} = chttpd:send_json(Acc#vacc.req, 200, {[{rows, []}]}),
+    {ok, Acc#vacc{resp=Resp}};
+
+
+docs_cb(Msg, #vacc{resp=undefined}=Acc) ->
+    %% Start response
+    Headers = [],
+    {ok, Resp} = chttpd:start_delayed_json_response(Acc#vacc.req, 200, Headers),
+    docs_cb(Msg, Acc#vacc{resp=Resp, should_close=true});
+
+
+docs_cb({error, Reason}, #vacc{resp=Resp}=Acc) ->
+    {ok, Resp1} = chttpd:send_delayed_error(Resp, Reason),
+    {ok, Acc#vacc{resp=Resp1}};
+
+
+docs_cb(complete, #vacc{resp=Resp, buffer=Buf, threshold=Max}=Acc) ->
+    % Finish view output and possibly end the response
+    {ok, Resp1} = chttpd:close_delayed_json_object(Resp, Buf, "\r\n]}", Max),
+    case Acc#vacc.should_close of
+        true ->
+            {ok, Resp2} = chttpd:end_delayed_json_response(Resp1),
+            {ok, Acc#vacc{resp=Resp2}};
+        _ ->
+            {ok, Acc#vacc{resp=Resp1, meta_sent=false, row_sent=false,
+                prepend=",\r\n", buffer=[], bufsize=0}}
+    end;
+
+
+docs_cb({meta, Meta}, #vacc{meta_sent=false, row_sent=false}=Acc) ->
+    % Sending metadata as we've not sent it or any row yet
+    Parts = case couch_util:get_value(total, Meta) of
+        undefined -> [];
+        Total -> [io_lib:format("\"total\":~p", [adjust_total(Total)])]
+    end ++ case couch_util:get_value(offset, Meta) of
+        undefined -> [];
+        Offset -> [io_lib:format("\"offset\":~p", [Offset])]
+    end ++ ["\"docs\":["],
+    Chunk = [prepend_val(Acc), "{", string:join(Parts, ","), "\r\n"],
+    {ok, AccOut} = maybe_flush_response(Acc, Chunk, iolist_size(Chunk)),
+    {ok, AccOut#vacc{prepend="", meta_sent=true}};
+
+
+docs_cb({meta, _Meta}, #vacc{}=Acc) ->
+    %% ignore metadata
+    {ok, Acc};
+
+
+docs_cb({row, Row}, #vacc{meta_sent=false}=Acc) ->
+    %% sorted=false and row arrived before meta
+    % Adding another row
+    Chunk = [prepend_val(Acc), "{\"docs\":[\r\n", row_to_json(Row)],
+    maybe_flush_response(Acc#vacc{meta_sent=true, row_sent=true}, Chunk, iolist_size(Chunk));
+
+
+docs_cb({row, Row}, #vacc{meta_sent=true}=Acc) ->
+    % Adding another row
+    Chunk = [prepend_val(Acc), row_to_json(Row)],
+    maybe_flush_response(Acc#vacc{row_sent=true}, Chunk, iolist_size(Chunk)).
 
 
 handle_req(#httpd{method = 'POST', user_ctx = UserCtx} = Req) ->
@@ -63,3 +137,39 @@ validate_rep_props([{<<"query_params">>, {Params}}|Rest])
->
     validate_rep_props(Rest);
 validate_rep_props([_|Rest]) ->
     validate_rep_props(Rest).
+
+
+prepend_val(#vacc{prepend=Prepend}) ->
+    case Prepend of
+        undefined ->
+            "";
+        _ ->
+            Prepend
+    end.
+
+
+maybe_flush_response(#vacc{bufsize=Size, threshold=Max} = Acc, Data, Len)
+        when Size > 0 andalso (Size + Len) > Max ->
+    #vacc{buffer = Buffer, resp = Resp} = Acc,
+    {ok, R1} = chttpd:send_delayed_chunk(Resp, Buffer),
+    {ok, Acc#vacc{prepend = ",\r\n", buffer = Data, bufsize = Len, resp = R1}};
+maybe_flush_response(Acc0, Data, Len) ->
+    #vacc{buffer = Buf, bufsize = Size} = Acc0,
+    Acc = Acc0#vacc{
+        prepend = ",\r\n",
+        buffer = [Buf | Data],
+        bufsize = Size + Len
+    },
+    {ok, Acc}.
+
+
+row_to_json(Row) ->
+    Doc = couch_util:get_value(doc, Row),
+    ?JSON_ENCODE(Doc).
+
+
+%% Adjust Total as there is an automatically created validation design doc
+adjust_total(Total) when is_integer(Total), Total > 0 ->
+    Total - 1;
+adjust_total(Total) when is_integer(Total) ->
+    0.
diff --git a/src/couch_replicator/src/couch_replicator_js_functions.hrl b/src/couch_replicator/src/couch_replicator_js_functions.hrl
index 0c78b90..9b11e8a 100644
--- a/src/couch_replicator/src/couch_replicator_js_functions.hrl
+++ b/src/couch_replicator/src/couch_replicator_js_functions.hrl
@@ -170,33 +170,3 @@
         }
     }
 ">>).
-
-
--define(REP_DB_TERMINAL_STATE_VIEW_MAP_FUN, <<"
-    function(doc) {
-        state = doc._replication_state;
-        if (state === 'failed') {
-            source = doc.source;
-            target = doc.target;
-            last_updated = doc._replication_state_time;
-            state_reason = doc._replication_state_reason;
-            emit('failed', [source, target, last_updated, last_updated, state_reason]);
-        } else if (state === 'completed') {
-            source = doc.source;
-            target = doc.target;
-            last_updated = doc._replication_state_time;
-            stats = doc._replication_stats;
-            start_time = stats.start_time;
-            info = {
-                'changes_pending': stats['changes_pending'],
-                'checkpointed_source_seq': stats['checkpointed_source_seq'],
-                'doc_write_failures': stats['doc_write_failures'],
-                'docs_read': stats['docs_read'],
-                'docs_written': stats['docs_written'],
-                'missing_revisions_found': stats['missing_revisions_found'],
-                'revisions_checked': stats['revisions_checked']
-            }
-            emit('completed', [source, target, start_time, last_updated, info]);
-        }
-    }
-">>).
diff --git a/src/couch_replicator/src/couch_replicator_utils.erl b/src/couch_replicator/src/couch_replicator_utils.erl
index e2ae4fb..05836d4 100644
--- a/src/couch_replicator/src/couch_replicator_utils.erl
+++ b/src/couch_replicator/src/couch_replicator_utils.erl
@@ -25,7 +25,8 @@
    get_json_value/2,
    get_json_value/3,
    pp_rep_id/1,
-   iso8601/1
+   iso8601/1,
+   filter_state/3
 ]).
 
 -export([
@@ -145,3 +146,20 @@ iso8601({_Mega, _Sec, _Micro} = Timestamp) ->
     {{Y, Mon, D}, {H, Min, S}} = calendar:now_to_universal_time(Timestamp),
     Format = "~B-~2..0B-~2..0BT~2..0B:~2..0B:~2..0BZ",
     iolist_to_binary(io_lib:format(Format, [Y, Mon, D, H, Min, S])).
+
+
+%% Filter replication info ejson by state provided. If it matches return
+%% the input value, if it doesn't return 'skip'. This is used from replicator
+%% fabric coordinator and worker.
+-spec filter_state(atom(), [atom()], {[_ | _]}) -> {[_ | _]} | skip.
+filter_state(null = _State, _States, _Info) ->
+    skip;
+filter_state(_ = _State, [] = _States, Info) ->
+    Info;
+filter_state(State, States, Info) ->
+    case lists:member(State, States) of
+        true ->
+            Info;
+        false ->
+            skip
+    end.

-- 
To stop receiving notification emails like this one, please contact
['"commits@couchdb.apache.org" <commits@couchdb.apache.org>'].

Mime
View raw message