couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rnew...@apache.org
Subject [36/36] couch commit: updated refs/heads/windsor-merge-211 to b7adb86
Date Tue, 05 Aug 2014 21:18:19 GMT
Handle duplicate doc updates in single commit

Before this commit, there were bugs in couch_db and couch_db_updater
which caused couch to incorrectly report that all duplicate docs in a
group commit failed from conflicts. The changes in this commit cause
couch to correctly report that one of the duplicate updates in the
group commit succeeded. This commit also changes behavior slightly by
applying duplicate doc updates in the order they were supplied. This
change is for consistency with CouchDB.

This work is based on the patch by Bob Dionne.

BugzID: 12540


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch/commit/b7adb869
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch/tree/b7adb869
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch/diff/b7adb869

Branch: refs/heads/windsor-merge-211
Commit: b7adb86903e662ad412634e8a3bd9102dced340c
Parents: 7f66153
Author: Benjamin Bastian <benjamin.bastian@gmail.com>
Authored: Tue Aug 27 11:26:58 2013 -0700
Committer: Robert Newson <rnewson@apache.org>
Committed: Tue Aug 5 22:16:43 2014 +0100

----------------------------------------------------------------------
 src/couch_db.erl         | 95 ++++++++++++++++++++++++++-----------------
 src/couch_db_updater.erl | 52 +++++++++++++++++------
 src/couch_doc.erl        | 12 +++---
 3 files changed, 103 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/b7adb869/src/couch_db.erl
----------------------------------------------------------------------
diff --git a/src/couch_db.erl b/src/couch_db.erl
index 0b9e490..b0c7894 100644
--- a/src/couch_db.erl
+++ b/src/couch_db.erl
@@ -500,11 +500,17 @@ update_docs(Db, Docs) ->
 % group_alike_docs groups the sorted documents into sublist buckets, by id.
 % ([DocA, DocA, DocB, DocC], []) -> [[DocA, DocA], [DocB], [DocC]]
 group_alike_docs(Docs) ->
