couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dav...@apache.org
Subject [couchdb] 15/25: Fix compiler errors
Date Tue, 23 Jul 2019 20:13:24 GMT
This is an automated email from the ASF dual-hosted git repository.

davisp pushed a commit to branch prototype/views
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 4dbf745b913b0879d8ca5c02cd9e72a2a0c08ca6
Author: Paul J. Davis <paul.joseph.davis@gmail.com>
AuthorDate: Thu Jul 18 15:05:00 2019 -0500

    Fix compiler errors
---
 src/couch_views/src/couch_views.erl          | 18 +++---
 src/couch_views/src/couch_views_encoding.erl | 10 +--
 src/couch_views/src/couch_views_fdb.erl      | 81 ++++++++++++-----------
 src/couch_views/src/couch_views_indexer.erl  | 97 ++++++++++++----------------
 src/couch_views/src/couch_views_jobs.erl     | 29 +++++----
 src/couch_views/src/couch_views_reader.erl   | 19 +++---
 src/couch_views/src/couch_views_server.erl   | 30 +--------
 src/couch_views/src/couch_views_util.erl     |  1 +
 8 files changed, 127 insertions(+), 158 deletions(-)

diff --git a/src/couch_views/src/couch_views.erl b/src/couch_views/src/couch_views.erl
index 7deb54d..e10675b 100644
--- a/src/couch_views/src/couch_views.erl
+++ b/src/couch_views/src/couch_views.erl
@@ -31,13 +31,12 @@ query(Db, DDoc, ViewName, Callback, Acc0, QueryArgs0) ->
     {ok, Mrst} = couch_views_util:ddoc_to_mrst(DbName, DDoc),
 
     #mrst{
-        views = Views,
-        language = Lang
+        views = Views
     } = Mrst,
 
+    View = get_view(ViewName, Views),
     QueryArgs1 = couch_mrview_util:set_view_type(QueryArgs0, View, Views),
     QueryArgs2 = couch_mrview_util:validate_args(QueryArgs1),
-    VInfo = couch_mrview_util:extract_view(Lang, QueryArgs2, View, Views),
     case is_reduce_view(QueryArgs2) of
         true -> throw({not_implemented});
         false -> ok
@@ -72,15 +71,18 @@ maybe_update_view(Db, Mrst, _Args) ->
     end),
 
     if WaitSeq == ready -> ok; true ->
-        couch_views_jobs:build_view(Db, Mrst, DbSeq)
+        couch_views_jobs:build_view(Db, Mrst, WaitSeq)
     end.
 
 
