couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rnew...@apache.org
Subject [1/2] couch-replicator commit: updated refs/heads/master to e2ecd85
Date Fri, 16 Oct 2015 09:50:42 GMT
Repository: couchdb-couch-replicator
Updated Branches:
  refs/heads/master 219f5d181 -> e2ecd854e


Fix race condition in worker release on connection_closing state.

This is exposed in the replicator large attachments tests case,
replicating from local to remote. In the current test configuration
it appears about once in 20-40 times. The failure manifests as
up as an {error, req_timedout} exception in the logs from one of the
PUT methods, during push replication. Then database comparison fails
because not all documents made it to the target.

Gory details:

After ibrowse receives Connection: Close header it will go into
shutdown 'connection_closing' state.

couch_replicator_httpc handles that state by trying to close
the socket and retrying, hoping that it would pick up a new worker from
the pool on next retry in couch_replicator_httpc.erl:

```
process_response({error, connection_closing}, Worker, HttpDb, Params, _Cb) ->
    ...
```

But it did not directly have a way to ensure socket is really closed,
instead it called ibrowse_http_client:stop(Worker). That didn't wait for
worker to die, also worker was returned back to the pool asynchronously,
in the 'after' clause in couch_replicator_httpc:send_req/3.

This worker which could still be alive but in a dying process,
could have been picked up immediately during the retry.
ibrowse in ibrowse:do_send_req/7 will handle a dead workers
process as {error, req_timedout}, which is what the intermitend
test failure showed in the log:

The fix:

 * Make sure worker is really stopped after calling stop.

 * Make sure worker is returned to the pool synchronously. So that
   on retry, a worker in a known good state is picked up.

COUCHDB-2833


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/307ae6d9
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/307ae6d9
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/307ae6d9

Branch: refs/heads/master
Commit: 307ae6d92fd300d31d6eb96bd5aca5b9a558a66d
Parents: 36e5bec
Author: Nick Vatamaniuc <vatamane@gmail.com>
Authored: Thu Oct 15 13:54:10 2015 -0400
Committer: Nick Vatamaniuc <vatamane@gmail.com>
Committed: Thu Oct 15 13:54:10 2015 -0400

----------------------------------------------------------------------
 src/couch_replicator_httpc.erl      | 18 ++++++++-
 src/couch_replicator_httpc_pool.erl | 63 ++++++++++++++++++--------------
 2 files changed, 51 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/307ae6d9/src/couch_replicator_httpc.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_httpc.erl b/src/couch_replicator_httpc.erl
