couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dav...@apache.org
Subject [couchdb] 10/25: Simplify worker vs. indexer distinction
Date Tue, 23 Jul 2019 20:13:19 GMT
This is an automated email from the ASF dual-hosted git repository.

davisp pushed a commit to branch prototype/views
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 14321badcfae21f428e63024b546f671f07b9355
Author: Paul J. Davis <paul.joseph.davis@gmail.com>
AuthorDate: Wed Jul 17 13:26:31 2019 -0500

    Simplify worker vs. indexer distinction
    
    This just turns an indexer into a job handler and removes the worker
    concept entirely. couch_views_server just starts `max_workers` indexer
    process that each wait for a job to process. Once processing is finished
    the worker exits and couch_views_server spawns a new indexer to replace
    it.
---
 src/couch_views/src/couch_views_indexer.erl | 270 ++++++++++++++--------------
 src/couch_views/src/couch_views_worker.erl  |  44 -----
 2 files changed, 132 insertions(+), 182 deletions(-)

diff --git a/src/couch_views/src/couch_views_indexer.erl b/src/couch_views/src/couch_views_indexer.erl
index e9f0b41..1a84116 100644
--- a/src/couch_views/src/couch_views_indexer.erl
+++ b/src/couch_views/src/couch_views_indexer.erl
@@ -13,55 +13,106 @@
 -module(couch_views_indexer).
 
 -export([
-    update/2,
-    update/4,
-
-    % For tests
-    map_docs/2,
-    write_doc/4
+    spawn_link/0
 ]).
 
 
--include("couch_views.hrl").
+-export([
+    init/0
+]).
+
+-include_lib("couch_views/include/couch_views.hrl").
 -include_lib("couch/include/couch_db.hrl").
 -include_lib("fabric/src/fabric2.hrl").
 -include_lib("eunit/include/eunit.hrl").
 
-% TODO: 
+% TODO:
 %  * Handle timeouts of transaction and other errors
 
