couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rnew...@apache.org
Subject [01/50] ibrowse commit: updated refs/heads/upstream to b28542d
Date Mon, 22 Aug 2016 11:19:27 GMT
Repository: couchdb-ibrowse
Updated Branches:
  refs/heads/upstream [created] b28542d1e


Changed pipeline algo to smallest pipeline first

Big commit. Switched algorithm to one which will favor
the connection with the smallest pipeline first
(deciding ties by timestamp of last finished request,
and then by pid as ultimate tie breaker).

Note: this also drastically changes the internal
representation of the connection in ets and is dependent
on specific order of operations when changing key values
to limit risk of race conditions between loadbalancer
and a given connection.

Also removed connection reporting of start of request
as this was no longer necessary since the load balancer
tees up the entry into ets with a 1.


Project: http://git-wip-us.apache.org/repos/asf/couchdb-ibrowse/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-ibrowse/commit/9d0b7e3e
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-ibrowse/tree/9d0b7e3e
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-ibrowse/diff/9d0b7e3e

Branch: refs/heads/upstream
Commit: 9d0b7e3eea12a72ae619e6f34aab349b25893eef
Parents: 3061aa2
Author: benjaminplee <yardspoon@gmail.com>
Authored: Wed Nov 19 21:50:54 2014 +0000
Committer: benjaminplee <yardspoon@gmail.com>
Committed: Thu Nov 20 16:15:51 2014 +0000

----------------------------------------------------------------------
 src/ibrowse_http_client.erl       | 44 ++++++++-------------
 src/ibrowse_lb.erl                | 71 ++++++++++++++++++++++------------
 test/ibrowse_functional_tests.erl | 11 ++----
 3 files changed, 66 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-ibrowse/blob/9d0b7e3e/src/ibrowse_http_client.erl
----------------------------------------------------------------------
diff --git a/src/ibrowse_http_client.erl b/src/ibrowse_http_client.erl
index 1bb95d2..c9161b0 100644
--- a/src/ibrowse_http_client.erl
+++ b/src/ibrowse_http_client.erl
@@ -762,11 +762,10 @@ send_req_1(From,
                 {ok, _Sent_body} ->
                     trace_request_body(Body_1),
                     _ = active_once(State_1),
-                    State_1_1 = inc_pipeline_counter(State_1),
-                    State_2 = State_1_1#state{status     = get_header,
-                                              cur_req    = NewReq,
-                                              proxy_tunnel_setup = in_progress,
-                                              tunnel_setup_queue = [{From, Url, Headers,
Method, Body, Options, Timeout}]},
+                    State_2 = State_1#state{status     = get_header,
+                                            cur_req    = NewReq,
+                                            proxy_tunnel_setup = in_progress,
+                                            tunnel_setup_queue = [{From, Url, Headers, Method,
Body, Options, Timeout}]},
                     State_3 = set_inac_timer(State_2),
                     {noreply, State_3};
                 Err ->
@@ -853,15 +852,14 @@ send_req_1(From,
                     Raw_req = list_to_binary([Req, Sent_body]),
                     NewReq_1 = NewReq#request{raw_req = Raw_req},
                     State_1 = State#state{reqs=queue:in(NewReq_1, State#state.reqs)},
-                    State_2 = inc_pipeline_counter(State_1),
-                    _ = active_once(State_2),
-                    State_3 = case Status of
+                    _ = active_once(State_1),
+                    State_2 = case Status of
                                   idle ->
-                                      State_2#state{
+                                      State_1#state{
                                         status     = get_header,
                                         cur_req    = NewReq_1};
                                   _ ->
-                                      State_2
+                                      State_1
                               end,
                     case StreamTo of
                         undefined ->
