From commits-return-34381-archive-asf-public=cust-asf.ponee.io@couchdb.apache.org Tue Aug 21 03:38:02 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 3D04F180679 for ; Tue, 21 Aug 2018 03:38:01 +0200 (CEST) Received: (qmail 28100 invoked by uid 500); 21 Aug 2018 01:38:00 -0000 Mailing-List: contact commits-help@couchdb.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@couchdb.apache.org Delivered-To: mailing list commits@couchdb.apache.org Received: (qmail 28034 invoked by uid 99); 21 Aug 2018 01:37:59 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 21 Aug 2018 01:37:59 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 4C43E853E6; Tue, 21 Aug 2018 01:37:59 +0000 (UTC) Date: Tue, 21 Aug 2018 01:38:01 +0000 To: "commits@couchdb.apache.org" Subject: [couchdb] 02/10: [02/10] Clustered Purge: Update single node APIs MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit From: jiangphcn@apache.org In-Reply-To: <153481547908.26557.1312947669371349763@gitbox.apache.org> References: <153481547908.26557.1312947669371349763@gitbox.apache.org> X-Git-Host: gitbox.apache.org X-Git-Repo: couchdb X-Git-Refname: refs/heads/COUCHDB-3326-clustered-purge-pr5-implementation X-Git-Reftype: branch X-Git-Rev: 4d695f1ec9e309f79a1c9e2a48e3ce4dc5e2ee2b X-Git-NotificationType: diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated Message-Id: <20180821013759.4C43E853E6@gitbox.apache.org> This is an automated email from the ASF dual-hosted git repository. jiangphcn pushed a commit to branch COUCHDB-3326-clustered-purge-pr5-implementation in repository https://gitbox.apache.org/repos/asf/couchdb.git commit 4d695f1ec9e309f79a1c9e2a48e3ce4dc5e2ee2b Author: Paul J. Davis AuthorDate: Tue Apr 24 12:24:10 2018 -0500 [02/10] Clustered Purge: Update single node APIs This patch updates the single node API implementations for use with the new clustered purge API. At the single node level the major change is to store a history of purge requests that can then be consumed by various other parts of the database system. The simpler of the major areas to use this new functionality will be any secondary indices. Rather than checking that only a single purge request has occurred each secondary index will store a _local document referencing its oldest processed purge request. During index updates each secondary index implementation will process any new purge requests and update its local doc checkpoint. In this way secondary indexes will no longer be sensitive to reset when multiple purge requests are issued against the database. The two other major areas that will make use of the newly stored purge request history are both of the anit-entropy mechanisms: read-repair and internal replication. Read-repair will use the purge request history to know when a node should discard updates that have come from a node that has not yet processed a purge request during internal replication. Otherwise read-repair would effectively undo any purge replication that happened "recently". Internal replication will use the purge request history to be able to mend any differences between shards. For instance, if a shard is down when a purge request is issue against a cluster this process will pull the purge request and apply it during internal replication. And similarly any local purge requests will be applied on the target before normal internal replication. COUCHDB-3326 Co-authored-by: Mayya Sharipova Co-authored-by: jiangphcn --- src/couch/priv/stats_descriptions.cfg | 12 +++ src/couch/src/couch_db.erl | 157 +++++++++++++++++++++++++++-- src/couch/src/couch_db_plugin.erl | 6 ++ src/couch/src/couch_db_updater.erl | 185 +++++++++++++++++++--------------- src/couch/src/couch_httpd_db.erl | 23 +++-- 5 files changed, 284 insertions(+), 99 deletions(-) diff --git a/src/couch/priv/stats_descriptions.cfg b/src/couch/priv/stats_descriptions.cfg index f091978..bceb0ce 100644 --- a/src/couch/priv/stats_descriptions.cfg +++ b/src/couch/priv/stats_descriptions.cfg @@ -34,6 +34,10 @@ {type, counter}, {desc, <<"number of times a document was read from a database">>} ]}. +{[couchdb, database_purges], [ + {type, counter}, + {desc, <<"number of times a database was purged">>} +]}. {[couchdb, db_open_time], [ {type, histogram}, {desc, <<"milliseconds required to open a database">>} @@ -46,6 +50,10 @@ {type, counter}, {desc, <<"number of document write operations">>} ]}. +{[couchdb, document_purges], [ + {type, counter}, + {desc, <<"number of document purge operations">>} +]}. {[couchdb, local_document_writes], [ {type, counter}, {desc, <<"number of _local document write operations">>} @@ -74,6 +82,10 @@ {type, counter}, {desc, <<"number of clients for continuous _changes">>} ]}. +{[couchdb, httpd, purge_requests], [ + {type, counter}, + {desc, <<"number of purge requests">>} +]}. {[couchdb, httpd_request_methods, 'COPY'], [ {type, counter}, {desc, <<"number of HTTP COPY requests">>} diff --git a/src/couch/src/couch_db.erl b/src/couch/src/couch_db.erl index 40c673a..8e932b2 100644 --- a/src/couch/src/couch_db.erl +++ b/src/couch/src/couch_db.erl @@ -43,7 +43,6 @@ get_epochs/1, get_filepath/1, get_instance_start_time/1, - get_last_purged/1, get_pid/1, get_revs_limit/1, get_security/1, @@ -51,12 +50,15 @@ get_user_ctx/1, get_uuid/1, get_purge_seq/1, + get_oldest_purge_seq/1, + get_purge_infos_limit/1, is_db/1, is_system_db/1, is_clustered/1, set_revs_limit/2, + set_purge_infos_limit/2, set_security/2, set_user_ctx/2, @@ -75,6 +77,10 @@ get_full_doc_infos/2, get_missing_revs/2, get_design_docs/1, + get_purge_infos/2, + + get_minimum_purge_seq/1, + purge_client_exists/3, update_doc/3, update_doc/4, @@ -84,6 +90,7 @@ delete_doc/3, purge_docs/2, + purge_docs/3, with_stream/3, open_write_stream/2, @@ -97,6 +104,8 @@ fold_changes/4, fold_changes/5, count_changes_since/2, + fold_purge_infos/4, + fold_purge_infos/5, calculate_start_seq/3, owner_of/2, @@ -369,8 +378,129 @@ get_full_doc_info(Db, Id) -> get_full_doc_infos(Db, Ids) -> couch_db_engine:open_docs(Db, Ids). -purge_docs(#db{main_pid=Pid}, IdsRevs) -> - gen_server:call(Pid, {purge_docs, IdsRevs}). +purge_docs(Db, IdRevs) -> + purge_docs(Db, IdRevs, []). + +-spec purge_docs(#db{}, [{UUId, Id, [Rev]}], [PurgeOption]) -> + {ok, [Reply]} when + UUId :: binary(), + Id :: binary(), + Rev :: {non_neg_integer(), binary()}, + PurgeOption :: interactive_edit | replicated_changes, + Reply :: {ok, []} | {ok, [Rev]}. +purge_docs(#db{main_pid = Pid} = Db, UUIDsIdsRevs, Options) -> + % Check here if any UUIDs already exist when + % we're not replicating purge infos + IsRepl = lists:member(replicated_changes, Options), + if IsRepl -> ok; true -> + UUIDs = [UUID || {UUID, _, _} <- UUIDsIdsRevs], + lists:foreach(fun(Resp) -> + if Resp == not_found -> ok; true -> + Fmt = "Duplicate purge info UIUD: ~s", + Reason = io_lib:format(Fmt, [element(2, Resp)]), + throw({badreq, Reason}) + end + end, get_purge_infos(Db, UUIDs)) + end, + increment_stat(Db, [couchdb, database_purges]), + gen_server:call(Pid, {purge_docs, UUIDsIdsRevs, Options}). + +-spec get_purge_infos(#db{}, [UUId]) -> [PurgeInfo] when + UUId :: binary(), + PurgeInfo :: {PurgeSeq, UUId, Id, [Rev]} | not_found, + PurgeSeq :: non_neg_integer(), + Id :: binary(), + Rev :: {non_neg_integer(), binary()}. +get_purge_infos(Db, UUIDs) -> + couch_db_engine:load_purge_infos(Db, UUIDs). + + +get_minimum_purge_seq(#db{} = Db) -> + PurgeSeq = couch_db_engine:get_purge_seq(Db), + OldestPurgeSeq = couch_db_engine:get_oldest_purge_seq(Db), + PurgeInfosLimit = couch_db_engine:get_purge_infos_limit(Db), + + FoldFun = fun(#doc{id = DocId, body = {Props}}, SeqAcc) -> + case DocId of + <> -> + ClientSeq = couch_util:get_value(<<"purge_seq">>, Props), + case ClientSeq of + CS when is_integer(CS), CS >= PurgeSeq - PurgeInfosLimit -> + {ok, SeqAcc}; + CS when is_integer(CS) -> + case purge_client_exists(Db, DocId, Props) of + true -> {ok, erlang:min(CS, SeqAcc)}; + false -> {ok, SeqAcc} + end; + _ -> + % If there's a broken doc we have to keep every + % purge info until the doc is fixed or removed. + Fmt = "Invalid purge doc '~s' on database ~p + with purge_seq '~w'", + DbName = couch_db:name(Db), + couch_log:error(Fmt, [DocId, DbName, ClientSeq]), + {ok, erlang:min(OldestPurgeSeq, SeqAcc)} + end; + _ -> + {stop, SeqAcc} + end + end, + InitMinSeq = PurgeSeq - PurgeInfosLimit, + Opts = [ + {start_key, list_to_binary(?LOCAL_DOC_PREFIX ++ "purge-")} + ], + {ok, MinIdxSeq} = couch_db:fold_local_docs(Db, FoldFun, InitMinSeq, Opts), + FinalSeq = case MinIdxSeq < PurgeSeq - PurgeInfosLimit of + true -> MinIdxSeq; + false -> erlang:max(0, PurgeSeq - PurgeInfosLimit) + end, + % Log a warning if we've got a purge sequence exceeding the + % configured threshold. + if FinalSeq >= (PurgeSeq - PurgeInfosLimit) -> ok; true -> + Fmt = "The purge sequence for '~s' exceeds configured threshold", + couch_log:warning(Fmt, [couch_db:name(Db)]) + end, + FinalSeq. + + +purge_client_exists(DbName, DocId, Props) -> + % Warn about clients that have not updated their purge + % checkpoints in the last "index_lag_warn_seconds" + LagWindow = config:get_integer( + "purge", "index_lag_warn_seconds", 86400), % Default 24 hours + + {Mega, Secs, _} = os:timestamp(), + NowSecs = Mega * 1000000 + Secs, + LagThreshold = NowSecs - LagWindow, + + try + Exists = couch_db_plugin:is_valid_purge_client(DbName, Props), + if not Exists -> ok; true -> + Updated = couch_util:get_value(<<"updated_on">>, Props), + if is_integer(Updated) and Updated > LagThreshold -> ok; true -> + Diff = NowSecs - Updated, + Fmt1 = "Purge checkpoint '~s' not updated in ~p seconds + in database ~p", + couch_log:error(Fmt1, [DocId, Diff, DbName]) + end + end, + Exists + catch _:_ -> + % If we fail to check for a client we have to assume that + % it exists. + Fmt2 = "Failed to check purge checkpoint using + document '~p' in database ~p", + couch_log:error(Fmt2, [DbName, DocId]), + true + end. + + +set_purge_infos_limit(#db{main_pid=Pid}=Db, Limit) when Limit > 0 -> + check_is_admin(Db), + gen_server:call(Pid, {set_purge_infos_limit, Limit}, infinity); +set_purge_infos_limit(_Db, _Limit) -> + throw(invalid_purge_infos_limit). + get_after_doc_read_fun(#db{after_doc_read = Fun}) -> Fun. @@ -390,10 +520,13 @@ get_user_ctx(?OLD_DB_REC = Db) -> ?OLD_DB_USER_CTX(Db). get_purge_seq(#db{}=Db) -> - {ok, couch_db_engine:get_purge_seq(Db)}. + couch_db_engine:get_purge_seq(Db). + +get_oldest_purge_seq(#db{}=Db) -> + couch_db_engine:get_oldest_purge_seq(Db). -get_last_purged(#db{}=Db) -> - {ok, couch_db_engine:get_last_purged(Db)}. +get_purge_infos_limit(#db{}=Db) -> + couch_db_engine:get_purge_infos_limit(Db). get_pid(#db{main_pid = Pid}) -> Pid. @@ -471,7 +604,8 @@ get_db_info(Db) -> ], {ok, InfoList}. -get_design_docs(#db{name = <<"shards/", _:18/binary, DbName/binary>>}) -> +get_design_docs(#db{name = <<"shards/", _:18/binary, DbFullName/binary>>}) -> + DbName = ?l2b(filename:rootname(filename:basename(?b2l(DbFullName)))), {_, Ref} = spawn_monitor(fun() -> exit(fabric:design_docs(DbName)) end), receive {'DOWN', Ref, _, _, Response} -> Response @@ -481,7 +615,6 @@ get_design_docs(#db{} = Db) -> {ok, Docs} = fold_design_docs(Db, FoldFun, [], []), {ok, lists:reverse(Docs)}. - check_is_admin(#db{user_ctx=UserCtx}=Db) -> case is_admin(Db) of true -> ok; @@ -1400,6 +1533,14 @@ fold_changes(Db, StartSeq, UserFun, UserAcc, Opts) -> couch_db_engine:fold_changes(Db, StartSeq, UserFun, UserAcc, Opts). +fold_purge_infos(Db, StartPurgeSeq, Fun, Acc) -> + fold_purge_infos(Db, StartPurgeSeq, Fun, Acc, []). + + +fold_purge_infos(Db, StartPurgeSeq, UFun, UAcc, Opts) -> + couch_db_engine:fold_purge_infos(Db, StartPurgeSeq, UFun, UAcc, Opts). + + count_changes_since(Db, SinceSeq) -> couch_db_engine:count_changes_since(Db, SinceSeq). diff --git a/src/couch/src/couch_db_plugin.erl b/src/couch/src/couch_db_plugin.erl index 8163256..e25866e 100644 --- a/src/couch/src/couch_db_plugin.erl +++ b/src/couch/src/couch_db_plugin.erl @@ -18,6 +18,7 @@ after_doc_read/2, validate_docid/1, check_is_admin/1, + is_valid_purge_client/2, on_compact/2, on_delete/2 ]). @@ -57,6 +58,11 @@ check_is_admin(Db) -> %% callbacks return true only if it specifically allow the given Id couch_epi:any(Handle, ?SERVICE_ID, check_is_admin, [Db], []). +is_valid_purge_client(DbName, Props) -> + Handle = couch_epi:get_handle(?SERVICE_ID), + %% callbacks return true only if it specifically allow the given Id + couch_epi:any(Handle, ?SERVICE_ID, is_valid_purge_client, [DbName, Props], []). + on_compact(DbName, DDocs) -> Handle = couch_epi:get_handle(?SERVICE_ID), couch_epi:apply(Handle, ?SERVICE_ID, on_compact, [DbName, DDocs], []). diff --git a/src/couch/src/couch_db_updater.erl b/src/couch/src/couch_db_updater.erl index 40e836a..52a4d2f 100644 --- a/src/couch/src/couch_db_updater.erl +++ b/src/couch/src/couch_db_updater.erl @@ -94,79 +94,28 @@ handle_call({set_revs_limit, Limit}, _From, Db) -> ok = gen_server:call(couch_server, {db_updated, Db3}, infinity), {reply, ok, Db3, idle_limit()}; -handle_call({purge_docs, _IdRevs}, _From, - #db{compactor_pid=Pid}=Db) when Pid /= nil -> - {reply, {error, purge_during_compaction}, Db, idle_limit()}; -handle_call({purge_docs, IdRevs}, _From, Db) -> - DocIds = [Id || {Id, _Revs} <- IdRevs], - OldDocInfos = couch_db_engine:open_docs(Db, DocIds), - - NewDocInfos = lists:flatmap(fun - ({{Id, Revs}, #full_doc_info{id = Id, rev_tree = Tree} = FDI}) -> - case couch_key_tree:remove_leafs(Tree, Revs) of - {_, [] = _RemovedRevs} -> % no change - []; - {NewTree, RemovedRevs} -> - NewFDI = FDI#full_doc_info{rev_tree = NewTree}, - [{FDI, NewFDI, RemovedRevs}] - end; - ({_, not_found}) -> - [] - end, lists:zip(IdRevs, OldDocInfos)), - - InitUpdateSeq = couch_db_engine:get_update_seq(Db), - InitAcc = {InitUpdateSeq, [], []}, - FinalAcc = lists:foldl(fun({_, #full_doc_info{} = OldFDI, RemRevs}, Acc) -> - #full_doc_info{ - id = Id, - rev_tree = OldTree - } = OldFDI, - {SeqAcc0, FDIAcc, IdRevsAcc} = Acc, - - {NewFDIAcc, NewSeqAcc} = case OldTree of - [] -> - % If we purged every #leaf{} in the doc record - % then we're removing it completely from the - % database. - {FDIAcc, SeqAcc0}; - _ -> - % Its possible to purge the #leaf{} that contains - % the update_seq where this doc sits in the update_seq - % sequence. Rather than do a bunch of complicated checks - % we just re-label every #leaf{} and reinsert it into - % the update_seq sequence. - {NewTree, SeqAcc1} = couch_key_tree:mapfold(fun - (_RevId, Leaf, leaf, InnerSeqAcc) -> - {Leaf#leaf{seq = InnerSeqAcc + 1}, InnerSeqAcc + 1}; - (_RevId, Value, _Type, InnerSeqAcc) -> - {Value, InnerSeqAcc} - end, SeqAcc0, OldTree), - - NewFDI = OldFDI#full_doc_info{ - update_seq = SeqAcc1, - rev_tree = NewTree - }, - - {[NewFDI | FDIAcc], SeqAcc1} - end, - NewIdRevsAcc = [{Id, RemRevs} | IdRevsAcc], - {NewSeqAcc, NewFDIAcc, NewIdRevsAcc} - end, InitAcc, NewDocInfos), - - {_FinalSeq, FDIs, PurgedIdRevs} = FinalAcc, - - % We need to only use the list of #full_doc_info{} records - % that we have actually changed due to a purge. - PreviousFDIs = [PrevFDI || {PrevFDI, _, _} <- NewDocInfos], - Pairs = pair_purge_info(PreviousFDIs, FDIs), - - {ok, Db2} = couch_db_engine:write_doc_infos(Db, Pairs, [], PurgedIdRevs), - Db3 = commit_data(Db2), - ok = gen_server:call(couch_server, {db_updated, Db3}, infinity), - couch_event:notify(Db#db.name, updated), +handle_call({set_purge_infos_limit, Limit}, _From, Db) -> + {ok, Db2} = couch_db_engine:set_purge_infos_limit(Db, Limit), + ok = gen_server:call(couch_server, {db_updated, Db2}, infinity), + {reply, ok, Db2, idle_limit()}; - PurgeSeq = couch_db_engine:get_purge_seq(Db3), - {reply, {ok, PurgeSeq, PurgedIdRevs}, Db3, idle_limit()}; +handle_call({purge_docs, [], _}, _From, Db) -> + {reply, {ok, []}, Db, idle_limit()}; + +handle_call({purge_docs, PurgeReqs0, Options}, _From, Db) -> + % Filter out any previously applied updates during + % internal replication + IsRepl = lists:member(replicated_changes, Options), + PurgeReqs = if not IsRepl -> PurgeReqs0; true -> + UUIDs = [UUID || {UUID, _Id, _Revs} <- PurgeReqs0], + PurgeInfos = couch_db_engine:load_purge_infos(Db, UUIDs), + lists:flatmap(fun + ({not_found, PReq}) -> [PReq]; + ({{_, _, _, _}, _}) -> [] + end, lists:zip(PurgeInfos, PurgeReqs0)) + end, + {ok, NewDb, Replies} = purge_docs(Db, PurgeReqs), + {reply, {ok, Replies}, NewDb, idle_limit()}; handle_call(Msg, From, Db) -> case couch_db_engine:handle_db_updater_call(Msg, From, Db) of @@ -656,7 +605,7 @@ update_docs_int(Db, DocsList, LocalDocs, MergeConflicts, FullCommit) -> Pairs = pair_write_info(OldDocLookups, IndexFDIs), LocalDocs2 = update_local_doc_revs(LocalDocs), - {ok, Db1} = couch_db_engine:write_doc_infos(Db, Pairs, LocalDocs2, []), + {ok, Db1} = couch_db_engine:write_doc_infos(Db, Pairs, LocalDocs2), WriteCount = length(IndexFDIs), couch_stats:increment_counter([couchdb, document_inserts], @@ -702,6 +651,87 @@ update_local_doc_revs(Docs) -> end, Docs). +purge_docs(Db, []) -> + {ok, Db, []}; + +purge_docs(Db, PurgeReqs) -> + Ids = lists:usort(lists:map(fun({_UUID, Id, _Revs}) -> Id end, PurgeReqs)), + FDIs = couch_db_engine:open_docs(Db, Ids), + USeq = couch_db_engine:get_update_seq(Db), + + IdFDIs = lists:zip(Ids, FDIs), + {NewIdFDIs, Replies} = apply_purge_reqs(PurgeReqs, IdFDIs, USeq, []), + + Pairs = lists:flatmap(fun({DocId, OldFDI}) -> + {DocId, NewFDI} = lists:keyfind(DocId, 1, NewIdFDIs), + case {OldFDI, NewFDI} of + {not_found, not_found} -> + []; + {#full_doc_info{} = A, #full_doc_info{} = A} -> + []; + {#full_doc_info{}, _} -> + [{OldFDI, NewFDI}] + end + end, IdFDIs), + + PSeq = couch_db_engine:get_purge_seq(Db), + {RevPInfos, _} = lists:foldl(fun({UUID, DocId, Revs}, {PIAcc, PSeqAcc}) -> + Info = {PSeqAcc + 1, UUID, DocId, Revs}, + {[Info | PIAcc], PSeqAcc + 1} + end, {[], PSeq}, PurgeReqs), + PInfos = lists:reverse(RevPInfos), + + {ok, Db1} = couch_db_engine:purge_docs(Db, Pairs, PInfos), + Db2 = commit_data(Db1), + ok = gen_server:call(couch_server, {db_updated, Db2}, infinity), + couch_event:notify(Db2#db.name, updated), + {ok, Db2, Replies}. + + +apply_purge_reqs([], IdFDIs, _USeq, Replies) -> + {IdFDIs, lists:reverse(Replies)}; + +apply_purge_reqs([Req | RestReqs], IdFDIs, USeq, Replies) -> + {_UUID, DocId, Revs} = Req, + {value, {_, FDI0}, RestIdFDIs} = lists:keytake(DocId, 1, IdFDIs), + {NewFDI, RemovedRevs, NewUSeq} = case FDI0 of + #full_doc_info{rev_tree = Tree} -> + case couch_key_tree:remove_leafs(Tree, Revs) of + {_, []} -> + % No change + {FDI0, [], USeq}; + {[], Removed} -> + % Completely purged + {not_found, Removed, USeq}; + {NewTree, Removed} -> + % Its possible to purge the #leaf{} that contains + % the update_seq where this doc sits in the + % update_seq sequence. Rather than do a bunch of + % complicated checks we just re-label every #leaf{} + % and reinsert it into the update_seq sequence. + {NewTree2, NewUpdateSeq} = couch_key_tree:mapfold(fun + (_RevId, Leaf, leaf, SeqAcc) -> + {Leaf#leaf{seq = SeqAcc + 1}, + SeqAcc + 1}; + (_RevId, Value, _Type, SeqAcc) -> + {Value, SeqAcc} + end, USeq, NewTree), + + FDI1 = FDI0#full_doc_info{ + update_seq = NewUpdateSeq, + rev_tree = NewTree2 + }, + {FDI1, Removed, NewUpdateSeq} + end; + not_found -> + % Not found means nothing to change + {not_found, [], USeq} + end, + NewIdFDIs = [{DocId, NewFDI} | RestIdFDIs], + NewReplies = [{ok, RemovedRevs} | Replies], + apply_purge_reqs(RestReqs, NewIdFDIs, NewUSeq, NewReplies). + + commit_data(Db) -> commit_data(Db, false). @@ -731,15 +761,6 @@ pair_write_info(Old, New) -> end, New). -pair_purge_info(Old, New) -> - lists:map(fun(OldFDI) -> - case lists:keyfind(OldFDI#full_doc_info.id, #full_doc_info.id, New) of - #full_doc_info{} = NewFDI -> {OldFDI, NewFDI}; - false -> {OldFDI, not_found} - end - end, Old). - - get_meta_body_size(Meta) -> {ejson_size, ExternalSize} = lists:keyfind(ejson_size, 1, Meta), ExternalSize. diff --git a/src/couch/src/couch_httpd_db.erl b/src/couch/src/couch_httpd_db.erl index 99b1192..81209d9 100644 --- a/src/couch/src/couch_httpd_db.erl +++ b/src/couch/src/couch_httpd_db.erl @@ -376,17 +376,22 @@ db_req(#httpd{path_parts=[_,<<"_bulk_docs">>]}=Req, _Db) -> send_method_not_allowed(Req, "POST"); db_req(#httpd{method='POST',path_parts=[_,<<"_purge">>]}=Req, Db) -> + couch_stats:increment_counter([couchdb, httpd, purge_requests]), couch_httpd:validate_ctype(Req, "application/json"), - {IdsRevs} = couch_httpd:json_body_obj(Req), - IdsRevs2 = [{Id, couch_doc:parse_revs(Revs)} || {Id, Revs} <- IdsRevs], + {IdRevs} = couch_httpd:json_body_obj(Req), + PurgeReqs = lists:map(fun({Id, JsonRevs}) -> + {couch_uuids:new(), Id, couch_doc:parse_revs(JsonRevs)} + end, IdRevs), - case couch_db:purge_docs(Db, IdsRevs2) of - {ok, PurgeSeq, PurgedIdsRevs} -> - PurgedIdsRevs2 = [{Id, couch_doc:revs_to_strs(Revs)} || {Id, Revs} <- PurgedIdsRevs], - send_json(Req, 200, {[{<<"purge_seq">>, PurgeSeq}, {<<"purged">>, {PurgedIdsRevs2}}]}); - Error -> - throw(Error) - end; + {ok, Replies} = couch_db:purge_docs(Db, PurgeReqs), + + Results = lists:zipwith(fun({Id, _}, {ok, Reply}) -> + {Id, couch_doc:revs_to_strs(Reply)} + end, IdRevs, Replies), + + {ok, Db2} = couch_db:reopen(Db), + PurgeSeq = couch_db:get_purge_seq(Db2), + send_json(Req, 200, {[{purge_seq, PurgeSeq}, {purged, {Results}}]}); db_req(#httpd{path_parts=[_,<<"_purge">>]}=Req, _Db) -> send_method_not_allowed(Req, "POST");