couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rnew...@apache.org
Subject [19/27] rexi commit: updated refs/heads/windsor-merge to 096f0cf
Date Fri, 01 Aug 2014 09:06:48 GMT
Rename governor to buffer

Buffer describes the behavior quite a bit better than governor.

BugzId: 23717
BugzId: 23718


Project: http://git-wip-us.apache.org/repos/asf/couchdb-rexi/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-rexi/commit/5b589962
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-rexi/tree/5b589962
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-rexi/diff/5b589962

Branch: refs/heads/windsor-merge
Commit: 5b5899627db710cab64d1c724cc4ad4d7447b9d8
Parents: 8f9d160
Author: Paul J. Davis <paul.joseph.davis@gmail.com>
Authored: Tue Dec 10 09:59:16 2013 -0600
Committer: Robert Newson <rnewson@apache.org>
Committed: Wed Jul 23 18:04:13 2014 +0100

----------------------------------------------------------------------
 src/rexi_buffer.erl      |  96 ++++++++++++++++++++++++++
 src/rexi_gov_manager.erl | 157 ------------------------------------------
 src/rexi_governor.erl    |  96 --------------------------
 src/rexi_sup.erl         |   8 +--
 src/rexi_utils.erl       |   2 +-
 5 files changed, 101 insertions(+), 258 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-rexi/blob/5b589962/src/rexi_buffer.erl