+get_view(ViewName, Views) ->
+    {value, View} = lists:search(fun(View) ->
+        lists:member(ViewName, View#mrview.map_names)
+    end, Views),
+    View.
+
+
 is_reduce_view(#mrargs{view_type = ViewType}) ->
     ViewType =:= red;
 is_reduce_view({Reduce, _, _}) ->
     Reduce =:= red.
-
-
-remove_ununsed_values(Args) ->
-    maps:filter(fun (_, V) -> V /= undefined end, Args).
diff --git a/src/couch_views/src/couch_views_encoding.erl b/src/couch_views/src/couch_views_encoding.erl
index 9d3e3fc..9f76ea6 100644
--- a/src/couch_views/src/couch_views_encoding.erl
+++ b/src/couch_views/src/couch_views_encoding.erl
@@ -34,7 +34,7 @@ encode(X) ->
 
 
 encode(X, Type) when Type == key; Type == value ->
-    erlfdb_tuple:pack(encode_int(X, value))
+    erlfdb_tuple:pack(encode_int(X, value)).
 
 
 decode(Encoded) ->
@@ -60,7 +60,7 @@ encode_int(Num, value) when is_number(Num) ->
 encode_int(Bin, key) when is_binary(Bin) ->
     {?STRING, couch_util:get_sort_key(Bin)};
 
-encode_int(Bin, value) when is_bianry(Bin) ->
+encode_int(Bin, value) when is_binary(Bin) ->
     {?STRING, Bin};
 
 encode_int(List, Type) when is_list(List) ->
@@ -75,7 +75,7 @@ encode_int({Props}, Type) when is_list(Props) ->
         EV = encode_int(V, Type),
         {EK, EV}
     end, Props),
-    {?OBJECT, list_to_tuple(EncodedProps)}.
+    {?OBJECT, list_to_tuple(Encoded)}.
 
 
 decode_int({?NULL}) ->
@@ -98,8 +98,8 @@ decode_int({?LIST, List}) ->
 
 decode_int({?OBJECT, Object}) ->
     Props = lists:map(fun({EK, EV}) ->
-        K = decode_int(EncodedK),
-        V = decode_int(EncodedV),
+        K = decode_int(EK),
+        V = decode_int(EV),
         {K, V}
     end, tuple_to_list(Object)),
     {Props}.
diff --git a/src/couch_views/src/couch_views_fdb.erl b/src/couch_views/src/couch_views_fdb.erl
index 57ed5f1..dc1840d 100644
--- a/src/couch_views/src/couch_views_fdb.erl
+++ b/src/couch_views/src/couch_views_fdb.erl
@@ -16,7 +16,7 @@
     get_update_seq/2,
     set_update_seq/3,
 
-    fold_map_idx/5,
+    fold_map_idx/6,
 
     write_doc/4
 ]).
@@ -27,6 +27,7 @@
 -define(VALUE, 2).
 
 
+-include_lib("couch_mrview/include/couch_mrview.hrl").
 -include_lib("fabric/src/fabric2.hrl").
 -include("couch_views.hrl").
 
@@ -40,25 +41,22 @@ get_update_seq(TxDb, #mrst{sig = Sig}) ->
         db_prefix := DbPrefix
     } = TxDb,
 
-    Key = get_seq_key(Sig, DbPrefix),
-    case erlfdb:wait(erlfdb:get(Tx, Key)) of
+    case erlfdb:wait(erlfdb:get(Tx, seq_key(DbPrefix, Sig))) of
         not_found -> <<>>;
         UpdateSeq -> UpdateSeq
     end.
 
 
-set_view_seq(TxDb, Sig, Seq) ->
+set_update_seq(TxDb, Sig, Seq) ->
     #{
-        tx := Tx
-        db_prefix := DbPrefix,
+        tx := Tx,
+        db_prefix := DbPrefix
     } = TxDb,
-    SeqKey = get_seq_key(Sig, DbPrefix),
-    ok = erlfdb:set(Tx, SeqKey, Seq).
+    ok = erlfdb:set(Tx, seq_key(DbPrefix, Sig), Seq).
 
 
 fold_map_idx(TxDb, Sig, ViewId, Options, Callback, Acc0) ->
     #{
-        tx := Tx,
         db_prefix := DbPrefix
     } = TxDb,
 
@@ -73,11 +71,11 @@ fold_map_idx(TxDb, Sig, ViewId, Options, Callback, Acc0) ->
                 docid => undefined,
                 dupe_id => undefined,
                 callback => Callback,
-                acc => Acc0,
+                acc => Acc0
             },
-            {fun fold_fwd/2, FwdAcc}
+            {fun fold_fwd/2, FwdAcc};
         rev ->
-            RevAcc #{
+            RevAcc = #{
                 prefix => MapIdxPrefix,
                 next => value,
                 value => undefined,
@@ -93,24 +91,20 @@ fold_map_idx(TxDb, Sig, ViewId, Options, Callback, Acc0) ->
     fabric2_db:fold_range(TxDb, MapIdxPrefix, Fun, Acc, Options).
 
 
