couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fdman...@apache.org
Subject svn commit: r1062772 - /couchdb/trunk/src/couchdb/couch_rep_db_listener.erl
Date Mon, 24 Jan 2011 13:46:11 GMT
Author: fdmanana
Date: Mon Jan 24 13:46:11 2011
New Revision: 1062772

URL: http://svn.apache.org/viewvc?rev=1062772&view=rev
Log:
Refactoring of the replicator database listener

Simpler implementation and more reliable behaviour when the replicator
database is deleted or changed on the fly.

Modified:
    couchdb/trunk/src/couchdb/couch_rep_db_listener.erl

Modified: couchdb/trunk/src/couchdb/couch_rep_db_listener.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_rep_db_listener.erl?rev=1062772&r1=1062771&r2=1062772&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_rep_db_listener.erl (original)
+++ couchdb/trunk/src/couchdb/couch_rep_db_listener.erl Mon Jan 24 13:46:11 2011
@@ -18,98 +18,113 @@
 
 -include("couch_db.hrl").
 
--define(DOC_TO_REP_ID_MAP, rep_doc_id_to_rep_id).
--define(REP_ID_TO_DOC_ID_MAP, rep_id_to_rep_doc_id).
+-define(DOC_ID_TO_REP_ID, rep_doc_id_to_rep_id).
+-define(REP_ID_TO_DOC_ID, rep_id_to_rep_doc_id).
 
 -record(state, {
     changes_feed_loop = nil,
-    changes_queue = nil,
-    changes_processor = nil,
-    db_notifier = nil
+    db_notifier = nil,
+    rep_db_name = nil,
+    rep_start_pids = []
 }).
 
+-import(couch_util, [
+    get_value/2,
+    get_value/3
+]).
+
 
 start_link() ->
     gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
 
 init(_) ->
     process_flag(trap_exit, true),
-    {ok, Queue} = couch_work_queue:new(
-        [{max_size, 1024 * 1024}, {max_items, 1000}]),
-    {ok, Processor} = changes_processor(Queue),
-    {ok, Loop} = changes_feed_loop(Queue),
+    ?DOC_ID_TO_REP_ID = ets:new(?DOC_ID_TO_REP_ID, [named_table, set, private]),
+    ?REP_ID_TO_DOC_ID = ets:new(?REP_ID_TO_DOC_ID, [named_table, set, private]),
     Server = self(),
     ok = couch_config:register(
-        fun("replicator", "db") ->
-            ok = gen_server:cast(Server, rep_db_changed)
+        fun("replicator", "db", NewName) ->
+            ok = gen_server:cast(Server, {rep_db_changed, ?l2b(NewName)})
         end
     ),
+    {Loop, RepDbName} = changes_feed_loop(),
     {ok, #state{
         changes_feed_loop = Loop,
-        changes_queue = Queue,
-        changes_processor = Processor,
+        rep_db_name = RepDbName,
         db_notifier = db_update_notifier()}
     }.
 
 
+handle_call({rep_db_update, Change}, _From, State) ->
+    {reply, ok, process_update(State, Change)};
+
 handle_call(Msg, From, State) ->
     ?LOG_ERROR("Replicator DB listener received unexpected call ~p from ~p",
         [Msg, From]),
     {stop, {error, {unexpected_call, Msg}}, State}.
 
 
