couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kocol...@apache.org
Subject svn commit: r775507 - /couchdb/trunk/src/couchdb/couch_rep.erl
Date Sat, 16 May 2009 18:58:18 GMT
Author: kocolosk
Date: Sat May 16 18:58:18 2009
New Revision: 775507

URL: http://svn.apache.org/viewvc?rev=775507&view=rev
Log:
replicator memory management and buffer flush calculation updates

* new should_flush fun considers ndocs, nattachments, memory in making decision
* memory utilized by attachment receivers is accounted for
* download attachments using standalone connections instead of conn pool.  This
  prevents a document request from getting stuck behind a huge attachment, which
  would prevent us from triggering a buffer flush in time.  We also consider the
  memory utilization of the standalone ibrowse connection in should_flush

Modified:
    couchdb/trunk/src/couchdb/couch_rep.erl

Modified: couchdb/trunk/src/couchdb/couch_rep.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_rep.erl?rev=775507&r1=775506&r2=775507&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_rep.erl (original)
+++ couchdb/trunk/src/couchdb/couch_rep.erl Sat May 16 18:58:18 2009
@@ -17,7 +17,12 @@
 
 -export([replicate/2]).
 
+-define(BUFFER_NDOCS, 1000).
+-define(BUFFER_NATTACHMENTS, 50).
+-define(BUFFER_MEMORY, 10000000). %% bytes
+
 -include("couch_db.hrl").
+-include("../ibrowse/ibrowse.hrl").
 
 %% @spec replicate(Source::binary(), Target::binary()) -> 
 %%      {ok, Stats} | {error, Reason}
@@ -202,7 +207,8 @@
     ets:update_counter(Stats, docs_read, length(Docs)),
     
     %% save them (maybe in a buffer)
-    {NewBuffer, NewContext} = case couch_util:should_flush() of
+    {NewBuffer, NewContext} = 
+    case should_flush(lists:flatlength([Docs|Buffer])) of
         true ->
             Docs2 = lists:flatten([Docs|Buffer]),
             {ok, Errors} = update_docs(Target, Docs2, [], replicated_changes),
@@ -222,7 +228,7 @@
     ets:update_counter(State#state.stats, total_revs, RevsCount),
     case State#state.listeners of
     [] ->
-        % still waiting for the first listener to sen a request
+        % still waiting for the first listener to send a request
         {noreply, State#state{current_seq=LastSeq,done=true}};
     _ ->
         {stop, normal, ok, State#state{current_seq=LastSeq}}
@@ -327,13 +333,13 @@
         [Id, couch_doc:rev_to_str(Rev), Error]),
     dump_update_errors(Rest).
 
-attachment_loop(ReqId) ->
+attachment_loop(ReqId, Conn) ->
     couch_util:should_flush(),
     receive 
         {From, {set_req_id, NewId}} ->
             %% we learn the ReqId to listen for
             From ! {self(), {ok, NewId}},
-            attachment_loop(NewId);
+            attachment_loop(NewId, Conn);
         {ibrowse_async_headers, ReqId, Status, Headers} ->
             %% we got header, give the controlling process a chance to react
             receive 
@@ -343,37 +349,42 @@
                     receive
                         {From, continue} -> 
                             %% normal case
-                            attachment_loop(ReqId);
+                            attachment_loop(ReqId, Conn);
                         {From, fail} ->
                             %% error, failure code
                             ?LOG_ERROR(
                                 "streaming attachment failed with status ~p",
                                 [Status]),
+                            catch ibrowse:stop_worker_process(Conn),
                             exit(attachment_request_failed);
                         {From, stop_ok} ->
                             %% stop looping, controller will start a new loop
+                            catch ibrowse:stop_worker_process(Conn),
                             stop_ok
                     end
             end,
-            attachment_loop(ReqId);
+            attachment_loop(ReqId, Conn);
         {ibrowse_async_response, ReqId, {chunk_start,_}} ->
-            attachment_loop(ReqId);
+            attachment_loop(ReqId, Conn);
         {ibrowse_async_response, ReqId, chunk_end} ->
-            attachment_loop(ReqId);
+            attachment_loop(ReqId, Conn);
         {ibrowse_async_response, ReqId, {error, Err}} ->
             ?LOG_ERROR("streaming attachment failed with ~p", [Err]),
+            catch ibrowse:stop_worker_process(Conn),
             exit(attachment_request_failed);
         {ibrowse_async_response, ReqId, Data} -> 
             receive {From, gimme_data} -> From ! {self(), Data} end,
-            attachment_loop(ReqId);
-        {ibrowse_async_response_end, ReqId} -> ok
+            attachment_loop(ReqId, Conn);
+        {ibrowse_async_response_end, ReqId} ->
+            catch ibrowse:stop_worker_process(Conn),
+            exit(normal)
     end.
 
 attachment_stub_converter(DbS, Id, Rev, {Name, {stub, Type, Length}}) ->
     #http_db{uri=DbUrl, headers=Headers} = DbS,
     {Pos, [RevId|_]} = Rev,
     Url = lists:flatten([DbUrl, url_encode(Id), "/", url_encode(?b2l(Name)),
-        "?rev=", couch_doc:rev_to_str({Pos,RevId})]),
+        "?rev=", ?b2l(couch_doc:rev_to_str({Pos,RevId}))]),
     ?LOG_DEBUG("Attachment URL ~p", [Url]),
     {ok, RcvFun} = make_attachment_stub_receiver(Url, Headers, Name, 
         Type, Length),
