couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dav...@apache.org
Subject [couchdb] 02/02: Implement _all_dbs/_all_docs API parameters
Date Tue, 02 Jul 2019 18:02:57 GMT
This is an automated email from the ASF dual-hosted git repository.

davisp pushed a commit to branch prototype/fdb-all-dbs-all-docs-qs-params
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit d7fef90b110c2c62de1ef3d413296adcc1ba7736
Author: Paul J. Davis <paul.joseph.davis@gmail.com>
AuthorDate: Wed Jun 19 11:58:47 2019 -0500

    Implement _all_dbs/_all_docs API parameters
    
    This adds the mapping of CouchDB start/end keys and so on to the similar
    yet slightly different concepts in FoundationDB. The handlers for
    `_all_dbs` and `_all_docs` have been udpated to use this new logic.
---
 src/chttpd/src/chttpd_misc.erl |  70 +++++++++------
 src/fabric/src/fabric2_db.erl  | 123 ++++++++++++++++++++++++++-
 src/fabric/src/fabric2_fdb.erl | 188 +++++++++++++++--------------------------
 3 files changed, 232 insertions(+), 149 deletions(-)

diff --git a/src/chttpd/src/chttpd_misc.erl b/src/chttpd/src/chttpd_misc.erl
index b244e84..796ce71 100644
--- a/src/chttpd/src/chttpd_misc.erl
+++ b/src/chttpd/src/chttpd_misc.erl
@@ -108,39 +108,57 @@ maybe_add_csp_headers(Headers, _) ->
     Headers.
 
 handle_all_dbs_req(#httpd{method='GET'}=Req) ->
-    % TODO: Support args and options properly, transform
-    % this back into a fold call similar to the old
-    % version.
-    %% Args = couch_mrview_http:parse_params(Req, undefined),
+    #mrargs{
+        start_key = StartKey,
+        end_key = EndKey,
+        direction = Dir,
+        limit = Limit,
+        skip = Skip
+    } = couch_mrview_http:parse_params(Req, undefined),
+
+    Options = [
+        {start_key, StartKey},
+        {end_key, EndKey},
+        {dir, Dir},
+        {limit, Limit},
+        {skip, Skip}
+    ],
+
     % Eventually the Etag for this request will be derived
     % from the \xFFmetadataVersion key in fdb
     Etag = <<"foo">>,