-update(Db, Mrst) ->
-    Noop = fun (_) -> ok end,
-    update(Db, Mrst, Noop, []).
-
-
-update(#{} = Db, Mrst, ProgressCallback, ProgressArgs)
-        when is_function(ProgressCallback, 6) ->
-    try
-        Seq = couch_views_fdb:get_update_seq(Db, Mrst),
-        State = #{
-            since_seq => Seq,
-            count => 0,
-            limit => config:get_integer("couch_views", "change_limit", 100),
-            doc_acc => [],
-            last_seq => Seq,
-            callback => ProgressCallback,
-            callback_args => ProgressArgs,
-            mrst => Mrst
-        },
-        update_int(Db, State)
-    catch error:database_does_not_exist ->
-        #{db_prefix := DbPrefix} = Db,
-        couch_log:notice("couch_views_indexer stopped"
-        "- ~p database does not exist", [DbPrefix])
-    end.
+
+spawn_link() ->
+    proc_lib:spawn_link(?MODULE, init, []).
+
+
+init() ->
+    {ok, Job, Data} = couch_jobs:accept(?INDEX_JOB_TYPE, #{}),
+
+    #{
+        <<"db_name">> := DbName,
+        <<"ddoc_id">> := DDocId,
+        <<"sig">> := Sig
+    } = Data,
+
+    {ok, Db} = fabric2_db:open(DbName, []),
+    {ok, DDoc} = fabric2_db:open_doc(Db, DDocId),
+    {ok, Mrst} = couch_views_util:ddoc_to_mrst(DbName, DDoc),
+
+    if Mrst#mrst.sig == Sig -> ok; true ->
+        couch_jobs:finish(Job, Data#{error => mismatched_signature}),
+        exit(normal)
+    end,
+
+    State = #{
+        tx_db => undefined,
+        db_seq => undefined,
+        view_seq => undefined,
+        last_seq => undefined,
+        count => 0,
+        limit => num_changes(),
+        doc_acc => [],
+        design_opts => Mrst#mrst.design_opts
+    },
+
+    update(Db, Mrst, State).
+
+
+update(#{} = Db, Mrst0, State0) ->
+    fabric2_fdb:transactional(Db, fun(TxDb) ->
+        % In the first iteration of update we need
+        % to populate our db and view sequences
+        State1 = case State0 of
+            #{db_seq := undefined} ->
+                State0#{
+                    tx_db := TxDb,
+                    db_seq := fabric2_db:get_update_seq(TxDb),
+                    view_seq := couch_views_fdb:get_update_seq(TxDb, Mrst)
+                };
+            _ ->
+                State0#{
+                    tx_db := TxDb
+                }
+        end,
+
+        {ok, State2} = fold_changes(State1),
+
+        #{
+            count := Count,
+            limit := Limit,
+            doc_acc := DocAcc,
+            last_seq := LastSeq
+        } = State2,
+
+        {Mrst1, MappedDocs} = map_docs(Mrst0, DocAcc),
+        write_docs(Db, Mrst1, MappedResults, State2),
+
+        case Count < Limit of
+            true ->
+                report_progress(State2, finished);
+            false ->
+                report_progress(State2, update),
+                State3 = maps:merge(FinalState, #{
+                    count => 0,
+                    doc_acc => [],
+                    db_seq => LastSeq,
+                    last_seq => 0,
+                    mrst => Mrst1
+                }),
+
+    end).
 
 
 update_int(#{} = Db, State) ->
-    {ok, FinalState} = fabric2_fdb:transactional(Db, fun(TxDb) ->
-        State1 = maps:put(tx_db, TxDb, State),
-        fold_changes(State1)
-    end),
+
 
     #{
         count := Count,
@@ -73,8 +124,8 @@ update_int(#{} = Db, State) ->
         mrst := Mrst
     } = FinalState,
 
-    {MappedResults, Mrst1} = map_docs(Mrst, DocAcc),
-    write_docs(Db, Mrst1, MappedResults, FinalState),
+    {MappedDocs, Mrst1} = map_docs(Mrst, DocAcc),
+    write_docs(Db, Mrst1, MappedDocs, FinalState),
 
     case Count < Limit of
         true ->
@@ -94,13 +145,13 @@ update_int(#{} = Db, State) ->
 
 fold_changes(State) ->
     #{
-        since_seq := SinceSeq,
+        view_seq := SinceSeq,
         limit := Limit,
         tx_db := TxDb
     } = State,
 
-    fabric2_db:fold_changes(TxDb, SinceSeq,
-        fun process_changes/2, State, [{limit, Limit}]).
+    Fun = fun process_changes/2,
+    fabric2_db:fold_changes(TxDb, SinceSeq, Fun, State, [{limit, Limit}]).
 
 
 process_changes(Change, Acc) ->
@@ -108,7 +159,7 @@ process_changes(Change, Acc) ->
         doc_acc := DocAcc,
         count := Count,
         tx_db := TxDb,
-        mrst := Mrst
+        design_opts := DesignOpts
     } = Acc,
 
     #{
@@ -117,8 +168,7 @@ process_changes(Change, Acc) ->
         deleted := Deleted
     } = Change,
 
-    IncludeDesign = lists:keymember(<<"include_design">>, 1,
-        Mrst#mrst.design_opts),
+    IncludeDesign = lists:keymember(<<"include_design">>, 1, DesignOpts),
 
     Acc1 = case {Id, IncludeDesign} of
         {<<"_design/", _/binary>>, false} ->
@@ -126,16 +176,13 @@ process_changes(Change, Acc) ->
             maps:merge(Acc, #{
                 count => Count + 1,
                 last_seq => LastSeq
-                });
+            });
         _ ->
-
             % Making a note here that we should make fetching all the docs
             % a parallel fdb operation