@@ -389,11 +400,14 @@
     
 make_attachment_stub_receiver(Url, Headers, Name, Type, Length, Retries) ->
     %% start the process that receives attachment data from ibrowse
-    Pid = spawn_link(fun() -> attachment_loop(nil) end),
+    #url{host=Host, port=Port} = ibrowse_lib:parse_url(Url),
+    {ok, Conn} = ibrowse:spawn_link_worker_process(Host, Port),
+    Pid = spawn_link(fun() -> attachment_loop(nil, Conn) end),
     
     %% make the async request
-    Options = [{stream_to, Pid}, {response_format, binary}],
-    ReqId = case ibrowse:send_req(Url, Headers, get, [], Options, infinity) of
+    Opts = [{stream_to, Pid}, {response_format, binary}],
+    ReqId = 
+    case ibrowse:send_req_direct(Conn, Url, Headers, get, [], Opts, infinity) of
         {ibrowse_req_id, X} -> X;
         {error, _Reason} -> exit(attachment_request_failed)
     end,
@@ -717,6 +731,46 @@
 open_doc_revs(Db, DocId, Revs, Options) ->
     couch_db:open_doc_revs(Db, DocId, Revs, Options).
 
+%% @spec should_flush() -> true | false
+%% @doc Calculates whether it's time to flush the document buffer. Considers
+%%        - memory utilization
+%%        - number of pending document writes
+%%        - approximate number of pending attachment writes
+should_flush(DocCount) when DocCount > ?BUFFER_NDOCS ->
+    true;
+should_flush(_DocCount) ->
+    MeAndMyLinks = [self()|element(2,process_info(self(),links))],
+    
+    case length(MeAndMyLinks)/2 > ?BUFFER_NATTACHMENTS of
+    true -> true;
+    false ->
+        case memory_footprint(MeAndMyLinks) > 2*?BUFFER_MEMORY of
+        true ->
+            [garbage_collect(Pid) || Pid <- MeAndMyLinks],
+            memory_footprint(MeAndMyLinks) > ?BUFFER_MEMORY;
+        false -> false
+        end
+    end.
+
+%% @spec memory_footprint([pid()]) -> integer()
+%% @doc Sum of process and binary memory utilization for all processes in list
+memory_footprint(PidList) ->
+    ProcessMemory = lists:foldl(fun(Pid, Acc) ->
+        Acc + element(2,process_info(Pid, memory))
+    end, 0, PidList),
+    
+    BinaryMemory = lists:foldl(fun(Pid, Acc) ->
+        Acc + binary_memory(Pid)
+    end, 0, PidList),
+    
+    ?LOG_DEBUG("ProcessMem ~p BinaryMem ~p", [ProcessMemory, BinaryMemory]),
+    ProcessMemory + BinaryMemory.
+
+%% @spec binary_memory(pid()) -> integer()
+%% @doc Memory utilization of all binaries referenced by this process.
+binary_memory(Pid) ->
+    lists:foldl(fun({_Id, Size, _NRefs}, Acc) -> Size+Acc end,
+        0, element(2,process_info(Pid, binary))).
 
 update_doc(#http_db{uri=DbUrl, headers=Headers}, #doc{id=DocId}=Doc, Options) ->
     [] = Options,



Mime
View raw message