couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kxe...@apache.org
Subject [08/25] couch commit: updated refs/heads/master to 92598cd
Date Thu, 15 Oct 2015 16:35:42 GMT
Move doc_from_multi_part_stream into httpd_multipart


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch/commit/f265acfb
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch/tree/f265acfb
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch/diff/f265acfb

Branch: refs/heads/master
Commit: f265acfb33cc66cdb25f6432bc1064d9f737e14e
Parents: b6961c5
Author: ILYA Khlopotov <iilyak@ca.ibm.com>
Authored: Thu Nov 27 14:31:06 2014 -0800
Committer: ILYA Khlopotov <iilyak@ca.ibm.com>
Committed: Wed May 6 06:55:58 2015 -0700

----------------------------------------------------------------------
 src/couch_doc.erl             | 194 ++-----------------------------------
 src/couch_httpd_db.erl        |   4 +-
 src/couch_httpd_multipart.erl | 190 ++++++++++++++++++++++++++++++++++++
 3 files changed, 199 insertions(+), 189 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/f265acfb/src/couch_doc.erl
----------------------------------------------------------------------
diff --git a/src/couch_doc.erl b/src/couch_doc.erl
index d152a2a..a7fd147 100644
--- a/src/couch_doc.erl
+++ b/src/couch_doc.erl
@@ -15,14 +15,15 @@
 -export([to_doc_info/1,to_doc_info_path/1,parse_rev/1,parse_revs/1,rev_to_str/1,revs_to_strs/1]).
 -export([from_json_obj/1,to_json_obj/2,has_stubs/1, merge_stubs/2]).
 -export([validate_docid/1, get_validate_doc_fun/1]).
--export([doc_from_multi_part_stream/2, doc_from_multi_part_stream/3]).
+
 -export([doc_to_multi_part_stream/5, len_doc_to_multi_part_stream/4]).
--export([abort_multi_part_stream/1, restart_open_doc_revs/3]).
+-export([abort_multi_part_stream/1]).
+-export([restart_open_doc_revs/3]).
 -export([to_path/1]).
--export([mp_parse_doc/2]).
+
 -export([with_ejson_body/1]).
 -export([is_deleted/1]).
--export([num_mp_writers/1]).
+
 
 -include_lib("couch/include/couch_db.hrl").
 
@@ -452,190 +453,9 @@ atts_to_mp([Att | RestAtts], Boundary, WriteFun, SendEncodedAtts)  ->
             atts_to_mp(RestAtts, Boundary, WriteFun, SendEncodedAtts)
     end.
 
