Return-Path: X-Original-To: apmail-couchdb-commits-archive@www.apache.org Delivered-To: apmail-couchdb-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5B1A9C702 for ; Thu, 7 Aug 2014 15:37:27 +0000 (UTC) Received: (qmail 87608 invoked by uid 500); 7 Aug 2014 15:37:25 -0000 Delivered-To: apmail-couchdb-commits-archive@couchdb.apache.org Received: (qmail 87431 invoked by uid 500); 7 Aug 2014 15:37:25 -0000 Mailing-List: contact commits-help@couchdb.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@couchdb.apache.org Delivered-To: mailing list commits@couchdb.apache.org Received: (qmail 86970 invoked by uid 99); 7 Aug 2014 15:37:25 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 07 Aug 2014 15:37:25 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 7D6D39C29C8; Thu, 7 Aug 2014 15:37:25 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: rnewson@apache.org To: commits@couchdb.apache.org Date: Thu, 07 Aug 2014 15:37:49 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [27/50] couch commit: updated refs/heads/windsor-merge to 6e60cbe Handle duplicate doc updates in single commit Before this commit, there were bugs in couch_db and couch_db_updater which caused couch to incorrectly report that all duplicate docs in a group commit failed from conflicts. The changes in this commit cause couch to correctly report that one of the duplicate updates in the group commit succeeded. This commit also changes behavior slightly by applying duplicate doc updates in the order they were supplied. This change is for consistency with CouchDB. This work is based on the patch by Bob Dionne. BugzID: 12540 Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch/repo Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch/commit/46607e03 Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch/tree/46607e03 Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch/diff/46607e03 Branch: refs/heads/windsor-merge Commit: 46607e03d2a06a95ff38d6256e037450e886a837 Parents: fcce4b7 Author: Benjamin Bastian Authored: Tue Aug 27 11:26:58 2013 -0700 Committer: Robert Newson Committed: Wed Aug 6 10:34:17 2014 +0100 ---------------------------------------------------------------------- src/couch_db.erl | 95 ++++++++++++++++++++++++++----------------- src/couch_db_updater.erl | 52 +++++++++++++++++------ src/couch_doc.erl | 12 +++--- 3 files changed, 103 insertions(+), 56 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/46607e03/src/couch_db.erl ---------------------------------------------------------------------- diff --git a/src/couch_db.erl b/src/couch_db.erl index 0b9e490..b0c7894 100644 --- a/src/couch_db.erl +++ b/src/couch_db.erl @@ -500,11 +500,17 @@ update_docs(Db, Docs) -> % group_alike_docs groups the sorted documents into sublist buckets, by id. % ([DocA, DocA, DocB, DocC], []) -> [[DocA, DocA], [DocB], [DocC]] group_alike_docs(Docs) -> - Sorted = lists:sort(fun(#doc{id=A},#doc{id=B})-> A < B end, Docs), - group_alike_docs(Sorted, []). + % Here we're just asserting that our doc sort is stable so that + % if we have duplicate docids we don't have to worry about the + % behavior of lists:sort/2 which isn't documented anyhwere as + % being stable. + WithPos = lists:zip(Docs, lists:seq(1, length(Docs))), + SortFun = fun({D1, P1}, {D2, P2}) -> {D1#doc.id, P1} =< {D2#doc.id, P2} end, + SortedDocs = [D || {D, _} <- lists:sort(SortFun, WithPos)], + group_alike_docs(SortedDocs, []). group_alike_docs([], Buckets) -> - lists:reverse(Buckets); + lists:reverse(lists:map(fun lists:reverse/1, Buckets)); group_alike_docs([Doc|Rest], []) -> group_alike_docs(Rest, [[Doc]]); group_alike_docs([Doc|Rest], [Bucket|RestBuckets]) -> @@ -627,10 +633,10 @@ prep_and_validate_update(Db, #doc{id=Id,revs={RevStart, Revs}}=Doc, prep_and_validate_updates(_Db, [], [], _AllowConflict, AccPrepped, AccFatalErrors) -> - {AccPrepped, AccFatalErrors}; + AccPrepped2 = lists:reverse(lists:map(fun lists:reverse/1, AccPrepped)), + {AccPrepped2, AccFatalErrors}; prep_and_validate_updates(Db, [DocBucket|RestBuckets], [not_found|RestLookups], AllowConflict, AccPrepped, AccErrors) -> - [#doc{id=Id}|_]=DocBucket, % no existing revs are known, {PreppedBucket, AccErrors3} = lists:foldl( fun(#doc{revs=Revs}=Doc, {AccBucket, AccErrors2}) -> @@ -645,11 +651,11 @@ prep_and_validate_updates(Db, [DocBucket|RestBuckets], [not_found|RestLookups], ok -> {[Doc | AccBucket], AccErrors2}; Error -> - {AccBucket, [{{Id, {0, []}}, Error} | AccErrors2]} + {AccBucket, [{doc_tag(Doc), Error} | AccErrors2]} end; _ -> % old revs specified but none exist, a conflict - {AccBucket, [{{Id, Revs}, conflict} | AccErrors2]} + {AccBucket, [{doc_tag(Doc), conflict} | AccErrors2]} end end, {[], AccErrors}, DocBucket), @@ -670,9 +676,9 @@ prep_and_validate_updates(Db, [DocBucket|RestBuckets], LeafRevsDict, AllowConflict) of {ok, Doc2} -> {[Doc2 | Docs2Acc], AccErrors2}; - {Error, #doc{id=Id,revs=Revs}} -> + {Error, _} -> % Record the error - {Docs2Acc, [{{Id, Revs}, Error} |AccErrors2]} + {Docs2Acc, [{doc_tag(Doc), Error} |AccErrors2]} end end, {[], AccErrors}, DocBucket), @@ -687,7 +693,8 @@ update_docs(Db, Docs, Options) -> prep_and_validate_replicated_updates(_Db, [], [], AccPrepped, AccErrors) -> Errors2 = [{{Id, {Pos, Rev}}, Error} || {#doc{id=Id,revs={Pos,[Rev|_]}}, Error} <- AccErrors], - {lists:reverse(AccPrepped), lists:reverse(Errors2)}; + AccPrepped2 = lists:reverse(lists:map(fun lists:reverse/1, AccPrepped)), + {AccPrepped2, lists:reverse(Errors2)}; prep_and_validate_replicated_updates(Db, [Bucket|RestBuckets], [OldInfo|RestOldInfo], AccPrepped, AccErrors) -> case OldInfo of not_found -> @@ -782,10 +789,10 @@ new_revs([], OutBuckets, IdRevsAcc) -> {lists:reverse(OutBuckets), IdRevsAcc}; new_revs([Bucket|RestBuckets], OutBuckets, IdRevsAcc) -> {NewBucket, IdRevsAcc3} = lists:mapfoldl( - fun(#doc{id=Id,revs={Start, RevIds}}=Doc, IdRevsAcc2)-> + fun(#doc{revs={Start, RevIds}}=Doc, IdRevsAcc2)-> NewRevId = new_revid(Doc), {Doc#doc{revs={Start+1, [NewRevId | RevIds]}}, - [{{Id, {Start, RevIds}}, {ok, {Start+1, NewRevId}}} | IdRevsAcc2]} + [{doc_tag(Doc), {ok, {Start+1, NewRevId}}} | IdRevsAcc2]} end, IdRevsAcc, Bucket), new_revs(RestBuckets, [NewBucket|OutBuckets], IdRevsAcc3). @@ -802,9 +809,23 @@ check_dup_atts2(_) -> ok. -update_docs(Db, Docs, Options, replicated_changes) -> +tag_docs([]) -> + []; +tag_docs([#doc{meta=Meta}=Doc | Rest]) -> + [Doc#doc{meta=[{ref, make_ref()} | Meta]} | tag_docs(Rest)]. + +doc_tag(#doc{meta=Meta}) -> + case lists:keyfind(ref, 1, Meta) of + {ref, Ref} when is_reference(Ref) -> Ref; + false -> throw(doc_not_tagged); + Else -> throw({invalid_doc_tag, Else}) + end. + +update_docs(Db, Docs0, Options, replicated_changes) -> increment_stat(Db, {couchdb, database_writes}), + Docs = tag_docs(Docs0), DocBuckets = before_docs_update(Db, group_alike_docs(Docs)), + case (Db#db.validate_doc_funs /= []) orelse lists:any( fun(#doc{id= <>}) -> true; @@ -827,21 +848,17 @@ update_docs(Db, Docs, Options, replicated_changes) -> {ok, []} = write_and_commit(Db, DocBuckets4, [], [merge_conflicts | Options]), {ok, DocErrors}; -update_docs(Db, Docs, Options, interactive_edit) -> +update_docs(Db, Docs0, Options, interactive_edit) -> increment_stat(Db, {couchdb, database_writes}), AllOrNothing = lists:member(all_or_nothing, Options), - % go ahead and generate the new revision ids for the documents. - % separate out the NonRep documents from the rest of the documents - - {Docs2, NonRepDocs} = lists:foldl( - fun(#doc{id=Id}=Doc, {DocsAcc, NonRepDocsAcc}) -> - case Id of - <> -> - {DocsAcc, [Doc | NonRepDocsAcc]}; - Id-> - {[Doc | DocsAcc], NonRepDocsAcc} - end - end, {[], []}, Docs), + Docs = tag_docs(Docs0), + + % Separate _local docs from normal docs + IsLocal = fun + (#doc{id= <>}) -> true; + (_) -> false + end, + {NonRepDocs, Docs2} = lists:partition(IsLocal, Docs), DocBuckets = before_docs_update(Db, group_alike_docs(Docs2)), @@ -868,12 +885,14 @@ update_docs(Db, Docs, Options, interactive_edit) -> end, if (AllOrNothing) and (PreCommitFailures /= []) -> - {aborted, lists:map( - fun({{Id,{Pos, [RevId|_]}}, Error}) -> - {{Id, {Pos, RevId}}, Error}; - ({{Id,{0, []}}, Error}) -> - {{Id, {0, <<>>}}, Error} - end, PreCommitFailures)}; + RefErrorDict = dict:from_list([{doc_tag(Doc), Doc} || Doc <- Docs]), + {aborted, lists:map(fun({Ref, Error}) -> + #doc{id=Id,revs={Start,RevIds}} = dict:fetch(Ref, RefErrorDict), + case {Start, RevIds} of + {Pos, [RevId | _]} -> {{Id, {Pos, RevId}}, Error}; + {0, []} -> {{Id, {0, <<>>}}, Error} + end + end, PreCommitFailures)}; true -> Options2 = if AllOrNothing -> [merge_conflicts]; true -> [] end ++ Options, @@ -885,12 +904,12 @@ update_docs(Db, Docs, Options, interactive_edit) -> {ok, CommitResults} = write_and_commit(Db, DocBuckets4, NonRepDocs, Options2), - ResultsDict = dict:from_list(IdRevs ++ CommitResults ++ PreCommitFailures), - {ok, lists:map( - fun(#doc{id=Id,revs={Pos, RevIds}}) -> - {ok, Result} = dict:find({Id, {Pos, RevIds}}, ResultsDict), - Result - end, Docs)} + ResultsDict = lists:foldl(fun({Key, Resp}, ResultsAcc) -> + dict:store(Key, Resp, ResultsAcc) + end, dict:from_list(IdRevs), CommitResults ++ PreCommitFailures), + {ok, lists:map(fun(Doc) -> + dict:fetch(doc_tag(Doc), ResultsDict) + end, Docs)} end. % Returns the first available document on disk. Input list is a full rev path http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/46607e03/src/couch_db_updater.erl ---------------------------------------------------------------------- diff --git a/src/couch_db_updater.erl b/src/couch_db_updater.erl index 09028bd..7c2a43a 100644 --- a/src/couch_db_updater.erl +++ b/src/couch_db_updater.erl @@ -264,7 +264,7 @@ handle_cast(Msg, #db{name = Name} = Db) -> handle_info({update_docs, Client, GroupedDocs, NonRepDocs, MergeConflicts, FullCommit}, Db) -> - GroupedDocs2 = [[{Client, D} || D <- DocGroup] || DocGroup <- GroupedDocs], + GroupedDocs2 = maybe_tag_grouped_docs(Client, GroupedDocs), if NonRepDocs == [] -> {GroupedDocs3, Clients, FullCommit2} = collect_updates(GroupedDocs2, [Client], MergeConflicts, FullCommit); @@ -319,6 +319,20 @@ handle_info({'DOWN', Ref, _, _, Reason}, #db{fd_monitor=Ref, name=Name} = Db) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. +maybe_tag_grouped_docs(Client, GroupedDocs) -> + lists:map(fun(DocGroup) -> + [{Client, maybe_tag_doc(D)} || D <- DocGroup] + end, GroupedDocs). + +maybe_tag_doc(#doc{id=Id, revs={Pos,[_Rev|PrevRevs]}, meta=Meta0}=Doc) -> + case lists:keymember(ref, 1, Meta0) of + true -> + Doc; + false -> + Key = {Id, {Pos-1, PrevRevs}}, + Doc#doc{meta=[{ref, Key} | Meta0]} + end. + merge_updates([[{_,#doc{id=X}}|_]=A|RestA], [[{_,#doc{id=X}}|_]=B|RestB]) -> [A++B | merge_updates(RestA, RestB)]; merge_updates([[{_,#doc{id=X}}|_]|_]=A, [[{_,#doc{id=Y}}|_]|_]=B) when X < Y -> @@ -337,8 +351,7 @@ collect_updates(GroupedDocsAcc, ClientsAcc, MergeConflicts, FullCommit) -> % updaters than deal with their possible conflicts, and local docs % writes are relatively rare. Can be optmized later if really needed. {update_docs, Client, GroupedDocs, [], MergeConflicts, FullCommit2} -> - GroupedDocs2 = [[{Client, Doc} || Doc <- DocGroup] - || DocGroup <- GroupedDocs], + GroupedDocs2 = maybe_tag_grouped_docs(Client, GroupedDocs), GroupedDocsAcc2 = merge_updates(GroupedDocsAcc, GroupedDocs2), collect_updates(GroupedDocsAcc2, [Client | ClientsAcc], @@ -597,9 +610,16 @@ flush_trees(#db{fd = Fd} = Db, flush_trees(Db, RestUnflushed, [InfoFlushed | AccFlushed]). -send_result(Client, Id, OriginalRevs, NewResult) -> +send_result(Client, Doc, NewResult) -> % used to send a result to the client - catch(Client ! {result, self(), {{Id, OriginalRevs}, NewResult}}). + catch(Client ! {result, self(), {doc_tag(Doc), NewResult}}). + +doc_tag(#doc{meta=Meta}) -> + case lists:keyfind(ref, 1, Meta) of + {ref, Ref} -> Ref; + false -> throw(no_doc_tag); + Else -> throw({invalid_doc_tag, Else}) + end. merge_rev_trees(_Limit, _Merge, [], [], AccNewInfos, AccRemoveSeqs, AccSeq) -> {ok, lists:reverse(AccNewInfos), AccRemoveSeqs, AccSeq}; @@ -613,7 +633,7 @@ merge_rev_trees(Limit, MergeConflicts, [NewDocs|RestDocsList], case couch_key_tree:merge(AccTree, couch_doc:to_path(NewDoc), Limit) of {_NewTree, conflicts} when (not OldDeleted) -> - send_result(Client, Id, {Pos-1,PrevRevs}, conflict), + send_result(Client, NewDoc, conflict), {AccTree, OldDeleted}; {NewTree, conflicts} when PrevRevs /= [] -> % Check to be sure if prev revision was specified, it's @@ -625,7 +645,7 @@ merge_rev_trees(Limit, MergeConflicts, [NewDocs|RestDocsList], if IsPrevLeaf -> {NewTree, OldDeleted}; true -> - send_result(Client, Id, {Pos-1,PrevRevs}, conflict), + send_result(Client, NewDoc, conflict), {AccTree, OldDeleted} end; {NewTree, no_conflicts} when AccTree == NewTree -> @@ -644,11 +664,11 @@ merge_rev_trees(Limit, MergeConflicts, [NewDocs|RestDocsList], {NewTree2, _} = couch_key_tree:merge(AccTree, couch_doc:to_path(NewDoc2), Limit), % we changed the rev id, this tells the caller we did - send_result(Client, Id, {Pos-1,PrevRevs}, - {ok, {OldPos + 1, NewRevId}}), + send_result(Client, NewDoc, + {ok, {OldPos + 1, NewRevId}}), {NewTree2, OldDeleted}; true -> - send_result(Client, Id, {Pos-1,PrevRevs}, conflict), + send_result(Client, NewDoc, conflict), {AccTree, OldDeleted} end; {NewTree, _} -> @@ -754,7 +774,13 @@ update_local_docs(#db{local_tree=Btree}=Db, Docs) -> Ids = [Id || {_Client, #doc{id=Id}} <- Docs], OldDocLookups = couch_btree:lookup(Btree, Ids), BtreeEntries = lists:zipwith( - fun({Client, #doc{id=Id,deleted=Delete,revs={0,PrevRevs},body=Body}}, _OldDocLookup) -> + fun({Client, NewDoc}, _OldDocLookup) -> + #doc{ + id = Id, + deleted = Delete, + revs = {0, PrevRevs}, + body = Body + } = NewDoc, case PrevRevs of [RevStr|_] -> PrevRev = list_to_integer(?b2l(RevStr)); @@ -771,11 +797,11 @@ update_local_docs(#db{local_tree=Btree}=Db, Docs) -> % true -> case Delete of false -> - send_result(Client, Id, {0, PrevRevs}, {ok, + send_result(Client, NewDoc, {ok, {0, ?l2b(integer_to_list(PrevRev + 1))}}), {update, {Id, {PrevRev + 1, Body}}}; true -> - send_result(Client, Id, {0, PrevRevs}, + send_result(Client, NewDoc, {ok, {0, <<"0">>}}), {remove, Id} end%; http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/46607e03/src/couch_doc.erl ---------------------------------------------------------------------- diff --git a/src/couch_doc.erl b/src/couch_doc.erl index 05202f4..14f6a4f 100644 --- a/src/couch_doc.erl +++ b/src/couch_doc.erl @@ -70,7 +70,7 @@ revs_to_strs([{Pos, RevId}| Rest]) -> [rev_to_str({Pos, RevId}) | revs_to_strs(Rest)]. to_json_meta(Meta) -> - lists:map( + lists:flatmap( fun({revs_info, Start, RevsInfo}) -> {JsonRevsInfo, _Pos} = lists:mapfoldl( fun({RevId, Status}, PosAcc) -> @@ -78,13 +78,15 @@ to_json_meta(Meta) -> {<<"status">>, ?l2b(atom_to_list(Status))}]}, {JsonObj, PosAcc - 1} end, Start, RevsInfo), - {<<"_revs_info">>, JsonRevsInfo}; + [{<<"_revs_info">>, JsonRevsInfo}]; ({local_seq, Seq}) -> - {<<"_local_seq">>, Seq}; + [{<<"_local_seq">>, Seq}]; ({conflicts, Conflicts}) -> - {<<"_conflicts">>, revs_to_strs(Conflicts)}; + [{<<"_conflicts">>, revs_to_strs(Conflicts)}]; ({deleted_conflicts, DConflicts}) -> - {<<"_deleted_conflicts">>, revs_to_strs(DConflicts)} + [{<<"_deleted_conflicts">>, revs_to_strs(DConflicts)}]; + (_) -> + [] end, Meta). to_json_attachments(Attachments, Options) ->