couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dav...@apache.org
Subject [couchdb] 02/02: Implement _all_dbs/_all_docs API parameters
Date Thu, 11 Jul 2019 21:19:46 GMT
This is an automated email from the ASF dual-hosted git repository.

davisp pushed a commit to branch prototype/fdb-layer
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 563dd67dc2b9bb9640125d00126c6effbe9256a3
Author: Paul J. Davis <paul.joseph.davis@gmail.com>
AuthorDate: Wed Jun 19 11:58:47 2019 -0500

    Implement _all_dbs/_all_docs API parameters
    
    This adds the mapping of CouchDB start/end keys and so on to the similar
    yet slightly different concepts in FoundationDB. The handlers for
    `_all_dbs` and `_all_docs` have been udpated to use this new logic.
---
 src/chttpd/src/chttpd_changes.erl          |  10 +-
 src/chttpd/src/chttpd_db.erl               | 220 +++++++++++++++++----------
 src/chttpd/src/chttpd_misc.erl             |  67 ++++----
 src/fabric/src/fabric2_db.erl              | 143 +++++++++++++++--
 src/fabric/src/fabric2_fdb.erl             | 236 ++++++++++++++---------------
 src/fabric/test/fabric2_doc_fold_tests.erl |  84 +++++++++-
 test/elixir/test/all_docs_test.exs         |   3 +-
 7 files changed, 512 insertions(+), 251 deletions(-)

diff --git a/src/chttpd/src/chttpd_changes.erl b/src/chttpd/src/chttpd_changes.erl
index 0e03482..c9107d1 100644
--- a/src/chttpd/src/chttpd_changes.erl
+++ b/src/chttpd/src/chttpd_changes.erl
@@ -871,15 +871,19 @@ changes_row(Results, Change, Acc) ->
 maybe_get_changes_doc(Value, #changes_acc{include_docs=true}=Acc) ->
     #changes_acc{
         db = Db,
-        doc_options = DocOpts,
+        doc_options = DocOpts0,
         conflicts = Conflicts,
         filter = Filter
     } = Acc,
-    Opts = case Conflicts of
+    OpenOpts = case Conflicts of
         true -> [deleted, conflicts];
         false -> [deleted]
     end,
-    load_doc(Db, Value, Opts, DocOpts, Filter);
+    DocOpts1 = case Conflicts of
+        true -> [conflicts | DocOpts0];
+        false -> DocOpts0
+    end,
+    load_doc(Db, Value, OpenOpts, DocOpts1, Filter);
 
 maybe_get_changes_doc(_Value, _Acc) ->
     [].
diff --git a/src/chttpd/src/chttpd_db.erl b/src/chttpd/src/chttpd_db.erl
index c0ac1ca..90869c6 100644
--- a/src/chttpd/src/chttpd_db.erl
+++ b/src/chttpd/src/chttpd_db.erl
@@ -16,6 +16,7 @@
 
 -include_lib("couch/include/couch_db.hrl").
 -include_lib("couch_mrview/include/couch_mrview.hrl").
