Return-Path: X-Original-To: apmail-couchdb-commits-archive@www.apache.org Delivered-To: apmail-couchdb-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id DB42B11B43 for ; Thu, 28 Aug 2014 12:11:38 +0000 (UTC) Received: (qmail 3705 invoked by uid 500); 28 Aug 2014 12:11:38 -0000 Delivered-To: apmail-couchdb-commits-archive@couchdb.apache.org Received: (qmail 3485 invoked by uid 500); 28 Aug 2014 12:11:38 -0000 Mailing-List: contact commits-help@couchdb.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@couchdb.apache.org Delivered-To: mailing list commits@couchdb.apache.org Received: (qmail 3177 invoked by uid 99); 28 Aug 2014 12:11:38 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 28 Aug 2014 12:11:38 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id E1235A02BA8; Thu, 28 Aug 2014 12:11:37 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: rnewson@apache.org To: commits@couchdb.apache.org Date: Thu, 28 Aug 2014 12:11:41 -0000 Message-Id: In-Reply-To: <149709301f134b60b82ddbd5d3fa6544@git.apache.org> References: <149709301f134b60b82ddbd5d3fa6544@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [05/50] couch commit: updated refs/heads/master to 9d0ac7d Handle duplicate doc updates in single commit Before this commit, there were bugs in couch_db and couch_db_updater which caused couch to incorrectly report that all duplicate docs in a group commit failed from conflicts. The changes in this commit cause couch to correctly report that one of the duplicate updates in the group commit succeeded. This commit also changes behavior slightly by applying duplicate doc updates in the order they were supplied. This change is for consistency with CouchDB. This work is based on the patch by Bob Dionne. BugzID: 12540 Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch/repo Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch/commit/74358fce Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch/tree/74358fce Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch/diff/74358fce Branch: refs/heads/master Commit: 74358fceecbb810645c305baede26976059158b7 Parents: 00ce8a1 Author: Benjamin Bastian Authored: Tue Aug 27 11:26:58 2013 -0700 Committer: Robert Newson Committed: Thu Aug 28 12:59:59 2014 +0100 ---------------------------------------------------------------------- src/couch_db.erl | 95 ++++++++++++++++++++++++++----------------- src/couch_db_updater.erl | 52 +++++++++++++++++------ src/couch_doc.erl | 12 +++--- 3 files changed, 103 insertions(+), 56 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/74358fce/src/couch_db.erl ---------------------------------------------------------------------- diff --git a/src/couch_db.erl b/src/couch_db.erl index 0b9e490..b0c7894 100644 --- a/src/couch_db.erl +++ b/src/couch_db.erl @@ -500,11 +500,17 @@ update_docs(Db, Docs) -> % group_alike_docs groups the sorted documents into sublist buckets, by id. % ([DocA, DocA, DocB, DocC], []) -> [[DocA, DocA], [DocB], [DocC]] group_alike_docs(Docs) -> - Sorted = lists:sort(fun(#doc{id=A},#doc{id=B})-> A < B end, Docs), - group_alike_docs(Sorted, []). + % Here we're just asserting that our doc sort is stable so that + % if we have duplicate docids we don't have to worry about the + % behavior of lists:sort/2 which isn't documented anyhwere as + % being stable. + WithPos = lists:zip(Docs, lists:seq(1, length(Docs))), + SortFun = fun({D1, P1}, {D2, P2}) -> {D1#doc.id, P1} =< {D2#doc.id, P2} end, + SortedDocs = [D || {D, _} <- lists:sort(SortFun, WithPos)], + group_alike_docs(SortedDocs, []). group_alike_docs([], Buckets) -> - lists:reverse(Buckets); + lists:reverse(lists:map(fun lists:reverse/1, Buckets)); group_alike_docs([Doc|Rest], []) -> group_alike_docs(Rest, [[Doc]]); group_alike_docs([Doc|Rest], [Bucket|RestBuckets]) -> @@ -627,10 +633,10 @@ prep_and_validate_update(Db, #doc{id=Id,revs={RevStart, Revs}}=Doc, prep_and_validate_updates(_Db, [], [], _AllowConflict, AccPrepped, AccFatalErrors) -> - {AccPrepped, AccFatalErrors}; + AccPrepped2 = lists:reverse(lists:map(fun lists:reverse/1, AccPrepped)), + {AccPrepped2, AccFatalErrors}; prep_and_validate_updates(Db, [DocBucket|RestBuckets], [not_found|RestLookups], AllowConflict, AccPrepped, AccErrors) -> - [#doc{id=Id}|_]=DocBucket, % no existing revs are known, {PreppedBucket, AccErrors3} = lists:foldl( fun(#doc{revs=Revs}=Doc, {AccBucket, AccErrors2}) -> @@ -645,11 +651,11 @@ prep_and_validate_updates(Db, [DocBucket|RestBuckets], [not_found|RestLookups], ok -> {[Doc | AccBucket], AccErrors2}; Error -> - {AccBucket, [{{Id, {0, []}}, Error} | AccErrors2]} + {AccBucket, [{doc_tag(Doc), Error} | AccErrors2]} end; _ -> % old revs specified but none exist, a conflict - {AccBucket, [{{Id, Revs}, conflict} | AccErrors2]} + {AccBucket, [{doc_tag(Doc), conflict} | AccErrors2]} end end, {[], AccErrors}, DocBucket), @@ -670,9 +676,9 @@ prep_and_validate_updates(Db, [DocBucket|RestBuckets], LeafRevsDict, AllowConflict) of {ok, Doc2} -> {[Doc2 | Docs2Acc], AccErrors2}; - {Error, #doc{id=Id,revs=Revs}} -> + {Error, _} -> % Record the error - {Docs2Acc, [{{Id, Revs}, Error} |AccErrors2]} + {Docs2Acc, [{doc_tag(Doc), Error} |AccErrors2]} end end, {[], AccErrors}, DocBucket), @@ -687,7 +693,8 @@ update_docs(Db, Docs, Options) -> prep_and_validate_replicated_updates(_Db, [], [], AccPrepped, AccErrors) -> Errors2 = [{{Id, {Pos, Rev}}, Error} || {#doc{id=Id,revs={Pos,[Rev|_]}}, Error} <- AccErrors], - {lists:reverse(AccPrepped), lists:reverse(Errors2)}; + AccPrepped2 = lists:reverse(lists:map(fun lists:reverse/1, AccPrepped)), + {AccPrepped2, lists:reverse(Errors2)}; prep_and_validate_replicated_updates(Db, [Bucket|RestBuckets], [OldInfo|RestOldInfo], AccPrepped, AccErrors) -> case OldInfo of not_found -> @@ -782,10 +789,10 @@ new_revs([], OutBuckets, IdRevsAcc) -> {lists:reverse(OutBuckets), IdRevsAcc}; new_revs([Bucket|RestBuckets], OutBuckets, IdRevsAcc) -> {NewBucket, IdRevsAcc3} = lists:mapfoldl( - fun(#doc{id=Id,revs={Start, RevIds}}=Doc, IdRevsAcc2)-> + fun(#doc{revs={Start, RevIds}}=Doc, IdRevsAcc2)-> NewRevId = new_revid(Doc), {Doc#doc{revs={Start+1, [NewRevId | RevIds]}}, - [{{Id, {Start, RevIds}}, {ok, {Start+1, NewRevId}}} | IdRevsAcc2]} + [{doc_tag(Doc), {ok, {Start+1, NewRevId}}} | IdRevsAcc2]} end, IdRevsAcc, Bucket), new_revs(RestBuckets, [NewBucket|OutBuckets], IdRevsAcc3). @@ -802,9 +809,23 @@ check_dup_atts2(_) -> ok. -update_docs(Db, Docs, Options, replicated_changes) -> +tag_docs([]) -> + []; +tag_docs([#doc{meta=Meta}=Doc | Rest]) -> + [Doc#doc{meta=[{ref, make_ref()} | Meta]} | tag_docs(Rest)]. + +doc_tag(#doc{meta=Meta}) -> + case lists:keyfind(ref, 1, Meta) of + {ref, Ref} when is_reference(Ref) -> Ref; + false -> throw(doc_not_tagged); + Else -> throw({invalid_doc_tag, Else}) + end. + +update_docs(Db, Docs0, Options, replicated_changes) -> increment_stat(Db, {couchdb, database_writes}), + Docs = tag_docs(Docs0), DocBuckets = before_docs_update(Db, group_alike_docs(Docs)), + case (Db#db.validate_doc_funs /= []) orelse lists:any( fun(#doc{id= <>}) -> true; @@ -827,21 +848,17 @@ update_docs(Db, Docs, Options, replicated_changes) -> {ok, []} = write_and_commit(Db, DocBuckets4, [], [merge_conflicts | Options]), {ok, DocErrors}; -update_docs(Db, Docs, Options, interactive_edit) -> +update_docs(Db, Docs0, Options, interactive_edit) -> increment_stat(Db, {couchdb, database_writes}), AllOrNothing = lists:member(all_or_nothing, Options), - % go ahead and generate the new revision ids for the documents. - % separate out the NonRep documents from the rest of the documents - - {Docs2, NonRepDocs} = lists:foldl( - fun(#doc{id=Id}=Doc, {DocsAcc, NonRepDocsAcc}) -> - case Id of - <> -> - {DocsAcc, [Doc | NonRepDocsAcc]}; - Id-> - {[Doc | DocsAcc], NonRepDocsAcc} - end - end, {[], []}, Docs), + Docs = tag_docs(Docs0), + + % Separate _local docs from normal docs + IsLocal = fun + (#doc{id= <>}) -> true; + (_) -> false + end, + {NonRepDocs, Docs2} = lists:partition(IsLocal, Docs), DocBuckets = before_docs_update(Db, group_alike_docs(Docs2)), @@ -868,12 +885,14 @@ update_docs(Db, Docs, Options, interactive_edit) -> end, if (AllOrNothing) and (PreCommitFailures /= []) -> - {aborted, lists:map( - fun({{Id,{Pos, [RevId|_]}}, Error}) -> - {{Id, {Pos, RevId}}, Error}; - ({{Id,{0, []}}, Error}) -> - {{Id, {0, <<>>}}, Error} - end, PreCommitFailures)}; + RefErrorDict = dict:from_list([{doc_tag(Doc), Doc} || Doc <- Docs]), + {aborted, lists:map(fun({Ref, Error}) -> + #doc{id=Id,revs={Start,RevIds}} = dict:fetch(Ref, RefErrorDict), + case {Start, RevIds} of + {Pos, [RevId | _]} -> {{Id, {Pos, RevId}}, Error}; + {0, []} -> {{Id, {0, <<>>}}, Error} + end + end, PreCommitFailures)}; true -> Options2 = if AllOrNothing -> [merge_conflicts]; true -> [] end ++ Options, @@ -885,12 +904,12 @@ update_docs(Db, Docs, Options, interactive_edit) -> {ok, CommitResults} = write_and_commit(Db, DocBuckets4, NonRepDocs, Options2), - ResultsDict = dict:from_list(IdRevs ++ CommitResults ++ PreCommitFailures), - {ok, lists:map( - fun(#doc{id=Id,revs={Pos, RevIds}}) -> - {ok, Result} = dict:find({Id, {Pos, RevIds}}, ResultsDict), - Result - end, Docs)} + ResultsDict = lists:foldl(fun({Key, Resp}, ResultsAcc) -> + dict:store(Key, Resp, ResultsAcc) + end, dict:from_list(IdRevs), CommitResults ++ PreCommitFailures), + {ok, lists:map(fun(Doc) -> + dict:fetch(doc_tag(Doc), ResultsDict) + end, Docs)} end. % Returns the first available document on disk. Input list is a full rev path http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/74358fce/src/couch_db_updater.erl ---------------------------------------------------------------------- diff --git a/src/couch_db_updater.erl b/src/couch_db_updater.erl index 09028bd..7c2a43a 100644 --- a/src/couch_db_updater.erl +++ b/src/couch_db_updater.erl @@ -264,7 +264,7 @@ handle_cast(Msg, #db{name = Name} = Db) -> handle_info({update_docs, Client, GroupedDocs, NonRepDocs, MergeConflicts, FullCommit}, Db) -> - GroupedDocs2 = [[{Client, D} || D <- DocGroup] || DocGroup <- GroupedDocs], + GroupedDocs2 = maybe_tag_grouped_docs(Client, GroupedDocs), if NonRepDocs == [] -> {GroupedDocs3, Clients, FullCommit2} = collect_updates(GroupedDocs2, [Client], MergeConflicts, FullCommit); @@ -319,6 +319,20 @@ handle_info({'DOWN', Ref, _, _, Reason}, #db{fd_monitor=Ref, name=Name} = Db) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. +maybe_tag_grouped_docs(Client, GroupedDocs) -> + lists:map(fun(DocGroup) -> + [{Client, maybe_tag_doc(D)} || D <- DocGroup] + end, GroupedDocs). + +maybe_tag_doc(#doc{id=Id, revs={Pos,[_Rev|PrevRevs]}, meta=Meta0}=Doc) -> + case lists:keymember(ref, 1, Meta0) of + true -> + Doc; + false -> + Key = {Id, {Pos-1, PrevRevs}}, + Doc#doc{meta=[{ref, Key} | Meta0]} + end. + merge_updates([[{_,#doc{id=X}}|_]=A|RestA], [[{_,#doc{id=X}}|_]=B|RestB]) -> [A++B | merge_updates(RestA, RestB)]; merge_updates([[{_,#doc{id=X}}|_]|_]=A, [[{_,#doc{id=Y}}|_]|_]=B) when X < Y -> @@ -337,8 +351,7 @@ collect_updates(GroupedDocsAcc, ClientsAcc, MergeConflicts, FullCommit) -> % updaters than deal with their possible conflicts, and local docs % writes are relatively rare. Can be optmized later if really needed. {update_docs, Client, GroupedDocs, [], MergeConflicts, FullCommit2} -> - GroupedDocs2 = [[{Client, Doc} || Doc <- DocGroup] - || DocGroup <- GroupedDocs], + GroupedDocs2 = maybe_tag_grouped_docs(Client, GroupedDocs), GroupedDocsAcc2 = merge_updates(GroupedDocsAcc, GroupedDocs2), collect_updates(GroupedDocsAcc2, [Client | ClientsAcc], @@ -597,9 +610,16 @@ flush_trees(#db{fd = Fd} = Db, flush_trees(Db, RestUnflushed, [InfoFlushed | AccFlushed]). -send_result(Client, Id, OriginalRevs, NewResult) -> +send_result(Client, Doc, NewResult) -> % used to send a result to the client - catch(Client ! {result, self(), {{Id, OriginalRevs}, NewResult}}). + catch(Client ! {result, self(), {doc_tag(Doc), NewResult}}). + +doc_tag(#doc{meta=Meta}) -> + case lists:keyfind(ref, 1, Meta) of + {ref, Ref} -> Ref; + false -> throw(no_doc_tag); + Else -> throw({invalid_doc_tag, Else}) + end. merge_rev_trees(_Limit, _Merge, [], [], AccNewInfos, AccRemoveSeqs, AccSeq) -> {ok, lists:reverse(AccNewInfos), AccRemoveSeqs, AccSeq}; @@ -613,7 +633,7 @@ merge_rev_trees(Limit, MergeConflicts, [NewDocs|RestDocsList], case couch_key_tree:merge(AccTree, couch_doc:to_path(NewDoc), Limit) of {_NewTree, conflicts} when (not OldDeleted) -> - send_result(Client, Id, {Pos-1,PrevRevs}, conflict), + send_result(Client, NewDoc, conflict), {AccTree, OldDeleted}; {NewTree, conflicts} when PrevRevs /= [] -> % Check to be sure if prev revision was specified, it's @@ -625,7 +645,7 @@ merge_rev_trees(Limit, MergeConflicts, [NewDocs|RestDocsList], if IsPrevLeaf -> {NewTree, OldDeleted}; true -> - send_result(Client, Id, {Pos-1,PrevRevs}, conflict), + send_result(Client, NewDoc, conflict), {AccTree, OldDeleted} end; {NewTree, no_conflicts} when AccTree == NewTree -> @@ -644,11 +664,11 @@ merge_rev_trees(Limit, MergeConflicts, [NewDocs|RestDocsList], {NewTree2, _} = couch_key_tree:merge(AccTree, couch_doc:to_path(NewDoc2), Limit), % we changed the rev id, this tells the caller we did - send_result(Client, Id, {Pos-1,PrevRevs}, - {ok, {OldPos + 1, NewRevId}}), + send_result(Client, NewDoc, + {ok, {OldPos + 1, NewRevId}}), {NewTree2, OldDeleted}; true -> - send_result(Client, Id, {Pos-1,PrevRevs}, conflict), + send_result(Client, NewDoc, conflict), {AccTree, OldDeleted} end; {NewTree, _} -> @@ -754,7 +774,13 @@ update_local_docs(#db{local_tree=Btree}=Db, Docs) -> Ids = [Id || {_Client, #doc{id=Id}} <- Docs], OldDocLookups = couch_btree:lookup(Btree, Ids), BtreeEntries = lists:zipwith( - fun({Client, #doc{id=Id,deleted=Delete,revs={0,PrevRevs},body=Body}}, _OldDocLookup) -> + fun({Client, NewDoc}, _OldDocLookup) -> + #doc{ + id = Id, + deleted = Delete, + revs = {0, PrevRevs}, + body = Body + } = NewDoc, case PrevRevs of [RevStr|_] -> PrevRev = list_to_integer(?b2l(RevStr)); @@ -771,11 +797,11 @@ update_local_docs(#db{local_tree=Btree}=Db, Docs) -> % true -> case Delete of false -> - send_result(Client, Id, {0, PrevRevs}, {ok, + send_result(Client, NewDoc, {ok, {0, ?l2b(integer_to_list(PrevRev + 1))}}), {update, {Id, {PrevRev + 1, Body}}}; true -> - send_result(Client, Id, {0, PrevRevs}, + send_result(Client, NewDoc, {ok, {0, <<"0">>}}), {remove, Id} end%; http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/74358fce/src/couch_doc.erl ---------------------------------------------------------------------- diff --git a/src/couch_doc.erl b/src/couch_doc.erl index 05202f4..14f6a4f 100644 --- a/src/couch_doc.erl +++ b/src/couch_doc.erl @@ -70,7 +70,7 @@ revs_to_strs([{Pos, RevId}| Rest]) -> [rev_to_str({Pos, RevId}) | revs_to_strs(Rest)]. to_json_meta(Meta) -> - lists:map( + lists:flatmap( fun({revs_info, Start, RevsInfo}) -> {JsonRevsInfo, _Pos} = lists:mapfoldl( fun({RevId, Status}, PosAcc) -> @@ -78,13 +78,15 @@ to_json_meta(Meta) -> {<<"status">>, ?l2b(atom_to_list(Status))}]}, {JsonObj, PosAcc - 1} end, Start, RevsInfo), - {<<"_revs_info">>, JsonRevsInfo}; + [{<<"_revs_info">>, JsonRevsInfo}]; ({local_seq, Seq}) -> - {<<"_local_seq">>, Seq}; + [{<<"_local_seq">>, Seq}]; ({conflicts, Conflicts}) -> - {<<"_conflicts">>, revs_to_strs(Conflicts)}; + [{<<"_conflicts">>, revs_to_strs(Conflicts)}]; ({deleted_conflicts, DConflicts}) -> - {<<"_deleted_conflicts">>, revs_to_strs(DConflicts)} + [{<<"_deleted_conflicts">>, revs_to_strs(DConflicts)}]; + (_) -> + [] end, Meta). to_json_attachments(Attachments, Options) ->