couchdb-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Adam Kocoloski <kocol...@apache.org>
Subject Re: svn commit: r775507 - /couchdb/trunk/src/couchdb/couch_rep.erl
Date Sat, 16 May 2009 20:20:39 GMT
Ok, so here's a start at reworking some of the memory management and  
buffering calculations.  It fixes the regression where attachment  
memory wasn't being included in the memory utilization numbers, and it  
also includes ibrowse memory utilization for attachments (which is  
larger than Couch's).

The decision to flush the buffer (to disk or to the remote target  
server) is dependent on the number of docs in the buffer, the  
approximate number of attachments, and the memory utilization.  I  
estimate the number of attachments as 0.5*nlinks, since every  
attachment download spawns two processes: one dedicated ibrowse worker  
and the attachment receiver.  The dedicated ibrowse workers get the  
attachments out of the connection pool and let us keep a better eye on  
their memory usage.

Each of the thresholds is currently just defined as a macro at the top  
of the module.  I haven't done any work on adjusting these thresholds  
dynamically or checkpointing as a function of elapsed time.

The replication module is getting pretty hairy again; in my opinion  
its probably time to refactor out the attachment stuff into its own  
module.  I may get around to that tomorrow if no one objects.

Best, Adam

On May 16, 2009, at 2:58 PM, kocolosk@apache.org wrote:

> Author: kocolosk
> Date: Sat May 16 18:58:18 2009
> New Revision: 775507
>
> URL: http://svn.apache.org/viewvc?rev=775507&view=rev
> Log:
> replicator memory management and buffer flush calculation updates
>
> * new should_flush fun considers ndocs, nattachments, memory in  
> making decision
> * memory utilized by attachment receivers is accounted for
> * download attachments using standalone connections instead of conn  
> pool.  This
>  prevents a document request from getting stuck behind a huge  
> attachment, which
>  would prevent us from triggering a buffer flush in time.  We also  
> consider the
>  memory utilization of the standalone ibrowse connection in  
> should_flush
>
> Modified:
>    couchdb/trunk/src/couchdb/couch_rep.erl
>
> Modified: couchdb/trunk/src/couchdb/couch_rep.erl
> URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_rep.erl?rev=775507&r1=775506&r2=775507&view=diff
> = 
> = 
> = 
> = 
> = 
> = 
> = 
> = 
> ======================================================================
> --- couchdb/trunk/src/couchdb/couch_rep.erl (original)
> +++ couchdb/trunk/src/couchdb/couch_rep.erl Sat May 16 18:58:18 2009
> @@ -17,7 +17,12 @@
>
> -export([replicate/2]).
>
> +-define(BUFFER_NDOCS, 1000).
> +-define(BUFFER_NATTACHMENTS, 50).
> +-define(BUFFER_MEMORY, 10000000). %% bytes
> +
> -include("couch_db.hrl").
> +-include("../ibrowse/ibrowse.hrl").
>
> %% @spec replicate(Source::binary(), Target::binary()) ->
> %%      {ok, Stats} | {error, Reason}
> @@ -202,7 +207,8 @@
>     ets:update_counter(Stats, docs_read, length(Docs)),
>
>     %% save them (maybe in a buffer)
> -    {NewBuffer, NewContext} = case couch_util:should_flush() of
> +    {NewBuffer, NewContext} =
> +    case should_flush(lists:flatlength([Docs|Buffer])) of
>         true ->
>             Docs2 = lists:flatten([Docs|Buffer]),
>             {ok, Errors} = update_docs(Target, Docs2, [],  
> replicated_changes),
> @@ -222,7 +228,7 @@
>     ets:update_counter(State#state.stats, total_revs, RevsCount),
>     case State#state.listeners of
>     [] ->
> -        % still waiting for the first listener to sen a request
> +        % still waiting for the first listener to send a request
>         {noreply, State#state{current_seq=LastSeq,done=true}};
>     _ ->
>         {stop, normal, ok, State#state{current_seq=LastSeq}}
> @@ -327,13 +333,13 @@
>         [Id, couch_doc:rev_to_str(Rev), Error]),
>     dump_update_errors(Rest).
>
> -attachment_loop(ReqId) ->
> +attachment_loop(ReqId, Conn) ->
>     couch_util:should_flush(),
>     receive
>         {From, {set_req_id, NewId}} ->
>             %% we learn the ReqId to listen for
>             From ! {self(), {ok, NewId}},
> -            attachment_loop(NewId);
> +            attachment_loop(NewId, Conn);
>         {ibrowse_async_headers, ReqId, Status, Headers} ->
>             %% we got header, give the controlling process a chance  
> to react
>             receive
> @@ -343,37 +349,42 @@
>                     receive
>                         {From, continue} ->
>                             %% normal case
> -                            attachment_loop(ReqId);
> +                            attachment_loop(ReqId, Conn);
>                         {From, fail} ->
>                             %% error, failure code
>                             ?LOG_ERROR(
>                                 "streaming attachment failed with  
> status ~p",
>                                 [Status]),
> +                            catch ibrowse:stop_worker_process(Conn),
>                             exit(attachment_request_failed);
>                         {From, stop_ok} ->
>                             %% stop looping, controller will start a  
> new loop
> +                            catch ibrowse:stop_worker_process(Conn),
>                             stop_ok
>                     end
>             end,
> -            attachment_loop(ReqId);
> +            attachment_loop(ReqId, Conn);
>         {ibrowse_async_response, ReqId, {chunk_start,_}} ->
> -            attachment_loop(ReqId);
> +            attachment_loop(ReqId, Conn);
>         {ibrowse_async_response, ReqId, chunk_end} ->
> -            attachment_loop(ReqId);
> +            attachment_loop(ReqId, Conn);
>         {ibrowse_async_response, ReqId, {error, Err}} ->
>             ?LOG_ERROR("streaming attachment failed with ~p", [Err]),
> +            catch ibrowse:stop_worker_process(Conn),
>             exit(attachment_request_failed);
>         {ibrowse_async_response, ReqId, Data} ->
>             receive {From, gimme_data} -> From ! {self(), Data} end,
> -            attachment_loop(ReqId);
> -        {ibrowse_async_response_end, ReqId} -> ok
> +            attachment_loop(ReqId, Conn);
> +        {ibrowse_async_response_end, ReqId} ->
> +            catch ibrowse:stop_worker_process(Conn),
> +            exit(normal)
>     end.
>
> attachment_stub_converter(DbS, Id, Rev, {Name, {stub, Type,  
> Length}}) ->
>     #http_db{uri=DbUrl, headers=Headers} = DbS,
>     {Pos, [RevId|_]} = Rev,
>     Url = lists:flatten([DbUrl, url_encode(Id), "/", url_encode(? 
> b2l(Name)),
> -        "?rev=", couch_doc:rev_to_str({Pos,RevId})]),
> +        "?rev=", ?b2l(couch_doc:rev_to_str({Pos,RevId}))]),
>     ?LOG_DEBUG("Attachment URL ~p", [Url]),
>     {ok, RcvFun} = make_attachment_stub_receiver(Url, Headers, Name,
>         Type, Length),
> @@ -389,11 +400,14 @@
>
> make_attachment_stub_receiver(Url, Headers, Name, Type, Length,  
> Retries) ->
>     %% start the process that receives attachment data from ibrowse
> -    Pid = spawn_link(fun() -> attachment_loop(nil) end),
> +    #url{host=Host, port=Port} = ibrowse_lib:parse_url(Url),
> +    {ok, Conn} = ibrowse:spawn_link_worker_process(Host, Port),
> +    Pid = spawn_link(fun() -> attachment_loop(nil, Conn) end),
>
>     %% make the async request
> -    Options = [{stream_to, Pid}, {response_format, binary}],
> -    ReqId = case ibrowse:send_req(Url, Headers, get, [], Options,  
> infinity) of
> +    Opts = [{stream_to, Pid}, {response_format, binary}],
> +    ReqId =
> +    case ibrowse:send_req_direct(Conn, Url, Headers, get, [], Opts,  
> infinity) of
>         {ibrowse_req_id, X} -> X;
>         {error, _Reason} -> exit(attachment_request_failed)
>     end,
> @@ -717,6 +731,46 @@
> open_doc_revs(Db, DocId, Revs, Options) ->
>     couch_db:open_doc_revs(Db, DocId, Revs, Options).
>
> +%% @spec should_flush() -> true | false
> +%% @doc Calculates whether it's time to flush the document buffer.  
> Considers
> +%%        - memory utilization
> +%%        - number of pending document writes
> +%%        - approximate number of pending attachment writes
> +should_flush(DocCount) when DocCount > ?BUFFER_NDOCS ->
> +    true;
> +should_flush(_DocCount) ->
> +    MeAndMyLinks = [self()|element(2,process_info(self(),links))],
> +
> +    case length(MeAndMyLinks)/2 > ?BUFFER_NATTACHMENTS of
> +    true -> true;
> +    false ->
> +        case memory_footprint(MeAndMyLinks) > 2*?BUFFER_MEMORY of
> +        true ->
> +            [garbage_collect(Pid) || Pid <- MeAndMyLinks],
> +            memory_footprint(MeAndMyLinks) > ?BUFFER_MEMORY;
> +        false -> false
> +        end
> +    end.
> +
> +%% @spec memory_footprint([pid()]) -> integer()
> +%% @doc Sum of process and binary memory utilization for all  
> processes in list
> +memory_footprint(PidList) ->
> +    ProcessMemory = lists:foldl(fun(Pid, Acc) ->
> +        Acc + element(2,process_info(Pid, memory))
> +    end, 0, PidList),
> +
> +    BinaryMemory = lists:foldl(fun(Pid, Acc) ->
> +        Acc + binary_memory(Pid)
> +    end, 0, PidList),
> +
> +    ?LOG_DEBUG("ProcessMem ~p BinaryMem ~p", [ProcessMemory,  
> BinaryMemory]),
> +    ProcessMemory + BinaryMemory.
> +
> +%% @spec binary_memory(pid()) -> integer()
> +%% @doc Memory utilization of all binaries referenced by this  
> process.
> +binary_memory(Pid) ->
> +    lists:foldl(fun({_Id, Size, _NRefs}, Acc) -> Size+Acc end,
> +        0, element(2,process_info(Pid, binary))).
>
> update_doc(#http_db{uri=DbUrl, headers=Headers}, #doc{id=DocId}=Doc,  
> Options) ->
>     [] = Options,
>
>


Mime
View raw message