-    %% Options = [{user_ctx, Req#httpd.user_ctx}],
+
     {ok, Resp} = chttpd:etag_respond(Req, Etag, fun() ->
-        AllDbs = fabric2_db:list_dbs(),
-        chttpd:send_json(Req, AllDbs)
-    end);
+        {ok, Resp} = chttpd:start_delayed_json_response(Req, 200, [{"ETag",Etag}]),
+        Callback = fun all_dbs_callback/2,
+        Acc = #vacc{req=Req,resp=Resp},
+        fabric2_db:list_dbs(Callback, Acc, Options)
+    end),
+    case is_record(Resp, vacc) of
+        true -> {ok, Resp#vacc.resp};
+        _ -> {ok, Resp}
+    end;
 handle_all_dbs_req(Req) ->
     send_method_not_allowed(Req, "GET,HEAD").
 
-%% all_dbs_callback({meta, _Meta}, #vacc{resp=Resp0}=Acc) ->
-%%     {ok, Resp1} = chttpd:send_delayed_chunk(Resp0, "["),
-%%     {ok, Acc#vacc{resp=Resp1}};
-%% all_dbs_callback({row, Row}, #vacc{resp=Resp0}=Acc) ->
-%%     Prepend = couch_mrview_http:prepend_val(Acc),
-%%     case couch_util:get_value(id, Row) of <<"_design", _/binary>> ->
-%%         {ok, Acc};
-%%     DbName ->
-%%         {ok, Resp1} = chttpd:send_delayed_chunk(Resp0, [Prepend, ?JSON_ENCODE(DbName)]),
-%%         {ok, Acc#vacc{prepend=",", resp=Resp1}}
-%%     end;
-%% all_dbs_callback(complete, #vacc{resp=Resp0}=Acc) ->
-%%     {ok, Resp1} = chttpd:send_delayed_chunk(Resp0, "]"),
-%%     {ok, Resp2} = chttpd:end_delayed_json_response(Resp1),
-%%     {ok, Acc#vacc{resp=Resp2}};
-%% all_dbs_callback({error, Reason}, #vacc{resp=Resp0}=Acc) ->
-%%     {ok, Resp1} = chttpd:send_delayed_error(Resp0, Reason),
-%%     {ok, Acc#vacc{resp=Resp1}}.
+all_dbs_callback({meta, _Meta}, #vacc{resp=Resp0}=Acc) ->
+    {ok, Resp1} = chttpd:send_delayed_chunk(Resp0, "["),
+    {ok, Acc#vacc{resp=Resp1}};
+all_dbs_callback({row, Row}, #vacc{resp=Resp0}=Acc) ->
+    Prepend = couch_mrview_http:prepend_val(Acc),
+    case couch_util:get_value(id, Row) of <<"_design", _/binary>> ->
+        {ok, Acc};
+    DbName ->
+        {ok, Resp1} = chttpd:send_delayed_chunk(Resp0, [Prepend, ?JSON_ENCODE(DbName)]),
+        {ok, Acc#vacc{prepend=",", resp=Resp1}}
+    end;
+all_dbs_callback(complete, #vacc{resp=Resp0}=Acc) ->
+    {ok, Resp1} = chttpd:send_delayed_chunk(Resp0, "]"),
+    {ok, Resp2} = chttpd:end_delayed_json_response(Resp1),
+    {ok, Acc#vacc{resp=Resp2}};
+all_dbs_callback({error, Reason}, #vacc{resp=Resp0}=Acc) ->
+    {ok, Resp1} = chttpd:send_delayed_error(Resp0, Reason),
+    {ok, Acc#vacc{resp=Resp1}}.
 
 handle_dbs_info_req(#httpd{method='POST'}=Req) ->
     chttpd:validate_ctype(Req, "application/json"),
diff --git a/src/fabric/src/fabric2_db.erl b/src/fabric/src/fabric2_db.erl
index 80028a6..7ec1173 100644
--- a/src/fabric/src/fabric2_db.erl
+++ b/src/fabric/src/fabric2_db.erl
@@ -20,6 +20,7 @@
 
     list_dbs/0,
     list_dbs/1,
+    list_dbs/3,
 
     is_admin/1,
     check_is_admin/1,
@@ -194,8 +195,40 @@ list_dbs() ->
 
 
 list_dbs(Options) ->
+    Skip = case fabric2_util:get_value(skip, Options, 0) of
+        N when is_integer(N), N >= 0 -> N
+    end,
+    Callback = fun
+        (DbName, {0, Acc}) -> {0, [DbName | Acc]};
+        (_DbName, {S, Acc}) -> {S - 1, Acc}
+    end,
     fabric2_fdb:transactional(fun(Tx) ->
-        fabric2_fdb:list_dbs(Tx, Options)
+        {_, DbNames} = fabric2_fdb:list_dbs(Tx, Callback, {Skip, []}, Options),
+        lists:reverse(DbNames)
+    end).
+
+
+list_dbs(UserFun, UserAcc0, Options) ->
+    Skip = case fabric2_util:get_value(skip, Options, 0) of
+        N when is_integer(N), N >= 0 -> N
+    end,
+    FoldFun = fun
+        (DbName, {0, Acc}) -> maybe_stop(UserFun({row, [{id, DbName}]}, Acc));
+        (_DbName, {S, Acc}) -> {S - 1, Acc}
+    end,
+    fabric2_fdb:transactional(fun(Tx) ->
+        try
+            UserAcc1 = maybe_stop(UserFun({meta, []}, UserAcc0)),
+            UserAcc2 = fabric2_fdb:list_dbs(
+                    Tx,
+                    FoldFun,
+                    {Skip, UserAcc1},
+                    Options
+                ),
+            {ok, maybe_stop(UserFun(complete, UserAcc2))}
+        catch throw:{stop, FinalUserAcc} ->
+            {ok, FinalUserAcc}
+        end
     end).
 
 
@@ -615,9 +648,35 @@ fold_docs(Db, UserFun, UserAcc) ->
     fold_docs(Db, UserFun, UserAcc, []).
 
 
-fold_docs(Db, UserFun, UserAcc, Options) ->
+fold_docs(Db, UserFun, UserAcc0, Options) ->
     fabric2_fdb:transactional(Db, fun(TxDb) ->
-        fabric2_fdb:fold_docs(TxDb, UserFun, UserAcc, Options)
+        try
+            #{
+                db_prefix := DbPrefix
+            } = TxDb,
+
+            Prefix = erlfdb_tuple:pack({?DB_ALL_DOCS}, DbPrefix),
+            DocCount = get_doc_count(TxDb),
+
+            UserAcc1 = maybe_stop(UserFun({meta, [
+                {total, DocCount},
+                {offset, null}
+            ]}, UserAcc0)),
+
+            UserAcc2 = fabric2_fdb:fold_range(TxDb, Prefix, fun({K, V}, Acc) ->
+                {DocId} = erlfdb_tuple:unpack(K, Prefix),
+                RevId = erlfdb_tuple:unpack(V),
+                maybe_stop(UserFun({row, [
+                    {id, DocId},
+                    {key, DocId},
+                    {value, couch_doc:rev_to_str(RevId)}
+                ]}, Acc))
+            end, UserAcc1, Options),
+
+            {ok, maybe_stop(UserFun(complete, UserAcc2))}
+        catch throw:{stop, FinalUserAcc} ->
+            {ok, FinalUserAcc}
+        end
     end).
 
 
@@ -627,7 +686,39 @@ fold_changes(Db, SinceSeq, UserFun, UserAcc) ->
 
 fold_changes(Db, SinceSeq, UserFun, UserAcc, Options) ->
     fabric2_fdb:transactional(Db, fun(TxDb) ->
-        fabric2_fdb:fold_changes(TxDb, SinceSeq, UserFun, UserAcc, Options)
+        try
+            #{
+                db_prefix := DbPrefix
+            } = TxDb,
+
+            Prefix = erlfdb_tuple:pack({?DB_CHANGES}, DbPrefix),
+
+            StartKey = get_since_seq(SinceSeq),
+            EndKey = case fabric2_util:get_value(dir, Options, fwd) of
+                rev -> fabric2_util:seq_zero_vs();
+                _ -> fabric2_util:seq_max_vs()
+            end,
+            FoldOpts = [
+                {start_key, StartKey},
+                {end_key, EndKey}
+            ] ++ Options,
+
+            {ok, fabric2_fdb:fold_range(TxDb, Prefix, fun({K, V}, Acc) ->
+                {SeqVS} = erlfdb_tuple:unpack(K, Prefix),
+                {DocId, Deleted, RevId} = erlfdb_tuple:unpack(V),
+
+                Change = #{
+                    id => DocId,
+                    sequence => fabric2_fdb:vs_to_seq(SeqVS),
+                    rev_id => RevId,
+                    deleted => Deleted
+                },
+
+                maybe_stop(UserFun(Change, Acc))
+            end, UserAcc, FoldOpts)}
+        catch throw:{stop, FinalUserAcc} ->
+            {ok, FinalUserAcc}
+        end
     end).
 
 
@@ -1289,6 +1380,25 @@ check_duplicate_attachments(#doc{atts = Atts}) ->
     end, ordsets:new(), Atts).
 
 
+get_since_seq(Seq) when Seq == <<>>; Seq == <<"0">>; Seq == 0->
+    fabric2_util:seq_zero_vs();
+
+get_since_seq(Seq) when Seq == now; Seq == <<"now">> ->
+    fabric2_util:seq_max_vs();
+
+get_since_seq(Seq) when is_binary(Seq), size(Seq) == 24 ->
+    Seq1 = fabric2_util:from_hex(Seq),
+    Seq2 = <<51:8, Seq1/binary>>,
+    {SeqVS} = erlfdb_tuple:unpack(Seq2),
+    fabric2_fdb:next_vs(SeqVS);
+
+get_since_seq(List) when is_list(List) ->
+    get_since_seq(list_to_binary(List));
+
+get_since_seq(Seq) ->
+    erlang:error({invalid_since_seq, Seq}).
+
+
 get_leaf_path(Pos, Rev, [{Pos, [{Rev, _RevInfo} | LeafPath]} | _]) ->
     LeafPath;
 get_leaf_path(Pos, Rev, [_WrongLeaf | RestLeafs]) ->
@@ -1353,3 +1463,8 @@ rev(Rev) when is_list(Rev); is_binary(Rev) ->
 rev({Seq, Hash} = Rev) when is_integer(Seq), is_binary(Hash) ->
     Rev.
 
+
+maybe_stop({ok, Acc}) ->
+    Acc;
+maybe_stop({stop, Acc}) ->
+    throw({stop, Acc}).
diff --git a/src/fabric/src/fabric2_fdb.erl b/src/fabric/src/fabric2_fdb.erl
index 4b01826..5a4d9f9 100644
--- a/src/fabric/src/fabric2_fdb.erl
+++ b/src/fabric/src/fabric2_fdb.erl
@@ -24,7 +24,7 @@
     delete/1,
     exists/1,
 
-    list_dbs/2,
+    list_dbs/4,
 
     get_info/1,
     get_config/1,
@@ -50,11 +50,12 @@
     read_attachment/3,
     write_attachment/3,
 
-    fold_docs/4,
-    fold_changes/5,
     get_last_change/1,
 
+    fold_range/5,
+
     vs_to_seq/1,
+    next_vs/1,
 
     debug_cluster/0,
     debug_cluster/2
@@ -254,16 +255,15 @@ exists(#{name := DbName} = Db) when is_binary(DbName) ->
     end.
 
 
-list_dbs(Tx, _Options) ->
+list_dbs(Tx, Callback, AccIn, Options) ->
     Root = erlfdb_directory:root(),
     CouchDB = erlfdb_directory:create_or_open(Tx, Root, [<<"couchdb">>]),
     LayerPrefix = erlfdb_directory:get_name(CouchDB),
-    {Start, End} = erlfdb_tuple:range({?ALL_DBS}, LayerPrefix),
-    Future = erlfdb:get_range(Tx, Start, End),
-    lists:map(fun({K, _V}) ->
-        {?ALL_DBS, DbName} = erlfdb_tuple:unpack(K, LayerPrefix),
-        DbName
-    end, erlfdb:wait(Future)).
+    Prefix = erlfdb_tuple:pack({?ALL_DBS}, LayerPrefix),
+    fold_range({tx, Tx}, Prefix, fun({K, _V}, Acc) ->
+        {DbName} = erlfdb_tuple:unpack(K, Prefix),
+        Callback(DbName, Acc)
+    end, AccIn, Options).
 
 
 get_info(#{} = Db) ->
@@ -640,84 +640,6 @@ write_attachment(#{} = Db, DocId, Data) when is_binary(Data) ->
     {ok, AttId}.
 
 
-fold_docs(#{} = Db, UserFun, UserAcc0, Options) ->
-    #{
-        tx := Tx,
-        db_prefix := DbPrefix
-    } = ensure_current(Db),
-
-    {Reverse, Start, End} = get_dir_and_bounds(DbPrefix, Options),
-
-    DocCountKey = erlfdb_tuple:pack({?DB_STATS, <<"doc_count">>}, DbPrefix),
-    DocCountBin = erlfdb:wait(erlfdb:get(Tx, DocCountKey)),
-
-    try
-        UserAcc1 = maybe_stop(UserFun({meta, [
-            {total, ?bin2uint(DocCountBin)},
-            {offset, null}
-        ]}, UserAcc0)),
-
-        UserAcc2 = erlfdb:fold_range(Tx, Start, End, fun({K, V}, UserAccIn) ->
-            {?DB_ALL_DOCS, DocId} = erlfdb_tuple:unpack(K, DbPrefix),
-            RevId = erlfdb_tuple:unpack(V),
-            maybe_stop(UserFun({row, [
-                {id, DocId},
-                {key, DocId},
-                {value, couch_doc:rev_to_str(RevId)}
-            ]}, UserAccIn))
-        end, UserAcc1, [{reverse, Reverse}] ++ Options),
-
-        {ok, maybe_stop(UserFun(complete, UserAcc2))}
-    catch throw:{stop, FinalUserAcc} ->
-        {ok, FinalUserAcc}
-    end.
-
-
-fold_changes(#{} = Db, SinceSeq0, UserFun, UserAcc0, Options) ->
-    #{
-        tx := Tx,
-        db_prefix := DbPrefix
-    } = ensure_current(Db),
-
-    SinceSeq1 = get_since_seq(SinceSeq0),
-
-    Reverse = case fabric2_util:get_value(dir, Options, fwd) of
-        fwd -> false;
-        rev -> true
-    end,
-
-    {Start0, End0} = case Reverse of
-        false -> {SinceSeq1, fabric2_util:seq_max_vs()};
-        true -> {fabric2_util:seq_zero_vs(), SinceSeq1}
-    end,
-
-    Start1 = erlfdb_tuple:pack({?DB_CHANGES, Start0}, DbPrefix),
-    End1 = erlfdb_tuple:pack({?DB_CHANGES, End0}, DbPrefix),
-
-    {Start, End} = case Reverse of
-        false -> {erlfdb_key:first_greater_than(Start1), End1};
-        true -> {Start1, erlfdb_key:first_greater_than(End1)}
-    end,
-
-    try
-        {ok, erlfdb:fold_range(Tx, Start, End, fun({K, V}, UserAccIn) ->
-            {?DB_CHANGES, SeqVS} = erlfdb_tuple:unpack(K, DbPrefix),
-            {DocId, Deleted, RevId} = erlfdb_tuple:unpack(V),
-
-            Change = #{
-                id => DocId,
-                sequence => vs_to_seq(SeqVS),
-                rev_id => RevId,
-                deleted => Deleted
-            },
-
-            maybe_stop(UserFun(Change, UserAccIn))
-        end, UserAcc0, [{reverse, Reverse}] ++ Options)}
-    catch throw:{stop, FinalUserAcc} ->
-        {ok, FinalUserAcc}
-    end.
-
-
 get_last_change(#{} = Db) ->
     #{
         tx := Tx,
@@ -735,10 +657,15 @@ get_last_change(#{} = Db) ->
     end.
 
 
-maybe_stop({ok, Acc}) ->
-    Acc;
-maybe_stop({stop, Acc}) ->
-    throw({stop, Acc}).
+fold_range(#{} = Db, RangePrefix, Callback, Acc, Options) ->
+    #{
+        tx := Tx
+    } = ensure_current(Db),
+    fold_range({tx, Tx}, RangePrefix, Callback, Acc, Options);
+
+fold_range({tx, Tx}, RangePrefix, Callback, Acc, Options) ->
+    {Start, End, FoldOpts} = get_fold_opts(RangePrefix, Options),
+    erlfdb:fold_range(Tx, Start, End, Callback, Acc, FoldOpts).
 
 
 vs_to_seq(VS) ->
@@ -746,6 +673,21 @@ vs_to_seq(VS) ->
     fabric2_util:to_hex(SeqBin).
 
 
+next_vs({versionstamp, VS, Batch, TxId}) ->
+    {V, B, T} = case TxId =< 65535 of
+        true ->
+            {VS, Batch, TxId + 1};
+        false ->
+            case Batch =< 65535 of
+                true ->
+                    {VS, Batch + 1, 0};
+                false ->
+                    {VS + 1, 0, 0}
+            end
+    end,
+    {versionstamp, V, B, T}.
+
+
 debug_cluster() ->
     debug_cluster(<<>>, <<16#FE, 16#FF, 16#FF>>).
 
@@ -753,7 +695,7 @@ debug_cluster() ->
 debug_cluster(Start, End) ->
     transactional(fun(Tx) ->
         lists:foreach(fun({Key, Val}) ->
-            io:format("~s => ~s~n", [
+            io:format(standard_error, "~s => ~s~n", [
                     string:pad(erlfdb_util:repr(Key), 60),
                     erlfdb_util:repr(Val)
                 ])
@@ -790,7 +732,7 @@ load_validate_doc_funs(#{} = Db) ->
         {end_key, <<"_design0">>}
     ],
 
-    {ok, Infos1} = fold_docs(Db, FoldFun, [], Options),
+    {ok, Infos1} = fabric2_db:fold_docs(Db, FoldFun, [], Options),
 
     Infos2 = lists:map(fun(Info) ->
         #{
@@ -999,11 +941,12 @@ chunkify_attachment(Data) ->
     end.
 
 
-get_dir_and_bounds(DbPrefix, Options) ->
-    Reverse = case fabric2_util:get_value(dir, Options, fwd) of
-        fwd -> false;
-        rev -> true
+get_fold_opts(RangePrefix, Options) ->
+    Reverse = case fabric2_util:get_value(dir, Options) of
+        rev -> true;
+        _ -> false
     end,
+
     StartKey0 = fabric2_util:get_value(start_key, Options),
     EndKeyGt = fabric2_util:get_value(end_key_gt, Options),
     EndKey0 = fabric2_util:get_value(end_key, Options, EndKeyGt),
@@ -1019,17 +962,17 @@ get_dir_and_bounds(DbPrefix, Options) ->
 
     % Set the maximum bounds for the start and endkey
     StartKey2 = case StartKey1 of
-        undefined -> {?DB_ALL_DOCS};
-        SK2 when is_binary(SK2) -> {?DB_ALL_DOCS, SK2}
+        undefined -> <<>>;
+        SK2 -> SK2
     end,
 
     EndKey2 = case EndKey1 of
-        undefined -> {?DB_ALL_DOCS, <<16#FF>>};
-        EK2 when is_binary(EK2) -> {?DB_ALL_DOCS, EK2}
+        undefined -> <<255>>;
+        EK2 -> EK2
     end,
 
-    StartKey3 = erlfdb_tuple:pack(StartKey2, DbPrefix),
-    EndKey3 = erlfdb_tuple:pack(EndKey2, DbPrefix),
+    StartKey3 = erlfdb_tuple:pack({StartKey2}, RangePrefix),
+    EndKey3 = erlfdb_tuple:pack({EndKey2}, RangePrefix),
 
     % FoundationDB ranges are applied as SK <= key < EK
     % By default, CouchDB is SK <= key <= EK with the
@@ -1056,26 +999,33 @@ get_dir_and_bounds(DbPrefix, Options) ->
             EndKey3
     end,
 
-    {Reverse, StartKey4, EndKey4}.
-
+    Limit = case fabric2_util:get_value(limit, Options) of
+        L when is_integer(L), L >= 0 -> [{limit, L}];
+        undefined -> []
+    end,
 
-get_since_seq(Seq) when Seq == <<>>; Seq == <<"0">>; Seq == 0->
-    fabric2_util:seq_zero_vs();
+    TargetBytes = case fabric2_util:get_value(target_bytes, Options) of
+        T when is_integer(T), T >= 0 -> [{target_bytes, T}];
+        undefined -> []
+    end,
 
-get_since_seq(Seq) when Seq == now; Seq == <<"now">> ->
-    fabric2_util:seq_max_vs();
+    StreamingMode = case fabric2_util:get_value(streaming_mode, Options) of
+        undefined -> [];
+        Name when is_atom(Name) -> [{streaming_mode, Name}]
+    end,
 
-get_since_seq(Seq) when is_binary(Seq), size(Seq) == 24 ->
-    Seq1 = fabric2_util:from_hex(Seq),
-    Seq2 = <<51:8, Seq1/binary>>,
-    {SeqVS} = erlfdb_tuple:unpack(Seq2),
-    SeqVS;
+    Snapshot = case fabric2_util:get_value(snapshot, Options) of
+        undefined -> [];
+        B when is_boolean(B) -> [{snapshot, B}]
+    end,
 
-get_since_seq(List) when is_list(List) ->
-    get_since_seq(list_to_binary(List));
+    OutOpts = [{reverse, Reverse}]
+            ++ Limit
+            ++ TargetBytes
+            ++ StreamingMode
+            ++ Snapshot,
 
-get_since_seq(Seq) ->
-    erlang:error({invalid_since_seq, Seq}).
+    {StartKey4, EndKey4, OutOpts}.
 
 
 get_db_handle() ->


Mime
View raw message