Return-Path: X-Original-To: apmail-couchdb-commits-archive@www.apache.org Delivered-To: apmail-couchdb-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1B77C1172B for ; Fri, 1 Aug 2014 12:06:03 +0000 (UTC) Received: (qmail 11872 invoked by uid 500); 1 Aug 2014 12:06:02 -0000 Delivered-To: apmail-couchdb-commits-archive@couchdb.apache.org Received: (qmail 11754 invoked by uid 500); 1 Aug 2014 12:06:02 -0000 Mailing-List: contact commits-help@couchdb.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@couchdb.apache.org Delivered-To: mailing list commits@couchdb.apache.org Received: (qmail 11467 invoked by uid 99); 1 Aug 2014 12:06:02 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 01 Aug 2014 12:06:02 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 56FB69BCFEA; Fri, 1 Aug 2014 12:06:02 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: rnewson@apache.org To: commits@couchdb.apache.org Date: Fri, 01 Aug 2014 12:06:10 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [09/11] git commit: Use gen_server:cast rather than rexi RPCs Use gen_server:cast rather than rexi RPCs Prior to this commit, the global_changes_listener would send updates to the global_changes_server via a rexi RPC which only did a gen_server:call. This is needlessly heavyweight and would cause gen_server:call timeout errors when the global_changes_server was slow (usually due to slow disk IO). This commit removes the rexi RPC and replaces it with a gen_server:cast. Since this causes all global_changes_server handlers to not reply, this commit also removes the format_reply function as it's unnecessary. BugzID: 28242 Project: http://git-wip-us.apache.org/repos/asf/couchdb-global-changes/repo Commit: http://git-wip-us.apache.org/repos/asf/couchdb-global-changes/commit/b226a241 Tree: http://git-wip-us.apache.org/repos/asf/couchdb-global-changes/tree/b226a241 Diff: http://git-wip-us.apache.org/repos/asf/couchdb-global-changes/diff/b226a241 Branch: refs/heads/windsor-merge Commit: b226a241a8cda786d16e951bb9ffaf374c657268 Parents: 2b2005a Author: Benjamin Bastian Authored: Wed Feb 19 13:41:28 2014 -0800 Committer: Robert Newson Committed: Fri Aug 1 13:03:55 2014 +0100 ---------------------------------------------------------------------- src/global_changes_listener.erl | 3 +- src/global_changes_server.erl | 55 ++++++++++++++++-------------------- 2 files changed, 25 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/couchdb-global-changes/blob/b226a241/src/global_changes_listener.erl ---------------------------------------------------------------------- diff --git a/src/global_changes_listener.erl b/src/global_changes_listener.erl index d836f2b..ccee705 100644 --- a/src/global_changes_listener.erl +++ b/src/global_changes_listener.erl @@ -114,8 +114,7 @@ maybe_send_updates(#state{update_db=true}=State) -> Grouped -> dict:map(fun(Node, Docs) -> Metric = [global_changes, rpcs], - MFA = {global_changes_server, update_docs, [Docs]}, - rexi:cast(Node, MFA) + global_changes_server:update_docs(Node, Docs) end, Grouped) catch error:database_does_not_exist -> ok http://git-wip-us.apache.org/repos/asf/couchdb-global-changes/blob/b226a241/src/global_changes_server.erl ---------------------------------------------------------------------- diff --git a/src/global_changes_server.erl b/src/global_changes_server.erl index 619313f..966d866 100644 --- a/src/global_changes_server.erl +++ b/src/global_changes_server.erl @@ -19,7 +19,7 @@ ]). -export([ - update_docs/1 + update_docs/2 ]). @@ -67,20 +67,23 @@ terminate(_Reason, _Srv) -> ok. -handle_call(_Msg, _From, #state{update_db=false}=State) -> - {reply, ok, State}; -handle_call({update_docs, DocIds}, _From, State) -> +handle_call(_Msg, _From, State) -> + {reply, ok, State}. + + +handle_cast(_Msg, #state{update_db=false}=State) -> + {noreply, State}; +handle_cast({update_docs, DocIds}, State) -> Pending = sets:union(sets:from_list(DocIds), State#state.pending_updates), NewState = State#state{ pending_updates=Pending, pending_update_count=sets:size(Pending) }, - format_reply(reply, maybe_update_docs(NewState)). - + maybe_update_docs(NewState); handle_cast({set_max_write_delay, MaxWriteDelay}, State) -> NewState = State#state{max_write_delay=MaxWriteDelay}, - format_reply(noreply, maybe_update_docs(NewState)); + maybe_update_docs(NewState); handle_cast({set_update_db, Boolean}, State0) -> % If turning update_db off, clear out server state State = case {Boolean, State0#state.update_db} of @@ -94,9 +97,9 @@ handle_cast({set_update_db, Boolean}, State0) -> _ -> State0#state{update_db=Boolean} end, - format_reply(noreply, maybe_update_docs(State)); + maybe_update_docs(State); handle_cast(_Msg, State) -> - format_reply(noreply, maybe_update_docs(State)). + maybe_update_docs(State). handle_info(start_listener, State) -> @@ -104,13 +107,13 @@ handle_info(start_listener, State) -> NewState = State#state{ handler_ref=erlang:monitor(process, Handler) }, - format_reply(noreply, maybe_update_docs(NewState)); + maybe_update_docs(NewState); handle_info({'DOWN', Ref, _, _, Reason}, #state{handler_ref=Ref}=State) -> couch_log:error("global_changes_listener terminated: ~w", [Reason]), erlang:send_after(5000, self(), start_listener), - format_reply(noreply, maybe_update_docs(State)); + maybe_update_docs(State); handle_info(_, State) -> - format_reply(noreply, maybe_update_docs(State)). + maybe_update_docs(State). code_change(_OldVsn, State, _Extra) -> @@ -118,13 +121,13 @@ code_change(_OldVsn, State, _Extra) -> maybe_update_docs(#state{pending_update_count=0}=State) -> - State; + {noreply, State}; maybe_update_docs(#state{update_db=true}=State) -> #state{max_write_delay=MaxWriteDelay, last_update_time=LastUpdateTime} = State, Now = os:timestamp(), case LastUpdateTime of undefined -> - {State#state{last_update_time=Now}, MaxWriteDelay}; + {noreply, State#state{last_update_time=Now}, MaxWriteDelay}; _ -> Delta = round(timer:now_diff(Now, LastUpdateTime)/1000), if Delta >= MaxWriteDelay -> @@ -147,23 +150,23 @@ maybe_update_docs(#state{update_db=true}=State) -> Count = State#state.pending_update_count, catch error:database_does_not_exist -> - ok + {noreply, State} end, - State#state{ + {noreply, State#state{ pending_updates=sets:new(), pending_update_count=0, last_update_time=undefined - }; + }}; true -> - {State, MaxWriteDelay-Delta} + {noreply, State, MaxWriteDelay-Delta} end end; maybe_update_docs(State) -> - State. + {noreply, State}. -update_docs(Updates) -> - gen_server:call(?MODULE, {update_docs, Updates}). +update_docs(Node, Updates) -> + gen_server:cast({?MODULE, Node}, {update_docs, Updates}). group_ids_by_shard(DbName, DocIds) -> @@ -179,16 +182,6 @@ group_ids_by_shard(DbName, DocIds) -> end, dict:new(), DocIds). -format_reply(reply, #state{}=State) -> - {reply, ok, State}; -format_reply(reply, {State, Timeout}) -> - {reply, ok, State, Timeout}; -format_reply(noreply, #state{}=State) -> - {noreply, State}; -format_reply(noreply, {State, Timeout}) -> - {noreply, State, Timeout}. - - get_docs_locally(Shard, Ids) -> lists:map(fun(Id) -> DocInfo = couch_db:get_doc_info(Shard, Id),