-
-doc_from_multi_part_stream(ContentType, DataFun) ->
-    doc_from_multi_part_stream(ContentType, DataFun, make_ref()).
-
-
-doc_from_multi_part_stream(ContentType, DataFun, Ref) ->
-    Parent = self(),
-    NumMpWriters = num_mp_writers(),
-    {Parser, ParserRef} = spawn_monitor(fun() ->
-        ParentRef = erlang:monitor(process, Parent),
-        put(mp_parent_ref, ParentRef),
-        num_mp_writers(NumMpWriters),
-        {<<"--",_/binary>>, _, _} = couch_httpd:parse_multipart_request(
-            ContentType, DataFun,
-            fun(Next) -> mp_parse_doc(Next, []) end),
-        unlink(Parent)
-        end),
-    Parser ! {get_doc_bytes, Ref, self()},
-    receive
-    {started_open_doc_revs, NewRef} ->
-        restart_open_doc_revs(Parser, Ref, NewRef);
-    {doc_bytes, Ref, DocBytes} ->
-        Doc = from_json_obj(?JSON_DECODE(DocBytes)),
-        % we'll send the Parser process ID to the remote nodes so they can
-        % retrieve their own copies of the attachment data
-        WithParser = fun(follows) -> {follows, Parser, Ref}; (D) -> D end,
-        Atts = [couch_att:transform(data, WithParser, A) || A <- Doc#doc.atts],
-        WaitFun = fun() ->
-            receive {'DOWN', ParserRef, _, _, _} -> ok end,
-            erlang:put(mochiweb_request_recv, true)
-        end,
-        {ok, Doc#doc{atts=Atts}, WaitFun, Parser};
-    {'DOWN', ParserRef, _, _, normal} ->
-        ok;
-    {'DOWN', ParserRef, process, Parser, {{nocatch, {Error, Msg}}, _}} ->
-        couch_log:error("Multipart streamer ~p died with reason ~p",
-                        [ParserRef, Msg]),
-        throw({Error, Msg});
-    {'DOWN', ParserRef, _, _, Reason} ->
-        couch_log:error("Multipart streamer ~p died with reason ~p",
-                        [ParserRef, Reason]),
-        throw({error, Reason})
-    end.
-
-
-mp_parse_doc({headers, H}, []) ->
-    case couch_util:get_value("content-type", H) of
-    {"application/json", _} ->
-        fun (Next) ->
-            mp_parse_doc(Next, [])
-        end;
-    _ ->
-        throw({bad_ctype, <<"Content-Type must be application/json">>})
-    end;
-mp_parse_doc({body, Bytes}, AccBytes) ->
-    fun (Next) ->
-        mp_parse_doc(Next, [Bytes | AccBytes])
-    end;
-mp_parse_doc(body_end, AccBytes) ->
-    receive {get_doc_bytes, Ref, From} ->
-        From ! {doc_bytes, Ref, lists:reverse(AccBytes)}
-    end,
-    fun(Next) ->
-        mp_parse_atts(Next, {Ref, [], 0, orddict:new(), []})
-    end.
-
-mp_parse_atts({headers, _}, Acc) ->
-    fun(Next) -> mp_parse_atts(Next, Acc) end;
-mp_parse_atts(body_end, Acc) ->
-    fun(Next) -> mp_parse_atts(Next, Acc) end;
-mp_parse_atts({body, Bytes}, {Ref, Chunks, Offset, Counters, Waiting}) ->
-    case maybe_send_data({Ref, Chunks++[Bytes], Offset, Counters, Waiting}) of
-        abort_parsing ->
-            fun(Next) -> mp_abort_parse_atts(Next, nil) end;
-        NewAcc ->
-            fun(Next) -> mp_parse_atts(Next, NewAcc) end
-    end;
-mp_parse_atts(eof, {Ref, Chunks, Offset, Counters, Waiting}) ->
-    N = num_mp_writers(),
-    M = length(Counters),
-    case (M == N) andalso Chunks == [] of
-    true ->
-        ok;
-    false ->
-        ParentRef = get(mp_parent_ref),
-        receive
-        abort_parsing ->
-            ok;
-        {get_bytes, Ref, From} ->
-            C2 = orddict:update_counter(From, 1, Counters),
-            NewAcc = maybe_send_data({Ref, Chunks, Offset, C2, [From|Waiting]}),
-            mp_parse_atts(eof, NewAcc);
-        {'DOWN', ParentRef, _, _, _} ->
-            exit(mp_reader_coordinator_died)
-        after 3600000 ->
-            ok
-        end
-    end.
-
-mp_abort_parse_atts(eof, _) ->
-    ok;
-mp_abort_parse_atts(_, _) ->
-    fun(Next) -> mp_abort_parse_atts(Next, nil) end.
-
-maybe_send_data({Ref, Chunks, Offset, Counters, Waiting}) ->
-    receive {get_bytes, Ref, From} ->
-        NewCounters = orddict:update_counter(From, 1, Counters),
-        maybe_send_data({Ref, Chunks, Offset, NewCounters, [From|Waiting]})
-    after 0 ->
-        % reply to as many writers as possible
-        NewWaiting = lists:filter(fun(Writer) ->
-            WhichChunk = orddict:fetch(Writer, Counters),
-            ListIndex = WhichChunk - Offset,
-            if ListIndex =< length(Chunks) ->
-                Writer ! {bytes, Ref, lists:nth(ListIndex, Chunks)},
-                false;
-            true ->
-                true
-            end
-        end, Waiting),
-
-        % check if we can drop a chunk from the head of the list
-        case Counters of
-        [] ->
-            SmallestIndex = 0;
-        _ ->
-            SmallestIndex = lists:min(element(2, lists:unzip(Counters)))
-        end,
-        Size = length(Counters),
-        N = num_mp_writers(),
-        if Size == N andalso SmallestIndex == (Offset+1) ->
-            NewChunks = tl(Chunks),
-            NewOffset = Offset+1;
-        true ->
-            NewChunks = Chunks,
-            NewOffset = Offset
-        end,
-
-        % we should wait for a writer if no one has written the last chunk
-        LargestIndex = lists:max([0|element(2, lists:unzip(Counters))]),
-        if LargestIndex  >= (Offset + length(Chunks)) ->
-            % someone has written all possible chunks, keep moving
-            {Ref, NewChunks, NewOffset, Counters, NewWaiting};
-        true ->
-            ParentRef = get(mp_parent_ref),
-            receive
-            abort_parsing ->
-                abort_parsing;
-            {'DOWN', ParentRef, _, _, _} ->
-                exit(mp_reader_coordinator_died);
-            {get_bytes, Ref, X} ->
-                C2 = orddict:update_counter(X, 1, Counters),
-                maybe_send_data({Ref, NewChunks, NewOffset, C2, [X|NewWaiting]})
-            end
-        end
-    end.
-
-
-num_mp_writers(N) ->
-    erlang:put(mp_att_writers, N).
-
-
-num_mp_writers() ->
-    case erlang:get(mp_att_writers) of
-        undefined -> 1;
-        Count -> Count
-    end.
-
-
 abort_multi_part_stream(Parser) ->
-    MonRef = erlang:monitor(process, Parser),
-    Parser ! abort_parsing,
-    receive
-        {'DOWN', MonRef, _, _, _} -> ok
-    after 60000 ->
-        % One minute is quite on purpose for this timeout. We
-        % want to try and read data to keep the socket open
-        % when possible but we also don't want to just make
-        % this a super long timeout because people have to
-        % wait this long to see if they just had an error
-        % like a validate_doc_update failure.
-        throw(multi_part_abort_timeout)
-    end.
-
+    couch_log:warning("couch_doc:abort_multi_part_stream/1 is deprecated use couch_httpd_multipart:abort_multipart_stream/1",
[]),
+    couch_httpd_multipart:abort_multipart_stream(Parser).
 
 restart_open_doc_revs(Parser, Ref, NewRef) ->
     unlink(Parser),

http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/f265acfb/src/couch_httpd_db.erl
----------------------------------------------------------------------
diff --git a/src/couch_httpd_db.erl b/src/couch_httpd_db.erl
index 938521c..46f9fc9 100644
--- a/src/couch_httpd_db.erl
+++ b/src/couch_httpd_db.erl
@@ -578,7 +578,7 @@ db_doc_req(#httpd{method='PUT'}=Req, Db, DocId) ->
 
     case couch_util:to_list(couch_httpd:header_value(Req, "Content-Type")) of
     ("multipart/related;" ++ _) = ContentType ->
-        {ok, Doc0, WaitFun, Parser} = couch_doc:doc_from_multi_part_stream(
+        {ok, Doc0, WaitFun, Parser} = couch_httpd_multipart:doc_from_multi_part_stream(
             ContentType, fun() -> receive_request_data(Req) end),
         Doc = couch_doc_from_req(Req, DocId, Doc0),
         try
@@ -587,7 +587,7 @@ db_doc_req(#httpd{method='PUT'}=Req, Db, DocId) ->
             Result
         catch throw:Err ->
             % Document rejected by a validate_doc_update function.
-            couch_doc:abort_multi_part_stream(Parser),
+            couch_httpd_multipart:abort_multi_part_stream(Parser),
             throw(Err)
         end;
     _Else ->

http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/f265acfb/src/couch_httpd_multipart.erl
----------------------------------------------------------------------
diff --git a/src/couch_httpd_multipart.erl b/src/couch_httpd_multipart.erl
new file mode 100644
index 0000000..0d3131e
--- /dev/null
+++ b/src/couch_httpd_multipart.erl
@@ -0,0 +1,190 @@
+-module(couch_httpd_multipart).
+
+-export([doc_from_multi_part_stream/2, doc_from_multi_part_stream/3]).
+
+-export([abort_multi_part_stream/1]).
+
+-include_lib("couch/include/couch_db.hrl").
+
+doc_from_multi_part_stream(ContentType, DataFun) ->
+    doc_from_multi_part_stream(ContentType, DataFun, make_ref()).
+
+
+doc_from_multi_part_stream(ContentType, DataFun, Ref) ->
+    Parent = self(),
+    NumMpWriters = num_mp_writers(),
+    {Parser, ParserRef} = spawn_monitor(fun() ->
+        ParentRef = erlang:monitor(process, Parent),
+        put(mp_parent_ref, ParentRef),
+        num_mp_writers(NumMpWriters),
+        {<<"--",_/binary>>, _, _} = couch_httpd:parse_multipart_request(
+            ContentType, DataFun,
+            fun(Next) -> mp_parse_doc(Next, []) end),
+        unlink(Parent)
+        end),
+    Parser ! {get_doc_bytes, Ref, self()},
+    receive
+    {started_open_doc_revs, NewRef} ->
+        couch_doc:restart_open_doc_revs(Parser, Ref, NewRef);
+    {doc_bytes, Ref, DocBytes} ->
+        Doc = couch_doc:from_json_obj(?JSON_DECODE(DocBytes)),
+        % we'll send the Parser process ID to the remote nodes so they can
+        % retrieve their own copies of the attachment data
+        WithParser = fun(follows) -> {follows, Parser, Ref}; (D) -> D end,
+        Atts = [couch_att:transform(data, WithParser, A) || A <- Doc#doc.atts],
+        WaitFun = fun() ->
+            receive {'DOWN', ParserRef, _, _, _} -> ok end,
+            erlang:put(mochiweb_request_recv, true)
+        end,
+        {ok, Doc#doc{atts=Atts}, WaitFun, Parser};
+    {'DOWN', ParserRef, _, _, normal} ->
+        ok;
+    {'DOWN', ParserRef, process, Parser, {{nocatch, {Error, Msg}}, _}} ->
+        couch_log:error("Multipart streamer ~p died with reason ~p",
+                        [ParserRef, Msg]),
+        throw({Error, Msg});
+    {'DOWN', ParserRef, _, _, Reason} ->
+        couch_log:error("Multipart streamer ~p died with reason ~p",
+                        [ParserRef, Reason]),
+        throw({error, Reason})
+    end.
+
+
+mp_parse_doc({headers, H}, []) ->
+    case couch_util:get_value("content-type", H) of
+    {"application/json", _} ->
+        fun (Next) ->
+            mp_parse_doc(Next, [])
+        end;
+    _ ->
+        throw({bad_ctype, <<"Content-Type must be application/json">>})
+    end;
+mp_parse_doc({body, Bytes}, AccBytes) ->
+    fun (Next) ->
+        mp_parse_doc(Next, [Bytes | AccBytes])
+    end;
+mp_parse_doc(body_end, AccBytes) ->
+    receive {get_doc_bytes, Ref, From} ->
+        From ! {doc_bytes, Ref, lists:reverse(AccBytes)}
+    end,
+    fun(Next) ->
+        mp_parse_atts(Next, {Ref, [], 0, orddict:new(), []})
+    end.
+
+mp_parse_atts({headers, _}, Acc) ->
+    fun(Next) -> mp_parse_atts(Next, Acc) end;
+mp_parse_atts(body_end, Acc) ->
+    fun(Next) -> mp_parse_atts(Next, Acc) end;
+mp_parse_atts({body, Bytes}, {Ref, Chunks, Offset, Counters, Waiting}) ->
+    case maybe_send_data({Ref, Chunks++[Bytes], Offset, Counters, Waiting}) of
+        abort_parsing ->
+            fun(Next) -> mp_abort_parse_atts(Next, nil) end;
+        NewAcc ->
+            fun(Next) -> mp_parse_atts(Next, NewAcc) end
+    end;
+mp_parse_atts(eof, {Ref, Chunks, Offset, Counters, Waiting}) ->
+    N = num_mp_writers(),
+    M = length(Counters),
+    case (M == N) andalso Chunks == [] of
+    true ->
+        ok;
+    false ->
+        ParentRef = get(mp_parent_ref),
+        receive
+        abort_parsing ->
+            ok;
+        {get_bytes, Ref, From} ->
+            C2 = orddict:update_counter(From, 1, Counters),
+            NewAcc = maybe_send_data({Ref, Chunks, Offset, C2, [From|Waiting]}),
+            mp_parse_atts(eof, NewAcc);
+        {'DOWN', ParentRef, _, _, _} ->
+            exit(mp_reader_coordinator_died)
+        after 3600000 ->
+            ok
+        end
+    end.
+
+mp_abort_parse_atts(eof, _) ->
+    ok;
+mp_abort_parse_atts(_, _) ->
+    fun(Next) -> mp_abort_parse_atts(Next, nil) end.
+
+maybe_send_data({Ref, Chunks, Offset, Counters, Waiting}) ->
+    receive {get_bytes, Ref, From} ->
+        NewCounters = orddict:update_counter(From, 1, Counters),
+        maybe_send_data({Ref, Chunks, Offset, NewCounters, [From|Waiting]})
+    after 0 ->
+        % reply to as many writers as possible
+        NewWaiting = lists:filter(fun(Writer) ->
+            WhichChunk = orddict:fetch(Writer, Counters),
+            ListIndex = WhichChunk - Offset,
+            if ListIndex =< length(Chunks) ->
+                Writer ! {bytes, Ref, lists:nth(ListIndex, Chunks)},
+                false;
+            true ->
+                true
+            end
+        end, Waiting),
+
+        % check if we can drop a chunk from the head of the list
+        case Counters of
+        [] ->
+            SmallestIndex = 0;
+        _ ->
+            SmallestIndex = lists:min(element(2, lists:unzip(Counters)))
+        end,
+        Size = length(Counters),
+        N = num_mp_writers(),
+        if Size == N andalso SmallestIndex == (Offset+1) ->
+            NewChunks = tl(Chunks),
+            NewOffset = Offset+1;
+        true ->
+            NewChunks = Chunks,
+            NewOffset = Offset
+        end,
+
+        % we should wait for a writer if no one has written the last chunk
+        LargestIndex = lists:max([0|element(2, lists:unzip(Counters))]),
+        if LargestIndex  >= (Offset + length(Chunks)) ->
+            % someone has written all possible chunks, keep moving
+            {Ref, NewChunks, NewOffset, Counters, NewWaiting};
+        true ->
+            ParentRef = get(mp_parent_ref),
+            receive
+            abort_parsing ->
+                abort_parsing;
+            {'DOWN', ParentRef, _, _, _} ->
+                exit(mp_reader_coordinator_died);
+            {get_bytes, Ref, X} ->
+                C2 = orddict:update_counter(X, 1, Counters),
+                maybe_send_data({Ref, NewChunks, NewOffset, C2, [X|NewWaiting]})
+            end
+        end
+    end.
+
+
+num_mp_writers(N) ->
+    erlang:put(mp_att_writers, N).
+
+
+num_mp_writers() ->
+    case erlang:get(mp_att_writers) of
+        undefined -> 1;
+        Count -> Count
+    end.
+
+
+abort_multi_part_stream(Parser) ->
+    MonRef = erlang:monitor(process, Parser),
+    Parser ! abort_parsing,
+    receive
+        {'DOWN', MonRef, _, _, _} -> ok
+    after 60000 ->
+        % One minute is quite on purpose for this timeout. We
+        % want to try and read data to keep the socket open
+        % when possible but we also don't want to just make
+        % this a super long timeout because people have to
+        % wait this long to see if they just had an error
+        % like a validate_doc_update failure.
+        throw(multi_part_abort_timeout)
+    end.


Mime
View raw message