@@ -875,8 +873,8 @@ send_req_1(From,
                                     catch StreamTo ! {ibrowse_async_raw_req, Raw_req}
                             end
                     end,
-                    State_4 = set_inac_timer(State_3),
-                    {noreply, State_4};
+                    State_3 = set_inac_timer(State_2),
+                    {noreply, State_3};
                 Err ->
                     shutting_down(State),
                     do_trace("Send failed... Reason: ~p~n", [Err]),
@@ -1815,13 +1813,13 @@ format_response_data(Resp_format, Body) ->
 do_reply(State, From, undefined, _, Resp_format, {ok, St_code, Headers, Body}) ->
     Msg_1 = {ok, St_code, Headers, format_response_data(Resp_format, Body)},
     gen_server:reply(From, Msg_1),
-    dec_pipeline_counter(State);
+    report_request_complete(State);
 do_reply(State, From, undefined, _, _, Msg) ->
     gen_server:reply(From, Msg),
-    dec_pipeline_counter(State);
+    report_request_complete(State);
 do_reply(#state{prev_req_id = Prev_req_id} = State,
          _From, StreamTo, ReqId, Resp_format, {ok, _, _, Body}) ->
-    State_1 = dec_pipeline_counter(State),
+    State_1 = report_request_complete(State),
     case Body of
         [] ->
             ok;
@@ -1843,7 +1841,7 @@ do_reply(#state{prev_req_id = Prev_req_id} = State,
     ets:delete(?STREAM_TABLE, {req_id_pid, Prev_req_id}),
     State_1#state{prev_req_id = ReqId};
 do_reply(State, _From, StreamTo, ReqId, Resp_format, Msg) ->
-    State_1 = dec_pipeline_counter(State),
+    State_1 = report_request_complete(State),
     Msg_1 = format_response_data(Resp_format, Msg),
     catch StreamTo ! {ibrowse_async_response, ReqId, Msg_1},
     State_1.
@@ -1946,19 +1944,11 @@ shutting_down(#state{lb_ets_tid = undefined}) ->
 shutting_down(#state{lb_ets_tid = Tid}) ->
     ibrowse_lb:report_connection_down(Tid).
 
-inc_pipeline_counter(#state{is_closing = true} = State) ->
-    State;
-inc_pipeline_counter(#state{lb_ets_tid = undefined} = State) ->
-    State;
-inc_pipeline_counter(#state{lb_ets_tid = Tid} = State) ->
-    ibrowse_lb:report_request_underway(Tid),
-    State.
-
-dec_pipeline_counter(#state{is_closing = true} = State) ->
+report_request_complete(#state{is_closing = true} = State) ->
     State;
-dec_pipeline_counter(#state{lb_ets_tid = undefined} = State) ->
+report_request_complete(#state{lb_ets_tid = undefined} = State) ->
     State;
-dec_pipeline_counter(#state{lb_ets_tid = Tid} = State) ->
+report_request_complete(#state{lb_ets_tid = Tid} = State) ->
     ibrowse_lb:report_request_complete(Tid),
     State.
 

http://git-wip-us.apache.org/repos/asf/couchdb-ibrowse/blob/9d0b7e3e/src/ibrowse_lb.erl
----------------------------------------------------------------------
diff --git a/src/ibrowse_lb.erl b/src/ibrowse_lb.erl
index cc067fc..3d487d4 100644
--- a/src/ibrowse_lb.erl
+++ b/src/ibrowse_lb.erl
@@ -16,7 +16,6 @@
 	 spawn_connection/6,
      stop/1,
      report_connection_down/1,
-     report_request_underway/1,
      report_request_complete/1
 	]).
 
@@ -39,6 +38,9 @@
                 proc_state}).
 
 -define(PIPELINE_MAX, 99999).
+-define(KEY_MATCHSPEC_BY_PID(Pid), [{{{'_', '_', Pid}, '_'}, [], ['$_']}]).
+-define(KEY_MATCHSPEC(Key), [{{Key, '_'}, [], ['$_']}]).
+-define(KEY_MATCHSPEC_FOR_DELETE(Key), [{{Key, '_'}, [], [true]}]).
 
 -include("ibrowse.hrl").
 
@@ -74,13 +76,23 @@ stop(Lb_pid) ->
     end.
 
 report_connection_down(Tid) ->
-    catch ets:delete(Tid, self()).
-
-report_request_underway(Tid) ->
-    catch ets:update_counter(Tid, self(), {2, 1, ?PIPELINE_MAX, ?PIPELINE_MAX}).
+    %% Don't cascade errors since Tid is really managed by other process
+    catch ets:select_delete(Tid, ?KEY_MATCHSPEC_BY_PID(self())).
 
 report_request_complete(Tid) ->
-    catch ets:update_counter(Tid, self(), {2, -1, 0, 0}).
+    %% Don't cascade errors since Tid is really managed by other process
+    catch case ets:select(Tid, ?KEY_MATCHSPEC_BY_PID(self())) of
+        [MatchKey] ->
+            case ets:select_delete(Tid, ?KEY_MATCHSPEC_FOR_DELETE(MatchKey)) of
+                1 ->
+                    ets:insert(Tid, {decremented(MatchKey), undefined}),
+                    true;
+                _ ->
+                    false
+            end;
+        _ ->
+            false
+    end.
 
 %%====================================================================
 %% Server functions
@@ -210,23 +222,17 @@ code_change(_OldVsn, State, _Extra) ->
 %%--------------------------------------------------------------------
 %%% Internal functions
 %%--------------------------------------------------------------------
-find_best_connection(Tid, Max_pipe) ->
-    find_best_connection(ets:first(Tid), Tid, Max_pipe).
-
-find_best_connection('$end_of_table', _, _) ->
-    {error, retry_later};
-find_best_connection(Pid, Tid, Max_pipe) ->
-    case ets:lookup(Tid, Pid) of
-        [{Pid, Cur_sz}] when Cur_sz < Max_pipe ->
-            case record_request_for_connection(Tid, Pid) of
-                {'EXIT', _} ->
-                    %% The selected process has shutdown
-                    find_best_connection(ets:next(Tid, Pid), Tid, Max_pipe);
-                _ ->
-                    {ok, Pid}
+find_best_connection(Tid, Max_pipeline_size) ->
+    case ets:first(Tid) of
+        {Size, _Timestamp, Pid} = Key when Size < Max_pipeline_size ->
+            case record_request_for_connection(Tid, Key) of
+                true ->
+                    {ok, Pid};
+                false ->
+                    find_best_connection(Tid, Max_pipeline_size)
             end;
-         _ ->
-            find_best_connection(ets:next(Tid, Pid), Tid, Max_pipe)
+        _ -> 
+            {error, retry_later}
     end.
 
 maybe_create_ets(#state{ets_tid = undefined} = State) ->
@@ -240,10 +246,25 @@ num_current_connections(Tid) ->
     catch ets:info(Tid, size).
 
 record_new_connection(Tid, Pid) ->
-    catch ets:insert(Tid, {Pid, 0}).
+    catch ets:insert(Tid, {new_key(Pid), undefined}).
+
+record_request_for_connection(Tid, Key) ->
+    case ets:select_delete(Tid, ?KEY_MATCHSPEC_FOR_DELETE(Key)) of
+        1 ->
+            ets:insert(Tid, {incremented(Key), undefined}),
+            true;
+        _ ->
+            false
+    end.
+
+new_key(Pid) ->
+    {1, os:timestamp(), Pid}.
+
+incremented({Size, Timestamp, Pid}) ->
+    {Size + 1, Timestamp, Pid}.
 
-record_request_for_connection(Tid, Pid) ->
-    catch ets:update_counter(Tid, Pid, {2, 1, ?PIPELINE_MAX, ?PIPELINE_MAX}).
+decremented({Size, _Timestamp, Pid}) ->
+    {Size - 1, os:timestamp(), Pid}.
 
 for_each_connection_pid(Tid, Fun) ->
     catch ets:foldl(fun({Pid, _}, _) -> Fun(Pid) end, undefined, Tid),

http://git-wip-us.apache.org/repos/asf/couchdb-ibrowse/blob/9d0b7e3e/test/ibrowse_functional_tests.erl
----------------------------------------------------------------------
diff --git a/test/ibrowse_functional_tests.erl b/test/ibrowse_functional_tests.erl
index 0a68e14..fc7afec 100644
--- a/test/ibrowse_functional_tests.erl
+++ b/test/ibrowse_functional_tests.erl
@@ -56,15 +56,10 @@ balanced_connections() ->
 
     timer:sleep(1000),
 
-    Diffs = [Count - BalancedNumberOfRequestsPerConnection || {_Pid, Count} <- ibrowse_test_server:get_conn_pipeline_depth()],
-    ?assertEqual(MaxSessions, length(Diffs)),
+    Counts = [Count || {_Pid, Count} <- ibrowse_test_server:get_conn_pipeline_depth()],
+    ?assertEqual(MaxSessions, length(Counts)),
 
-    lists:foreach(fun(X) -> ?assertEqual(yep, close_to_zero(X)) end, Diffs).
-
-close_to_zero(0) -> yep;
-close_to_zero(-1) -> yep;
-close_to_zero(1) -> yep;
-close_to_zero(X) -> {nope, X}.
+    ?assertEqual(lists:duplicate(MaxSessions, BalancedNumberOfRequestsPerConnection), Counts).
 
 times(0, _) ->
     ok;


Mime
View raw message