-handle_cast(rep_db_changed, State) ->
-    #state{
-        changes_feed_loop = Loop,
-        changes_queue = Queue
-    } = State,
-    catch unlink(Loop),
-    catch exit(Loop, rep_db_changed),
-    couch_work_queue:queue(Queue, stop_all_replications),
-    {ok, NewLoop} = changes_feed_loop(Queue),
-    {noreply, State#state{changes_feed_loop = NewLoop}};
-
-handle_cast(rep_db_created, #state{changes_feed_loop = Loop} = State) ->
-    catch unlink(Loop),
-    catch exit(Loop, rep_db_changed),
-    {ok, NewLoop} = changes_feed_loop(State#state.changes_queue),
-    {noreply, State#state{changes_feed_loop = NewLoop}};
+handle_cast({rep_db_changed, NewName},
+        #state{rep_db_name = NewName} = State) ->
+    {noreply, State};
+
+handle_cast({rep_db_changed, _NewName}, State) ->
+    {noreply, restart(State)};
+
+handle_cast({rep_db_created, NewName},
+        #state{rep_db_name = NewName} = State) ->
+    {noreply, State};
+
+handle_cast({rep_db_created, _NewName}, State) ->
+    {noreply, restart(State)};
 
 handle_cast(Msg, State) ->
     ?LOG_ERROR("Replicator DB listener received unexpected cast ~p", [Msg]),
     {stop, {error, {unexpected_cast, Msg}}, State}.
 
+
 handle_info({'EXIT', From, normal}, #state{changes_feed_loop = From} = State) ->
     % replicator DB deleted
-    couch_work_queue:queue(State#state.changes_queue, stop_all_replications),
-    {noreply, State#state{changes_feed_loop = nil}};
+    {noreply, State#state{changes_feed_loop = nil, rep_db_name = nil}};
 
 handle_info({'EXIT', From, Reason}, #state{db_notifier = From} = State) ->
     ?LOG_ERROR("Database update notifier died. Reason: ~p", [Reason]),
     {stop, {db_update_notifier_died, Reason}, State};
 
-handle_info({'EXIT', From, Reason}, #state{changes_processor = From} = State) ->
-    ?LOG_ERROR("Replicator DB changes processor died. Reason: ~p", [Reason]),
-    {stop, {rep_db_changes_processor_died, Reason}, State}.
+handle_info({'EXIT', From, normal}, #state{rep_start_pids = Pids} = State) ->
+    % one of the replication start processes terminated successfully
+    {noreply, State#state{rep_start_pids = Pids -- [From]}};
+
+handle_info(Msg, State) ->
+    ?LOG_ERROR("Replicator DB listener received unexpected message ~p", [Msg]),
+    {stop, {unexpected_msg, Msg}, State}.
 
 
 terminate(_Reason, State) ->
     #state{
+        rep_start_pids = StartPids,
         changes_feed_loop = Loop,
-        changes_queue = Queue
+        db_notifier = Notifier
     } = State,
-    exit(Loop, stop),
-    % closing the queue will cause changes_processor to shutdown
-    couch_work_queue:close(Queue),
-    ok.
+    stop_all_replications(),
+    lists:foreach(
+        fun(Pid) ->
+            catch unlink(Pid),
+            catch exit(Pid, stop)
+        end,
+        [Loop | StartPids]),
+    true = ets:delete(?REP_ID_TO_DOC_ID),
+    true = ets:delete(?DOC_ID_TO_REP_ID),
+    couch_db_update_notifier:stop(Notifier).
 
 
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
 
 
-changes_feed_loop(ChangesQueue) ->
+changes_feed_loop() ->
     {ok, RepDb} = couch_rep:ensure_rep_db_exists(),
+    Server = self(),
     Pid = spawn_link(
         fun() ->
             ChangesFeedFun = couch_changes:handle_changes(
@@ -126,7 +141,8 @@ changes_feed_loop(ChangesQueue) ->
                 fun({change, Change, _}, _) ->
                     case has_valid_rep_id(Change) of
                     true ->
-                        couch_work_queue:queue(ChangesQueue, Change);
+                        ok = gen_server:call(
+                            Server, {rep_db_update, Change}, infinity);
                     false ->
                         ok
                     end;
@@ -137,7 +153,15 @@ changes_feed_loop(ChangesQueue) ->
         end
     ),
     couch_db:close(RepDb),
-    {ok, Pid}.
+    {Pid, couch_db:name(RepDb)}.
+
+
+has_valid_rep_id({Change}) ->
+    has_valid_rep_id(get_value(<<"id">>, Change));
+has_valid_rep_id(<<?DESIGN_DOC_PREFIX, _Rest/binary>>) ->
+    false;
+has_valid_rep_id(_Else) ->
+    true.
 
 
 db_update_notifier() ->
@@ -146,121 +170,106 @@ db_update_notifier() ->
         fun({created, DbName}) ->
             case ?l2b(couch_config:get("replicator", "db", "_replicator")) of
             DbName ->
-                ok = gen_server:cast(Server, rep_db_created);
+                ok = gen_server:cast(Server, {rep_db_created, DbName});
             _ ->
                 ok
             end;
         (_) ->
+            % no need to handle the 'deleted' event - the changes feed loop
+            % dies when the database is deleted
             ok
         end
     ),
     Notifier.
 
 
-changes_processor(ChangesQueue) ->
-    Pid = spawn_link(
-        fun() ->
-            ets:new(?DOC_TO_REP_ID_MAP, [named_table, set, private]),
-            ets:new(?REP_ID_TO_DOC_ID_MAP, [named_table, set, private]),
-            consume_changes(ChangesQueue),
-            true = ets:delete(?REP_ID_TO_DOC_ID_MAP),
-            true = ets:delete(?DOC_TO_REP_ID_MAP)
-        end
-    ),
-    {ok, Pid}.
-
-
-consume_changes(ChangesQueue) ->
-    case couch_work_queue:dequeue(ChangesQueue) of
-    closed ->
-        ok;
-    {ok, Changes} ->
-        lists:foreach(fun process_change/1, Changes),
-        consume_changes(ChangesQueue)
-    end.
-
-
-has_valid_rep_id({Change}) ->
-    has_valid_rep_id(couch_util:get_value(<<"id">>, Change));
-has_valid_rep_id(<<?DESIGN_DOC_PREFIX, _Rest/binary>>) ->
-    false;
-has_valid_rep_id(_Else) ->
-    true.
+restart(#state{changes_feed_loop = Loop, rep_start_pids = StartPids} = State) ->
+    stop_all_replications(),
+    lists:foreach(
+        fun(Pid) ->
+            catch unlink(Pid),
+            catch exit(Pid, rep_db_changed)
+        end,
+        [Loop | StartPids]),
+    {NewLoop, NewRepDbName} = changes_feed_loop(),
+    State#state{
+        changes_feed_loop = NewLoop,
+        rep_db_name = NewRepDbName,
+        rep_start_pids = []
+    }.
 
-process_change(stop_all_replications) ->
-    ?LOG_INFO("Stopping all ongoing replications because the replicator DB "
-        "was deleted or changed", []),
-    stop_all_replications();
 
-process_change({Change}) ->
-    {RepProps} = JsonRepDoc = couch_util:get_value(doc, Change),
-    DocId = couch_util:get_value(<<"_id">>, RepProps),
-    case couch_util:get_value(<<"deleted">>, Change, false) of
+process_update(State, {Change}) ->
+    {RepProps} = JsonRepDoc = get_value(doc, Change),
+    DocId = get_value(<<"_id">>, RepProps),
+    case get_value(<<"deleted">>, Change, false) of
     true ->
-        rep_doc_deleted(DocId);
+        rep_doc_deleted(DocId),
+        State;
     false ->
-        case couch_util:get_value(<<"_replication_state">>, RepProps) of
+        case get_value(<<"_replication_state">>, RepProps) of
         <<"completed">> ->
-            replication_complete(DocId);
+            replication_complete(DocId),
+            State;
         <<"error">> ->
-            stop_replication(DocId);
+            stop_replication(DocId),
+            State;
         <<"triggered">> ->
-            maybe_start_replication(DocId, JsonRepDoc);
+            maybe_start_replication(State, DocId, JsonRepDoc);
         undefined ->
-            maybe_start_replication(DocId, JsonRepDoc);
-        _ ->
-            ?LOG_ERROR("Invalid value for the `_replication_state` property"
-                " of the replication document `~s`", [DocId])
+            maybe_start_replication(State, DocId, JsonRepDoc)
         end
-    end,
-    ok.
+    end.
 
 
 rep_user_ctx({RepDoc}) ->
-    case couch_util:get_value(<<"user_ctx">>, RepDoc) of
+    case get_value(<<"user_ctx">>, RepDoc) of
     undefined ->
         #user_ctx{roles = [<<"_admin">>]};
     {UserCtx} ->
         #user_ctx{
-            name = couch_util:get_value(<<"name">>, UserCtx, null),
-            roles = couch_util:get_value(<<"roles">>, UserCtx, [])
+            name = get_value(<<"name">>, UserCtx, null),
+            roles = get_value(<<"roles">>, UserCtx, [])
         }
     end.
 
 
-maybe_start_replication(DocId, JsonRepDoc) ->
+maybe_start_replication(State, DocId, JsonRepDoc) ->
     UserCtx = rep_user_ctx(JsonRepDoc),
     {BaseId, _} = RepId = couch_rep:make_replication_id(JsonRepDoc, UserCtx),
-    case ets:lookup(?REP_ID_TO_DOC_ID_MAP, BaseId) of
+    case ets:lookup(?REP_ID_TO_DOC_ID, BaseId) of
     [] ->
-        true = ets:insert(?REP_ID_TO_DOC_ID_MAP, {BaseId, DocId}),
-        true = ets:insert(?DOC_TO_REP_ID_MAP, {DocId, RepId}),
-        spawn_link(fun() -> start_replication(JsonRepDoc, RepId, UserCtx) end);
+        true = ets:insert(?REP_ID_TO_DOC_ID, {BaseId, DocId}),
+        true = ets:insert(?DOC_ID_TO_REP_ID, {DocId, RepId}),
+        Pid = spawn_link(fun() ->
+            start_replication(JsonRepDoc, RepId, UserCtx)
+        end),
+        State#state{rep_start_pids = [Pid | State#state.rep_start_pids]};
     [{BaseId, DocId}] ->
-        ok;
+        State;
     [{BaseId, OtherDocId}] ->
-        maybe_tag_rep_doc(DocId, JsonRepDoc, ?l2b(BaseId), OtherDocId)
+        ?LOG_INFO("The replication specified by the document `~s` was already"
+            " triggered by the document `~s`", [DocId, OtherDocId]),
+        maybe_tag_rep_doc(JsonRepDoc, ?l2b(BaseId)),
+        State
     end.
 
 
-maybe_tag_rep_doc(DocId, {Props} = JsonRepDoc, RepId, OtherDocId) ->
-    case couch_util:get_value(<<"_replication_id">>, Props) of
+maybe_tag_rep_doc({Props} = JsonRepDoc, RepId) ->
+    case get_value(<<"_replication_id">>, Props) of
     RepId ->
         ok;
     _ ->
-        ?LOG_INFO("The replication specified by the document `~s` was already"
-            " triggered by the document `~s`", [DocId, OtherDocId]),
         couch_rep:update_rep_doc(JsonRepDoc, [{<<"_replication_id">>, RepId}])
     end.
 
 
-
-start_replication({RepProps} = RepDoc, {Base, Ext} = RepId, UserCtx) ->
+start_replication({RepProps} = RepDoc, {Base, _} = RepId, UserCtx) ->
     case (catch couch_rep:start_replication(RepDoc, RepId, UserCtx)) of
-    RepPid when is_pid(RepPid) ->
+    Pid when is_pid(Pid) ->
         ?LOG_INFO("Document `~s` triggered replication `~s`",
-            [couch_util:get_value(<<"_id">>, RepProps), Base ++ Ext]),
-        couch_rep:get_result(RepPid, RepId, RepDoc, UserCtx);
+            [get_value(<<"_id">>, RepProps), pp_rep_id(RepId)]),
+        couch_rep:get_result(Pid, RepId, RepDoc, UserCtx);
     Error ->
         couch_rep:update_rep_doc(
             RepDoc,
@@ -269,43 +278,54 @@ start_replication({RepProps} = RepDoc, {
                 {<<"_replication_id">>, ?l2b(Base)}
             ]
         ),
-        ?LOG_ERROR("Error starting replication `~s`: ~p", [Base ++ Ext, Error])
+        ?LOG_ERROR("Error starting replication `~s`: ~p", [pp_rep_id(RepId), Error])
     end.
 
+
 rep_doc_deleted(DocId) ->
     case stop_replication(DocId) of
-    {ok, {Base, Ext}} ->
+    {ok, RepId} ->
         ?LOG_INFO("Stopped replication `~s` because replication document `~s`"
-            " was deleted", [Base ++ Ext, DocId]);
+            " was deleted", [pp_rep_id(RepId), DocId]);
     none ->
         ok
     end.
 
+
 replication_complete(DocId) ->
     case stop_replication(DocId) of
-    {ok, {Base, Ext}} ->
+    {ok, RepId} ->
         ?LOG_INFO("Replication `~s` finished (triggered by document `~s`)",
-            [Base ++ Ext, DocId]);
+            [pp_rep_id(RepId), DocId]);
     none ->
         ok
     end.
 
+
 stop_replication(DocId) ->
-    case ets:lookup(?DOC_TO_REP_ID_MAP, DocId) of
+    case ets:lookup(?DOC_ID_TO_REP_ID, DocId) of
     [{DocId, {BaseId, _} = RepId}] ->
         couch_rep:end_replication(RepId),
-        true = ets:delete(?REP_ID_TO_DOC_ID_MAP, BaseId),
-        true = ets:delete(?DOC_TO_REP_ID_MAP, DocId),
+        true = ets:delete(?REP_ID_TO_DOC_ID, BaseId),
+        true = ets:delete(?DOC_ID_TO_REP_ID, DocId),
         {ok, RepId};
     [] ->
         none
     end.
 
+
 stop_all_replications() ->
+    ?LOG_INFO("Stopping all ongoing replications because the replicator DB "
+        "was deleted or changed", []),
     ets:foldl(
         fun({_, RepId}, _) -> couch_rep:end_replication(RepId) end,
         ok,
-        ?DOC_TO_REP_ID_MAP
+        ?DOC_ID_TO_REP_ID
     ),
-    true = ets:delete_all_objects(?REP_ID_TO_DOC_ID_MAP),
-    true = ets:delete_all_objects(?DOC_TO_REP_ID_MAP).
+    true = ets:delete_all_objects(?REP_ID_TO_DOC_ID),
+    true = ets:delete_all_objects(?DOC_ID_TO_REP_ID).
+
+
+% pretty-print replication id
+pp_rep_id({Base, Extension}) ->
+    Base ++ Extension.



Mime
View raw message