-            Doc = if Deleted -> []; true ->
-                case fabric2_db:open_doc(TxDb, Id) of
-                    {ok, Doc0} -> Doc0;
-                    {not_found, _} -> []
-                end
+            {ok, Doc} = case Deleted of
+                true -> {ok, []};
+                false -> fabric2_db:open_doc(TxDb, Id)
             end,
 
             Change1 = maps:put(doc, Doc, Change),
@@ -150,113 +197,60 @@ process_changes(Change, Acc) ->
 
 map_docs(Mrst, Docs) ->
     % Run all the non deleted docs through the view engine and
-    Mrst1 = get_query_server(Mrst),
+    Mrst1 = start_query_server(Mrst),
     QServer = Mrst1#mrst.qserver,
-
     MapFun = fun
         (#{deleted := true} = Change) ->
             maps:put(results, [], Change);
-
         (Change) ->
             #{doc := Doc} = Change,
             couch_stats:increment_counter([couchdb, mrview, map_doc]),
             {ok, RawResults} = couch_query_servers:map_doc_raw(QServer, Doc),
             JsonResults = couch_query_servers:raw_to_ejson(RawResults),
-            ListResults = [[list_to_tuple(Res) || Res <- FunRs]
-                || FunRs <- JsonResults],
+            ListResults = lists:map(fun(ViewResults) ->
+                [list_to_tuple(Res) || Res <- ViewResults]
+            end, JsonResults),
             maps:put(results, ListResults, Change)
     end,
-    MappedResults = lists:map(MapFun, Docs),
-    {MappedResults, Mrst1}.
+    {Mrst1, lists:map(MapFun, Docs)}.
 
 
-start_query_server(#mrst{} = Mrst) ->
+write_docs(TxDb, Mrst, Docs, State) ->
     #mrst{
-        language=Language,
-        lib=Lib,
-        views=Views
+        views = Views,
+        sig = Sig
     } = Mrst,
-    Defs = [View#mrview.def || View <- Views],
-    {ok, QServer} = couch_query_servers:start_doc_map(Language, Defs, Lib),
-    Mrst#mrst{qserver=QServer}.
 
+    #{
+        last_seq := LastSeq
+    } = State,
 
-get_query_server(#mrst{} = Mrst) ->
-    case Mrst#mrst.qserver of
-        nil -> start_query_server(Mrst);
-        _ -> Mrst
-    end.
+    ViewIds = [View#mrview.id_num || View <- Views],
+
+    lists:foreach(fun(Doc) ->
+        couch_views_fdb:write_doc(TxDb, Sig, ViewIds, Doc)
+    end, Docs),
 
+    couch_views_fdb:update_view_seq(TxDb, Sig, LastSeq).
 
-write_docs(Db, Mrst, Docs, State) ->
+
+start_query_server(#mrst{} = Mrst) ->
     #mrst{
-        views = Views,
-        sig = Sig
+        language = Language,
+        lib = Lib,
+        views = Views
     } = Mrst,
+    Defs = [View#mrview.def || View <- Views],
+    {ok, QServer} = couch_query_servers:start_doc_map(Language, Defs, Lib),
+    Mrst#mrst{qserver = QServer}.
 
-    #{
-        callback := Cb,
-        callback_args := CallbackArgs
-    } = State,
 
-    IdxNames = lists:map(fun (View) ->
-        View#mrview.id_num
-    end, Views),
-
-    lists:foreach(fun (Doc) ->
-        #{sequence := Seq} = Doc,
-        fabric2_fdb:transactional(Db, fun(TxDb) ->
-            couch_views_fdb:update_view_seq(TxDb, Sig, Seq),
-            Cb(TxDb, update, CallbackArgs, Db, Mrst, Seq),
-            write_doc(TxDb, Sig, Doc, IdxNames)
-        end)
-    end, Docs).
-
-
-write_doc(TxDb, Sig, #{deleted := true} = Doc, ViewIds) ->
-    #{id := DocId} = Doc,
-    lists:foreach(fun (IdxName) ->
-        maybe_clear_id_and_map_index(TxDb, Sig, DocId, IdxName)
-    end, ViewIds);
-
-write_doc(TxDb, Sig, Doc, ViewIds) ->
-    #{id := DocId, results := Results} = Doc,
-    lists:foreach(fun
-        ({IdxName, []}) ->
-            maybe_clear_id_and_map_index(TxDb, Sig, DocId, IdxName);
-        ({IdxName, IdxResults}) ->
-            lists:foldl(fun (IdxResult, DocIdsCleared) ->
-                {IdxKey, _} = IdxResult,
-                OldIdxKey = couch_views_fdb:get_id_index(TxDb, Sig,
-                    DocId, IdxName),
-                IsAlreadyCleared = lists:member(DocId, DocIdsCleared),
-                case OldIdxKey == not_found orelse IsAlreadyCleared == true of
-                    true ->
-                        couch_views_fdb:set_id_index(TxDb, Sig, IdxName,
-                            DocId, IdxKey),
-                        couch_views_fdb:set_map_index_results(TxDb, Sig,
-                            IdxName, DocId, IdxResults);
-                    false ->
-                        couch_views_fdb:clear_id_index(TxDb, Sig,
-                            DocId, IdxName),
-                        couch_views_fdb:clear_map_index(TxDb, Sig, IdxName,
-                            DocId, OldIdxKey),
-                        couch_views_fdb:set_id_index(TxDb, Sig, DocId,
-                            IdxName, IdxKey),
-                        couch_views_fdb:set_map_index_results(TxDb, Sig,
-                            IdxName, DocId, IdxResults)
-                end,
-                [DocId | DocIdsCleared]
-            end, [], IdxResults)
-    end, lists:zip(ViewIds, Results)).
-
-
-maybe_clear_id_and_map_index(TxDb, Sig, DocId, IdxName) ->
-    OldIdxKey = couch_views_fdb:get_id_index(TxDb, Sig,
-        DocId, IdxName),
-    if OldIdxKey == not_found -> ok; true ->
-        couch_views_fdb:clear_id_index(TxDb, Sig,
-            DocId, IdxName),
-        couch_views_fdb:clear_map_index(TxDb, Sig, IdxName,
-            DocId, OldIdxKey)
+
+report_progress(State, UpdateType) ->
+    case UpdateType of
+        update ->
+            couch_views_jobs:update(Tx, Job, Db, Mrst, LastSeq);
+        finished ->
+            couch_views_jobs:finish(Tx, Job, Db, Mrst, LastSeq)
     end.
+
diff --git a/src/couch_views/src/couch_views_worker.erl b/src/couch_views/src/couch_views_worker.erl
deleted file mode 100644
index fa641d5..0000000
--- a/src/couch_views/src/couch_views_worker.erl
+++ /dev/null
@@ -1,44 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-%   http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(couch_views_worker).
-
--export([
-    start/2,
-    job_progress/6
-]).
-
-
-start(Job, JobData) ->
-    {ok, Db, Mrst} = get_indexing_info(JobData),
-    % maybe we should spawn here
-    couch_views_indexer:update(Db, Mrst, fun job_progress/6, Job).
-
-
-job_progress(Tx, Progress, Job, Db, Mrst, LastSeq) ->
-    case Progress of
-        update ->
-            couch_views_jobs:update(Tx, Job, Db, Mrst, LastSeq);
-        finished ->
-            couch_views_jobs:finish(Tx, Job, Db, Mrst, LastSeq)
-    end.
-
-
-get_indexing_info(JobData) ->
-    #{
-        <<"db_name">> := DbName,
-        <<"ddoc_id">> := DDocId
-    } = JobData,
-    {ok, Db} = fabric2_db:open(DbName, []),
-    {ok, DDoc} = fabric2_db:open_doc(Db, DDocId),
-    {ok, Mrst} = couch_views_util:ddoc_to_mrst(DbName, DDoc),
-    {ok, Db, Mrst}.


Mime
View raw message