couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jiangp...@apache.org
Subject [couchdb] 02/10: [02/10] Clustered Purge: Update single node APIs
Date Tue, 21 Aug 2018 01:38:01 GMT
This is an automated email from the ASF dual-hosted git repository.

jiangphcn pushed a commit to branch COUCHDB-3326-clustered-purge-pr5-implementation
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 4d695f1ec9e309f79a1c9e2a48e3ce4dc5e2ee2b
Author: Paul J. Davis <paul.joseph.davis@gmail.com>
AuthorDate: Tue Apr 24 12:24:10 2018 -0500

    [02/10] Clustered Purge: Update single node APIs
    
    This patch updates the single node API implementations for use with the
    new clustered purge API. At the single node level the major change is to
    store a history of purge requests that can then be consumed by various
    other parts of the database system.
    
    The simpler of the major areas to use this new functionality will be any
    secondary indices. Rather than checking that only a single purge request
    has occurred each secondary index will store a _local document
    referencing its oldest processed purge request. During index updates
    each secondary index implementation will process any new purge requests
    and update its local doc checkpoint. In this way secondary indexes will
    no longer be sensitive to reset when multiple purge requests are issued
    against the database.
    
    The two other major areas that will make use of the newly stored purge
    request history are both of the anit-entropy mechanisms: read-repair and
    internal replication.
    
    Read-repair will use the purge request history to know when a node
    should discard updates that have come from a node that has not yet
    processed a purge request during internal replication. Otherwise
    read-repair would effectively undo any purge replication that happened
    "recently".
    
    Internal replication will use the purge request history to be able to
    mend any differences between shards. For instance, if a shard is down
    when a purge request is issue against a cluster this process will pull
    the purge request and apply it during internal replication. And
    similarly any local purge requests will be applied on the target before
    normal internal replication.
    
    COUCHDB-3326
    
    Co-authored-by: Mayya Sharipova <mayyas@ca.ibm.com>
    Co-authored-by: jiangphcn <jiangph@cn.ibm.com>
---
 src/couch/priv/stats_descriptions.cfg |  12 +++
 src/couch/src/couch_db.erl            | 157 +++++++++++++++++++++++++++--
 src/couch/src/couch_db_plugin.erl     |   6 ++
 src/couch/src/couch_db_updater.erl    | 185 +++++++++++++++++++---------------
 src/couch/src/couch_httpd_db.erl      |  23 +++--
 5 files changed, 284 insertions(+), 99 deletions(-)

diff --git a/src/couch/priv/stats_descriptions.cfg b/src/couch/priv/stats_descriptions.cfg
index f091978..bceb0ce 100644
--- a/src/couch/priv/stats_descriptions.cfg
+++ b/src/couch/priv/stats_descriptions.cfg
@@ -34,6 +34,10 @@
     {type, counter},
     {desc, <<"number of times a document was read from a database">>}
 ]}.
+{[couchdb, database_purges], [
+    {type, counter},
+    {desc, <<"number of times a database was purged">>}
+]}.
 {[couchdb, db_open_time], [
     {type, histogram},
     {desc, <<"milliseconds required to open a database">>}
@@ -46,6 +50,10 @@
     {type, counter},
     {desc, <<"number of document write operations">>}
 ]}.
+{[couchdb, document_purges], [
+    {type, counter},
+    {desc, <<"number of document purge operations">>}
+]}.
 {[couchdb, local_document_writes], [
     {type, counter},
     {desc, <<"number of _local document write operations">>}
@@ -74,6 +82,10 @@
     {type, counter},
     {desc, <<"number of clients for continuous _changes">>}
 ]}.
