This is an automated email from the ASF dual-hosted git repository.
davisp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/couchdb.git
The following commit(s) were added to refs/heads/master by this push:
new 0832393 Prevent chttpd multipart zombie processes
0832393 is described below
commit 083239353e919e897b97e8a96ee07cb42ca4eccd
Author: Jan Lehnardt <jan@apache.org>
AuthorDate: Tue Feb 13 15:32:29 2018 +0100
Prevent chttpd multipart zombie processes
Occasionally it's possible to lose track of our RPC workers in the main
multipart parsing code. This change monitors each worker process and
then exits if all workers have exited before the parser considers itself
finished.
Fixes part of #745
---
src/couch/src/couch_httpd_multipart.erl | 77 ++++++++++++++++++++++++++++-----
1 file changed, 65 insertions(+), 12 deletions(-)
diff --git a/src/couch/src/couch_httpd_multipart.erl b/src/couch/src/couch_httpd_multipart.erl
index 6ce3c76..e556b28 100644
--- a/src/couch/src/couch_httpd_multipart.erl
+++ b/src/couch/src/couch_httpd_multipart.erl
@@ -99,12 +99,24 @@ mp_parse_atts(eof, {Ref, Chunks, Offset, Counters, Waiting}) ->
abort_parsing ->
ok;
{get_bytes, Ref, From} ->
- C2 = orddict:update_counter(From, 1, Counters),
- NewAcc = maybe_send_data({Ref, Chunks, Offset, C2, [From|Waiting]}),
- mp_parse_atts(eof, NewAcc);
+ C2 = update_writer(From, Counters),
+ case maybe_send_data({Ref, Chunks, Offset, C2, [From|Waiting]}) of
+ abort_parsing ->
+ ok;
+ NewAcc ->
+ mp_parse_atts(eof, NewAcc)
+ end;
{'DOWN', ParentRef, _, _, _} ->
- exit(mp_reader_coordinator_died)
- after 3600000 ->
+ exit(mp_reader_coordinator_died);
+ {'DOWN', WriterRef, _, WriterPid, _} ->
+ case remove_writer(WriterPid, WriterRef, Counters) of
+ abort_parsing ->
+ ok;
+ C2 ->
+ NewAcc = {Ref, Chunks, Offset, C2, Waiting -- [WriterPid]},
+ mp_parse_atts(eof, NewAcc)
+ end
+ after 300000 ->
ok
end
end.
@@ -116,12 +128,12 @@ mp_abort_parse_atts(_, _) ->
maybe_send_data({Ref, Chunks, Offset, Counters, Waiting}) ->
receive {get_bytes, Ref, From} ->
- NewCounters = orddict:update_counter(From, 1, Counters),
+ NewCounters = update_writer(From, Counters),
maybe_send_data({Ref, Chunks, Offset, NewCounters, [From|Waiting]})
after 0 ->
% reply to as many writers as possible
NewWaiting = lists:filter(fun(Writer) ->
- WhichChunk = orddict:fetch(Writer, Counters),
+ {_, WhichChunk} = orddict:fetch(Writer, Counters),
ListIndex = WhichChunk - Offset,
if ListIndex =< length(Chunks) ->
Writer ! {bytes, Ref, lists:nth(ListIndex, Chunks)},
@@ -132,11 +144,11 @@ maybe_send_data({Ref, Chunks, Offset, Counters, Waiting}) ->
end, Waiting),
% check if we can drop a chunk from the head of the list
- case Counters of
+ SmallestIndex = case Counters of
[] ->
- SmallestIndex = 0;
+ 0;
_ ->
- SmallestIndex = lists:min(element(2, lists:unzip(Counters)))
+ lists:min([C || {_WPid, {_WRef, C}} <- Counters])
end,
Size = length(Counters),
N = num_mp_writers(),
@@ -149,7 +161,7 @@ maybe_send_data({Ref, Chunks, Offset, Counters, Waiting}) ->
end,
% we should wait for a writer if no one has written the last chunk
- LargestIndex = lists:max([0|element(2, lists:unzip(Counters))]),
+ LargestIndex = lists:max([0] ++ [C || {_WPid, {_WRef, C}} <- Counters]),
if LargestIndex >= (Offset + length(Chunks)) ->
% someone has written all possible chunks, keep moving
{Ref, NewChunks, NewOffset, Counters, NewWaiting};
@@ -160,14 +172,55 @@ maybe_send_data({Ref, Chunks, Offset, Counters, Waiting}) ->
abort_parsing;
{'DOWN', ParentRef, _, _, _} ->
exit(mp_reader_coordinator_died);
+ {'DOWN', WriterRef, _, WriterPid, _} ->
+ case remove_writer(WriterPid, WriterRef, Counters) of
+ abort_parsing ->
+ abort_parsing;
+ C2 ->
+ RestWaiting = NewWaiting -- [WriterPid],
+ NewAcc = {Ref, NewChunks, NewOffset, C2, RestWaiting},
+ maybe_send_data(NewAcc)
+ end;
{get_bytes, Ref, X} ->
- C2 = orddict:update_counter(X, 1, Counters),
+ C2 = update_writer(X, Counters),
maybe_send_data({Ref, NewChunks, NewOffset, C2, [X|NewWaiting]})
+ after 300000 ->
+ abort_parsing
end
end
end.
+update_writer(WriterPid, Counters) ->
+ UpdateFun = fun({WriterRef, Count}) -> {WriterRef, Count + 1} end,
+ InitialValue = case orddict:find(WriterPid, Counters) of
+ {ok, IV} ->
+ IV;
+ error ->
+ WriterRef = erlang:monitor(process, WriterPid),
+ {WriterRef, 1}
+ end,
+ orddict:update(WriterPid, UpdateFun, InitialValue, Counters).
+
+
+remove_writer(WriterPid, WriterRef, Counters) ->
+ case orddict:find(WriterPid, Counters) of
+ {ok, {WriterRef, _}} ->
+ case num_mp_writers() of
+ N when N > 1 ->
+ num_mp_writers(N - 1);
+ _ ->
+ abort_parsing
+ end;
+ {ok, _} ->
+ % We got a different ref fired for a known worker
+ abort_parsing;
+ error ->
+ % Unknown worker pid?
+ abort_parsing
+ end.
+
+
num_mp_writers(N) ->
erlang:put(mp_att_writers, N).
--
To stop receiving notification emails like this one, please contact
davisp@apache.org.
|