-write_doc(TxDb, Sig, #{deleted := true} = Doc, ViewIds) ->
+write_doc(TxDb, Sig, #{deleted := true} = Doc, _ViewIds) ->
     #{
         id := DocId
     } = Doc,
 
     ExistingViewKeys = get_view_keys(TxDb, Sig, DocId),
 
-    sclear_id_idx(TxDb, Sig, DocId),
+    clear_id_idx(TxDb, Sig, DocId),
     lists:foreach(fun({ViewId, ViewKeys}) ->
-        clear_map_idx(TxDb, Sig, ViewId, ViewKeys)
+        clear_map_idx(TxDb, Sig, ViewId, DocId, ViewKeys)
     end, ExistingViewKeys);
 
 write_doc(TxDb, Sig, Doc, ViewIds) ->
     #{
-        db_prefix := DbPrefix
-    } = TxDb,
-
-    #{
         id := DocId,
         results := Results
     } = Doc,
@@ -122,7 +116,7 @@ write_doc(TxDb, Sig, Doc, ViewIds) ->
     lists:foreach(fun({ViewId, NewRows}) ->
         ExistingKeys = fabric2_util:get_value(ViewId, ExistingViewKeys, []),
         update_id_idx(TxDb, Sig, ViewId, DocId, NewRows),
-        update_map_idx(TxDb, Sig, ViewId, DocId, ExitingKeys, NewRows)
+        update_map_idx(TxDb, Sig, ViewId, DocId, ExistingKeys, NewRows)
     end, lists:zip(ViewIds, Results)).
 
 
@@ -131,7 +125,7 @@ fold_fwd({RowKey, EncodedOriginalKey}, #{next := key} = Acc) ->
         prefix := Prefix
     } = Acc,
 
-    {{SortKey, DocId}, _DupeId, ?VIEW_ROW_KEY} =
+    {{SortKey, DocId}, DupeId, ?VIEW_ROW_KEY} =
             erlfdb_tuple:unpack(RowKey, Prefix),
     Acc#{
         next := val,
@@ -147,21 +141,21 @@ fold_fwd({RowKey, EncodedValue}, #{next := val} = Acc) ->
         key := Key,
         sort_key := SortKey,
         docid := DocId,
-        dupe_id := DocId,
-        callback := UserCallback
+        dupe_id := DupeId,
+        callback := UserCallback,
         acc := UserAcc0
     } = Acc,
 
     % We're asserting there that this row is paired
     % correctly with the previous row by relying on
     % a badmatch if any of these values don't match.
-    {{SortKey, DocId}, DupeId, ?VIEW_ROW_VAL} =
+    {{SortKey, DocId}, DupeId, ?VIEW_ROW_VALUE} =
             erlfdb_tuple:unpack(RowKey, Prefix),
 
     Value = couch_views_encoding:decode(EncodedValue),
-    NewAcc = UserCallback(DocId, Key, Value, UserAcc0),
+    UserAcc1 = UserCallback(DocId, Key, Value, UserAcc0),
 
-    #{
+    Acc#{
         next := key,
         key := undefined,
         sort_key := undefined,
@@ -176,7 +170,7 @@ fold_rev({RowKey, EncodedValue}, #{next := value} = Acc) ->
         prefix := Prefix
     } = Acc,
 
-    {{SortKey, DocId}, _DupeId, ?VIEW_ROW_VAL} =
+    {{SortKey, DocId}, DupeId, ?VIEW_ROW_VALUE} =
             erlfdb_tuple:unpack(RowKey, Prefix),
     Acc#{
         next := key,
@@ -192,8 +186,8 @@ fold_rev({RowKey, EncodedOriginalKey}, #{next := key} = Acc) ->
         value := Value,
         sort_key := SortKey,
         docid := DocId,
-        dupe_id := DocId,
-        callback := UserCallback
+        dupe_id := DupeId,
+        callback := UserCallback,
         acc := UserAcc0
     } = Acc,
 
