couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fdman...@apache.org
Subject svn commit: r1139825 - in /couchdb/trunk/src/couchdb: couch_api_wrap.erl couch_doc.erl couch_replicator_doc_copier.erl
Date Sun, 26 Jun 2011 16:04:57 GMT
Author: fdmanana
Date: Sun Jun 26 16:04:57 2011
New Revision: 1139825

URL: http://svn.apache.org/viewvc?rev=1139825&view=rev
Log:
Improved replicator's multipart sending/receiving

Messages exchanged between the multipart parser and the
client process are now tagged with references. This ensures
that when retrying a multipart request the client doesn't
get old data messages from the previous request attempt.

Modified:
    couchdb/trunk/src/couchdb/couch_api_wrap.erl
    couchdb/trunk/src/couchdb/couch_doc.erl
    couchdb/trunk/src/couchdb/couch_replicator_doc_copier.erl

Modified: couchdb/trunk/src/couchdb/couch_api_wrap.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_api_wrap.erl?rev=1139825&r1=1139824&r2=1139825&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_api_wrap.erl (original)
+++ couchdb/trunk/src/couchdb/couch_api_wrap.erl Sun Jun 26 16:04:57 2011
@@ -174,8 +174,8 @@ open_doc_revs(#httpdb{} = HttpDb, Id, Re
             unlink(Self)
         end),
     receive
-    started ->
-        receive_docs_loop(Streamer, Fun, Acc)
+    {started_open_doc_revs, Ref} ->
+        receive_docs_loop(Streamer, Fun, Id, Revs, Ref, Acc)
     end;
 open_doc_revs(Db, Id, Revs, Options, Fun, Acc) ->
     {ok, Results} = couch_db:open_doc_revs(Db, Id, Revs, Options),
@@ -459,24 +459,28 @@ atts_since_arg(UrlLen, [PA | Rest], Acc)
 %       the exported open_doc_revs/6 function. The restart should be
 %       transparent to the caller like any other Couch API function exported
 %       by this module.
-receive_docs_loop(Streamer, Fun, Acc) ->
+receive_docs_loop(Streamer, Fun, Id, Revs, Ref, Acc) ->
     try
-        receive_docs(Streamer, Fun, Acc)
+        % Left only for debugging purposes via an interactive or remote shell
+        erlang:put(open_doc_revs, {Id, Revs, Ref, Streamer}),
+        receive_docs(Streamer, Fun, Ref, Acc)
     catch
-    throw:restart ->
-        receive_docs_loop(Streamer, Fun, Acc)
+    error:{restart_open_doc_revs, NewRef} ->
+        receive_docs_loop(Streamer, Fun, Id, Revs, NewRef, Acc)
     end.
 
-receive_docs(Streamer, UserFun, UserAcc) ->
-    Streamer ! {get_headers, self()},
+receive_docs(Streamer, UserFun, Ref, UserAcc) ->
+    Streamer ! {get_headers, Ref, self()},
     receive
-    started ->
-        restart_remote_open_doc_revs();
-    {headers, Headers} ->
+    {started_open_doc_revs, NewRef} ->
+        restart_remote_open_doc_revs(Ref, NewRef);
+    {headers, Ref, Headers} ->
         case get_value("content-type", Headers) of
         {"multipart/related", _} = ContentType ->
             case doc_from_multi_part_stream(
-                ContentType, fun() -> receive_doc_data(Streamer) end) of
+                ContentType,
+                fun() -> receive_doc_data(Streamer, Ref) end,
+                Ref) of
             {ok, Doc, Parser} ->
                 case UserFun({ok, Doc}, UserAcc) of
                 {ok, UserAcc2} ->
@@ -484,98 +488,96 @@ receive_docs(Streamer, UserFun, UserAcc)
                 {skip, UserAcc2} ->
                     couch_doc:abort_multi_part_stream(Parser)
                 end,