+{[couchdb, httpd, purge_requests], [
+    {type, counter},
+    {desc, <<"number of purge requests">>}
+]}.
 {[couchdb, httpd_request_methods, 'COPY'], [
     {type, counter},
     {desc, <<"number of HTTP COPY requests">>}
diff --git a/src/couch/src/couch_db.erl b/src/couch/src/couch_db.erl
index 40c673a..8e932b2 100644
--- a/src/couch/src/couch_db.erl
+++ b/src/couch/src/couch_db.erl
@@ -43,7 +43,6 @@
     get_epochs/1,
     get_filepath/1,
     get_instance_start_time/1,
-    get_last_purged/1,
     get_pid/1,
     get_revs_limit/1,
     get_security/1,
@@ -51,12 +50,15 @@
     get_user_ctx/1,
     get_uuid/1,
     get_purge_seq/1,
+    get_oldest_purge_seq/1,
+    get_purge_infos_limit/1,
 
     is_db/1,
     is_system_db/1,
     is_clustered/1,
 
     set_revs_limit/2,
+    set_purge_infos_limit/2,
     set_security/2,
     set_user_ctx/2,
 
@@ -75,6 +77,10 @@
     get_full_doc_infos/2,
     get_missing_revs/2,
     get_design_docs/1,
+    get_purge_infos/2,
+
+    get_minimum_purge_seq/1,
+    purge_client_exists/3,
 
     update_doc/3,
     update_doc/4,
@@ -84,6 +90,7 @@
     delete_doc/3,
 
     purge_docs/2,
+    purge_docs/3,
 
     with_stream/3,
     open_write_stream/2,
@@ -97,6 +104,8 @@
     fold_changes/4,
     fold_changes/5,
     count_changes_since/2,
+    fold_purge_infos/4,
+    fold_purge_infos/5,
 
     calculate_start_seq/3,
     owner_of/2,
@@ -369,8 +378,129 @@ get_full_doc_info(Db, Id) ->
 get_full_doc_infos(Db, Ids) ->
     couch_db_engine:open_docs(Db, Ids).
 
-purge_docs(#db{main_pid=Pid}, IdsRevs) ->
-    gen_server:call(Pid, {purge_docs, IdsRevs}).
+purge_docs(Db, IdRevs) ->
+    purge_docs(Db, IdRevs, []).
+
+-spec purge_docs(#db{}, [{UUId, Id, [Rev]}], [PurgeOption]) ->
+    {ok, [Reply]} when
+    UUId :: binary(),
+    Id :: binary(),
+    Rev :: {non_neg_integer(), binary()},
+    PurgeOption :: interactive_edit | replicated_changes,
+    Reply :: {ok, []} | {ok, [Rev]}.
+purge_docs(#db{main_pid = Pid} = Db, UUIDsIdsRevs, Options) ->
+    % Check here if any UUIDs already exist when
+    % we're not replicating purge infos
+    IsRepl = lists:member(replicated_changes, Options),
+    if IsRepl -> ok; true ->
+        UUIDs = [UUID || {UUID, _, _} <- UUIDsIdsRevs],
+        lists:foreach(fun(Resp) ->
+            if Resp == not_found -> ok; true ->
+                Fmt = "Duplicate purge info UIUD: ~s",
+                Reason = io_lib:format(Fmt, [element(2, Resp)]),
+                throw({badreq, Reason})
+            end
+        end, get_purge_infos(Db, UUIDs))
+    end,
+    increment_stat(Db, [couchdb, database_purges]),
+    gen_server:call(Pid, {purge_docs, UUIDsIdsRevs, Options}).
+
+-spec get_purge_infos(#db{}, [UUId]) -> [PurgeInfo] when
+    UUId :: binary(),
+    PurgeInfo :: {PurgeSeq, UUId, Id, [Rev]} | not_found,
+    PurgeSeq :: non_neg_integer(),
+    Id :: binary(),
+    Rev :: {non_neg_integer(), binary()}.
+get_purge_infos(Db, UUIDs) ->
+    couch_db_engine:load_purge_infos(Db, UUIDs).
+
+
+get_minimum_purge_seq(#db{} = Db) ->
+    PurgeSeq = couch_db_engine:get_purge_seq(Db),
+    OldestPurgeSeq = couch_db_engine:get_oldest_purge_seq(Db),
+    PurgeInfosLimit = couch_db_engine:get_purge_infos_limit(Db),
+
+    FoldFun = fun(#doc{id = DocId, body = {Props}}, SeqAcc) ->
+        case DocId of
+            <<?LOCAL_DOC_PREFIX, "purge-", _/binary>> ->
+                ClientSeq = couch_util:get_value(<<"purge_seq">>, Props),
+                case ClientSeq of
+                    CS when is_integer(CS), CS >= PurgeSeq - PurgeInfosLimit ->
+                        {ok, SeqAcc};
+                    CS when is_integer(CS) ->
+                        case purge_client_exists(Db, DocId, Props) of
+                            true -> {ok, erlang:min(CS, SeqAcc)};
+                            false -> {ok, SeqAcc}
+                        end;
+                    _ ->
+                        % If there's a broken doc we have to keep every
+                        % purge info until the doc is fixed or removed.
+                        Fmt = "Invalid purge doc '~s' on database ~p
+                            with purge_seq '~w'",
+                        DbName = couch_db:name(Db),
+                        couch_log:error(Fmt, [DocId, DbName, ClientSeq]),
+                        {ok, erlang:min(OldestPurgeSeq, SeqAcc)}
+                end;
+            _ ->
+                {stop, SeqAcc}
+        end
+    end,
+    InitMinSeq = PurgeSeq - PurgeInfosLimit,
+    Opts = [
+        {start_key, list_to_binary(?LOCAL_DOC_PREFIX ++ "purge-")}
+    ],
+    {ok, MinIdxSeq} = couch_db:fold_local_docs(Db, FoldFun, InitMinSeq, Opts),
+    FinalSeq = case MinIdxSeq < PurgeSeq - PurgeInfosLimit of
+        true -> MinIdxSeq;
+        false -> erlang:max(0, PurgeSeq - PurgeInfosLimit)
+    end,
+    % Log a warning if we've got a purge sequence exceeding the
+    % configured threshold.
+    if FinalSeq >= (PurgeSeq - PurgeInfosLimit) -> ok; true ->
+        Fmt = "The purge sequence for '~s' exceeds configured threshold",
+        couch_log:warning(Fmt, [couch_db:name(Db)])
+    end,
+    FinalSeq.
+
+
+purge_client_exists(DbName, DocId, Props) ->
+    % Warn about clients that have not updated their purge
+    % checkpoints in the last "index_lag_warn_seconds"
+    LagWindow = config:get_integer(
+            "purge", "index_lag_warn_seconds", 86400), % Default 24 hours
+
+    {Mega, Secs, _} = os:timestamp(),
+    NowSecs = Mega * 1000000 + Secs,
+    LagThreshold = NowSecs - LagWindow,
+
+    try
+        Exists = couch_db_plugin:is_valid_purge_client(DbName, Props),
+        if not Exists -> ok; true ->
+            Updated = couch_util:get_value(<<"updated_on">>, Props),
+            if is_integer(Updated) and Updated > LagThreshold -> ok; true ->
+                Diff = NowSecs - Updated,
+                Fmt1 = "Purge checkpoint '~s' not updated in ~p seconds
+                    in database ~p",
+                couch_log:error(Fmt1, [DocId, Diff, DbName])
+            end
+        end,
+        Exists
+    catch _:_ ->
+        % If we fail to check for a client we have to assume that
+        % it exists.
+        Fmt2 = "Failed to check purge checkpoint using
+            document '~p' in database ~p",
+        couch_log:error(Fmt2, [DbName, DocId]),
+        true
+    end.
+
+
+set_purge_infos_limit(#db{main_pid=Pid}=Db, Limit) when Limit > 0 ->
+    check_is_admin(Db),
+    gen_server:call(Pid, {set_purge_infos_limit, Limit}, infinity);
+set_purge_infos_limit(_Db, _Limit) ->
+    throw(invalid_purge_infos_limit).
+
 
 get_after_doc_read_fun(#db{after_doc_read = Fun}) ->
     Fun.
@@ -390,10 +520,13 @@ get_user_ctx(?OLD_DB_REC = Db) ->
     ?OLD_DB_USER_CTX(Db).
 
 get_purge_seq(#db{}=Db) ->
-    {ok, couch_db_engine:get_purge_seq(Db)}.
+    couch_db_engine:get_purge_seq(Db).
+
+get_oldest_purge_seq(#db{}=Db) ->
+    couch_db_engine:get_oldest_purge_seq(Db).
 
-get_last_purged(#db{}=Db) ->
-    {ok, couch_db_engine:get_last_purged(Db)}.
+get_purge_infos_limit(#db{}=Db) ->
+    couch_db_engine:get_purge_infos_limit(Db).
 
 get_pid(#db{main_pid = Pid}) ->
     Pid.
@@ -471,7 +604,8 @@ get_db_info(Db) ->
     ],
     {ok, InfoList}.
 
-get_design_docs(#db{name = <<"shards/", _:18/binary, DbName/binary>>}) ->
+get_design_docs(#db{name = <<"shards/", _:18/binary, DbFullName/binary>>}) ->
+    DbName = ?l2b(filename:rootname(filename:basename(?b2l(DbFullName)))),
     {_, Ref} = spawn_monitor(fun() -> exit(fabric:design_docs(DbName)) end),
     receive {'DOWN', Ref, _, _, Response} ->
         Response
@@ -481,7 +615,6 @@ get_design_docs(#db{} = Db) ->
     {ok, Docs} = fold_design_docs(Db, FoldFun, [], []),
     {ok, lists:reverse(Docs)}.
 
-
 check_is_admin(#db{user_ctx=UserCtx}=Db) ->
     case is_admin(Db) of
         true -> ok;
@@ -1400,6 +1533,14 @@ fold_changes(Db, StartSeq, UserFun, UserAcc, Opts) ->
     couch_db_engine:fold_changes(Db, StartSeq, UserFun, UserAcc, Opts).
 
 
+fold_purge_infos(Db, StartPurgeSeq, Fun, Acc) ->
+    fold_purge_infos(Db, StartPurgeSeq, Fun, Acc, []).
+
+
+fold_purge_infos(Db, StartPurgeSeq, UFun, UAcc, Opts) ->
+    couch_db_engine:fold_purge_infos(Db, StartPurgeSeq, UFun, UAcc, Opts).
+
+
 count_changes_since(Db, SinceSeq) ->
     couch_db_engine:count_changes_since(Db, SinceSeq).
 
diff --git a/src/couch/src/couch_db_plugin.erl b/src/couch/src/couch_db_plugin.erl
index 8163256..e25866e 100644
--- a/src/couch/src/couch_db_plugin.erl
+++ b/src/couch/src/couch_db_plugin.erl
@@ -18,6 +18,7 @@
     after_doc_read/2,
     validate_docid/1,
     check_is_admin/1,
+    is_valid_purge_client/2,
     on_compact/2,
     on_delete/2
 ]).
@@ -57,6 +58,11 @@ check_is_admin(Db) ->
     %% callbacks return true only if it specifically allow the given Id
     couch_epi:any(Handle, ?SERVICE_ID, check_is_admin, [Db], []).
 
+is_valid_purge_client(DbName, Props) ->
+    Handle = couch_epi:get_handle(?SERVICE_ID),
+    %% callbacks return true only if it specifically allow the given Id
+    couch_epi:any(Handle, ?SERVICE_ID, is_valid_purge_client, [DbName, Props], []).
+
 on_compact(DbName, DDocs) ->
     Handle = couch_epi:get_handle(?SERVICE_ID),
     couch_epi:apply(Handle, ?SERVICE_ID, on_compact, [DbName, DDocs], []).
diff --git a/src/couch/src/couch_db_updater.erl b/src/couch/src/couch_db_updater.erl
index 40e836a..52a4d2f 100644
--- a/src/couch/src/couch_db_updater.erl
+++ b/src/couch/src/couch_db_updater.erl
@@ -94,79 +94,28 @@ handle_call({set_revs_limit, Limit}, _From, Db) ->
     ok = gen_server:call(couch_server, {db_updated, Db3}, infinity),
     {reply, ok, Db3, idle_limit()};
 
-handle_call({purge_docs, _IdRevs}, _From,
-        #db{compactor_pid=Pid}=Db) when Pid /= nil ->
-    {reply, {error, purge_during_compaction}, Db, idle_limit()};
-handle_call({purge_docs, IdRevs}, _From, Db) ->
-    DocIds = [Id || {Id, _Revs} <- IdRevs],
-    OldDocInfos = couch_db_engine:open_docs(Db, DocIds),
-
-    NewDocInfos = lists:flatmap(fun
-        ({{Id, Revs}, #full_doc_info{id = Id, rev_tree = Tree} = FDI}) ->
-            case couch_key_tree:remove_leafs(Tree, Revs) of
-                {_, [] = _RemovedRevs} -> % no change
-                    [];
-                {NewTree, RemovedRevs} ->
-                    NewFDI = FDI#full_doc_info{rev_tree = NewTree},
-                    [{FDI, NewFDI, RemovedRevs}]
-            end;
-        ({_, not_found}) ->
-            []
-    end, lists:zip(IdRevs, OldDocInfos)),
-
-    InitUpdateSeq = couch_db_engine:get_update_seq(Db),
-    InitAcc = {InitUpdateSeq, [], []},
-    FinalAcc = lists:foldl(fun({_, #full_doc_info{} = OldFDI, RemRevs}, Acc) ->
-        #full_doc_info{
-            id = Id,
-            rev_tree = OldTree
-        } = OldFDI,
-        {SeqAcc0, FDIAcc, IdRevsAcc} = Acc,
-
-        {NewFDIAcc, NewSeqAcc} = case OldTree of
-            [] ->
-                % If we purged every #leaf{} in the doc record
-                % then we're removing it completely from the
-                % database.
-                {FDIAcc, SeqAcc0};
-            _ ->
-                % Its possible to purge the #leaf{} that contains
-                % the update_seq where this doc sits in the update_seq
-                % sequence. Rather than do a bunch of complicated checks
-                % we just re-label every #leaf{} and reinsert it into
-                % the update_seq sequence.
-                {NewTree, SeqAcc1} = couch_key_tree:mapfold(fun
-                    (_RevId, Leaf, leaf, InnerSeqAcc) ->
-                        {Leaf#leaf{seq = InnerSeqAcc + 1}, InnerSeqAcc + 1};
-                    (_RevId, Value, _Type, InnerSeqAcc) ->
-                        {Value, InnerSeqAcc}
-                end, SeqAcc0, OldTree),
-
-                NewFDI = OldFDI#full_doc_info{
-                    update_seq = SeqAcc1,
-                    rev_tree = NewTree
-                },
-
-                {[NewFDI | FDIAcc], SeqAcc1}
-        end,
-        NewIdRevsAcc = [{Id, RemRevs} | IdRevsAcc],
-        {NewSeqAcc, NewFDIAcc, NewIdRevsAcc}
-    end, InitAcc, NewDocInfos),
-
-    {_FinalSeq, FDIs, PurgedIdRevs} = FinalAcc,
-
-    % We need to only use the list of #full_doc_info{} records
-    % that we have actually changed due to a purge.
-    PreviousFDIs = [PrevFDI || {PrevFDI, _, _} <- NewDocInfos],
-    Pairs = pair_purge_info(PreviousFDIs, FDIs),
-
-    {ok, Db2} = couch_db_engine:write_doc_infos(Db, Pairs, [], PurgedIdRevs),
-    Db3 = commit_data(Db2),
-    ok = gen_server:call(couch_server, {db_updated, Db3}, infinity),
-    couch_event:notify(Db#db.name, updated),
+handle_call({set_purge_infos_limit, Limit}, _From, Db) ->
+    {ok, Db2} = couch_db_engine:set_purge_infos_limit(Db, Limit),
+    ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
+    {reply, ok, Db2, idle_limit()};
 
-    PurgeSeq = couch_db_engine:get_purge_seq(Db3),
-    {reply, {ok, PurgeSeq, PurgedIdRevs}, Db3, idle_limit()};
+handle_call({purge_docs, [], _}, _From, Db) ->
+    {reply, {ok, []}, Db, idle_limit()};
+
+handle_call({purge_docs, PurgeReqs0, Options}, _From, Db) ->
+    % Filter out any previously applied updates during
+    % internal replication
+    IsRepl = lists:member(replicated_changes, Options),
+    PurgeReqs = if not IsRepl -> PurgeReqs0; true ->
+        UUIDs = [UUID || {UUID, _Id, _Revs} <- PurgeReqs0],
+        PurgeInfos = couch_db_engine:load_purge_infos(Db, UUIDs),
+        lists:flatmap(fun
+            ({not_found, PReq}) -> [PReq];
+            ({{_, _, _, _}, _}) -> []
+        end, lists:zip(PurgeInfos, PurgeReqs0))
+    end,
+    {ok, NewDb, Replies} = purge_docs(Db, PurgeReqs),
+    {reply, {ok, Replies}, NewDb, idle_limit()};
 
 handle_call(Msg, From, Db) ->
     case couch_db_engine:handle_db_updater_call(Msg, From, Db) of
@@ -656,7 +605,7 @@ update_docs_int(Db, DocsList, LocalDocs, MergeConflicts, FullCommit) ->
     Pairs = pair_write_info(OldDocLookups, IndexFDIs),
     LocalDocs2 = update_local_doc_revs(LocalDocs),
 
-    {ok, Db1} = couch_db_engine:write_doc_infos(Db, Pairs, LocalDocs2, []),
+    {ok, Db1} = couch_db_engine:write_doc_infos(Db, Pairs, LocalDocs2),
 
     WriteCount = length(IndexFDIs),
     couch_stats:increment_counter([couchdb, document_inserts],
@@ -702,6 +651,87 @@ update_local_doc_revs(Docs) ->
     end, Docs).
 
 
+purge_docs(Db, []) ->
+    {ok, Db, []};
+
+purge_docs(Db, PurgeReqs) ->
+    Ids = lists:usort(lists:map(fun({_UUID, Id, _Revs}) -> Id end, PurgeReqs)),
+    FDIs = couch_db_engine:open_docs(Db, Ids),
+    USeq = couch_db_engine:get_update_seq(Db),
+
+    IdFDIs = lists:zip(Ids, FDIs),
+    {NewIdFDIs, Replies} = apply_purge_reqs(PurgeReqs, IdFDIs, USeq, []),
+
+    Pairs = lists:flatmap(fun({DocId, OldFDI}) ->
+        {DocId, NewFDI} = lists:keyfind(DocId, 1, NewIdFDIs),
+        case {OldFDI, NewFDI} of
+            {not_found, not_found} ->
+                [];
+            {#full_doc_info{} = A, #full_doc_info{} = A} ->
+                [];
+            {#full_doc_info{}, _} ->
+                [{OldFDI, NewFDI}]
+        end
+    end, IdFDIs),
+
+    PSeq = couch_db_engine:get_purge_seq(Db),
+    {RevPInfos, _} = lists:foldl(fun({UUID, DocId, Revs}, {PIAcc, PSeqAcc}) ->
+        Info = {PSeqAcc + 1, UUID, DocId, Revs},
+        {[Info | PIAcc], PSeqAcc + 1}
+    end, {[], PSeq}, PurgeReqs),
+    PInfos = lists:reverse(RevPInfos),
+
+    {ok, Db1} = couch_db_engine:purge_docs(Db, Pairs, PInfos),
+    Db2 = commit_data(Db1),
+    ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
+    couch_event:notify(Db2#db.name, updated),
+    {ok, Db2, Replies}.
+
+
+apply_purge_reqs([], IdFDIs, _USeq, Replies) ->
+    {IdFDIs, lists:reverse(Replies)};
+
+apply_purge_reqs([Req | RestReqs], IdFDIs, USeq, Replies) ->
+    {_UUID, DocId, Revs} = Req,
+    {value, {_, FDI0}, RestIdFDIs} = lists:keytake(DocId, 1, IdFDIs),
+    {NewFDI, RemovedRevs, NewUSeq} = case FDI0 of
+        #full_doc_info{rev_tree = Tree} ->
+            case couch_key_tree:remove_leafs(Tree, Revs) of
+                {_, []} ->
+                    % No change
+                    {FDI0, [], USeq};
+                {[], Removed} ->
+                    % Completely purged
+                    {not_found, Removed, USeq};
+                {NewTree, Removed} ->
+                    % Its possible to purge the #leaf{} that contains
+                    % the update_seq where this doc sits in the
+                    % update_seq sequence. Rather than do a bunch of
+                    % complicated checks we just re-label every #leaf{}
+                    % and reinsert it into the update_seq sequence.
+                    {NewTree2, NewUpdateSeq} = couch_key_tree:mapfold(fun
+                        (_RevId, Leaf, leaf, SeqAcc) ->
+                            {Leaf#leaf{seq = SeqAcc + 1},
+                                SeqAcc + 1};
+                        (_RevId, Value, _Type, SeqAcc) ->
+                            {Value, SeqAcc}
+                    end, USeq, NewTree),
+
+                    FDI1 = FDI0#full_doc_info{
+                        update_seq = NewUpdateSeq,
+                        rev_tree = NewTree2
+                    },
+                    {FDI1, Removed, NewUpdateSeq}
+            end;
+        not_found ->
+            % Not found means nothing to change
+            {not_found, [], USeq}
+    end,
+    NewIdFDIs = [{DocId, NewFDI} | RestIdFDIs],
+    NewReplies = [{ok, RemovedRevs} | Replies],
+    apply_purge_reqs(RestReqs, NewIdFDIs, NewUSeq, NewReplies).
+
+
 commit_data(Db) ->
     commit_data(Db, false).
 
@@ -731,15 +761,6 @@ pair_write_info(Old, New) ->
     end, New).
 
 
-pair_purge_info(Old, New) ->
-    lists:map(fun(OldFDI) ->
-        case lists:keyfind(OldFDI#full_doc_info.id, #full_doc_info.id, New) of
-            #full_doc_info{} = NewFDI -> {OldFDI, NewFDI};
-            false -> {OldFDI, not_found}
-        end
-    end, Old).
-
-
 get_meta_body_size(Meta) ->
     {ejson_size, ExternalSize} = lists:keyfind(ejson_size, 1, Meta),
     ExternalSize.
diff --git a/src/couch/src/couch_httpd_db.erl b/src/couch/src/couch_httpd_db.erl
index 99b1192..81209d9 100644
--- a/src/couch/src/couch_httpd_db.erl
+++ b/src/couch/src/couch_httpd_db.erl
@@ -376,17 +376,22 @@ db_req(#httpd{path_parts=[_,<<"_bulk_docs">>]}=Req, _Db)
->
     send_method_not_allowed(Req, "POST");
 
 db_req(#httpd{method='POST',path_parts=[_,<<"_purge">>]}=Req, Db) ->
+    couch_stats:increment_counter([couchdb, httpd, purge_requests]),
     couch_httpd:validate_ctype(Req, "application/json"),
-    {IdsRevs} = couch_httpd:json_body_obj(Req),
-    IdsRevs2 = [{Id, couch_doc:parse_revs(Revs)} || {Id, Revs} <- IdsRevs],
+    {IdRevs} = couch_httpd:json_body_obj(Req),
+    PurgeReqs = lists:map(fun({Id, JsonRevs}) ->
+        {couch_uuids:new(), Id, couch_doc:parse_revs(JsonRevs)}
+    end, IdRevs),
 
-    case couch_db:purge_docs(Db, IdsRevs2) of
-    {ok, PurgeSeq, PurgedIdsRevs} ->
-        PurgedIdsRevs2 = [{Id, couch_doc:revs_to_strs(Revs)} || {Id, Revs} <- PurgedIdsRevs],
-        send_json(Req, 200, {[{<<"purge_seq">>, PurgeSeq}, {<<"purged">>,
{PurgedIdsRevs2}}]});
-    Error ->
-        throw(Error)
-    end;
+    {ok, Replies} = couch_db:purge_docs(Db, PurgeReqs),
+
+    Results = lists:zipwith(fun({Id, _}, {ok, Reply}) ->
+        {Id, couch_doc:revs_to_strs(Reply)}
+    end, IdRevs, Replies),
+
+    {ok, Db2} = couch_db:reopen(Db),
+    PurgeSeq = couch_db:get_purge_seq(Db2),
+    send_json(Req, 200, {[{purge_seq, PurgeSeq}, {purged, {Results}}]});
 
 db_req(#httpd{path_parts=[_,<<"_purge">>]}=Req, _Db) ->
     send_method_not_allowed(Req, "POST");


Mime
View raw message