+-include_lib("fabric/include/fabric.hrl").
 -include_lib("mem3/include/mem3.hrl").
 
 -export([handle_request/1, handle_compact_req/2, handle_design_req/2,
@@ -825,21 +826,151 @@ multi_all_docs_view(Req, Db, OP, Queries) ->
     {ok, Resp1} = chttpd:send_delayed_chunk(VAcc2#vacc.resp, "\r\n]}"),
     chttpd:end_delayed_json_response(Resp1).
 
-all_docs_view(Req, Db, _Keys, _OP) ->
-    % Args0 = couch_mrview_http:parse_params(Req, Keys),
-    % Args1 = Args0#mrargs{view_type=map},
-    % Args2 = fabric_util:validate_all_docs_args(Db, Args1),
-    % Args3 = set_namespace(OP, Args2),
-    Options = [{user_ctx, Req#httpd.user_ctx}],
+all_docs_view(Req, Db, Keys, OP) ->
+    Args0 = couch_mrview_http:parse_params(Req, Keys),
+    Args1 = set_namespace(OP, Args0),
     Max = chttpd:chunked_response_buffer_size(),
-    VAcc = #vacc{db=Db, req=Req, threshold=Max},
-    {ok, Resp} = fabric2_db:fold_docs(Db, fun view_cb/2, VAcc, Options),
-    {ok, Resp#vacc.resp}.
+    VAcc0 = #vacc{
+        db = Db,
+        req = Req,
+        threshold = Max
+    },
+    case Args1#mrargs.keys of
+        undefined ->
+            Options = [
+                {user_ctx, Req#httpd.user_ctx},
+                {dir, Args1#mrargs.direction},
+                {start_key, Args1#mrargs.start_key},
+                {end_key, Args1#mrargs.end_key},
+                {limit, Args1#mrargs.limit},
+                {skip, Args1#mrargs.skip},
+                {update_seq, Args1#mrargs.update_seq}
+            ],
+            Acc = {iter, Db, Args1, VAcc0},
+            {ok, {iter, _, _, Resp}} =
+                    fabric2_db:fold_docs(Db, fun view_cb/2, Acc, Options),
+            {ok, Resp#vacc.resp};
+        Keys0 when is_list(Keys0) ->
+            Keys1 = apply_args_to_keylist(Args1, Keys0),
+            %% namespace can be _set_ to `undefined`, so we
+            %% want simulate enum here
+            NS = case couch_util:get_value(namespace, Args1#mrargs.extra) of
+                <<"_all_docs">> -> <<"_all_docs">>;
+                <<"_design">> -> <<"_design">>;
+                <<"_local">> -> <<"_local">>;
+                _ -> <<"_all_docs">>
+            end,
+            TotalRows = fabric2_db:get_doc_count(Db, NS),
+            Meta = case Args1#mrargs.update_seq of
+                true ->
+                    UpdateSeq = fabric2_db:get_update_seq(Db),
+                    [{update_seq, UpdateSeq}];
+                false ->
+                    []
+            end ++ [{total, TotalRows}, {offset, null}],
+            {ok, VAcc1} = view_cb({meta, Meta}, VAcc0),
+            DocOpts = case Args1#mrargs.conflicts of
+                true -> [conflicts | Args1#mrargs.doc_options];
+                _ -> Args1#mrargs.doc_options
+            end ++ [{user_ctx, Req#httpd.user_ctx}],
+            IncludeDocs = Args1#mrargs.include_docs,
+            VAcc2 = lists:foldl(fun(DocId, Acc) ->
+                OpenOpts = [deleted | DocOpts],
+                Row0 = case fabric2_db:open_doc(Db, DocId, OpenOpts) of
+                    {not_found, missing} ->
+                        #view_row{key = DocId};
+                    {ok, #doc{deleted = true, revs = Revs}} ->
+                        {RevPos, [RevId | _]} = Revs,
+                        Value = {[
+                            {rev, couch_doc:rev_to_str({RevPos, RevId})},
+                            {deleted, true}
+                        ]},
+                        DocValue = if not IncludeDocs -> undefined; true ->
+                            null
+                        end,
+                        #view_row{
+                            key = DocId,
+                            id = DocId,
+                            value = Value,
+                            doc = DocValue
+                        };
+                    {ok, #doc{revs = Revs} = Doc0} ->
+                        {RevPos, [RevId | _]} = Revs,
+                        Value = {[
+                            {rev, couch_doc:rev_to_str({RevPos, RevId})}
+                        ]},
+                        DocValue = if not IncludeDocs -> undefined; true ->
+                            couch_doc:to_json_obj(Doc0, DocOpts)
+                        end,
+                        #view_row{
+                            key = DocId,
+                            id = DocId,
+                            value = Value,
+                            doc = DocValue
+                        }
+                end,
+                Row1 = fabric_view:transform_row(Row0),
+                {ok, NewAcc} = view_cb(Row1, Acc),
+                NewAcc
+            end, VAcc1, Keys1),
+            {ok, VAcc3} = view_cb(complete, VAcc2),
+            {ok, VAcc3#vacc.resp}
+    end.
+
+
+apply_args_to_keylist(Args, Keys0) ->
+    Keys1 = case Args#mrargs.direction of
+        fwd -> Keys0;
+        _ -> lists:reverse(Keys0)
+    end,
+    Keys2 = case Args#mrargs.skip < length(Keys1) of
+        true -> lists:nthtail(Args#mrargs.skip, Keys1);
+        false -> []
+    end,
+    case Args#mrargs.limit < length(Keys2) of
+        true -> lists:sublist(Keys2, Args#mrargs.limit);
+        false -> Keys2
+    end.
+
+
+view_cb({row, Row}, {iter, Db, Args, VAcc}) ->
+    NewRow = case lists:keymember(doc, 1, Row) of
+        true ->
+            chttpd_stats:incr_reads();
+        false when Args#mrargs.include_docs ->
+            {id, DocId} = lists:keyfind(id, 1, Row),
+            chttpd_stats:incr_reads(),
+            DocOpts = case Args#mrargs.conflicts of
+                true -> [conflicts | Args#mrargs.doc_options];
+                _ -> Args#mrargs.doc_options
+            end ++ [{user_ctx, (VAcc#vacc.req)#httpd.user_ctx}],
+            OpenOpts = [deleted | DocOpts],
+            DocMember = case fabric2_db:open_doc(Db, DocId, OpenOpts) of
+                {not_found, missing} ->
+                    [];
+                {ok, #doc{deleted = true}} ->
+                    [{doc, null}];
+                {ok, #doc{} = Doc} ->
+                    [{doc, couch_doc:to_json_obj(Doc, DocOpts)}]
+            end,
+            Row ++ DocMember;
+        _ ->
+            Row
+    end,
+    chttpd_stats:incr_rows(),
+    {Go, NewVAcc} = couch_mrview_http:view_cb({row, NewRow}, VAcc),
+    {Go, {iter, Db, Args, NewVAcc}};
+
+view_cb(Msg, {iter, Db, Args, VAcc}) ->
+    {Go, NewVAcc} = couch_mrview_http:view_cb(Msg, VAcc),
+    {Go, {iter, Db, Args, NewVAcc}};
 
 view_cb({row, Row} = Msg, Acc) ->
     case lists:keymember(doc, 1, Row) of
-        true -> chttpd_stats:incr_reads();
-        false -> ok
+        true ->
+            chttpd_stats:incr_reads();
+        false ->
+            ok
     end,
     chttpd_stats:incr_rows(),
     couch_mrview_http:view_cb(Msg, Acc);
@@ -2005,70 +2136,3 @@ bulk_get_json_error(DocId, Rev, Error, Reason) ->
                              {<<"rev">>, Rev},
                              {<<"error">>, Error},
                              {<<"reason">>, Reason}]}}]}).
-
-
--ifdef(TEST).
--include_lib("eunit/include/eunit.hrl").
-
-monitor_attachments_test_() ->
-    {"ignore stubs",
-        fun () ->
-            Atts = [couch_att:new([{data, stub}])],
-            ?_assertEqual([], monitor_attachments(Atts))
-        end
-    }.
-
-parse_partitioned_opt_test_() ->
-    {
-        foreach,
-        fun setup/0,
-        fun teardown/1,
-        [
-            t_should_allow_partitioned_db(),
-            t_should_throw_on_not_allowed_partitioned_db(),
-            t_returns_empty_array_for_partitioned_false(),
-            t_returns_empty_array_for_no_partitioned_qs()
-        ]
-    }.
-
-
-setup() ->
-    ok.
-
-teardown(_) ->
-    meck:unload().
-
-mock_request(Url) ->
-    Headers = mochiweb_headers:make([{"Host", "examples.com"}]),
-    MochiReq = mochiweb_request:new(nil, 'PUT', Url, {1, 1}, Headers),
-    #httpd{mochi_req = MochiReq}.
-
-t_should_allow_partitioned_db() ->
-    ?_test(begin
-        meck:expect(couch_flags, is_enabled, 2, true),
-        Req = mock_request("/all-test21?partitioned=true"),
-        [Partitioned, _] = parse_partitioned_opt(Req),
-        ?assertEqual(Partitioned, {partitioned, true})
-    end).
-
-t_should_throw_on_not_allowed_partitioned_db() ->
-    ?_test(begin
-        meck:expect(couch_flags, is_enabled, 2, false),
-        Req = mock_request("/all-test21?partitioned=true"),
-        Throw = {bad_request, <<"Partitioned feature is not enabled.">>},
-        ?assertThrow(Throw, parse_partitioned_opt(Req))
-    end).
-
-t_returns_empty_array_for_partitioned_false() ->
-    ?_test(begin
-        Req = mock_request("/all-test21?partitioned=false"),
-        ?assertEqual(parse_partitioned_opt(Req), [])
-    end).
-
-t_returns_empty_array_for_no_partitioned_qs() ->
-    ?_test(begin
-        Req = mock_request("/all-test21"),
-        ?assertEqual(parse_partitioned_opt(Req), [])
-    end).
-
--endif.
diff --git a/src/chttpd/src/chttpd_misc.erl b/src/chttpd/src/chttpd_misc.erl
index b244e84..e5f0002 100644
--- a/src/chttpd/src/chttpd_misc.erl
+++ b/src/chttpd/src/chttpd_misc.erl
@@ -108,39 +108,54 @@ maybe_add_csp_headers(Headers, _) ->
     Headers.
 
 handle_all_dbs_req(#httpd{method='GET'}=Req) ->
-    % TODO: Support args and options properly, transform
-    % this back into a fold call similar to the old
-    % version.
-    %% Args = couch_mrview_http:parse_params(Req, undefined),
+    #mrargs{
+        start_key = StartKey,
+        end_key = EndKey,
+        direction = Dir,
+        limit = Limit,
+        skip = Skip
+    } = couch_mrview_http:parse_params(Req, undefined),
+
+    Options = [
+        {start_key, StartKey},
+        {end_key, EndKey},
+        {dir, Dir},
+        {limit, Limit},
+        {skip, Skip}
+    ],
+
     % Eventually the Etag for this request will be derived
     % from the \xFFmetadataVersion key in fdb
     Etag = <<"foo">>,
-    %% Options = [{user_ctx, Req#httpd.user_ctx}],
+
     {ok, Resp} = chttpd:etag_respond(Req, Etag, fun() ->
-        AllDbs = fabric2_db:list_dbs(),
-        chttpd:send_json(Req, AllDbs)
-    end);
+        {ok, Resp} = chttpd:start_delayed_json_response(Req, 200, [{"ETag",Etag}]),
+        Callback = fun all_dbs_callback/2,
+        Acc = #vacc{req=Req,resp=Resp},
+        fabric2_db:list_dbs(Callback, Acc, Options)
+    end),
+    case is_record(Resp, vacc) of
+        true -> {ok, Resp#vacc.resp};
+        _ -> {ok, Resp}
+    end;
 handle_all_dbs_req(Req) ->
     send_method_not_allowed(Req, "GET,HEAD").
 
-%% all_dbs_callback({meta, _Meta}, #vacc{resp=Resp0}=Acc) ->
-%%     {ok, Resp1} = chttpd:send_delayed_chunk(Resp0, "["),
-%%     {ok, Acc#vacc{resp=Resp1}};
-%% all_dbs_callback({row, Row}, #vacc{resp=Resp0}=Acc) ->
-%%     Prepend = couch_mrview_http:prepend_val(Acc),
-%%     case couch_util:get_value(id, Row) of <<"_design", _/binary>> ->
-%%         {ok, Acc};
-%%     DbName ->
-%%         {ok, Resp1} = chttpd:send_delayed_chunk(Resp0, [Prepend, ?JSON_ENCODE(DbName)]),
-%%         {ok, Acc#vacc{prepend=",", resp=Resp1}}
-%%     end;
-%% all_dbs_callback(complete, #vacc{resp=Resp0}=Acc) ->
-%%     {ok, Resp1} = chttpd:send_delayed_chunk(Resp0, "]"),
-%%     {ok, Resp2} = chttpd:end_delayed_json_response(Resp1),
-%%     {ok, Acc#vacc{resp=Resp2}};
-%% all_dbs_callback({error, Reason}, #vacc{resp=Resp0}=Acc) ->
-%%     {ok, Resp1} = chttpd:send_delayed_error(Resp0, Reason),
-%%     {ok, Acc#vacc{resp=Resp1}}.
+all_dbs_callback({meta, _Meta}, #vacc{resp=Resp0}=Acc) ->
+    {ok, Resp1} = chttpd:send_delayed_chunk(Resp0, "["),
+    {ok, Acc#vacc{resp=Resp1}};
+all_dbs_callback({row, Row}, #vacc{resp=Resp0}=Acc) ->
+    Prepend = couch_mrview_http:prepend_val(Acc),
+    DbName = couch_util:get_value(id, Row),
+    {ok, Resp1} = chttpd:send_delayed_chunk(Resp0, [Prepend, ?JSON_ENCODE(DbName)]),
+    {ok, Acc#vacc{prepend=",", resp=Resp1}};
+all_dbs_callback(complete, #vacc{resp=Resp0}=Acc) ->
+    {ok, Resp1} = chttpd:send_delayed_chunk(Resp0, "]"),
+    {ok, Resp2} = chttpd:end_delayed_json_response(Resp1),
+    {ok, Acc#vacc{resp=Resp2}};
+all_dbs_callback({error, Reason}, #vacc{resp=Resp0}=Acc) ->
+    {ok, Resp1} = chttpd:send_delayed_error(Resp0, Reason),
+    {ok, Acc#vacc{resp=Resp1}}.
 
 handle_dbs_info_req(#httpd{method='POST'}=Req) ->
     chttpd:validate_ctype(Req, "application/json"),
diff --git a/src/fabric/src/fabric2_db.erl b/src/fabric/src/fabric2_db.erl
index 80028a6..eb74a18 100644
--- a/src/fabric/src/fabric2_db.erl
+++ b/src/fabric/src/fabric2_db.erl
@@ -20,6 +20,7 @@
 
     list_dbs/0,
     list_dbs/1,
+    list_dbs/3,
 
     is_admin/1,
     check_is_admin/1,
@@ -194,8 +195,30 @@ list_dbs() ->
 
 
 list_dbs(Options) ->
+    Callback = fun(DbName, Acc) -> [DbName | Acc] end,
+    DbNames = fabric2_fdb:transactional(fun(Tx) ->
+        fabric2_fdb:list_dbs(Tx, Callback, [], Options)
+    end),
+    lists:reverse(DbNames).
+
+
+list_dbs(UserFun, UserAcc0, Options) ->
+    FoldFun = fun
+        (DbName, Acc) -> maybe_stop(UserFun({row, [{id, DbName}]}, Acc))
+    end,
     fabric2_fdb:transactional(fun(Tx) ->
-        fabric2_fdb:list_dbs(Tx, Options)
+        try
+            UserAcc1 = maybe_stop(UserFun({meta, []}, UserAcc0)),
+            UserAcc2 = fabric2_fdb:list_dbs(
+                    Tx,
+                    FoldFun,
+                    UserAcc1,
+                    Options
+                ),
+            {ok, maybe_stop(UserFun(complete, UserAcc2))}
+        catch throw:{stop, FinalUserAcc} ->
+            {ok, FinalUserAcc}
+        end
     end).
 
 
@@ -406,6 +429,7 @@ open_doc(#{} = Db, <<?LOCAL_DOC_PREFIX, _/binary>> = DocId,
_Options) ->
 open_doc(#{} = Db, DocId, Options) ->
     NeedsTreeOpts = [revs_info, conflicts, deleted_conflicts],
     NeedsTree = (Options -- NeedsTreeOpts /= Options),
+    OpenDeleted = lists:member(deleted, Options),
     fabric2_fdb:transactional(Db, fun(TxDb) ->
         Revs = case NeedsTree of
             true -> fabric2_fdb:get_all_revs(TxDb, DocId);
@@ -414,6 +438,8 @@ open_doc(#{} = Db, DocId, Options) ->
         if Revs == [] -> {not_found, missing}; true ->
             #{winner := true} = RI = lists:last(Revs),
             case fabric2_fdb:get_doc_body(TxDb, DocId, RI) of
+                #doc{deleted = true} when not OpenDeleted ->
+                    {not_found, deleted};
                 #doc{} = Doc ->
                     apply_open_doc_opts(Doc, Revs, Options);
                 Else ->
@@ -451,8 +477,10 @@ open_doc_revs(Db, DocId, Revs, Options) ->
                         rev_path => RevPath
                     },
                     case fabric2_fdb:get_doc_body(TxDb, DocId, RevInfo) of
-                        #doc{} = Doc -> {ok, Doc};
-                        Else -> {Else, {Pos, Rev}}
+                        #doc{} = Doc ->
+                            apply_open_doc_opts(Doc, AllRevInfos, Options);
+                        Else ->
+                            {Else, {Pos, Rev}}
                     end
             end
         end, Found),
@@ -615,9 +643,40 @@ fold_docs(Db, UserFun, UserAcc) ->
     fold_docs(Db, UserFun, UserAcc, []).
 
 
-fold_docs(Db, UserFun, UserAcc, Options) ->
+fold_docs(Db, UserFun, UserAcc0, Options) ->
     fabric2_fdb:transactional(Db, fun(TxDb) ->
-        fabric2_fdb:fold_docs(TxDb, UserFun, UserAcc, Options)
+        try
+            #{
+                db_prefix := DbPrefix
+            } = TxDb,
+
+            Prefix = erlfdb_tuple:pack({?DB_ALL_DOCS}, DbPrefix),
+            DocCount = get_doc_count(TxDb),
+
+            Meta = case lists:keyfind(update_seq, 1, Options) of
+                {_, true} ->
+                    UpdateSeq = fabric2_db:get_update_seq(TxDb),
+                    [{update_seq, UpdateSeq}];
+                _ ->
+                    []
+            end ++ [{total, DocCount}, {offset, null}],
+
+            UserAcc1 = maybe_stop(UserFun({meta, Meta}, UserAcc0)),
+
+            UserAcc2 = fabric2_fdb:fold_range(TxDb, Prefix, fun({K, V}, Acc) ->
+                {DocId} = erlfdb_tuple:unpack(K, Prefix),
+                RevId = erlfdb_tuple:unpack(V),
+                maybe_stop(UserFun({row, [
+                    {id, DocId},
+                    {key, DocId},
+                    {value, {[{rev, couch_doc:rev_to_str(RevId)}]}}
+                ]}, Acc))
+            end, UserAcc1, Options),
+
+            {ok, maybe_stop(UserFun(complete, UserAcc2))}
+        catch throw:{stop, FinalUserAcc} ->
+            {ok, FinalUserAcc}
+        end
     end).
 
 
@@ -627,7 +686,44 @@ fold_changes(Db, SinceSeq, UserFun, UserAcc) ->
 
 fold_changes(Db, SinceSeq, UserFun, UserAcc, Options) ->
     fabric2_fdb:transactional(Db, fun(TxDb) ->
-        fabric2_fdb:fold_changes(TxDb, SinceSeq, UserFun, UserAcc, Options)
+        try
+            #{
+                db_prefix := DbPrefix
+            } = TxDb,
+
+            Prefix = erlfdb_tuple:pack({?DB_CHANGES}, DbPrefix),
+
+            Dir = case fabric2_util:get_value(dir, Options, fwd) of
+                rev -> rev;
+                _ -> fwd
+            end,
+
+            StartKey = get_since_seq(TxDb, Dir, SinceSeq),
+            EndKey = case Dir of
+                rev -> fabric2_util:seq_zero_vs();
+                _ -> fabric2_util:seq_max_vs()
+            end,
+            FoldOpts = [
+                {start_key, StartKey},
+                {end_key, EndKey}
+            ] ++ Options,
+
+            {ok, fabric2_fdb:fold_range(TxDb, Prefix, fun({K, V}, Acc) ->
+                {SeqVS} = erlfdb_tuple:unpack(K, Prefix),
+                {DocId, Deleted, RevId} = erlfdb_tuple:unpack(V),
+
+                Change = #{
+                    id => DocId,
+                    sequence => fabric2_fdb:vs_to_seq(SeqVS),
+                    rev_id => RevId,
+                    deleted => Deleted
+                },
+
+                maybe_stop(UserFun(Change, Acc))
+            end, UserAcc, FoldOpts)}
+        catch throw:{stop, FinalUserAcc} ->
+            {ok, FinalUserAcc}
+        end
     end).
 
 
@@ -796,7 +892,6 @@ apply_open_doc_opts(Doc, Revs, Options) ->
     IncludeConflicts = lists:member(conflicts, Options),
     IncludeDelConflicts = lists:member(deleted_conflicts, Options),
     IncludeLocalSeq = lists:member(local_seq, Options),
-    ReturnDeleted = lists:member(deleted, Options),
 
     % This revs_info becomes fairly useless now that we're
     % not keeping old document bodies around...
@@ -827,14 +922,7 @@ apply_open_doc_opts(Doc, Revs, Options) ->
         [{local_seq, fabric2_fdb:vs_to_seq(SeqVS)}]
     end,
 
-    case Doc#doc.deleted and not ReturnDeleted of
-        true ->
-            {not_found, deleted};
-        false ->
-            {ok, Doc#doc{
-                meta = Meta1 ++ Meta2 ++ Meta3 ++ Meta4
-            }}
-    end.
+    {ok, Doc#doc{meta = Meta1 ++ Meta2 ++ Meta3 ++ Meta4}}.
 
 
 filter_found_revs(RevInfo, Revs) ->
@@ -1289,6 +1377,26 @@ check_duplicate_attachments(#doc{atts = Atts}) ->
     end, ordsets:new(), Atts).
 
 
+get_since_seq(Db, rev, <<>>) ->
+    get_since_seq(Db, rev, now);
+
+get_since_seq(_Db, _Dir, Seq) when Seq == <<>>; Seq == <<"0">>; Seq
== 0->
+    fabric2_util:seq_zero_vs();
+
+get_since_seq(Db, Dir, Seq) when Seq == now; Seq == <<"now">> ->
+    CurrSeq = fabric2_fdb:get_last_change(Db),
+    get_since_seq(Db, Dir, CurrSeq);
+
+get_since_seq(_Db, _Dir, Seq) when is_binary(Seq), size(Seq) == 24 ->
+    fabric2_fdb:next_vs(fabric2_fdb:seq_to_vs(Seq));
+
+get_since_seq(Db, Dir, List) when is_list(List) ->
+    get_since_seq(Db, Dir, list_to_binary(List));
+
+get_since_seq(_Db, _Dir, Seq) ->
+    erlang:error({invalid_since_seq, Seq}).
+
+
 get_leaf_path(Pos, Rev, [{Pos, [{Rev, _RevInfo} | LeafPath]} | _]) ->
     LeafPath;
 get_leaf_path(Pos, Rev, [_WrongLeaf | RestLeafs]) ->
@@ -1353,3 +1461,8 @@ rev(Rev) when is_list(Rev); is_binary(Rev) ->
 rev({Seq, Hash} = Rev) when is_integer(Seq), is_binary(Hash) ->
     Rev.
 
+
+maybe_stop({ok, Acc}) ->
+    Acc;
+maybe_stop({stop, Acc}) ->
+    throw({stop, Acc}).
diff --git a/src/fabric/src/fabric2_fdb.erl b/src/fabric/src/fabric2_fdb.erl
index 4b01826..670ce8b 100644
--- a/src/fabric/src/fabric2_fdb.erl
+++ b/src/fabric/src/fabric2_fdb.erl
@@ -24,7 +24,7 @@
     delete/1,
     exists/1,
 
-    list_dbs/2,
+    list_dbs/4,
 
     get_info/1,
     get_config/1,
@@ -50,11 +50,13 @@
     read_attachment/3,
     write_attachment/3,
 
-    fold_docs/4,
-    fold_changes/5,
     get_last_change/1,
 
+    fold_range/5,
+
     vs_to_seq/1,
+    seq_to_vs/1,
+    next_vs/1,
 
     debug_cluster/0,
     debug_cluster/2
@@ -254,16 +256,15 @@ exists(#{name := DbName} = Db) when is_binary(DbName) ->
     end.
 
 
-list_dbs(Tx, _Options) ->
+list_dbs(Tx, Callback, AccIn, Options) ->
     Root = erlfdb_directory:root(),
     CouchDB = erlfdb_directory:create_or_open(Tx, Root, [<<"couchdb">>]),
     LayerPrefix = erlfdb_directory:get_name(CouchDB),
-    {Start, End} = erlfdb_tuple:range({?ALL_DBS}, LayerPrefix),
-    Future = erlfdb:get_range(Tx, Start, End),
-    lists:map(fun({K, _V}) ->
-        {?ALL_DBS, DbName} = erlfdb_tuple:unpack(K, LayerPrefix),
-        DbName
-    end, erlfdb:wait(Future)).
+    Prefix = erlfdb_tuple:pack({?ALL_DBS}, LayerPrefix),
+    fold_range({tx, Tx}, Prefix, fun({K, _V}, Acc) ->
+        {DbName} = erlfdb_tuple:unpack(K, Prefix),
+        Callback(DbName, Acc)
+    end, AccIn, Options).
 
 
 get_info(#{} = Db) ->
@@ -508,24 +509,26 @@ write_doc(#{} = Db0, Doc, NewWinner0, OldWinner, ToUpdate, ToRemove)
->
     UpdateStatus = case {OldWinner, NewWinner} of
         {not_found, #{deleted := false}} ->
             created;
+        {not_found, #{deleted := true}} ->
+            deleted;
         {#{deleted := true}, #{deleted := false}} ->
             recreated;
         {#{deleted := false}, #{deleted := false}} ->
             updated;
         {#{deleted := false}, #{deleted := true}} ->
+            deleted;
+        {#{deleted := true}, #{deleted := true}} ->
             deleted
     end,
 
     case UpdateStatus of
-        Status when Status == created orelse Status == recreated ->
-            ADKey = erlfdb_tuple:pack({?DB_ALL_DOCS, DocId}, DbPrefix),
-            ADVal = erlfdb_tuple:pack(NewRevId),
-            ok = erlfdb:set(Tx, ADKey, ADVal);
         deleted ->
             ADKey = erlfdb_tuple:pack({?DB_ALL_DOCS, DocId}, DbPrefix),
             ok = erlfdb:clear(Tx, ADKey);
-        updated ->
-            ok
+        _ ->
+            ADKey = erlfdb_tuple:pack({?DB_ALL_DOCS, DocId}, DbPrefix),
+            ADVal = erlfdb_tuple:pack(NewRevId),
+            ok = erlfdb:set(Tx, ADKey, ADVal)
     end,
 
     % _changes
@@ -640,84 +643,6 @@ write_attachment(#{} = Db, DocId, Data) when is_binary(Data) ->
     {ok, AttId}.
 
 
-fold_docs(#{} = Db, UserFun, UserAcc0, Options) ->
-    #{
-        tx := Tx,
-        db_prefix := DbPrefix
-    } = ensure_current(Db),
-
-    {Reverse, Start, End} = get_dir_and_bounds(DbPrefix, Options),
-
-    DocCountKey = erlfdb_tuple:pack({?DB_STATS, <<"doc_count">>}, DbPrefix),
-    DocCountBin = erlfdb:wait(erlfdb:get(Tx, DocCountKey)),
-
-    try
-        UserAcc1 = maybe_stop(UserFun({meta, [
-            {total, ?bin2uint(DocCountBin)},
-            {offset, null}
-        ]}, UserAcc0)),
-
-        UserAcc2 = erlfdb:fold_range(Tx, Start, End, fun({K, V}, UserAccIn) ->
-            {?DB_ALL_DOCS, DocId} = erlfdb_tuple:unpack(K, DbPrefix),
-            RevId = erlfdb_tuple:unpack(V),
-            maybe_stop(UserFun({row, [
-                {id, DocId},
-                {key, DocId},
-                {value, couch_doc:rev_to_str(RevId)}
-            ]}, UserAccIn))
-        end, UserAcc1, [{reverse, Reverse}] ++ Options),
-
-        {ok, maybe_stop(UserFun(complete, UserAcc2))}
-    catch throw:{stop, FinalUserAcc} ->
-        {ok, FinalUserAcc}
-    end.
-
-
-fold_changes(#{} = Db, SinceSeq0, UserFun, UserAcc0, Options) ->
-    #{
-        tx := Tx,
-        db_prefix := DbPrefix
-    } = ensure_current(Db),
-
-    SinceSeq1 = get_since_seq(SinceSeq0),
-
-    Reverse = case fabric2_util:get_value(dir, Options, fwd) of
-        fwd -> false;
-        rev -> true
-    end,
-
-    {Start0, End0} = case Reverse of
-        false -> {SinceSeq1, fabric2_util:seq_max_vs()};
-        true -> {fabric2_util:seq_zero_vs(), SinceSeq1}
-    end,
-
-    Start1 = erlfdb_tuple:pack({?DB_CHANGES, Start0}, DbPrefix),
-    End1 = erlfdb_tuple:pack({?DB_CHANGES, End0}, DbPrefix),
-
-    {Start, End} = case Reverse of
-        false -> {erlfdb_key:first_greater_than(Start1), End1};
-        true -> {Start1, erlfdb_key:first_greater_than(End1)}
-    end,
-
-    try
-        {ok, erlfdb:fold_range(Tx, Start, End, fun({K, V}, UserAccIn) ->
-            {?DB_CHANGES, SeqVS} = erlfdb_tuple:unpack(K, DbPrefix),
-            {DocId, Deleted, RevId} = erlfdb_tuple:unpack(V),
-
-            Change = #{
-                id => DocId,
-                sequence => vs_to_seq(SeqVS),
-                rev_id => RevId,
-                deleted => Deleted
-            },
-
-            maybe_stop(UserFun(Change, UserAccIn))
-        end, UserAcc0, [{reverse, Reverse}] ++ Options)}
-    catch throw:{stop, FinalUserAcc} ->
-        {ok, FinalUserAcc}
-    end.
-
-
 get_last_change(#{} = Db) ->
     #{
         tx := Tx,
@@ -735,17 +660,57 @@ get_last_change(#{} = Db) ->
     end.
 
 
-maybe_stop({ok, Acc}) ->
-    Acc;
-maybe_stop({stop, Acc}) ->
-    throw({stop, Acc}).
+fold_range(#{} = Db, RangePrefix, Callback, Acc, Options) ->
+    #{
+        tx := Tx
+    } = ensure_current(Db),
+    fold_range({tx, Tx}, RangePrefix, Callback, Acc, Options);
+
+fold_range({tx, Tx}, RangePrefix, UserCallback, UserAcc, Options) ->
+    case fabric2_util:get_value(limit, Options) of
+        0 ->
+            % FoundationDB treats a limit of 0 as unlimited
+            % so we have to guard for that here.
+            UserAcc;
+        _ ->
+            {Start, End, Skip, FoldOpts} = get_fold_opts(RangePrefix, Options),
+            Callback = fun fold_range_cb/2,
+            Acc = {skip, Skip, UserCallback, UserAcc},
+            {skip, _, UserCallback, OutAcc} =
+                    erlfdb:fold_range(Tx, Start, End, Callback, Acc, FoldOpts),
+            OutAcc
+    end.
 
 
-vs_to_seq(VS) ->
+vs_to_seq(VS) when is_tuple(VS) ->
+    % 51 is the versionstamp type tag
     <<51:8, SeqBin:12/binary>> = erlfdb_tuple:pack({VS}),
     fabric2_util:to_hex(SeqBin).
 
 
+seq_to_vs(Seq) when is_binary(Seq) ->
+    Seq1 = fabric2_util:from_hex(Seq),
+    % 51 is the versionstamp type tag
+    Seq2 = <<51:8, Seq1/binary>>,
+    {VS} = erlfdb_tuple:unpack(Seq2),
+    VS.
+
+
+next_vs({versionstamp, VS, Batch, TxId}) ->
+    {V, B, T} = case TxId =< 65535 of
+        true ->
+            {VS, Batch, TxId + 1};
+        false ->
+            case Batch =< 65535 of
+                true ->
+                    {VS, Batch + 1, 0};
+                false ->
+                    {VS + 1, 0, 0}
+            end
+    end,
+    {versionstamp, V, B, T}.
+
+
 debug_cluster() ->
     debug_cluster(<<>>, <<16#FE, 16#FF, 16#FF>>).
 
@@ -753,7 +718,7 @@ debug_cluster() ->
 debug_cluster(Start, End) ->
     transactional(fun(Tx) ->
         lists:foreach(fun({Key, Val}) ->
-            io:format("~s => ~s~n", [
+            io:format(standard_error, "~s => ~s~n", [
                     string:pad(erlfdb_util:repr(Key), 60),
                     erlfdb_util:repr(Val)
                 ])
@@ -790,7 +755,7 @@ load_validate_doc_funs(#{} = Db) ->
         {end_key, <<"_design0">>}
     ],
 
-    {ok, Infos1} = fold_docs(Db, FoldFun, [], Options),
+    {ok, Infos1} = fabric2_db:fold_docs(Db, FoldFun, [], Options),
 
     Infos2 = lists:map(fun(Info) ->
         #{
@@ -999,11 +964,12 @@ chunkify_attachment(Data) ->
     end.
 
 
-get_dir_and_bounds(DbPrefix, Options) ->
-    Reverse = case fabric2_util:get_value(dir, Options, fwd) of
-        fwd -> false;
-        rev -> true
+get_fold_opts(RangePrefix, Options) ->
+    Reverse = case fabric2_util:get_value(dir, Options) of
+        rev -> true;
+        _ -> false
     end,
+
     StartKey0 = fabric2_util:get_value(start_key, Options),
     EndKeyGt = fabric2_util:get_value(end_key_gt, Options),
     EndKey0 = fabric2_util:get_value(end_key, Options, EndKeyGt),
@@ -1019,17 +985,17 @@ get_dir_and_bounds(DbPrefix, Options) ->
 
     % Set the maximum bounds for the start and endkey
     StartKey2 = case StartKey1 of
-        undefined -> {?DB_ALL_DOCS};
-        SK2 when is_binary(SK2) -> {?DB_ALL_DOCS, SK2}
+        undefined -> <<>>;
+        SK2 -> SK2
     end,
 
     EndKey2 = case EndKey1 of
-        undefined -> {?DB_ALL_DOCS, <<16#FF>>};
-        EK2 when is_binary(EK2) -> {?DB_ALL_DOCS, EK2}
+        undefined -> <<255>>;
+        EK2 -> EK2
     end,
 
-    StartKey3 = erlfdb_tuple:pack(StartKey2, DbPrefix),
-    EndKey3 = erlfdb_tuple:pack(EndKey2, DbPrefix),
+    StartKey3 = erlfdb_tuple:pack({StartKey2}, RangePrefix),
+    EndKey3 = erlfdb_tuple:pack({EndKey2}, RangePrefix),
 
     % FoundationDB ranges are applied as SK <= key < EK
     % By default, CouchDB is SK <= key <= EK with the
@@ -1056,26 +1022,46 @@ get_dir_and_bounds(DbPrefix, Options) ->
             EndKey3
     end,
 
-    {Reverse, StartKey4, EndKey4}.
+    Skip = case fabric2_util:get_value(skip, Options) of
+        S when is_integer(S), S >= 0 -> S;
+        _ -> 0
+    end,
 
+    Limit = case fabric2_util:get_value(limit, Options) of
+        L when is_integer(L), L >= 0 -> [{limit, L + Skip}];
+        undefined -> []
+    end,
 
-get_since_seq(Seq) when Seq == <<>>; Seq == <<"0">>; Seq == 0->
-    fabric2_util:seq_zero_vs();
+    TargetBytes = case fabric2_util:get_value(target_bytes, Options) of
+        T when is_integer(T), T >= 0 -> [{target_bytes, T}];
+        undefined -> []
+    end,
 
-get_since_seq(Seq) when Seq == now; Seq == <<"now">> ->
-    fabric2_util:seq_max_vs();
+    StreamingMode = case fabric2_util:get_value(streaming_mode, Options) of
+        undefined -> [];
+        Name when is_atom(Name) -> [{streaming_mode, Name}]
+    end,
+
+    Snapshot = case fabric2_util:get_value(snapshot, Options) of
+        undefined -> [];
+        B when is_boolean(B) -> [{snapshot, B}]
+    end,
+
+    OutOpts = [{reverse, Reverse}]
+            ++ Limit
+            ++ TargetBytes
+            ++ StreamingMode
+            ++ Snapshot,
+
+    {StartKey4, EndKey4, Skip, OutOpts}.
 
-get_since_seq(Seq) when is_binary(Seq), size(Seq) == 24 ->
-    Seq1 = fabric2_util:from_hex(Seq),
-    Seq2 = <<51:8, Seq1/binary>>,
-    {SeqVS} = erlfdb_tuple:unpack(Seq2),
-    SeqVS;
 
-get_since_seq(List) when is_list(List) ->
-    get_since_seq(list_to_binary(List));
+fold_range_cb(KV, {skip, 0, Callback, Acc}) ->
+    NewAcc = Callback(KV, Acc),
+    {skip, 0, Callback, NewAcc};
 
-get_since_seq(Seq) ->
-    erlang:error({invalid_since_seq, Seq}).
+fold_range_cb(_KV, {skip, N, Callback, Acc}) when is_integer(N), N > 0 ->
+    {skip, N - 1, Callback, Acc}.
 
 
 get_db_handle() ->
diff --git a/src/fabric/test/fabric2_doc_fold_tests.erl b/src/fabric/test/fabric2_doc_fold_tests.erl
index caa5f92..ee0180f 100644
--- a/src/fabric/test/fabric2_doc_fold_tests.erl
+++ b/src/fabric/test/fabric2_doc_fold_tests.erl
@@ -34,7 +34,10 @@ doc_fold_test_() ->
                 fun fold_docs_with_start_key/1,
                 fun fold_docs_with_end_key/1,
                 fun fold_docs_with_both_keys_the_same/1,
-                fun fold_docs_with_different_keys/1
+                fun fold_docs_with_different_keys/1,
+                fun fold_docs_with_limit/1,
+                fun fold_docs_with_skip/1,
+                fun fold_docs_with_skip_and_limit/1
             ]}
         }
     }.
@@ -50,7 +53,7 @@ setup() ->
             body = {[{<<"value">>, Val}]}
         },
         {ok, Rev} = fabric2_db:update_doc(Db, Doc, []),
-        {DocId, couch_doc:rev_to_str(Rev)}
+        {DocId, {[{rev, couch_doc:rev_to_str(Rev)}]}}
     end, lists:seq(1, ?DOC_COUNT)),
     {Db, lists:sort(DocIdRevs), Ctx}.
 
@@ -108,11 +111,58 @@ fold_docs_with_different_keys({Db, DocIdRevs, _}) ->
     end, lists:seq(1, 500)).
 
 
+fold_docs_with_limit({Db, DocIdRevs, _}) ->
+    lists:foreach(fun(Limit) ->
+        Opts1 = [{limit, Limit}],
+        {ok, {?DOC_COUNT, Rows1}} =
+                fabric2_db:fold_docs(Db, fun fold_fun/2, [], Opts1),
+        ?assertEqual(lists:sublist(DocIdRevs, Limit), lists:reverse(Rows1)),
+
+        Opts2 = [{dir, rev} | Opts1],
+        {ok, {?DOC_COUNT, Rows2}} =
+                fabric2_db:fold_docs(Db, fun fold_fun/2, [], Opts2),
+        ?assertEqual(
+                lists:sublist(lists:reverse(DocIdRevs), Limit),
+                lists:reverse(Rows2)
+            )
+    end, lists:seq(0, 51)).
+
+
+fold_docs_with_skip({Db, DocIdRevs, _}) ->
+    lists:foreach(fun(Skip) ->
+        Opts1 = [{skip, Skip}],
+        {ok, {?DOC_COUNT, Rows1}} =
+                fabric2_db:fold_docs(Db, fun fold_fun/2, [], Opts1),
+        Expect1 = case Skip > length(DocIdRevs) of
+            true -> [];
+            false -> lists:nthtail(Skip, DocIdRevs)
+        end,
+        ?assertEqual(Expect1, lists:reverse(Rows1)),
+
+        Opts2 = [{dir, rev} | Opts1],
+        {ok, {?DOC_COUNT, Rows2}} =
+                fabric2_db:fold_docs(Db, fun fold_fun/2, [], Opts2),
+        Expect2 = case Skip > length(DocIdRevs) of
+            true -> [];
+            false -> lists:nthtail(Skip, lists:reverse(DocIdRevs))
+        end,
+        ?assertEqual(Expect2, lists:reverse(Rows2))
+    end, lists:seq(0, 51)).
+
+
+fold_docs_with_skip_and_limit({Db, DocIdRevs, _}) ->
+    lists:foreach(fun(_) ->
+        check_skip_and_limit(Db, [], DocIdRevs),
+        check_skip_and_limit(Db, [{dir, rev}], lists:reverse(DocIdRevs))
+    end, lists:seq(1, 100)).
+
+
 check_all_combos(Db, StartKey, EndKey, Rows) ->
     Opts1 = make_opts(fwd, StartKey, EndKey, true),
     {ok, {?DOC_COUNT, Rows1}} =
             fabric2_db:fold_docs(Db, fun fold_fun/2, [], Opts1),
     ?assertEqual(lists:reverse(Rows), Rows1),
+    check_skip_and_limit(Db, Opts1, Rows),
 
     Opts2 = make_opts(fwd, StartKey, EndKey, false),
     {ok, {?DOC_COUNT, Rows2}} =
@@ -121,11 +171,13 @@ check_all_combos(Db, StartKey, EndKey, Rows) ->
         lists:reverse(all_but_last(Rows))
     end,
     ?assertEqual(Expect2, Rows2),
+    check_skip_and_limit(Db, Opts2, lists:reverse(Expect2)),
 
     Opts3 = make_opts(rev, StartKey, EndKey, true),
     {ok, {?DOC_COUNT, Rows3}} =
             fabric2_db:fold_docs(Db, fun fold_fun/2, [], Opts3),
     ?assertEqual(Rows, Rows3),
+    check_skip_and_limit(Db, Opts3, lists:reverse(Rows)),
 
     Opts4 = make_opts(rev, StartKey, EndKey, false),
     {ok, {?DOC_COUNT, Rows4}} =
@@ -133,8 +185,34 @@ check_all_combos(Db, StartKey, EndKey, Rows) ->
     Expect4 = if StartKey == undefined -> Rows; true ->
         tl(Rows)
     end,
-    ?assertEqual(Expect4, Rows4).
+    ?assertEqual(Expect4, Rows4),
+    check_skip_and_limit(Db, Opts4, lists:reverse(Expect4)).
+
+
+check_skip_and_limit(Db, Opts, []) ->
+    Skip = rand:uniform(?DOC_COUNT + 1) - 1,
+    Limit = rand:uniform(?DOC_COUNT + 1) - 1,
+    NewOpts = [{skip, Skip}, {limit, Limit} | Opts],
+    {ok, {?DOC_COUNT, OutRows}} =
+            fabric2_db:fold_docs(Db, fun fold_fun/2, [], NewOpts),
+    ?assertEqual([], OutRows);
+
+check_skip_and_limit(Db, Opts, Rows) ->
+    Skip = rand:uniform(length(Rows) + 1) - 1,
+    Limit = rand:uniform(?DOC_COUNT + 1 - Skip) - 1,
+
+    ExpectRows = case Skip >= length(Rows) of
+        true ->
+            [];
+        false ->
+            lists:sublist(lists:nthtail(Skip, Rows), Limit)
+    end,
 
+    SkipLimitOpts = [{skip, Skip}, {limit, Limit} | Opts],
+    {ok, {?DOC_COUNT, RevRows}} =
+            fabric2_db:fold_docs(Db, fun fold_fun/2, [], SkipLimitOpts),
+    OutRows = lists:reverse(RevRows),
+    ?assertEqual(ExpectRows, OutRows).
 
 
 make_opts(fwd, StartKey, EndKey, InclusiveEnd) ->
diff --git a/test/elixir/test/all_docs_test.exs b/test/elixir/test/all_docs_test.exs
index 9f6aeb6..dab153a 100644
--- a/test/elixir/test/all_docs_test.exs
+++ b/test/elixir/test/all_docs_test.exs
@@ -43,7 +43,8 @@ defmodule AllDocsTest do
     # Check _all_docs offset
     retry_until(fn ->
       resp = Couch.get("/#{db_name}/_all_docs", query: %{:startkey => "\"2\""}).body
-      assert resp["offset"] == 2
+      assert resp["offset"] == :null
+			assert Enum.at(resp["rows"], 0)["key"] == "2"
     end)
 
     # Confirm that queries may assume raw collation


Mime
View raw message