@@ -204,9 +198,9 @@ fold_rev({RowKey, EncodedOriginalKey}, #{next := key} = Acc) ->
             erlfdb_tuple:unpack(RowKey, Prefix),
 
     Key = couch_views_encoding:decode(EncodedOriginalKey),
-    NewAcc = UserCallback(DocId, Key, Value, UserAcc0),
+    UserAcc1 = UserCallback(DocId, Key, Value, UserAcc0),
 
-    #{
+    Acc#{
         next := val,
         value := undefined,
         sort_key := undefined,
@@ -222,10 +216,10 @@ clear_id_idx(TxDb, Sig, DocId) ->
     } = TxDb,
 
     {Start, End} = id_idx_range(DbPrefix, Sig, DocId),
-    ok = erlfdb:clear_range(Start, End).
+    ok = erlfdb:clear_range(Tx, Start, End).
 
 
-clear_map_idx(TxDb, Sig, ViewId, ViewKeys) ->
+clear_map_idx(TxDb, Sig, ViewId, DocId, ViewKeys) ->
     #{
         tx := Tx,
         db_prefix := DbPrefix
@@ -269,7 +263,7 @@ update_map_idx(TxDb, Sig, ViewId, DocId, ExistingKeys, NewRows) ->
 
     lists:foreach(fun({DupeId, Key1, Key2, Val}) ->
         KK = map_idx_key(MapIdxPrefix, {Key1, DocId}, DupeId, ?VIEW_ROW_KEY),
-        VK = map_idx_key(MapIdxPrefix, {Key1, DocId}, DupeId, ?VIEW_ROW_VAL),
+        VK = map_idx_key(MapIdxPrefix, {Key1, DocId}, DupeId, ?VIEW_ROW_VALUE),
         ok = erlfdn:store(Tx, KK, Key2),
         ok = erlfdb:store(Tx, VK, Val)
     end, KVsToAdd).
@@ -280,15 +274,20 @@ get_view_keys(TxDb, Sig, DocId) ->
         tx := Tx,
         db_prefix := DbPrefix
     } = TxDb,
-    {Start, End} = id_idx_range(DbPrefix, Sig, DocId)
+    {Start, End} = id_idx_range(DbPrefix, Sig, DocId),
     lists:map(fun({K, V}) ->
         {?DB_VIEWS, Sig, ?VIEW_ID_RANGE, DocId, ViewId} =
                 erlfdb_tuple:unpack(K, DbPrefix),
-        ViewKeys = couch_views_encoding:decode(V)
+        ViewKeys = couch_views_encoding:decode(V),
         {ViewId, ViewKeys}
     end, erlfdb:get_range(Tx, Start, End, [])).
 
 
+seq_key(DbPrefix, Sig) ->
+    Key = {?DB_VIEWS, Sig, ?VIEW_UPDATE_SEQ},
+    erlfdb_tuple:pack(Key, DbPrefix).
+
+
 id_idx_key(DbPrefix, Sig, DocId, ViewId) ->
     Key = {?DB_VIEWS, Sig, ?VIEW_ID_RANGE, DocId, ViewId},
     erlfdb_tuple:pack(Key, DbPrefix).
@@ -300,12 +299,12 @@ id_idx_range(DbPrefix, Sig, DocId) ->
 
 
 map_idx_prefix(DbPrefix, Sig, ViewId) ->
-    Key = {?DB_VIES, Sig, ?VIEW_MAP_RANGE, ViewId},
-    erlfdb_tuple:pack(Key).
+    Key = {?DB_VIEWS, Sig, ?VIEW_MAP_RANGE, ViewId},
+    erlfdb_tuple:pack(Key, DbPrefix).
 
 
-map_idx_key(MapIdxPrefix, MapKey, DocId, DupeId, Type)
-    Key = {MapKey, DocId, DupeId, Type},
+map_idx_key(MapIdxPrefix, MapKey, DupeId, Type) ->
+    Key = {MapKey, DupeId, Type},
     erldb_tuple:encode(Key, MapIdxPrefix).
 
 
