couchdb-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Benoit Chesneau <bchesn...@gmail.com>
Subject Re: svn commit: r1062772 - /couchdb/trunk/src/couchdb/couch_rep_db_listener.erl
Date Tue, 25 Jan 2011 17:31:11 GMT
On Mon, Jan 24, 2011 at 2:46 PM,  <fdmanana@apache.org> wrote:
> Author: fdmanana
> Date: Mon Jan 24 13:46:11 2011
> New Revision: 1062772
>
> URL: http://svn.apache.org/viewvc?rev=1062772&view=rev
> Log:
> Refactoring of the replicator database listener
>
> Simpler implementation and more reliable behaviour when the replicator
> database is deleted or changed on the fly.
>
> Modified:
>    couchdb/trunk/src/couchdb/couch_rep_db_listener.erl
>
> Modified: couchdb/trunk/src/couchdb/couch_rep_db_listener.erl
> URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_rep_db_listener.erl?rev=1062772&r1=1062771&r2=1062772&view=diff
> ==============================================================================
> --- couchdb/trunk/src/couchdb/couch_rep_db_listener.erl (original)
> +++ couchdb/trunk/src/couchdb/couch_rep_db_listener.erl Mon Jan 24 13:46:11 2011
> @@ -18,98 +18,113 @@
>
>  -include("couch_db.hrl").
>
> --define(DOC_TO_REP_ID_MAP, rep_doc_id_to_rep_id).
> --define(REP_ID_TO_DOC_ID_MAP, rep_id_to_rep_doc_id).
> +-define(DOC_ID_TO_REP_ID, rep_doc_id_to_rep_id).
> +-define(REP_ID_TO_DOC_ID, rep_id_to_rep_doc_id).
>
>  -record(state, {
>     changes_feed_loop = nil,
> -    changes_queue = nil,
> -    changes_processor = nil,
> -    db_notifier = nil
> +    db_notifier = nil,
> +    rep_db_name = nil,
> +    rep_start_pids = []
>  }).
>
> +-import(couch_util, [
> +    get_value/2,
> +    get_value/3
> +]).
> +
>
>  start_link() ->
>     gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
>
>  init(_) ->
>     process_flag(trap_exit, true),
> -    {ok, Queue} = couch_work_queue:new(
> -        [{max_size, 1024 * 1024}, {max_items, 1000}]),
> -    {ok, Processor} = changes_processor(Queue),
> -    {ok, Loop} = changes_feed_loop(Queue),
> +    ?DOC_ID_TO_REP_ID = ets:new(?DOC_ID_TO_REP_ID, [named_table, set, private]),
> +    ?REP_ID_TO_DOC_ID = ets:new(?REP_ID_TO_DOC_ID, [named_table, set, private]),
>     Server = self(),
>     ok = couch_config:register(
> -        fun("replicator", "db") ->
> -            ok = gen_server:cast(Server, rep_db_changed)
> +        fun("replicator", "db", NewName) ->
> +            ok = gen_server:cast(Server, {rep_db_changed, ?l2b(NewName)})
>         end
>     ),
> +    {Loop, RepDbName} = changes_feed_loop(),
>     {ok, #state{
>         changes_feed_loop = Loop,
> -        changes_queue = Queue,
> -        changes_processor = Processor,
> +        rep_db_name = RepDbName,
>         db_notifier = db_update_notifier()}
>     }.
>
>
> +handle_call({rep_db_update, Change}, _From, State) ->
> +    {reply, ok, process_update(State, Change)};
> +
>  handle_call(Msg, From, State) ->
>     ?LOG_ERROR("Replicator DB listener received unexpected call ~p from ~p",
>         [Msg, From]),
>     {stop, {error, {unexpected_call, Msg}}, State}.
>
>
> -handle_cast(rep_db_changed, State) ->
> -    #state{
> -        changes_feed_loop = Loop,
> -        changes_queue = Queue
> -    } = State,
> -    catch unlink(Loop),
> -    catch exit(Loop, rep_db_changed),
> -    couch_work_queue:queue(Queue, stop_all_replications),
> -    {ok, NewLoop} = changes_feed_loop(Queue),
> -    {noreply, State#state{changes_feed_loop = NewLoop}};
> -
> -handle_cast(rep_db_created, #state{changes_feed_loop = Loop} = State) ->
> -    catch unlink(Loop),
> -    catch exit(Loop, rep_db_changed),
> -    {ok, NewLoop} = changes_feed_loop(State#state.changes_queue),
> -    {noreply, State#state{changes_feed_loop = NewLoop}};
> +handle_cast({rep_db_changed, NewName},
> +        #state{rep_db_name = NewName} = State) ->
> +    {noreply, State};
> +
> +handle_cast({rep_db_changed, _NewName}, State) ->
> +    {noreply, restart(State)};
> +
> +handle_cast({rep_db_created, NewName},
> +        #state{rep_db_name = NewName} = State) ->
> +    {noreply, State};
> +
> +handle_cast({rep_db_created, _NewName}, State) ->
> +    {noreply, restart(State)};
>
>  handle_cast(Msg, State) ->
>     ?LOG_ERROR("Replicator DB listener received unexpected cast ~p", [Msg]),
>     {stop, {error, {unexpected_cast, Msg}}, State}.
>
> +
>  handle_info({'EXIT', From, normal}, #state{changes_feed_loop = From} = State) ->
>     % replicator DB deleted
> -    couch_work_queue:queue(State#state.changes_queue, stop_all_replications),
> -    {noreply, State#state{changes_feed_loop = nil}};
> +    {noreply, State#state{changes_feed_loop = nil, rep_db_name = nil}};
>
>  handle_info({'EXIT', From, Reason}, #state{db_notifier = From} = State) ->
>     ?LOG_ERROR("Database update notifier died. Reason: ~p", [Reason]),
>     {stop, {db_update_notifier_died, Reason}, State};
>
> -handle_info({'EXIT', From, Reason}, #state{changes_processor = From} = State) ->
> -    ?LOG_ERROR("Replicator DB changes processor died. Reason: ~p", [Reason]),
> -    {stop, {rep_db_changes_processor_died, Reason}, State}.
> +handle_info({'EXIT', From, normal}, #state{rep_start_pids = Pids} = State) ->
> +    % one of the replication start processes terminated successfully
> +    {noreply, State#state{rep_start_pids = Pids -- [From]}};
> +
> +handle_info(Msg, State) ->
> +    ?LOG_ERROR("Replicator DB listener received unexpected message ~p", [Msg]),
> +    {stop, {unexpected_msg, Msg}, State}.
>
>
>  terminate(_Reason, State) ->
>     #state{
> +        rep_start_pids = StartPids,
>         changes_feed_loop = Loop,
> -        changes_queue = Queue
> +        db_notifier = Notifier
>     } = State,
> -    exit(Loop, stop),
> -    % closing the queue will cause changes_processor to shutdown
> -    couch_work_queue:close(Queue),
> -    ok.
> +    stop_all_replications(),
> +    lists:foreach(
> +        fun(Pid) ->
> +            catch unlink(Pid),
> +            catch exit(Pid, stop)
> +        end,
> +        [Loop | StartPids]),
> +    true = ets:delete(?REP_ID_TO_DOC_ID),
> +    true = ets:delete(?DOC_ID_TO_REP_ID),
> +    couch_db_update_notifier:stop(Notifier).
>
>
>  code_change(_OldVsn, State, _Extra) ->
>     {ok, State}.
>
>
> -changes_feed_loop(ChangesQueue) ->
> +changes_feed_loop() ->
>     {ok, RepDb} = couch_rep:ensure_rep_db_exists(),
> +    Server = self(),
>     Pid = spawn_link(
>         fun() ->
>             ChangesFeedFun = couch_changes:handle_changes(
> @@ -126,7 +141,8 @@ changes_feed_loop(ChangesQueue) ->
>                 fun({change, Change, _}, _) ->
>                     case has_valid_rep_id(Change) of
>                     true ->
> -                        couch_work_queue:queue(ChangesQueue, Change);
> +                        ok = gen_server:call(
> +                            Server, {rep_db_update, Change}, infinity);
>                     false ->
>                         ok
>                     end;
> @@ -137,7 +153,15 @@ changes_feed_loop(ChangesQueue) ->
>         end
>     ),
>     couch_db:close(RepDb),
> -    {ok, Pid}.
> +    {Pid, couch_db:name(RepDb)}.
> +
> +
> +has_valid_rep_id({Change}) ->
> +    has_valid_rep_id(get_value(<<"id">>, Change));
> +has_valid_rep_id(<<?DESIGN_DOC_PREFIX, _Rest/binary>>) ->
> +    false;
> +has_valid_rep_id(_Else) ->
> +    true.
>
>
>  db_update_notifier() ->
> @@ -146,121 +170,106 @@ db_update_notifier() ->
>         fun({created, DbName}) ->
>             case ?l2b(couch_config:get("replicator", "db", "_replicator")) of
>             DbName ->
> -                ok = gen_server:cast(Server, rep_db_created);
> +                ok = gen_server:cast(Server, {rep_db_created, DbName});
>             _ ->
>                 ok
>             end;
>         (_) ->
> +            % no need to handle the 'deleted' event - the changes feed loop
> +            % dies when the database is deleted
>             ok
>         end
>     ),
>     Notifier.
>
>
> -changes_processor(ChangesQueue) ->
> -    Pid = spawn_link(
> -        fun() ->
> -            ets:new(?DOC_TO_REP_ID_MAP, [named_table, set, private]),
> -            ets:new(?REP_ID_TO_DOC_ID_MAP, [named_table, set, private]),
> -            consume_changes(ChangesQueue),
> -            true = ets:delete(?REP_ID_TO_DOC_ID_MAP),
> -            true = ets:delete(?DOC_TO_REP_ID_MAP)
> -        end
> -    ),
> -    {ok, Pid}.
> -
> -
> -consume_changes(ChangesQueue) ->
> -    case couch_work_queue:dequeue(ChangesQueue) of
> -    closed ->
> -        ok;
> -    {ok, Changes} ->
> -        lists:foreach(fun process_change/1, Changes),
> -        consume_changes(ChangesQueue)
> -    end.
> -
> -
> -has_valid_rep_id({Change}) ->
> -    has_valid_rep_id(couch_util:get_value(<<"id">>, Change));
> -has_valid_rep_id(<<?DESIGN_DOC_PREFIX, _Rest/binary>>) ->
> -    false;
> -has_valid_rep_id(_Else) ->
> -    true.
> +restart(#state{changes_feed_loop = Loop, rep_start_pids = StartPids} = State) ->
> +    stop_all_replications(),
> +    lists:foreach(
> +        fun(Pid) ->
> +            catch unlink(Pid),
> +            catch exit(Pid, rep_db_changed)
> +        end,
> +        [Loop | StartPids]),
> +    {NewLoop, NewRepDbName} = changes_feed_loop(),
> +    State#state{
> +        changes_feed_loop = NewLoop,
> +        rep_db_name = NewRepDbName,
> +        rep_start_pids = []
> +    }.
>
> -process_change(stop_all_replications) ->
> -    ?LOG_INFO("Stopping all ongoing replications because the replicator DB "
> -        "was deleted or changed", []),
> -    stop_all_replications();
>
> -process_change({Change}) ->
> -    {RepProps} = JsonRepDoc = couch_util:get_value(doc, Change),
> -    DocId = couch_util:get_value(<<"_id">>, RepProps),
> -    case couch_util:get_value(<<"deleted">>, Change, false) of
> +process_update(State, {Change}) ->
> +    {RepProps} = JsonRepDoc = get_value(doc, Change),
> +    DocId = get_value(<<"_id">>, RepProps),
> +    case get_value(<<"deleted">>, Change, false) of
>     true ->
> -        rep_doc_deleted(DocId);
> +        rep_doc_deleted(DocId),
> +        State;
>     false ->
> -        case couch_util:get_value(<<"_replication_state">>, RepProps)
of
> +        case get_value(<<"_replication_state">>, RepProps) of
>         <<"completed">> ->
> -            replication_complete(DocId);
> +            replication_complete(DocId),
> +            State;
>         <<"error">> ->
> -            stop_replication(DocId);
> +            stop_replication(DocId),
> +            State;
>         <<"triggered">> ->
> -            maybe_start_replication(DocId, JsonRepDoc);
> +            maybe_start_replication(State, DocId, JsonRepDoc);
>         undefined ->
> -            maybe_start_replication(DocId, JsonRepDoc);
> -        _ ->
> -            ?LOG_ERROR("Invalid value for the `_replication_state` property"
> -                " of the replication document `~s`", [DocId])
> +            maybe_start_replication(State, DocId, JsonRepDoc)
>         end
> -    end,
> -    ok.
> +    end.
>
>
>  rep_user_ctx({RepDoc}) ->
> -    case couch_util:get_value(<<"user_ctx">>, RepDoc) of
> +    case get_value(<<"user_ctx">>, RepDoc) of
>     undefined ->
>         #user_ctx{roles = [<<"_admin">>]};
>     {UserCtx} ->
>         #user_ctx{
> -            name = couch_util:get_value(<<"name">>, UserCtx, null),
> -            roles = couch_util:get_value(<<"roles">>, UserCtx, [])
> +            name = get_value(<<"name">>, UserCtx, null),
> +            roles = get_value(<<"roles">>, UserCtx, [])
>         }
>     end.
>
>
> -maybe_start_replication(DocId, JsonRepDoc) ->
> +maybe_start_replication(State, DocId, JsonRepDoc) ->
>     UserCtx = rep_user_ctx(JsonRepDoc),
>     {BaseId, _} = RepId = couch_rep:make_replication_id(JsonRepDoc, UserCtx),
> -    case ets:lookup(?REP_ID_TO_DOC_ID_MAP, BaseId) of
> +    case ets:lookup(?REP_ID_TO_DOC_ID, BaseId) of
>     [] ->
> -        true = ets:insert(?REP_ID_TO_DOC_ID_MAP, {BaseId, DocId}),
> -        true = ets:insert(?DOC_TO_REP_ID_MAP, {DocId, RepId}),
> -        spawn_link(fun() -> start_replication(JsonRepDoc, RepId, UserCtx) end);
> +        true = ets:insert(?REP_ID_TO_DOC_ID, {BaseId, DocId}),
> +        true = ets:insert(?DOC_ID_TO_REP_ID, {DocId, RepId}),
> +        Pid = spawn_link(fun() ->
> +            start_replication(JsonRepDoc, RepId, UserCtx)
> +        end),
> +        State#state{rep_start_pids = [Pid | State#state.rep_start_pids]};
>     [{BaseId, DocId}] ->
> -        ok;
> +        State;
>     [{BaseId, OtherDocId}] ->
> -        maybe_tag_rep_doc(DocId, JsonRepDoc, ?l2b(BaseId), OtherDocId)
> +        ?LOG_INFO("The replication specified by the document `~s` was already"
> +            " triggered by the document `~s`", [DocId, OtherDocId]),
> +        maybe_tag_rep_doc(JsonRepDoc, ?l2b(BaseId)),
> +        State
>     end.
>
>
> -maybe_tag_rep_doc(DocId, {Props} = JsonRepDoc, RepId, OtherDocId) ->
> -    case couch_util:get_value(<<"_replication_id">>, Props) of
> +maybe_tag_rep_doc({Props} = JsonRepDoc, RepId) ->
> +    case get_value(<<"_replication_id">>, Props) of
>     RepId ->
>         ok;
>     _ ->
> -        ?LOG_INFO("The replication specified by the document `~s` was already"
> -            " triggered by the document `~s`", [DocId, OtherDocId]),
>         couch_rep:update_rep_doc(JsonRepDoc, [{<<"_replication_id">>,
RepId}])
>     end.
>
>
> -
> -start_replication({RepProps} = RepDoc, {Base, Ext} = RepId, UserCtx) ->
> +start_replication({RepProps} = RepDoc, {Base, _} = RepId, UserCtx) ->
>     case (catch couch_rep:start_replication(RepDoc, RepId, UserCtx)) of
> -    RepPid when is_pid(RepPid) ->
> +    Pid when is_pid(Pid) ->
>         ?LOG_INFO("Document `~s` triggered replication `~s`",
> -            [couch_util:get_value(<<"_id">>, RepProps), Base ++ Ext]),
> -        couch_rep:get_result(RepPid, RepId, RepDoc, UserCtx);
> +            [get_value(<<"_id">>, RepProps), pp_rep_id(RepId)]),
> +        couch_rep:get_result(Pid, RepId, RepDoc, UserCtx);
>     Error ->
>         couch_rep:update_rep_doc(
>             RepDoc,
> @@ -269,43 +278,54 @@ start_replication({RepProps} = RepDoc, {
>                 {<<"_replication_id">>, ?l2b(Base)}
>             ]
>         ),
> -        ?LOG_ERROR("Error starting replication `~s`: ~p", [Base ++ Ext, Error])
> +        ?LOG_ERROR("Error starting replication `~s`: ~p", [pp_rep_id(RepId), Error])
>     end.
>
> +
>  rep_doc_deleted(DocId) ->
>     case stop_replication(DocId) of
> -    {ok, {Base, Ext}} ->
> +    {ok, RepId} ->
>         ?LOG_INFO("Stopped replication `~s` because replication document `~s`"
> -            " was deleted", [Base ++ Ext, DocId]);
> +            " was deleted", [pp_rep_id(RepId), DocId]);
>     none ->
>         ok
>     end.
>
> +
>  replication_complete(DocId) ->
>     case stop_replication(DocId) of
> -    {ok, {Base, Ext}} ->
> +    {ok, RepId} ->
>         ?LOG_INFO("Replication `~s` finished (triggered by document `~s`)",
> -            [Base ++ Ext, DocId]);
> +            [pp_rep_id(RepId), DocId]);
>     none ->
>         ok
>     end.
>
> +
>  stop_replication(DocId) ->
> -    case ets:lookup(?DOC_TO_REP_ID_MAP, DocId) of
> +    case ets:lookup(?DOC_ID_TO_REP_ID, DocId) of
>     [{DocId, {BaseId, _} = RepId}] ->
>         couch_rep:end_replication(RepId),
> -        true = ets:delete(?REP_ID_TO_DOC_ID_MAP, BaseId),
> -        true = ets:delete(?DOC_TO_REP_ID_MAP, DocId),
> +        true = ets:delete(?REP_ID_TO_DOC_ID, BaseId),
> +        true = ets:delete(?DOC_ID_TO_REP_ID, DocId),
>         {ok, RepId};
>     [] ->
>         none
>     end.
>
> +
>  stop_all_replications() ->
> +    ?LOG_INFO("Stopping all ongoing replications because the replicator DB "
> +        "was deleted or changed", []),
>     ets:foldl(
>         fun({_, RepId}, _) -> couch_rep:end_replication(RepId) end,
>         ok,
> -        ?DOC_TO_REP_ID_MAP
> +        ?DOC_ID_TO_REP_ID
>     ),
> -    true = ets:delete_all_objects(?REP_ID_TO_DOC_ID_MAP),
> -    true = ets:delete_all_objects(?DOC_TO_REP_ID_MAP).
> +    true = ets:delete_all_objects(?REP_ID_TO_DOC_ID),
> +    true = ets:delete_all_objects(?DOC_ID_TO_REP_ID).
> +
> +
> +% pretty-print replication id
> +pp_rep_id({Base, Extension}) ->
> +    Base ++ Extension.
>
>
>

Is there any reason you are using named table here ? Why not just use
ets ids ? Also why using macros ?

Mime
View raw message