-                receive_docs(Streamer, UserFun, UserAcc2)
+                receive_docs(Streamer, UserFun, Ref, UserAcc2)
             end;
         {"application/json", []} ->
             Doc = couch_doc:from_json_obj(
-                    ?JSON_DECODE(receive_all(Streamer, []))),
+                    ?JSON_DECODE(receive_all(Streamer, Ref, []))),
             {_, UserAcc2} = UserFun({ok, Doc}, UserAcc),
-            receive_docs(Streamer, UserFun, UserAcc2);
+            receive_docs(Streamer, UserFun, Ref, UserAcc2);
         {"application/json", [{"error","true"}]} ->
-            {ErrorProps} = ?JSON_DECODE(receive_all(Streamer, [])),
+            {ErrorProps} = ?JSON_DECODE(receive_all(Streamer, Ref, [])),
             Rev = get_value(<<"missing">>, ErrorProps),
             Result = {{not_found, missing}, couch_doc:parse_rev(Rev)},
             {_, UserAcc2} = UserFun(Result, UserAcc),
-            receive_docs(Streamer, UserFun, UserAcc2)
+            receive_docs(Streamer, UserFun, Ref, UserAcc2)
         end;
-    done ->
+    {done, Ref} ->
         {ok, UserAcc}
     end.
 
 
-restart_remote_open_doc_revs() ->
+restart_remote_open_doc_revs(Ref, NewRef) ->
     receive
-    {body_bytes, _} ->
-        restart_remote_open_doc_revs();
-    body_done ->
-        restart_remote_open_doc_revs();
-    done ->
-        restart_remote_open_doc_revs();
-    {headers, _} ->
-        restart_remote_open_doc_revs();
-    started ->
-        restart_remote_open_doc_revs()
+    {body_bytes, Ref, _} ->
+        restart_remote_open_doc_revs(Ref, NewRef);
+    {body_done, Ref} ->
+        restart_remote_open_doc_revs(Ref, NewRef);
+    {done, Ref} ->
+        restart_remote_open_doc_revs(Ref, NewRef);
+    {headers, Ref, _} ->
+        restart_remote_open_doc_revs(Ref, NewRef)
     after 0 ->
-        throw(restart)
+        erlang:error({restart_open_doc_revs, NewRef})
     end.
 
 
 remote_open_doc_revs_streamer_start(Parent) ->
     receive
-    {get_headers, _} ->
+    {get_headers, _Ref, Parent} ->
         remote_open_doc_revs_streamer_start(Parent);
-    {next_bytes, _} ->
+    {next_bytes, _Ref, Parent} ->
         remote_open_doc_revs_streamer_start(Parent)
     after 0 ->
-        Parent ! started
+        Parent ! {started_open_doc_revs, make_ref()}
     end.
 
 
-receive_all(Streamer, Acc) ->
-    Streamer ! {next_bytes, self()},
+receive_all(Streamer, Ref, Acc) ->
+    Streamer ! {next_bytes, Ref, self()},
     receive
-    started ->
-        restart_remote_open_doc_revs();
-    {body_bytes, Bytes} ->
-        receive_all(Streamer, [Bytes | Acc]);
-    body_done ->
+    {started_open_doc_revs, NewRef} ->
+        restart_remote_open_doc_revs(Ref, NewRef);
+    {body_bytes, Ref, Bytes} ->
+        receive_all(Streamer, Ref, [Bytes | Acc]);
+    {body_done, Ref} ->
         lists:reverse(Acc)
     end.
 
 
 mp_parse_mixed(eof) ->
-    receive {get_headers, From} ->
-        From ! done
+    receive {get_headers, Ref, From} ->
+        From ! {done, Ref}
     end;
 mp_parse_mixed({headers, H}) ->
-    receive {get_headers, From} ->
-        From ! {headers, H}
+    receive {get_headers, Ref, From} ->
+        From ! {headers, Ref, H}
     end,
     fun mp_parse_mixed/1;
 mp_parse_mixed({body, Bytes}) ->