@@ -320,7 +319,7 @@ process_rows(Rows) ->
         EK1 = couch_views_encoding:encode(K, key),
         EK2 = couch_views_encoding:encode(K, value),
         EV = couch_views_encoding:encode(V, value),
-        {EKK, EKV, EV}
+        {EK1, EK2, EV}
     end, Rows),
 
     Grouped = lists:foldl(fun({K1, K2, V}, Acc) ->
diff --git a/src/couch_views/src/couch_views_indexer.erl b/src/couch_views/src/couch_views_indexer.erl
index 1a84116..91072a1 100644
--- a/src/couch_views/src/couch_views_indexer.erl
+++ b/src/couch_views/src/couch_views_indexer.erl
@@ -21,10 +21,10 @@
     init/0
 ]).
 
--include_lib("couch_views/include/couch_views.hrl").
 -include_lib("couch/include/couch_db.hrl").
+-include_lib("couch_mrview/include/couch_mrview.hrl").
 -include_lib("fabric/src/fabric2.hrl").
--include_lib("eunit/include/eunit.hrl").
+-include("couch_views.hrl").
 
 % TODO:
 %  * Handle timeouts of transaction and other errors
@@ -57,6 +57,8 @@ init() ->
         db_seq => undefined,
         view_seq => undefined,
         last_seq => undefined,
+        job => Job,
+        job_data => Data,
         count => 0,
         limit => num_changes(),
         doc_acc => [],
@@ -67,7 +69,7 @@ init() ->
 
 
 update(#{} = Db, Mrst0, State0) ->
-    fabric2_fdb:transactional(Db, fun(TxDb) ->
+    {Mrst2, State3} = fabric2_fdb:transactional(Db, fun(TxDb) ->
         % In the first iteration of update we need
         % to populate our db and view sequences
         State1 = case State0 of
@@ -75,7 +77,7 @@ update(#{} = Db, Mrst0, State0) ->
                 State0#{
                     tx_db := TxDb,
                     db_seq := fabric2_db:get_update_seq(TxDb),
-                    view_seq := couch_views_fdb:get_update_seq(TxDb, Mrst)
+                    view_seq := couch_views_fdb:get_update_seq(TxDb, Mrst0)
                 };
             _ ->
                 State0#{
@@ -93,53 +95,29 @@ update(#{} = Db, Mrst0, State0) ->
         } = State2,
 
         {Mrst1, MappedDocs} = map_docs(Mrst0, DocAcc),
-        write_docs(Db, Mrst1, MappedResults, State2),
+        write_docs(Db, Mrst1, MappedDocs, State2),
 
         case Count < Limit of
             true ->
-                report_progress(State2, finished);
+                report_progress(State2, finished),
+                {Mrst1, finished};
             false ->
                 report_progress(State2, update),
-                State3 = maps:merge(FinalState, #{
-                    count => 0,
-                    doc_acc => [],
-                    db_seq => LastSeq,
-                    last_seq => 0,
-                    mrst => Mrst1
-                }),
-
-    end).
-
-
-update_int(#{} = Db, State) ->
-
-
-    #{
-        count := Count,
-        limit := Limit,
-        doc_acc := DocAcc,
-        last_seq := LastSeq,
-        callback := Cb,
-        callback_args := CallbackArgs,
-        mrst := Mrst
-    } = FinalState,
-
-    {MappedDocs, Mrst1} = map_docs(Mrst, DocAcc),
-    write_docs(Db, Mrst1, MappedDocs, FinalState),
-
-    case Count < Limit of
-        true ->
-            Cb(undefined, finished, CallbackArgs, Db, Mrst, LastSeq);
-        false ->
-            NextState = maps:merge(FinalState, #{
-                limit => Limit,
-                count => 0,
-                doc_acc => [],
-                since_seq => LastSeq,
-                last_seq => 0,
-                mrst => Mrst1
-            }),
-            update_int(Db, NextState)
+                {Mrst1, State2#{
+                    tx_db := undefined,
+                    count := 0,
+                    doc_acc := [],
+                    view_seq := LastSeq,
+                    last_seq := undefined
+                }}
+        end
+    end),
+
+    case State3 of
+        finished ->
+            couch_query_servers:stop_doc_map(Mrst2#mrst.qserver);
+        _ ->
+            update(Db, Mrst2, State3)
     end.
 
 
@@ -172,7 +150,6 @@ process_changes(Change, Acc) ->
 
     Acc1 = case {Id, IncludeDesign} of
         {<<"_design/", _/binary>>, false} ->
-            % {ok, Doc} = fabric2_db:open_doc(Db, Id),
             maps:merge(Acc, #{
                 count => Count + 1,
                 last_seq => LastSeq
@@ -186,11 +163,11 @@ process_changes(Change, Acc) ->
             end,
 
             Change1 = maps:put(doc, Doc, Change),
-            maps:merge(Acc, #{
-                doc_acc => DocAcc ++ [Change1],
-                count => Count + 1,
-                last_seq => LastSeq
-            })
+            Acc#{
+                doc_acc := DocAcc ++ [Change1],
+                count := Count + 1,
+                last_seq := LastSeq
+            }
     end,
     {ok, Acc1}.
 
