Return-Path: X-Original-To: apmail-couchdb-commits-archive@www.apache.org Delivered-To: apmail-couchdb-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 4503D9624 for ; Fri, 18 Nov 2011 20:35:48 +0000 (UTC) Received: (qmail 27319 invoked by uid 500); 18 Nov 2011 20:35:48 -0000 Delivered-To: apmail-couchdb-commits-archive@couchdb.apache.org Received: (qmail 27285 invoked by uid 500); 18 Nov 2011 20:35:48 -0000 Mailing-List: contact commits-help@couchdb.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@couchdb.apache.org Delivered-To: mailing list commits@couchdb.apache.org Received: (qmail 27278 invoked by uid 99); 18 Nov 2011 20:35:48 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 18 Nov 2011 20:35:48 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.114] (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 18 Nov 2011 20:35:46 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 2239F54D82; Fri, 18 Nov 2011 20:35:26 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: bitdiddle@apache.org To: commits@couchdb.apache.org X-Mailer: ASF-Git Admin Mailer Subject: git commit: Add references to docs to prevent dups from being collapsed Message-Id: <20111118203526.2239F54D82@tyr.zones.apache.org> Date: Fri, 18 Nov 2011 20:35:26 +0000 (UTC) Updated Branches: refs/heads/master bcf746557 -> 5b1430c12 Add references to docs to prevent dups from being collapsed Bulk docs requests may have duplicates or multiple docs with the same id. These were being collapsed in a dictionary as messages are passed from merge_rev_trees in couch_db_updater to collect_results in couch_db. Attaching a reference allows each to be processed correctly. Jira-911 Project: http://git-wip-us.apache.org/repos/asf/couchdb/repo Commit: http://git-wip-us.apache.org/repos/asf/couchdb/commit/5b1430c1 Tree: http://git-wip-us.apache.org/repos/asf/couchdb/tree/5b1430c1 Diff: http://git-wip-us.apache.org/repos/asf/couchdb/diff/5b1430c1 Branch: refs/heads/master Commit: 5b1430c120904181313848444dbfcdb60e42568b Parents: bcf7465 Author: Bob Dionne Authored: Tue Oct 11 16:04:09 2011 -0400 Committer: Bob Dionne Committed: Fri Nov 18 15:34:57 2011 -0500 ---------------------------------------------------------------------- src/couchdb/couch_db.erl | 118 +++++++++++++++++---------------- src/couchdb/couch_db_updater.erl | 31 ++++----- 2 files changed, 77 insertions(+), 72 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/couchdb/blob/5b1430c1/src/couchdb/couch_db.erl ---------------------------------------------------------------------- diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl index 0454f3e..2d7c45e 100644 --- a/src/couchdb/couch_db.erl +++ b/src/couchdb/couch_db.erl @@ -436,22 +436,22 @@ update_docs(Db, Docs) -> % group_alike_docs groups the sorted documents into sublist buckets, by id. % ([DocA, DocA, DocB, DocC], []) -> [[DocA, DocA], [DocB], [DocC]] group_alike_docs(Docs) -> - Sorted = lists:sort(fun(#doc{id=A},#doc{id=B})-> A < B end, Docs), + Sorted = lists:sort(fun({#doc{id=A},_},{#doc{id=B},_})-> A < B end, Docs), group_alike_docs(Sorted, []). group_alike_docs([], Buckets) -> - lists:reverse(Buckets); + lists:reverse(lists:map(fun lists:reverse/1, Buckets)); group_alike_docs([Doc|Rest], []) -> group_alike_docs(Rest, [[Doc]]); -group_alike_docs([Doc|Rest], [Bucket|RestBuckets]) -> - [#doc{id=BucketId}|_] = Bucket, +group_alike_docs([{Doc,Ref}|Rest], [Bucket|RestBuckets]) -> + [{#doc{id=BucketId},_Ref}|_] = Bucket, case Doc#doc.id == BucketId of true -> % add to existing bucket - group_alike_docs(Rest, [[Doc|Bucket]|RestBuckets]); + group_alike_docs(Rest, [[{Doc,Ref}|Bucket]|RestBuckets]); false -> % add to new bucket - group_alike_docs(Rest, [[Doc]|[Bucket|RestBuckets]]) + group_alike_docs(Rest, [[{Doc,Ref}]|[Bucket|RestBuckets]]) end. validate_doc_update(#db{}=Db, #doc{id= <<"_design/",_/binary>>}, _GetDiskDocFun) -> @@ -514,10 +514,8 @@ prep_and_validate_updates(_Db, [], [], _AllowConflict, AccPrepped, {AccPrepped, AccFatalErrors}; prep_and_validate_updates(Db, [DocBucket|RestBuckets], [not_found|RestLookups], AllowConflict, AccPrepped, AccErrors) -> - [#doc{id=Id}|_]=DocBucket, - % no existing revs are known, {PreppedBucket, AccErrors3} = lists:foldl( - fun(#doc{revs=Revs}=Doc, {AccBucket, AccErrors2}) -> + fun({#doc{revs=Revs}=Doc,Ref}, {AccBucket, AccErrors2}) -> case couch_doc:has_stubs(Doc) of true -> couch_doc:merge_stubs(Doc, #doc{}); % will throw exception @@ -527,19 +525,19 @@ prep_and_validate_updates(Db, [DocBucket|RestBuckets], [not_found|RestLookups], {0, []} -> case validate_doc_update(Db, Doc, fun() -> nil end) of ok -> - {[Doc | AccBucket], AccErrors2}; + {[{Doc, Ref} | AccBucket], AccErrors2}; Error -> - {AccBucket, [{{Id, {0, []}}, Error} | AccErrors2]} + {AccBucket, [{Ref, Error} | AccErrors2]} end; _ -> % old revs specified but none exist, a conflict - {AccBucket, [{{Id, Revs}, conflict} | AccErrors2]} + {AccBucket, [{Ref, conflict} | AccErrors2]} end end, {[], AccErrors}, DocBucket), prep_and_validate_updates(Db, RestBuckets, RestLookups, AllowConflict, - [PreppedBucket | AccPrepped], AccErrors3); + [lists:reverse(PreppedBucket) | AccPrepped], AccErrors3); prep_and_validate_updates(Db, [DocBucket|RestBuckets], [{ok, #full_doc_info{rev_tree=OldRevTree}=OldFullDocInfo}|RestLookups], AllowConflict, AccPrepped, AccErrors) -> @@ -553,14 +551,14 @@ prep_and_validate_updates(Db, [DocBucket|RestBuckets], {LeafVal, {Start, [RevId | _]} = Revs} <- Leafs ]), {PreppedBucket, AccErrors3} = lists:foldl( - fun(Doc, {Docs2Acc, AccErrors2}) -> + fun({Doc, Ref}, {Docs2Acc, AccErrors2}) -> case prep_and_validate_update(Db, Doc, OldFullDocInfo, LeafRevsDict, AllowConflict) of {ok, Doc2} -> - {[Doc2 | Docs2Acc], AccErrors2}; - {Error, #doc{id=Id,revs=Revs}} -> + {[{Doc2, Ref} | Docs2Acc], AccErrors2}; + {Error, #doc{}} -> % Record the error - {Docs2Acc, [{{Id, Revs}, Error} |AccErrors2]} + {Docs2Acc, [{Ref, Error} |AccErrors2]} end end, {[], AccErrors}, DocBucket), @@ -580,7 +578,7 @@ prep_and_validate_replicated_updates(Db, [Bucket|RestBuckets], [OldInfo|RestOldI case OldInfo of not_found -> {ValidatedBucket, AccErrors3} = lists:foldl( - fun(Doc, {AccPrepped2, AccErrors2}) -> + fun({Doc, Ref}, {AccPrepped2, AccErrors2}) -> case couch_doc:has_stubs(Doc) of true -> couch_doc:merge_stubs(Doc, #doc{}); % will throw exception @@ -588,7 +586,7 @@ prep_and_validate_replicated_updates(Db, [Bucket|RestBuckets], [OldInfo|RestOldI end, case validate_doc_update(Db, Doc, fun() -> nil end) of ok -> - {[Doc | AccPrepped2], AccErrors2}; + {[{Doc, Ref} | AccPrepped2], AccErrors2}; Error -> {AccPrepped2, [{Doc, Error} | AccErrors2]} end @@ -597,7 +595,7 @@ prep_and_validate_replicated_updates(Db, [Bucket|RestBuckets], [OldInfo|RestOldI prep_and_validate_replicated_updates(Db, RestBuckets, RestOldInfo, [ValidatedBucket | AccPrepped], AccErrors3); {ok, #full_doc_info{rev_tree=OldTree}} -> NewRevTree = lists:foldl( - fun(NewDoc, AccTree) -> + fun({NewDoc, _Ref}, AccTree) -> {NewTree, _} = couch_key_tree:merge(AccTree, couch_doc:to_path(NewDoc), Db#db.revs_limit), NewTree @@ -607,7 +605,7 @@ prep_and_validate_replicated_updates(Db, [Bucket|RestBuckets], [OldInfo|RestOldI LeafRevsFullDict = dict:from_list( [{{Start, RevId}, FullPath} || {Start, [{RevId, _}|_]}=FullPath <- Leafs]), {ValidatedBucket, AccErrors3} = lists:foldl( - fun(#doc{id=Id,revs={Pos, [RevId|_]}}=Doc, {AccValidated, AccErrors2}) -> + fun({#doc{id=Id,revs={Pos, [RevId|_]}}=Doc, Ref}, {AccValidated, AccErrors2}) -> case dict:find({Pos, RevId}, LeafRevsFullDict) of {ok, {Start, Path}} -> % our unflushed doc is a leaf node. Go back on the path @@ -629,7 +627,7 @@ prep_and_validate_replicated_updates(Db, [Bucket|RestBuckets], [OldInfo|RestOldI case validate_doc_update(Db, Doc2, GetDiskDocFun) of ok -> - {[Doc2 | AccValidated], AccErrors2}; + {[{Doc2, Ref} | AccValidated], AccErrors2}; Error -> {AccValidated, [{Doc, Error} | AccErrors2]} end; @@ -661,10 +659,10 @@ new_revs([], OutBuckets, IdRevsAcc) -> {lists:reverse(OutBuckets), IdRevsAcc}; new_revs([Bucket|RestBuckets], OutBuckets, IdRevsAcc) -> {NewBucket, IdRevsAcc3} = lists:mapfoldl( - fun(#doc{id=Id,revs={Start, RevIds}}=Doc, IdRevsAcc2)-> + fun({#doc{revs={Start, RevIds}}=Doc, Ref}, IdRevsAcc2)-> NewRevId = new_revid(Doc), - {Doc#doc{revs={Start+1, [NewRevId | RevIds]}}, - [{{Id, {Start, RevIds}}, {ok, {Start+1, NewRevId}}} | IdRevsAcc2]} + {{Doc#doc{revs={Start+1, [NewRevId | RevIds]}}, Ref}, + [{Ref, {ok, {Start+1, NewRevId}}} | IdRevsAcc2]} end, IdRevsAcc, Bucket), new_revs(RestBuckets, [NewBucket|OutBuckets], IdRevsAcc3). @@ -683,16 +681,17 @@ check_dup_atts2(_) -> update_docs(Db, Docs, Options, replicated_changes) -> increment_stat(Db, {couchdb, database_writes}), - DocBuckets = group_alike_docs(Docs), - + % associate reference with each doc in order to track duplicates + Docs2 = lists:map(fun(Doc) -> {Doc, make_ref()} end, Docs), + DocBuckets = group_alike_docs(Docs2), case (Db#db.validate_doc_funs /= []) orelse lists:any( - fun(#doc{id= <>}) -> true; - (#doc{atts=Atts}) -> + fun({#doc{id= <>}, _Ref}) -> true; + ({#doc{atts=Atts}, _Ref}) -> Atts /= [] - end, Docs) of + end, Docs2) of true -> - Ids = [Id || [#doc{id=Id}|_] <- DocBuckets], + Ids = [Id || [{#doc{id=Id}, _Ref}|_] <- DocBuckets], ExistingDocs = get_full_doc_infos(Db, Ids), {DocBuckets2, DocErrors} = @@ -702,8 +701,8 @@ update_docs(Db, Docs, Options, replicated_changes) -> DocErrors = [], DocBuckets3 = DocBuckets end, - DocBuckets4 = [[doc_flush_atts(check_dup_atts(Doc), Db#db.updater_fd) - || Doc <- Bucket] || Bucket <- DocBuckets3], + DocBuckets4 = [[{doc_flush_atts(check_dup_atts(Doc), Db#db.updater_fd), Ref} + || {Doc, Ref} <- Bucket] || Bucket <- DocBuckets3], {ok, []} = write_and_commit(Db, DocBuckets4, [], [merge_conflicts | Options]), {ok, DocErrors}; @@ -712,28 +711,31 @@ update_docs(Db, Docs, Options, interactive_edit) -> AllOrNothing = lists:member(all_or_nothing, Options), % go ahead and generate the new revision ids for the documents. % separate out the NonRep documents from the rest of the documents - {Docs2, NonRepDocs} = lists:foldl( - fun(#doc{id=Id}=Doc, {DocsAcc, NonRepDocsAcc}) -> + + % associate reference with each doc in order to track duplicates + Docs2 = lists:map(fun(Doc) -> {Doc, make_ref()} end,Docs), + {Docs3, NonRepDocs} = lists:foldl( + fun({#doc{id=Id},_Ref}=Doc, {DocsAcc, NonRepDocsAcc}) -> case Id of <> -> {DocsAcc, [Doc | NonRepDocsAcc]}; Id-> {[Doc | DocsAcc], NonRepDocsAcc} end - end, {[], []}, Docs), + end, {[], []}, Docs2), - DocBuckets = group_alike_docs(Docs2), + DocBuckets = group_alike_docs(Docs3), case (Db#db.validate_doc_funs /= []) orelse lists:any( - fun(#doc{id= <>}) -> + fun({#doc{id= <>}, _Ref}) -> true; - (#doc{atts=Atts}) -> + ({#doc{atts=Atts}, _Ref}) -> Atts /= [] - end, Docs2) of + end, Docs3) of true -> % lookup the doc by id and get the most recent - Ids = [Id || [#doc{id=Id}|_] <- DocBuckets], + Ids = [Id || [{#doc{id=Id}, _Ref}|_] <- DocBuckets], ExistingDocInfos = get_full_doc_infos(Db, Ids), {DocBucketsPrepped, PreCommitFailures} = prep_and_validate_updates(Db, @@ -747,29 +749,33 @@ update_docs(Db, Docs, Options, interactive_edit) -> end, if (AllOrNothing) and (PreCommitFailures /= []) -> - {aborted, lists:map( - fun({{Id,{Pos, [RevId|_]}}, Error}) -> - {{Id, {Pos, RevId}}, Error}; - ({{Id,{0, []}}, Error}) -> - {{Id, {0, <<>>}}, Error} - end, PreCommitFailures)}; + {aborted, + lists:foldl(fun({#doc{id=Id,revs={Pos, RevIds}}, Ref},Acc) -> + case lists:keyfind(Ref,1,PreCommitFailures) of + {Ref, Error} -> + [{{Id,{Pos,RevIds}}, Error} | Acc]; + false -> + Acc + end + end,[],Docs3)}; + true -> Options2 = if AllOrNothing -> [merge_conflicts]; true -> [] end ++ Options, DocBuckets3 = [[ - doc_flush_atts(set_new_att_revpos( - check_dup_atts(Doc)), Db#db.updater_fd) - || Doc <- B] || B <- DocBuckets2], + {doc_flush_atts(set_new_att_revpos( + check_dup_atts(Doc)), Db#db.updater_fd), Ref} + || {Doc, Ref} <- B] || B <- DocBuckets2], {DocBuckets4, IdRevs} = new_revs(DocBuckets3, [], []), {ok, CommitResults} = write_and_commit(Db, DocBuckets4, NonRepDocs, Options2), ResultsDict = dict:from_list(IdRevs ++ CommitResults ++ PreCommitFailures), {ok, lists:map( - fun(#doc{id=Id,revs={Pos, RevIds}}) -> - {ok, Result} = dict:find({Id, {Pos, RevIds}}, ResultsDict), + fun({#doc{}, Ref}) -> + {ok, Result} = dict:find(Ref, ResultsDict), Result - end, Docs)} + end, Docs2)} end. % Returns the first available document on disk. Input list is a full rev path @@ -832,7 +838,7 @@ write_and_commit(#db{update_pid=UpdatePid}=Db, DocBuckets1, % compaction. Retry by reopening the db and writing to the current file {ok, Db2} = open_ref_counted(Db#db.main_pid, self()), DocBuckets2 = [ - [doc_flush_atts(Doc, Db2#db.updater_fd) || Doc <- Bucket] || + [{doc_flush_atts(Doc, Db2#db.updater_fd), Ref} || {Doc, Ref} <- Bucket] || Bucket <- DocBuckets1 ], % We only retry once @@ -851,7 +857,7 @@ write_and_commit(#db{update_pid=UpdatePid}=Db, DocBuckets1, prepare_doc_summaries(Db, BucketList) -> [lists:map( - fun(#doc{body = Body, atts = Atts} = Doc) -> + fun({#doc{body = Body, atts = Atts} = Doc, Ref}) -> DiskAtts = [{N, T, P, AL, DL, R, M, E} || #att{name = N, type = T, data = {_, P}, md5 = M, revpos = R, att_len = AL, disk_len = DL, encoding = E} <- Atts], @@ -862,7 +868,7 @@ prepare_doc_summaries(Db, BucketList) -> nil end, SummaryChunk = couch_db_updater:make_doc_summary(Db, {Body, DiskAtts}), - Doc#doc{body = {summary, SummaryChunk, AttsFd}} + {Doc#doc{body = {summary, SummaryChunk, AttsFd}}, Ref} end, Bucket) || Bucket <- BucketList]. http://git-wip-us.apache.org/repos/asf/couchdb/blob/5b1430c1/src/couchdb/couch_db_updater.erl ---------------------------------------------------------------------- diff --git a/src/couchdb/couch_db_updater.erl b/src/couchdb/couch_db_updater.erl index e15a944..2b6635c 100644 --- a/src/couchdb/couch_db_updater.erl +++ b/src/couchdb/couch_db_updater.erl @@ -274,8 +274,8 @@ merge_updates([], RestB, AccOutGroups) -> lists:reverse(AccOutGroups, RestB); merge_updates(RestA, [], AccOutGroups) -> lists:reverse(AccOutGroups, RestA); -merge_updates([[{_, #doc{id=IdA}}|_]=GroupA | RestA], - [[{_, #doc{id=IdB}}|_]=GroupB | RestB], AccOutGroups) -> +merge_updates([[{_, {#doc{id=IdA}, _}}|_]=GroupA | RestA], + [[{_, {#doc{id=IdB}, _}}|_]=GroupB | RestB], AccOutGroups) -> if IdA == IdB -> merge_updates(RestA, RestB, [GroupA ++ GroupB | AccOutGroups]); IdA < IdB -> @@ -565,9 +565,9 @@ flush_trees(#db{updater_fd = Fd} = Db, flush_trees(Db, RestUnflushed, [InfoFlushed | AccFlushed]). -send_result(Client, Id, OriginalRevs, NewResult) -> +send_result(Client, Ref, NewResult) -> % used to send a result to the client - catch(Client ! {result, self(), {{Id, OriginalRevs}, NewResult}}). + catch(Client ! {result, self(), {Ref, NewResult}}). merge_rev_trees(_Limit, _Merge, [], [], AccNewInfos, AccRemoveSeqs, AccSeq) -> {ok, lists:reverse(AccNewInfos), AccRemoveSeqs, AccSeq}; @@ -576,12 +576,12 @@ merge_rev_trees(Limit, MergeConflicts, [NewDocs|RestDocsList], #full_doc_info{id=Id,rev_tree=OldTree,deleted=OldDeleted,update_seq=OldSeq} = OldDocInfo, NewRevTree = lists:foldl( - fun({Client, #doc{revs={Pos,[_Rev|PrevRevs]}}=NewDoc}, AccTree) -> + fun({Client, {#doc{revs={Pos,[_Rev|PrevRevs]}}=NewDoc, Ref}}, AccTree) -> if not MergeConflicts -> case couch_key_tree:merge(AccTree, couch_doc:to_path(NewDoc), Limit) of {_NewTree, conflicts} when (not OldDeleted) -> - send_result(Client, Id, {Pos-1,PrevRevs}, conflict), + send_result(Client, Ref, conflict), AccTree; {NewTree, conflicts} when PrevRevs /= [] -> % Check to be sure if prev revision was specified, it's @@ -593,7 +593,7 @@ merge_rev_trees(Limit, MergeConflicts, [NewDocs|RestDocsList], if IsPrevLeaf -> NewTree; true -> - send_result(Client, Id, {Pos-1,PrevRevs}, conflict), + send_result(Client, Ref, conflict), AccTree end; {NewTree, no_conflicts} when AccTree == NewTree -> @@ -612,11 +612,10 @@ merge_rev_trees(Limit, MergeConflicts, [NewDocs|RestDocsList], {NewTree2, _} = couch_key_tree:merge(AccTree, couch_doc:to_path(NewDoc2), Limit), % we changed the rev id, this tells the caller we did - send_result(Client, Id, {Pos-1,PrevRevs}, - {ok, {OldPos + 1, NewRevId}}), + send_result(Client, Ref, {ok, {OldPos + 1, NewRevId}}), NewTree2; true -> - send_result(Client, Id, {Pos-1,PrevRevs}, conflict), + send_result(Client, Ref, conflict), AccTree end; {NewTree, _} -> @@ -674,7 +673,7 @@ update_docs_int(Db, DocsList, NonRepDocs, MergeConflicts, FullCommit) -> update_seq = LastSeq, revs_limit = RevsLimit } = Db, - Ids = [Id || [{_Client, #doc{id=Id}}|_] <- DocsList], + Ids = [Id || [{_Client, {#doc{id=Id}, _Ref}}|_] <- DocsList], % lookup up the old documents, if they exist. OldDocLookups = couch_btree:lookup(DocInfoByIdBTree, Ids), OldDocInfos = lists:zipwith( @@ -722,10 +721,10 @@ update_docs_int(Db, DocsList, NonRepDocs, MergeConflicts, FullCommit) -> update_local_docs(Db, []) -> {ok, Db}; update_local_docs(#db{local_docs_btree=Btree}=Db, Docs) -> - Ids = [Id || {_Client, #doc{id=Id}} <- Docs], + Ids = [Id || {_Client, {#doc{id=Id}, _Ref}} <- Docs], OldDocLookups = couch_btree:lookup(Btree, Ids), BtreeEntries = lists:zipwith( - fun({Client, #doc{id=Id,deleted=Delete,revs={0,PrevRevs},body=Body}}, OldDocLookup) -> + fun({Client, {#doc{id=Id,deleted=Delete,revs={0,PrevRevs},body=Body}, Ref}}, OldDocLookup) -> case PrevRevs of [RevStr|_] -> PrevRev = list_to_integer(?b2l(RevStr)); @@ -741,16 +740,16 @@ update_local_docs(#db{local_docs_btree=Btree}=Db, Docs) -> true -> case Delete of false -> - send_result(Client, Id, {0, PrevRevs}, {ok, + send_result(Client, Ref, {ok, {0, ?l2b(integer_to_list(PrevRev + 1))}}), {update, {Id, {PrevRev + 1, Body}}}; true -> - send_result(Client, Id, {0, PrevRevs}, + send_result(Client, Ref, {ok, {0, <<"0">>}}), {remove, Id} end; false -> - send_result(Client, Id, {0, PrevRevs}, conflict), + send_result(Client, Ref, conflict), ignore end end, Docs, OldDocLookups),