----------------------------------------------------------------------
diff --git a/src/rexi_buffer.erl b/src/rexi_buffer.erl
new file mode 100644
index 0000000..b096c5b
--- /dev/null
+++ b/src/rexi_buffer.erl
@@ -0,0 +1,96 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+-module(rexi_buffer).
+
+-behaviour(gen_server).
+-vsn(1).
+
+%  gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+         terminate/2, code_change/3]).
+
+-export ([
+    send/2,
+    start_link/1
+]).
+
+-record(state, {
+    buffer = queue:new(),
+    sender = nil,
+    count = 0
+}).
+
+%% TODO Leverage os_mon to discover available memory in the system
+-define (MAX_MEMORY, 17179869184).
+
+start_link(ServerId) ->
+    gen_server:start_link({local, ServerId}, ?MODULE, nil, []).
+
+send(Dest, Msg) ->
+    Server = list_to_atom(lists:concat([rexi_buffer, "_", get_node(Dest)])),
+    gen_server:cast(Server, {deliver, Dest, Msg}).
+
+
+init(_) ->
+    {ok, #state{}}.
+
+handle_call(get_buffered_count, _From, State) ->
+    {reply, State#state.count, State, 0}.
+
+handle_cast({deliver, Dest, Msg}, #state{buffer = Q, count = C} = State) ->
+    margaret_counter:increment([erlang, rexi, buffered]),
+    Q2 = queue:in({Dest, Msg}, Q),
+    case should_drop() of
+    true ->
+            {noreply, State#state{buffer = queue:drop(Q2)}, 0};
+    false ->
+            {noreply, State#state{buffer = Q2, count = C+1}, 0}
+    end.
+
+handle_info(timeout, #state{sender = nil} = State) ->
+    #state{buffer = Q, count = C} = State,
+    Sender = case queue:out_r(Q) of
+        {{value, {Dest, Msg}}, Q2} ->
+            case erlang:send(Dest, Msg, [noconnect, nosuspend]) of
+                ok ->
+                    nil;
+                _Else ->
+                    spawn_monitor(erlang, send, [Dest, Msg])
+            end;
+        {empty, Q2} ->
+            nil
+    end,
+    if Sender =:= nil, C > 1 ->
+        {noreply, State#state{buffer = Q2, count = C-1}, 0};
+    true ->
+        {noreply, State#state{buffer = Q2, sender = Sender, count = C-1}}
+    end;
+handle_info(timeout, State) ->
+    % Waiting on a sender to return
+    {noreply, State};
+
+handle_info({'DOWN', Ref, _, Pid, _}, #state{sender = {Pid, Ref}} = State) ->
+    {noreply, State#state{sender = nil}, 0}.
+
+terminate(_Reason, _State) ->
+    ok.
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+should_drop() ->
+    erlang:memory(total) > ?MAX_MEMORY.
+
+get_node({_, Node}) when is_atom(Node) ->
+    Node;
+get_node(Pid) when is_pid(Pid) ->
+    node(Pid).

http://git-wip-us.apache.org/repos/asf/couchdb-rexi/blob/5b589962/src/rexi_gov_manager.erl
----------------------------------------------------------------------
diff --git a/src/rexi_gov_manager.erl b/src/rexi_gov_manager.erl
deleted file mode 100644
index 46cbe53..0000000
--- a/src/rexi_gov_manager.erl
+++ /dev/null
@@ -1,157 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-%   http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
--module(rexi_gov_manager).
-
--behaviour(gen_server).
--vsn(1).
--behaviour(config_listener).
-
-% API
--export([start_link/0, send/2]).
-
-% gen_server callbacks
--export([init/1, handle_call/3, handle_cast/2, handle_info/2,
-         terminate/2, code_change/3]).
--export([handle_config_change/5]).
-
--record(state, {node_timers = ets:new(timers, [set]),
-                nodeout_timeout = 2000,
-                pid_spawn_max = 10000}).
-
-
-% API
-
-start_link() ->
-    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-
-send(Dest, Msg) ->
-    case erlang:send(Dest, Msg, [noconnect, nosuspend]) of
-    ok -> ok;
-    _ ->
-        % treat nosuspend and noconnect the same
-        {ok, Governor} = get_governor(get_node(Dest)),
-        gen_server:cast(Governor, {spawn_and_track, Dest, Msg})
-    end.
-
-get_node({_, Node}) when is_atom(Node) ->
-    Node;
-get_node(Pid) when is_pid(Pid) ->
-    node(Pid).
-
-get_governor(Node) ->
-    case ets:lookup(govs, Node) of
-    [{Node, Gov}] ->
-        {ok, Gov};
-    [] ->
-        gen_server:call(?MODULE, {get_governor, Node})
-    end.
-
-% gen_server callbacks
-
-init([]) ->
-    ets:new(govs, [named_table, set, {read_concurrency, true}]),
-    net_kernel:monitor_nodes(true),
-    NodeOutTimeout = config:get("rexi","nodeout_timeout","500"),
-    PidSpawnMax = config:get("rexi","pid_spawn_max", "10000"),
-    State = #state{
-        nodeout_timeout = list_to_integer(NodeOutTimeout),
-        pid_spawn_max = list_to_integer(PidSpawnMax)
-    },
-    config:listen_for_changes(?MODULE, State),
-    {ok, State}.
-
-handle_config_change("rexi", "nodeout_timeout", Value, _, State) ->
-    IntValue = list_to_integer(Value),
-    %% Setting the timeout is cheap, no need to check if it actually changed
-    gen_server:call(?MODULE, {set_timeout, IntValue}),
-    {ok, State#state{nodeout_timeout = IntValue}};
-handle_config_change("rexi", "pid_spawn_max", Value, _, State) ->
-    IntValue = list_to_integer(Value),
-    %% Setting the timeout is cheap, no need to check if it actually changed
-    gen_server:call(?MODULE, {set_spawn_max, IntValue}),
-    {ok, State#state{pid_spawn_max = IntValue}};
-handle_config_change(_, _, _, _, State) ->
-    {ok, State}.
-
-handle_call({set_timeout, TO}, _, #state{nodeout_timeout = Old} = State) ->
-    {reply, Old, State#state{nodeout_timeout = TO}};
-handle_call({set_spawn_max, Max}, _, #state{pid_spawn_max = Old} = State) ->
-    {reply, Old, State#state{pid_spawn_max = Max}};
-handle_call({get_governor, Node}, _From,
-            #state{pid_spawn_max = PidSpawnMax} = State) ->
-    case ets:lookup(govs, Node) of
-    [] ->
-        {ok, Gov} = gen_server:start_link(rexi_governor, [PidSpawnMax], []),
-        ets:insert(govs, {Node, Gov});
-    [{Node, Gov}] ->
-        Gov
-    end,
-    {reply, {ok, Gov}, State}.
-
-handle_cast(_Msg, State) ->
-    {noreply, State}.
-
-handle_info({nodeup, Node}, #state{node_timers = Timers,
-                                   pid_spawn_max = PidSpawnMax} = State) ->
-    case ets:lookup(Timers, Node) of
-    [{Node, TRef}] ->
-        erlang:cancel_timer(TRef),
-        ets:delete(Timers, Node);
-    _ ->
-        ok
-    end,
-    case ets:lookup(govs, Node) of
-    [{Node, _}] ->
-        ok;
-    [] ->
-        {ok, Gov} = gen_server:start_link(rexi_governor, [PidSpawnMax], []),
-        ets:insert(govs, {Node, Gov})
-    end,
-    {noreply, State};
-
-handle_info({nodedown, Node}, #state{node_timers = Timers,
-                                     nodeout_timeout = NodeTimeout} = State) ->
-    case ets:lookup(Timers, Node) of
-    [] ->
-        TRef = erlang:send_after(NodeTimeout, self(), {nodeout, Node}),
-        ets:insert(Timers, {Node, TRef}),
-        {noreply, State};
-    _ ->
-        {noreply, State}
-    end;
-
-handle_info({nodeout, Node}, #state{node_timers = Timers} = State) ->
-    % check for race with node up
-    case ets:member(Timers, Node) of
-    true ->
-        ets:delete(Timers, Node),
-        case ets:lookup(govs, Node) of
-        [] ->
-            ok;
-        [{Node, Governor}] ->
-            gen_server:cast(Governor, nodeout)
-        end;
-    false ->
-        ok
-    end,
-    {noreply, State};
-
-handle_info(_Info, State) ->
-    {noreply, State}.
-
-terminate(_Reason, _State) ->
-    ok.
-
-code_change(_OldVsn, State, _Extra) ->
-    {ok, State}.
-
-% Internal functions

http://git-wip-us.apache.org/repos/asf/couchdb-rexi/blob/5b589962/src/rexi_governor.erl
----------------------------------------------------------------------
diff --git a/src/rexi_governor.erl b/src/rexi_governor.erl
deleted file mode 100644
index 12ec013..0000000
--- a/src/rexi_governor.erl
+++ /dev/null
@@ -1,96 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-%   http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
--module(rexi_governor).
-
--behaviour(gen_server).
--vsn(1).
-
-%  gen_server callbacks
--export([init/1, handle_call/3, handle_cast/2, handle_info/2,
-         terminate/2, code_change/3]).
-
--export ([
-    send/2,
-    start_link/1
-]).
-
--record(state, {
-    buffer = queue:new(),
-    sender = nil,
-    count = 0
-}).
-
-%% TODO Leverage os_mon to discover available memory in the system
--define (MAX_MEMORY, 17179869184).
-
-start_link(ServerId) ->
-    gen_server:start_link({local, ServerId}, ?MODULE, nil, []).
-
-send(Dest, Msg) ->
-    Server = list_to_atom(lists:concat([rexi_governor, "_", get_node(Dest)])),
-    gen_server:cast(Server, {deliver, Dest, Msg}).
-
-
-init(_) ->
-    {ok, #state{}}.
-
-handle_call(get_buffered_count, _From, State) ->
-    {reply, State#state.count, State, 0}.
-
-handle_cast({deliver, Dest, Msg}, #state{buffer = Q, count = C} = State) ->
-    margaret_counter:increment([erlang, rexi, buffered]),
-    Q2 = queue:in({Dest, Msg}, Q),
-    case should_drop() of
-    true ->
-            {noreply, State#state{buffer = queue:drop(Q2)}, 0};
-    false ->
-            {noreply, State#state{buffer = Q2, count = C+1}, 0}
-    end.
-
-handle_info(timeout, #state{sender = nil} = State) ->
-    #state{buffer = Q, count = C} = State,
-    Sender = case queue:out_r(Q) of
-        {{value, {Dest, Msg}}, Q2} ->
-            case erlang:send(Dest, Msg, [noconnect, nosuspend]) of
-                ok ->
-                    nil;
-                _Else ->
-                    spawn_monitor(erlang, send, [Dest, Msg])
-            end;
-        {empty, Q2} ->
-            nil
-    end,
-    if Sender =:= nil, C > 1 ->
-        {noreply, State#state{buffer = Q2, count = C-1}, 0};
-    true ->
-        {noreply, State#state{buffer = Q2, sender = Sender, count = C-1}}
-    end;
-handle_info(timeout, State) ->
-    % Waiting on a sender to return
-    {noreply, State};
-
-handle_info({'DOWN', Ref, _, Pid, _}, #state{sender = {Pid, Ref}} = State) ->
-    {noreply, State#state{sender = nil}, 0}.
-
-terminate(_Reason, _State) ->
-    ok.
-
-code_change(_OldVsn, State, _Extra) ->
-    {ok, State}.
-
-should_drop() ->
-    erlang:memory(total) > ?MAX_MEMORY.
-
-get_node({_, Node}) when is_atom(Node) ->
-    Node;
-get_node(Pid) when is_pid(Pid) ->
-    node(Pid).

http://git-wip-us.apache.org/repos/asf/couchdb-rexi/blob/5b589962/src/rexi_sup.erl
----------------------------------------------------------------------
diff --git a/src/rexi_sup.erl b/src/rexi_sup.erl
index f9e4933..55c4829 100644
--- a/src/rexi_sup.erl
+++ b/src/rexi_sup.erl
@@ -46,16 +46,16 @@ init([]) ->
             [rexi_server_mon]
         },
         {
-            rexi_governor_sup,
-            {rexi_server_sup, start_link, [rexi_governor_sup]},
+            rexi_buffer_sup,
+            {rexi_server_sup, start_link, [rexi_buffer_sup]},
             permanent,
             100,
             supervisor,
             [rexi_server_sup]
         },
         {
-            rexi_governor_mon,
-            {rexi_server_mon, start_link, [rexi_governor]},
+            rexi_buffer_mon,
+            {rexi_server_mon, start_link, [rexi_buffer]},
             permanent,
             100,
             worker,

http://git-wip-us.apache.org/repos/asf/couchdb-rexi/blob/5b589962/src/rexi_utils.erl
----------------------------------------------------------------------
diff --git a/src/rexi_utils.erl b/src/rexi_utils.erl
index 3c89ca9..e3eaa6f 100644
--- a/src/rexi_utils.erl
+++ b/src/rexi_utils.erl
@@ -34,7 +34,7 @@ send(Dest, Msg) ->
         ok;
     _ ->
         % treat nosuspend and noconnect the same
-        rexi_governor:send(Dest, Msg)
+        rexi_buffer:send(Dest, Msg)
     end.
 
 %% @doc set up the receive loop with an overall timeout


Mime
View raw message