-    receive {next_bytes, From} ->
-        From ! {body_bytes, Bytes}
+    receive {next_bytes, Ref, From} ->
+        From ! {body_bytes, Ref, Bytes}
     end,
     fun mp_parse_mixed/1;
 mp_parse_mixed(body_end) ->
-    receive {next_bytes, From} ->
-        From ! body_done;
-    {get_headers, From} ->
-        self() ! {get_headers, From}
+    receive {next_bytes, Ref, From} ->
+        From ! {body_done, Ref};
+    {get_headers, Ref, From} ->
+        self() ! {get_headers, Ref, From}
     end,
     fun mp_parse_mixed/1.
 
 
-receive_doc_data(Streamer) ->
-    Streamer ! {next_bytes, self()},
+receive_doc_data(Streamer, Ref) ->
+    Streamer ! {next_bytes, Ref, self()},
     receive
-    {body_bytes, Bytes} ->
-        {Bytes, fun() -> receive_doc_data(Streamer) end};
-    body_done ->
-        {<<>>, fun() -> receive_doc_data(Streamer) end}
+    {body_bytes, Ref, Bytes} ->
+        {Bytes, fun() -> receive_doc_data(Streamer, Ref) end};
+    {body_done, Ref} ->
+        {<<>>, fun() -> receive_doc_data(Streamer, Ref) end}
     end.
 