index 8b34e0e..668b218 100644
--- a/src/couch_replicator_httpc.erl
+++ b/src/couch_replicator_httpc.erl
@@ -103,6 +103,20 @@ send_ibrowse_req(#httpdb{headers = BaseHeaders} = HttpDb, Params) ->
     {Worker, Response}.
 
 
+%% Stop worker, wait for it to die, then release it. Make sure it is dead before
+%% releasing it to the pool, so there is not race triggered recycling it again.
+%% The reason is recycling a dying worker, could end up that worker returning
+%% {error, req_timedout} error. While in reality is not really a timeout, just
+%% a race condition.
+stop_and_release_worker(Pool, Worker) ->
+    Ref = erlang:monitor(process, Worker),
+    ibrowse_http_client:stop(Worker),
+    receive
+        {'DOWN', Ref, _, _, _} ->
+            ok
+    end,
+    ok = couch_replicator_httpc_pool:release_worker_sync(Pool, Worker).
+
 process_response({error, sel_conn_closed}, _Worker, HttpDb, Params, _Cb) ->
     throw({retry, HttpDb, Params});
 
@@ -110,8 +124,8 @@ process_response({error, sel_conn_closed}, _Worker, HttpDb, Params, _Cb)
->
 %% For example, if server responds to a request, sets Connection: close header
 %% and closes the socket, ibrowse will detect that error when it sends
 %% next request.
-process_response({error, connection_closing}, Worker, HttpDb, Params, _Cb)->
-    ibrowse_http_client:stop(Worker),
+process_response({error, connection_closing}, Worker, HttpDb, Params, _Cb) ->
+    stop_and_release_worker(HttpDb#httpdb.httpc_pool, Worker),
     throw({retry, HttpDb, Params});
 
 process_response({error, {'EXIT',{normal,_}}}, _Worker, HttpDb, Params, _Cb) ->

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/307ae6d9/src/couch_replicator_httpc_pool.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_httpc_pool.erl b/src/couch_replicator_httpc_pool.erl
index c895048..09e3b23 100644
--- a/src/couch_replicator_httpc_pool.erl
+++ b/src/couch_replicator_httpc_pool.erl
@@ -16,7 +16,7 @@
 
 % public API
 -export([start_link/2, stop/1]).
--export([get_worker/1, release_worker/2]).
+-export([get_worker/1, release_worker/2, release_worker_sync/2]).
 
 % gen_server API
 -export([init/1, handle_call/3, handle_info/2, handle_cast/2]).
@@ -52,6 +52,8 @@ get_worker(Pool) ->
 release_worker(Pool, Worker) ->
     ok = gen_server:cast(Pool, {release_worker, Worker}).
 
+release_worker_sync(Pool, Worker) ->
+    ok = gen_server:call(Pool, {release_worker_sync, Worker}).
 
 init({Url, Options}) ->
     process_flag(trap_exit, true),
@@ -91,36 +93,13 @@ handle_call(get_worker, From, State) ->
     end;
 
 handle_call(stop, _From, State) ->
-    {stop, normal, ok, State}.
+    {stop, normal, ok, State};
 
+handle_call({release_worker_sync, Worker}, _From, State) ->
+    {reply, ok, release_worker_internal(Worker, State)}.
 
 handle_cast({release_worker, Worker}, State) ->
-    #state{waiting = Waiting, callers = Callers} = State,
-    NewCallers0 = demonitor_client(Callers, Worker),
-    case is_process_alive(Worker) andalso
-        lists:member(Worker, State#state.busy) of
-    true ->
-        case queue:out(Waiting) of
-        {empty, Waiting2} ->
-            NewCallers1 = NewCallers0,
-            Busy2 = State#state.busy -- [Worker],
-            Free2 = [Worker | State#state.free];
-        {{value, From}, Waiting2} ->
-            NewCallers1 = monitor_client(NewCallers0, Worker, From),
-            gen_server:reply(From, {ok, Worker}),
-            Busy2 = State#state.busy,
-            Free2 = State#state.free
-        end,
-        NewState = State#state{
-           busy = Busy2,
-           free = Free2,
-           waiting = Waiting2,
-           callers = NewCallers1
-        },
-        {noreply, NewState};
-   false ->
-        {noreply, State#state{callers = NewCallers0}}
-   end.
+    {noreply, release_worker_internal(Worker, State)}.
 
 handle_info({'EXIT', Pid, _Reason}, State) ->
     #state{
@@ -183,3 +162,31 @@ demonitor_client(Callers, Worker) ->
         false ->
             Callers
     end.
+
+release_worker_internal(Worker, State) ->
+    #state{waiting = Waiting, callers = Callers} = State,
+    NewCallers0 = demonitor_client(Callers, Worker),
+    case is_process_alive(Worker) andalso
+        lists:member(Worker, State#state.busy) of
+    true ->
+        case queue:out(Waiting) of
+        {empty, Waiting2} ->
+            NewCallers1 = NewCallers0,
+            Busy2 = State#state.busy -- [Worker],
+            Free2 = [Worker | State#state.free];
+        {{value, From}, Waiting2} ->
+            NewCallers1 = monitor_client(NewCallers0, Worker, From),
+            gen_server:reply(From, {ok, Worker}),
+            Busy2 = State#state.busy,
+            Free2 = State#state.free
+        end,
+        NewState = State#state{
+           busy = Busy2,
+           free = Free2,
+           waiting = Waiting2,
+           callers = NewCallers1
+        },
+        NewState;
+   false ->
+        State#state{callers = NewCallers0}
+   end.


Mime
View raw message