-    Sorted = lists:sort(fun(#doc{id=A},#doc{id=B})-> A < B end, Docs),
-    group_alike_docs(Sorted, []).
+    % Here we're just asserting that our doc sort is stable so that
+    % if we have duplicate docids we don't have to worry about the
+    % behavior of lists:sort/2 which isn't documented anyhwere as
+    % being stable.
+    WithPos = lists:zip(Docs, lists:seq(1, length(Docs))),
+    SortFun = fun({D1, P1}, {D2, P2}) -> {D1#doc.id, P1} =< {D2#doc.id, P2} end,
+    SortedDocs = [D || {D, _} <- lists:sort(SortFun, WithPos)],
+    group_alike_docs(SortedDocs, []).
 
 group_alike_docs([], Buckets) ->
-    lists:reverse(Buckets);
+    lists:reverse(lists:map(fun lists:reverse/1, Buckets));
 group_alike_docs([Doc|Rest], []) ->
     group_alike_docs(Rest, [[Doc]]);
 group_alike_docs([Doc|Rest], [Bucket|RestBuckets]) ->
@@ -627,10 +633,10 @@ prep_and_validate_update(Db, #doc{id=Id,revs={RevStart, Revs}}=Doc,
 
 prep_and_validate_updates(_Db, [], [], _AllowConflict, AccPrepped,
         AccFatalErrors) ->
-   {AccPrepped, AccFatalErrors};
+    AccPrepped2 = lists:reverse(lists:map(fun lists:reverse/1, AccPrepped)),
+    {AccPrepped2, AccFatalErrors};
 prep_and_validate_updates(Db, [DocBucket|RestBuckets], [not_found|RestLookups],
         AllowConflict, AccPrepped, AccErrors) ->
-    [#doc{id=Id}|_]=DocBucket,
     % no existing revs are known,
     {PreppedBucket, AccErrors3} = lists:foldl(
         fun(#doc{revs=Revs}=Doc, {AccBucket, AccErrors2}) ->
@@ -645,11 +651,11 @@ prep_and_validate_updates(Db, [DocBucket|RestBuckets], [not_found|RestLookups],
                 ok ->
                     {[Doc | AccBucket], AccErrors2};
                 Error ->
-                    {AccBucket, [{{Id, {0, []}}, Error} | AccErrors2]}
+                    {AccBucket, [{doc_tag(Doc), Error} | AccErrors2]}
                 end;
             _ ->
                 % old revs specified but none exist, a conflict
-                {AccBucket, [{{Id, Revs}, conflict} | AccErrors2]}
+                {AccBucket, [{doc_tag(Doc), conflict} | AccErrors2]}
             end
         end,
         {[], AccErrors}, DocBucket),
@@ -670,9 +676,9 @@ prep_and_validate_updates(Db, [DocBucket|RestBuckets],
                     LeafRevsDict, AllowConflict) of
             {ok, Doc2} ->
                 {[Doc2 | Docs2Acc], AccErrors2};
-            {Error, #doc{id=Id,revs=Revs}} ->
+            {Error, _} ->
                 % Record the error
-                {Docs2Acc, [{{Id, Revs}, Error} |AccErrors2]}
+                {Docs2Acc, [{doc_tag(Doc), Error} |AccErrors2]}
             end
         end,
         {[], AccErrors}, DocBucket),
@@ -687,7 +693,8 @@ update_docs(Db, Docs, Options) ->
 prep_and_validate_replicated_updates(_Db, [], [], AccPrepped, AccErrors) ->
     Errors2 = [{{Id, {Pos, Rev}}, Error} ||
             {#doc{id=Id,revs={Pos,[Rev|_]}}, Error} <- AccErrors],
-    {lists:reverse(AccPrepped), lists:reverse(Errors2)};
+    AccPrepped2 = lists:reverse(lists:map(fun lists:reverse/1, AccPrepped)),
+    {AccPrepped2, lists:reverse(Errors2)};
 prep_and_validate_replicated_updates(Db, [Bucket|RestBuckets], [OldInfo|RestOldInfo], AccPrepped,
AccErrors) ->
     case OldInfo of
     not_found ->
@@ -782,10 +789,10 @@ new_revs([], OutBuckets, IdRevsAcc) ->
     {lists:reverse(OutBuckets), IdRevsAcc};
 new_revs([Bucket|RestBuckets], OutBuckets, IdRevsAcc) ->
     {NewBucket, IdRevsAcc3} = lists:mapfoldl(
-        fun(#doc{id=Id,revs={Start, RevIds}}=Doc, IdRevsAcc2)->
+        fun(#doc{revs={Start, RevIds}}=Doc, IdRevsAcc2)->
         NewRevId = new_revid(Doc),
         {Doc#doc{revs={Start+1, [NewRevId | RevIds]}},
-            [{{Id, {Start, RevIds}}, {ok, {Start+1, NewRevId}}} | IdRevsAcc2]}
+            [{doc_tag(Doc), {ok, {Start+1, NewRevId}}} | IdRevsAcc2]}
     end, IdRevsAcc, Bucket),
     new_revs(RestBuckets, [NewBucket|OutBuckets], IdRevsAcc3).
 
@@ -802,9 +809,23 @@ check_dup_atts2(_) ->
     ok.
 
 
-update_docs(Db, Docs, Options, replicated_changes) ->
+tag_docs([]) ->
+    [];
+tag_docs([#doc{meta=Meta}=Doc | Rest]) ->
+    [Doc#doc{meta=[{ref, make_ref()} | Meta]} | tag_docs(Rest)].
+
+doc_tag(#doc{meta=Meta}) ->
+    case lists:keyfind(ref, 1, Meta) of
+        {ref, Ref} when is_reference(Ref) -> Ref;
+        false -> throw(doc_not_tagged);
+        Else -> throw({invalid_doc_tag, Else})
+    end.
+
+update_docs(Db, Docs0, Options, replicated_changes) ->
     increment_stat(Db, {couchdb, database_writes}),
+    Docs = tag_docs(Docs0),
     DocBuckets = before_docs_update(Db, group_alike_docs(Docs)),
+
     case (Db#db.validate_doc_funs /= []) orelse
         lists:any(
             fun(#doc{id= <<?DESIGN_DOC_PREFIX, _/binary>>}) -> true;
@@ -827,21 +848,17 @@ update_docs(Db, Docs, Options, replicated_changes) ->
     {ok, []} = write_and_commit(Db, DocBuckets4, [], [merge_conflicts | Options]),
     {ok, DocErrors};
 
-update_docs(Db, Docs, Options, interactive_edit) ->
+update_docs(Db, Docs0, Options, interactive_edit) ->
     increment_stat(Db, {couchdb, database_writes}),
     AllOrNothing = lists:member(all_or_nothing, Options),
-    % go ahead and generate the new revision ids for the documents.
-    % separate out the NonRep documents from the rest of the documents
-
-    {Docs2, NonRepDocs} = lists:foldl(
-         fun(#doc{id=Id}=Doc, {DocsAcc, NonRepDocsAcc}) ->
-            case Id of
-            <<?LOCAL_DOC_PREFIX, _/binary>> ->
-                {DocsAcc, [Doc | NonRepDocsAcc]};
-            Id->
-                {[Doc | DocsAcc], NonRepDocsAcc}
-            end
-        end, {[], []}, Docs),
+    Docs = tag_docs(Docs0),
+
+    % Separate _local docs from normal docs
+    IsLocal = fun
+        (#doc{id= <<?LOCAL_DOC_PREFIX, _/binary>>}) -> true;
+        (_) -> false
+    end,
+    {NonRepDocs, Docs2} = lists:partition(IsLocal, Docs),
 
     DocBuckets = before_docs_update(Db, group_alike_docs(Docs2)),
 
@@ -868,12 +885,14 @@ update_docs(Db, Docs, Options, interactive_edit) ->
     end,
 
     if (AllOrNothing) and (PreCommitFailures /= []) ->
-        {aborted, lists:map(
-            fun({{Id,{Pos, [RevId|_]}}, Error}) ->
-                {{Id, {Pos, RevId}}, Error};
-            ({{Id,{0, []}}, Error}) ->
-                {{Id, {0, <<>>}}, Error}
-            end, PreCommitFailures)};
+        RefErrorDict = dict:from_list([{doc_tag(Doc), Doc} || Doc <- Docs]),
+        {aborted, lists:map(fun({Ref, Error}) ->
+            #doc{id=Id,revs={Start,RevIds}} = dict:fetch(Ref, RefErrorDict),
+            case {Start, RevIds} of
+                {Pos, [RevId | _]} -> {{Id, {Pos, RevId}}, Error};
+                {0, []} -> {{Id, {0, <<>>}}, Error}
+            end
+        end, PreCommitFailures)};
     true ->
         Options2 = if AllOrNothing -> [merge_conflicts];
                 true -> [] end ++ Options,
@@ -885,12 +904,12 @@ update_docs(Db, Docs, Options, interactive_edit) ->
 
         {ok, CommitResults} = write_and_commit(Db, DocBuckets4, NonRepDocs, Options2),
 
-        ResultsDict = dict:from_list(IdRevs ++ CommitResults ++ PreCommitFailures),
-        {ok, lists:map(
-            fun(#doc{id=Id,revs={Pos, RevIds}}) ->
-                {ok, Result} = dict:find({Id, {Pos, RevIds}}, ResultsDict),
-                Result
-            end, Docs)}
+        ResultsDict = lists:foldl(fun({Key, Resp}, ResultsAcc) ->
+            dict:store(Key, Resp, ResultsAcc)
+        end, dict:from_list(IdRevs), CommitResults ++ PreCommitFailures),
+        {ok, lists:map(fun(Doc) ->
+            dict:fetch(doc_tag(Doc), ResultsDict)
+        end, Docs)}
     end.
 
 % Returns the first available document on disk. Input list is a full rev path

http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/b7adb869/src/couch_db_updater.erl
----------------------------------------------------------------------
diff --git a/src/couch_db_updater.erl b/src/couch_db_updater.erl
index 09028bd..7c2a43a 100644
--- a/src/couch_db_updater.erl
+++ b/src/couch_db_updater.erl
@@ -264,7 +264,7 @@ handle_cast(Msg, #db{name = Name} = Db) ->
 
 handle_info({update_docs, Client, GroupedDocs, NonRepDocs, MergeConflicts,
         FullCommit}, Db) ->
-    GroupedDocs2 = [[{Client, D} || D <- DocGroup] || DocGroup <- GroupedDocs],
+    GroupedDocs2 = maybe_tag_grouped_docs(Client, GroupedDocs),
     if NonRepDocs == [] ->
         {GroupedDocs3, Clients, FullCommit2} = collect_updates(GroupedDocs2,
                 [Client], MergeConflicts, FullCommit);
@@ -319,6 +319,20 @@ handle_info({'DOWN', Ref, _, _, Reason}, #db{fd_monitor=Ref, name=Name}
= Db) ->
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
 
+maybe_tag_grouped_docs(Client, GroupedDocs) ->
+    lists:map(fun(DocGroup) ->
+        [{Client, maybe_tag_doc(D)} || D <- DocGroup]
+    end, GroupedDocs).
+
+maybe_tag_doc(#doc{id=Id, revs={Pos,[_Rev|PrevRevs]}, meta=Meta0}=Doc) ->
+    case lists:keymember(ref, 1, Meta0) of
+        true ->
+            Doc;
+        false ->
+            Key = {Id, {Pos-1, PrevRevs}},
+            Doc#doc{meta=[{ref, Key} | Meta0]}
+    end.
+
 merge_updates([[{_,#doc{id=X}}|_]=A|RestA], [[{_,#doc{id=X}}|_]=B|RestB]) ->
     [A++B | merge_updates(RestA, RestB)];
 merge_updates([[{_,#doc{id=X}}|_]|_]=A, [[{_,#doc{id=Y}}|_]|_]=B) when X < Y ->
@@ -337,8 +351,7 @@ collect_updates(GroupedDocsAcc, ClientsAcc, MergeConflicts, FullCommit)
->
         % updaters than deal with their possible conflicts, and local docs
         % writes are relatively rare. Can be optmized later if really needed.
         {update_docs, Client, GroupedDocs, [], MergeConflicts, FullCommit2} ->
-            GroupedDocs2 = [[{Client, Doc} || Doc <- DocGroup]
-                    || DocGroup <- GroupedDocs],
+            GroupedDocs2 = maybe_tag_grouped_docs(Client, GroupedDocs),
             GroupedDocsAcc2 =
                 merge_updates(GroupedDocsAcc, GroupedDocs2),
             collect_updates(GroupedDocsAcc2, [Client | ClientsAcc],
@@ -597,9 +610,16 @@ flush_trees(#db{fd = Fd} = Db,
     flush_trees(Db, RestUnflushed, [InfoFlushed | AccFlushed]).
 
 
-send_result(Client, Id, OriginalRevs, NewResult) ->
+send_result(Client, Doc, NewResult) ->
     % used to send a result to the client
-    catch(Client ! {result, self(), {{Id, OriginalRevs}, NewResult}}).
+    catch(Client ! {result, self(), {doc_tag(Doc), NewResult}}).
+
+doc_tag(#doc{meta=Meta}) ->
+    case lists:keyfind(ref, 1, Meta) of
+        {ref, Ref} -> Ref;
+        false -> throw(no_doc_tag);
+        Else -> throw({invalid_doc_tag, Else})
+    end.
 
 merge_rev_trees(_Limit, _Merge, [], [], AccNewInfos, AccRemoveSeqs, AccSeq) ->
     {ok, lists:reverse(AccNewInfos), AccRemoveSeqs, AccSeq};
@@ -613,7 +633,7 @@ merge_rev_trees(Limit, MergeConflicts, [NewDocs|RestDocsList],
                 case couch_key_tree:merge(AccTree, couch_doc:to_path(NewDoc),
                     Limit) of
                 {_NewTree, conflicts} when (not OldDeleted) ->
-                    send_result(Client, Id, {Pos-1,PrevRevs}, conflict),
+                    send_result(Client, NewDoc, conflict),
                     {AccTree, OldDeleted};
                 {NewTree, conflicts} when PrevRevs /= [] ->
                     % Check to be sure if prev revision was specified, it's
@@ -625,7 +645,7 @@ merge_rev_trees(Limit, MergeConflicts, [NewDocs|RestDocsList],
                     if IsPrevLeaf ->
                         {NewTree, OldDeleted};
                     true ->
-                        send_result(Client, Id, {Pos-1,PrevRevs}, conflict),
+                        send_result(Client, NewDoc, conflict),
                         {AccTree, OldDeleted}
                     end;
                 {NewTree, no_conflicts} when  AccTree == NewTree ->
@@ -644,11 +664,11 @@ merge_rev_trees(Limit, MergeConflicts, [NewDocs|RestDocsList],
                         {NewTree2, _} = couch_key_tree:merge(AccTree,
                                 couch_doc:to_path(NewDoc2), Limit),
                         % we changed the rev id, this tells the caller we did
-                        send_result(Client, Id, {Pos-1,PrevRevs},
-                            {ok, {OldPos + 1, NewRevId}}),
+                        send_result(Client, NewDoc,
+                                {ok, {OldPos + 1, NewRevId}}),
                         {NewTree2, OldDeleted};
                     true ->
-                        send_result(Client, Id, {Pos-1,PrevRevs}, conflict),
+                        send_result(Client, NewDoc, conflict),
                         {AccTree, OldDeleted}
                     end;
                 {NewTree, _} ->
@@ -754,7 +774,13 @@ update_local_docs(#db{local_tree=Btree}=Db, Docs) ->
     Ids = [Id || {_Client, #doc{id=Id}} <- Docs],
     OldDocLookups = couch_btree:lookup(Btree, Ids),
     BtreeEntries = lists:zipwith(
-        fun({Client, #doc{id=Id,deleted=Delete,revs={0,PrevRevs},body=Body}}, _OldDocLookup)
->
+        fun({Client, NewDoc}, _OldDocLookup) ->
+            #doc{
+                id = Id,
+                deleted = Delete,
+                revs = {0, PrevRevs},
+                body = Body
+            } = NewDoc,
             case PrevRevs of
             [RevStr|_] ->
                 PrevRev = list_to_integer(?b2l(RevStr));
@@ -771,11 +797,11 @@ update_local_docs(#db{local_tree=Btree}=Db, Docs) ->
             % true ->
                 case Delete of
                     false ->
-                        send_result(Client, Id, {0, PrevRevs}, {ok,
+                        send_result(Client, NewDoc, {ok,
                                 {0, ?l2b(integer_to_list(PrevRev + 1))}}),
                         {update, {Id, {PrevRev + 1, Body}}};
                     true  ->
-                        send_result(Client, Id, {0, PrevRevs},
+                        send_result(Client, NewDoc,
                                 {ok, {0, <<"0">>}}),
                         {remove, Id}
                 end%;

http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/b7adb869/src/couch_doc.erl
----------------------------------------------------------------------
diff --git a/src/couch_doc.erl b/src/couch_doc.erl
index 05202f4..14f6a4f 100644
--- a/src/couch_doc.erl
+++ b/src/couch_doc.erl
@@ -70,7 +70,7 @@ revs_to_strs([{Pos, RevId}| Rest]) ->
     [rev_to_str({Pos, RevId}) | revs_to_strs(Rest)].
 
 to_json_meta(Meta) ->
-    lists:map(
+    lists:flatmap(
         fun({revs_info, Start, RevsInfo}) ->
             {JsonRevsInfo, _Pos}  = lists:mapfoldl(
                 fun({RevId, Status}, PosAcc) ->
@@ -78,13 +78,15 @@ to_json_meta(Meta) ->
                         {<<"status">>, ?l2b(atom_to_list(Status))}]},
                     {JsonObj, PosAcc - 1}
                 end, Start, RevsInfo),
-            {<<"_revs_info">>, JsonRevsInfo};
+            [{<<"_revs_info">>, JsonRevsInfo}];
         ({local_seq, Seq}) ->
-            {<<"_local_seq">>, Seq};
+            [{<<"_local_seq">>, Seq}];
         ({conflicts, Conflicts}) ->
-            {<<"_conflicts">>, revs_to_strs(Conflicts)};
+            [{<<"_conflicts">>, revs_to_strs(Conflicts)}];
         ({deleted_conflicts, DConflicts}) ->
-            {<<"_deleted_conflicts">>, revs_to_strs(DConflicts)}
+            [{<<"_deleted_conflicts">>, revs_to_strs(DConflicts)}];
+        (_) ->
+            []
         end, Meta).
 
 to_json_attachments(Attachments, Options) ->


Mime
View raw message