-doc_from_multi_part_stream(ContentType, DataFun) ->
+doc_from_multi_part_stream(ContentType, DataFun, Ref) ->
     Self = self(),
     Parser = spawn_link(fun() ->
         {<<"--">>, _, _} = couch_httpd:parse_multipart_request(
@@ -583,22 +585,23 @@ doc_from_multi_part_stream(ContentType, 
             fun(Next) -> couch_doc:mp_parse_doc(Next, []) end),
         unlink(Self)
         end),
-    Parser ! {get_doc_bytes, self()},
+    Parser ! {get_doc_bytes, Ref, self()},
     receive
-    started ->
+    {started_open_doc_revs, NewRef} ->
         unlink(Parser),
         exit(Parser, kill),
-        restart_remote_open_doc_revs();
-    {doc_bytes, DocBytes} ->
+        restart_remote_open_doc_revs(Ref, NewRef);
+    {doc_bytes, Ref, DocBytes} ->
         Doc = couch_doc:from_json_obj(?JSON_DECODE(DocBytes)),
         ReadAttachmentDataFun = fun() ->
-            Parser ! {get_bytes, self()},
+            Parser ! {get_bytes, Ref, self()},
             receive
-            started ->
+            {started_open_doc_revs, NewRef} ->
                 unlink(Parser),
                 exit(Parser, kill),
-                restart_remote_open_doc_revs();
-            {bytes, Bytes} ->
+                receive {bytes, Ref, _} -> ok after 0 -> ok end,
+                restart_remote_open_doc_revs(Ref, NewRef);
+            {bytes, Ref, Bytes} ->
                 Bytes
             end
         end,
@@ -721,8 +724,8 @@ stream_doc({JsonBytes, Atts, Boundary, L
         couch_doc:doc_to_multi_part_stream(
             Boundary, JsonBytes, Atts,
             fun(Data) ->
-                receive {get_data, From} ->
-                    From ! {data, Data}
+                receive {get_data, Ref, From} ->
+                    From ! {data, Ref, Data}
                 end
             end, true),
         unlink(Self)
@@ -733,7 +736,8 @@ stream_doc({0, Id}) ->
     erlang:erase({doc_streamer, Id}),
     eof;
 stream_doc({LenLeft, Id}) when LenLeft > 0 ->
-    erlang:get({doc_streamer, Id}) ! {get_data, self()},
-    receive {data, Data} ->
+    Ref = make_ref(),
+    erlang:get({doc_streamer, Id}) ! {get_data, Ref, self()},
+    receive {data, Ref, Data} ->
         {ok, Data, {LenLeft - iolist_size(Data), Id}}
     end.

Modified: couchdb/trunk/src/couchdb/couch_doc.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_doc.erl?rev=1139825&r1=1139824&r2=1139825&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_doc.erl (original)
+++ couchdb/trunk/src/couchdb/couch_doc.erl Sun Jun 26 16:04:57 2011
@@ -501,15 +501,16 @@ doc_from_multi_part_stream(ContentType, 
         unlink(Parent),
         Parent ! {self(), finished}
         end),
-    Parser ! {get_doc_bytes, self()},
+    Ref = make_ref(),
+    Parser ! {get_doc_bytes, Ref, self()},
     receive 
-    {doc_bytes, DocBytes} ->
+    {doc_bytes, Ref, DocBytes} ->
         Doc = from_json_obj(?JSON_DECODE(DocBytes)),
         % go through the attachments looking for 'follows' in the data,
         % replace with function that reads the data from MIME stream.
         ReadAttachmentDataFun = fun() ->
-            Parser ! {get_bytes, self()},
-            receive {bytes, Bytes} -> Bytes end
+            Parser ! {get_bytes, Ref, self()},
+            receive {bytes, Ref, Bytes} -> Bytes end
         end,
         Atts2 = lists:map(
             fun(#att{data=follows}=A) ->
@@ -536,8 +537,8 @@ mp_parse_doc({body, Bytes}, AccBytes) ->
         mp_parse_doc(Next, [Bytes | AccBytes])
     end;
 mp_parse_doc(body_end, AccBytes) ->
-    receive {get_doc_bytes, From} ->
-        From ! {doc_bytes, lists:reverse(AccBytes)}
+    receive {get_doc_bytes, Ref, From} ->
+        From ! {doc_bytes, Ref, lists:reverse(AccBytes)}
     end,
     fun mp_parse_atts/1.
 
@@ -546,8 +547,8 @@ mp_parse_atts(eof) ->
 mp_parse_atts({headers, _H}) ->
     fun mp_parse_atts/1;
 mp_parse_atts({body, Bytes}) ->
-    receive {get_bytes, From} ->
-        From ! {bytes, Bytes}
+    receive {get_bytes, Ref, From} ->
+        From ! {bytes, Ref, Bytes}
     end,
     fun mp_parse_atts/1;
 mp_parse_atts(body_end) ->
@@ -560,9 +561,9 @@ abort_multi_part_stream(Parser) ->
 abort_multi_part_stream(Parser, MonRef) ->
     case is_process_alive(Parser) of
     true ->
-        Parser ! {get_bytes, self()},
+        Parser ! {get_bytes, nil, self()},
         receive
-        {bytes, _Bytes} ->
+        {bytes, nil, _Bytes} ->
              abort_multi_part_stream(Parser, MonRef);
         {'DOWN', MonRef, _, _, _} ->
              ok

Modified: couchdb/trunk/src/couchdb/couch_replicator_doc_copier.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_replicator_doc_copier.erl?rev=1139825&r1=1139824&r2=1139825&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_replicator_doc_copier.erl (original)
+++ couchdb/trunk/src/couchdb/couch_replicator_doc_copier.erl Sun Jun 26 16:04:57 2011
@@ -521,10 +521,10 @@ flush_doc(Target, #doc{id = Id, revs = {
             [Id, couch_doc:rev_to_str({Pos, RevId}),
                 couch_api_wrap:db_uri(Target), to_binary(Error), to_binary(Reason)]),
         {error, Error};
-    Tag:Err ->
+    throw:Err ->
         ?LOG_ERROR("Replicator: couldn't write document `~s`, revision `~s`,"
             " to target database `~s`. Error: `~s`.",
             [Id, couch_doc:rev_to_str({Pos, RevId}),
-                couch_api_wrap:db_uri(Target), to_binary({Tag, Err})]),
+                couch_api_wrap:db_uri(Target), to_binary(Err)]),
         {error, Err}
     end.



Mime
View raw message