@@ -247,10 +224,22 @@ start_query_server(#mrst{} = Mrst) ->
 
 
 report_progress(State, UpdateType) ->
+    #{
+        tx_db := TxDb,
+        job := Job,
+        job_data := JobData,
+        last_seq := LastSeq
+    } = State,
+
+    NewData = JobData#{view_seq => LastSeq},
+
     case UpdateType of
         update ->
-            couch_views_jobs:update(Tx, Job, Db, Mrst, LastSeq);
+            couch_jobs:update(TxDb, Job, NewData);
         finished ->
-            couch_views_jobs:finish(Tx, Job, Db, Mrst, LastSeq)
+            couch_jobs:finish(TxDb, Job, NewData)
     end.
 
+
+num_changes() ->
+    config:get_integer("couch_views", "change_limit", 100).
diff --git a/src/couch_views/src/couch_views_jobs.erl b/src/couch_views/src/couch_views_jobs.erl
index 9714b29..9e299af 100644
--- a/src/couch_views/src/couch_views_jobs.erl
+++ b/src/couch_views/src/couch_views_jobs.erl
@@ -19,6 +19,7 @@
 ]).
 
 
+-include_lib("couch_mrview/include/couch_mrview.hrl").
 -include("couch_views.hrl").
 
 
@@ -26,18 +27,18 @@ set_timeout() ->
     couch_jobs:set_type_timeout(?INDEX_JOB_TYPE, 6 * 1000).
 
 
-build_view(Db, Mrst, UpdateSeq) ->
-    {ok, JobId} = build_view_async(Db, Mrst),
+build_view(TxDb, Mrst, UpdateSeq) ->
+    {ok, JobId} = build_view_async(TxDb, Mrst),
     case wait_for_job(JobId, UpdateSeq) of
         ok -> ok;
-        retry -> build_view(Db, Mrst, UpdateSeq)
+        retry -> build_view(TxDb, Mrst, UpdateSeq)
     end.
 
 
-build_view_async(Db, Mrst) ->
-    JobId = create_job_id(TxDb, Mrst),
-    JobData = create_job_data(TxDb, Mrst),
-    ok = couch_jobs:add(undefined, ?INDEX_JOB_TYPE, JobId, JobData).
+build_view_async(TxDb, Mrst) ->
+    JobId = job_id(TxDb, Mrst),
+    JobData = job_data(TxDb, Mrst),
+    ok = couch_jobs:add(undefined, ?INDEX_JOB_TYPE, JobId, JobData),
     {ok, JobId}.
 
 
@@ -45,7 +46,7 @@ build_view_async(Db, Mrst) ->
 wait_for_job(JobId, UpdateSeq) ->
     case couch_jobs:subscribe(?INDEX_JOB_TYPE, JobId) of
         {ok, Subscription, _State, _Data} ->
-            wait_for_job(JobId, Subscription, UpdateSeq)
+            wait_for_job(JobId, Subscription, UpdateSeq);
         {ok, finished, Data} ->
             case Data of
                 #{view_seq := ViewSeq} when ViewSeq >= UpdateSeq ->
@@ -57,7 +58,7 @@ wait_for_job(JobId, UpdateSeq) ->
 
 
 wait_for_job(JobId, Subscription, UpdateSeq) ->
-    case wait(Subscription, infinity) of
+    case wait(Subscription) of
         {error, Error} ->
             erlang:error(Error);
         {finished, #{view_seq := ViewSeq}} when ViewSeq >= UpdateSeq ->
@@ -72,14 +73,14 @@ wait_for_job(JobId, Subscription, UpdateSeq) ->
     end.
 
 
-get_id(#{name := DbName}, #mrst{sig = Sig}) ->
-    create_job_id(DbName, Sig);
+job_id(#{name := DbName}, #mrst{sig = Sig}) ->
+    job_id(DbName, Sig);
 
-get_id(DbName, Sig) ->
+job_id(DbName, Sig) ->
     <<DbName/binary, Sig/binary>>.
 
 
-create_job_data(Db, Mrst) ->
+job_data(Db, Mrst) ->
     #mrst{
         idx_name = DDocId,
         sig = Sig
@@ -93,7 +94,7 @@ create_job_data(Db, Mrst) ->
 
 
 wait(Subscription) ->
-    case couch_jobs:wait(JobSubscription, infinity) of
+    case couch_jobs:wait(Subscription, infinity) of
         {?INDEX_JOB_TYPE, _JobId, JobState, JobData} -> {JobState, JobData};
         timeout -> {error, timeout}
     end.
diff --git a/src/couch_views/src/couch_views_reader.erl b/src/couch_views/src/couch_views_reader.erl
index 56b23f2..8d2bf5a 100644
--- a/src/couch_views/src/couch_views_reader.erl
+++ b/src/couch_views/src/couch_views_reader.erl
@@ -19,6 +19,7 @@
 
 -include("couch_views.hrl").
 -include_lib("couch/include/couch_db.hrl").
+-include_lib("couch_mrview/include/couch_mrview.hrl").
 -include_lib("fabric/src/fabric2.hrl").
 
 
@@ -39,7 +40,7 @@ read(Db, Mrst, ViewName, UserCallback, UserAcc0, Args) ->
 
         fabric2_fdb:transactional(Db, fun(TxDb) ->
             Acc0 = #{
-                db => TxDb
+                db => TxDb,
                 skip => Args#mrargs.skip,
                 mrargs => Args,
                 callback => UserCallback,
@@ -59,7 +60,7 @@ read(Db, Mrst, ViewName, UserCallback, UserAcc0, Args) ->
                 acc := UserAcc2
             } = Acc1,
 
-            maybe_stop(Callback(complete, UserAcc2)
+            maybe_stop(UserCallback(complete, UserAcc2))
         end)
     catch throw:{done, Out} ->
         {ok, Out}
@@ -71,10 +72,10 @@ handle_row(_DocId, _Key, _Value, #{skip := Skip} = Acc) when Skip >
0 ->
 
 handle_row(DocId, Key, Value, Acc) ->
     #{
-        db := TxDb
+        db := TxDb,
         mrargs := Args,
         callback := UserCallback,
-        acc := UserAcc
+        acc := UserAcc0
     } = Acc,
 
     BaseRow = [
@@ -83,21 +84,21 @@ handle_row(DocId, Key, Value, Acc) ->
         {value, Value}
     ],
 
-    Row = BaseRow ++ if not IncludeDocs -> []; true ->
+    Row = BaseRow ++ if not Args#mrargs.include_docs -> []; true ->
         DocOpts0 = Args#mrargs.doc_options,
-        DocOpts1 = OpenOpts0 ++ case Args#mrargs.conflicts of
+        DocOpts1 = DocOpts0 ++ case Args#mrargs.conflicts of
             true -> [conflicts];
             false -> []
         end,
-        DocObj = case fabric2_db:open_doc(Db, DocId, DocOpts1) of
+        DocObj = case fabric2_db:open_doc(TxDb, DocId, DocOpts1) of
             {ok, Doc} -> couch_doc:to_json_obj(Doc, DocOpts1);
             {not_found, _} -> null
         end,
         [{doc, DocObj}]
     end,
 
-    UserAcc1 = maybe_stop(Callback({row, Row1}, UserAcc0)),
-    Acc#{acc := UserAcc1}
+    UserAcc1 = maybe_stop(UserCallback({row, Row}, UserAcc0)),
+    Acc#{acc := UserAcc1}.
 
 
 get_view_id(ViewName, Views) ->
diff --git a/src/couch_views/src/couch_views_server.erl b/src/couch_views/src/couch_views_server.erl
index 8ec2425..0417a9b 100644
--- a/src/couch_views/src/couch_views_server.erl
+++ b/src/couch_views/src/couch_views_server.erl
@@ -41,7 +41,7 @@ start_link() ->
 init(_) ->
     process_flag(trap_exit, true),
     couch_views_jobs:set_timeout(),
-    State0 = #{
+    State = #{
         workers => [],
         num_workers => num_workers()
     },
@@ -70,7 +70,7 @@ handle_info({'EXIT', Pid, Reason}, State) ->
         NewWorkers ->
             if Reason == normal -> ok; true ->
                 LogMsg = "~p : indexer process ~p exited with ~p",
-                couch_log:error(LogMsg, [?MODULE, Pid, Reason]),
+                couch_log:error(LogMsg, [?MODULE, Pid, Reason])
             end,
             {noreply, spawn_workers(State#{workers := NewWorkers})}
     end;
@@ -90,36 +90,12 @@ spawn_workers(State) ->
     } = State,
     case length(Workers) < NumWorkers of
         true ->
-            Pid = spawn_worker(),
+            Pid = couch_views_indexer:spawn_link(),
             spawn_workers(State#{workers := [Pid | Workers]});
         false ->
             State
     end.
 
 
-spawn_worker() ->
-    couch_views_indexer:spawn_link().
-
-
-blocking_acceptor(Parent) ->
-    case couch_views_jobs:accept() of
-        not_found ->
-            blocking_acceptor(Parent);
-        {ok, Job, JobData} ->
-            gen_server:cast(Parent, {job, Job, JobData})
-    end.
-
-
-check_finished_process(#{acceptor_pid := Pid} = State, Pid) ->
-    State1 = State#{acceptor_pid := undefined},
-    spawn_acceptor(State1);
-
-check_finished_process(State, Pid) ->
-    #{workers := Workers} = State,
-    Workers1 = maps:remove(Pid, Workers),
-    State#{workers := Workers1}.
-
-
-
 num_workers() ->
     config:get_integer("couch_views", "max_workers", ?MAX_WORKERS).
diff --git a/src/couch_views/src/couch_views_util.erl b/src/couch_views/src/couch_views_util.erl
index d7ed29f..b88cfcd 100644
--- a/src/couch_views/src/couch_views_util.erl
+++ b/src/couch_views/src/couch_views_util.erl
@@ -19,6 +19,7 @@
 
 
 -include_lib("couch/include/couch_db.hrl").
+-include_lib("couch_mrview/include/couch_mrview.hrl").
 -include("couch_views.hrl").
 
 


Mime
View raw message