couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fdman...@apache.org
Subject svn commit: r1036705 - in /couchdb/trunk: src/couchdb/couch_rep.erl src/couchdb/couch_rep_missing_revs.erl src/couchdb/couch_rep_reader.erl src/couchdb/couch_rep_writer.erl test/etap/112-replication-missing-revs.t
Date Fri, 19 Nov 2010 01:38:41 GMT
Author: fdmanana
Date: Fri Nov 19 01:38:41 2010
New Revision: 1036705

URL: http://svn.apache.org/viewvc?rev=1036705&view=rev
Log:
Make sure that after a local database compaction the old database reference counters don't
get unreleased forever because of a
continuous (or long) replication is going on.

Same type of issue as in COUCHDB-926.
Thanks Adam Kocoloski for some suggestions.


Modified:
    couchdb/trunk/src/couchdb/couch_rep.erl
    couchdb/trunk/src/couchdb/couch_rep_missing_revs.erl
    couchdb/trunk/src/couchdb/couch_rep_reader.erl
    couchdb/trunk/src/couchdb/couch_rep_writer.erl
    couchdb/trunk/test/etap/112-replication-missing-revs.t

Modified: couchdb/trunk/src/couchdb/couch_rep.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_rep.erl?rev=1036705&r1=1036704&r2=1036705&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_rep.erl (original)
+++ couchdb/trunk/src/couchdb/couch_rep.erl Fri Nov 19 01:38:41 2010
@@ -53,7 +53,9 @@
     committed_seq = 0,
 
     stats = nil,
-    rep_doc = nil
+    rep_doc = nil,
+    source_db_update_notifier = nil,
+    target_db_update_notifier = nil
 }).
 
 %% convenience function to do a simple replication from the shell
@@ -187,7 +189,9 @@ do_init([RepId, {PostProps} = RepDoc, Us
         rep_starttime = httpd_util:rfc1123_date(),
         src_starttime = couch_util:get_value(instance_start_time, SourceInfo),
         tgt_starttime = couch_util:get_value(instance_start_time, TargetInfo),
-        rep_doc = RepDoc
+        rep_doc = RepDoc,
+        source_db_update_notifier = source_db_update_notifier(Source),
+        target_db_update_notifier = target_db_update_notifier(Target)
     },
     {ok, State}.
 
@@ -195,7 +199,21 @@ handle_call(get_result, From, #state{com
     {stop, normal, State#state{listeners=[From]}};
 handle_call(get_result, From, State) ->
     Listeners = State#state.listeners,
-    {noreply, State#state{listeners=[From|Listeners]}}.
+    {noreply, State#state{listeners=[From|Listeners]}};
+
+handle_call(get_source_db, _From, #state{source = Source} = State) ->
+    {reply, {ok, Source}, State};
+
+handle_call(get_target_db, _From, #state{target = Target} = State) ->
+    {reply, {ok, Target}, State}.
+
+handle_cast(reopen_source_db, #state{source = Source} = State) ->
+    {ok, NewSource} = couch_db:reopen(Source),
+    {noreply, State#state{source = NewSource}};
+
+handle_cast(reopen_target_db, #state{target = Target} = State) ->
+    {ok, NewTarget} = couch_db:reopen(Target),
+    {noreply, State#state{target = NewTarget}};
 
 handle_cast(do_checkpoint, State) ->
     {noreply, do_checkpoint(State)};
@@ -255,11 +273,11 @@ terminate(normal, State) ->
 terminate(shutdown, #state{listeners = Listeners} = State) ->
     % continuous replication stopped
     [gen_server:reply(L, {ok, stopped}) || L <- Listeners],
-    do_forced_terminate(State);
+    terminate_cleanup(State);
 
 terminate(Reason, #state{listeners = Listeners} = State) ->
     [gen_server:reply(L, {error, Reason}) || L <- Listeners],
-    do_forced_terminate(State),
+    terminate_cleanup(State),
     update_rep_doc(State#state.rep_doc, [{<<"state">>, <<"error">>}]).
 
 code_change(_OldVsn, State, _Extra) ->
@@ -267,11 +285,6 @@ code_change(_OldVsn, State, _Extra) ->
 
 % internal funs
 
-do_forced_terminate(#state{source = Source, target = Target, stats = Stats}) ->
-    ets:delete(Stats),
-    close_db(Target),
-    close_db(Source).
-
 start_replication_server(Replicator) ->
     RepId = element(1, Replicator),
     case supervisor:start_child(couch_rep_sup, Replicator) of
@@ -399,13 +412,20 @@ do_terminate(State) ->
         false ->
             [gen_server:reply(R, retry) || R <- OtherListeners]
     end,
+    couch_task_status:update("Finishing"),
     terminate_cleanup(State).
 
-terminate_cleanup(#state{source=Source, target=Target, stats=Stats}) ->
-    couch_task_status:update("Finishing"),
-    close_db(Target),
-    close_db(Source),
-    ets:delete(Stats).
+terminate_cleanup(State) ->
+    close_db(State#state.source),
+    close_db(State#state.target),
+    stop_db_update_notifier(State#state.source_db_update_notifier),
+    stop_db_update_notifier(State#state.target_db_update_notifier),
+    ets:delete(State#state.stats).
+
+stop_db_update_notifier(nil) ->
+    ok;
+stop_db_update_notifier(Notifier) ->
+    couch_db_update_notifier:stop(Notifier).
 
 has_session_id(_SessionId, []) ->
     false;
@@ -887,3 +907,27 @@ ensure_rep_ddoc_exists(RepDb, DDocID) ->
         {ok, _Rev} = couch_db:update_doc(RepDb, DDoc, [])
     end,
     ok.
+
+source_db_update_notifier(#db{name = DbName}) ->
+    Server = self(),
+    {ok, Notifier} = couch_db_update_notifier:start_link(
+        fun({compacted, DbName1}) when DbName1 =:= DbName ->
+                ok = gen_server:cast(Server, reopen_source_db);
+            (_) ->
+                ok
+        end),
+    Notifier;
+source_db_update_notifier(_) ->
+    nil.
+
+target_db_update_notifier(#db{name = DbName}) ->
+    Server = self(),
+    {ok, Notifier} = couch_db_update_notifier:start_link(
+        fun({compacted, DbName1}) when DbName1 =:= DbName ->
+                ok = gen_server:cast(Server, reopen_target_db);
+            (_) ->
+                ok
+        end),
+    Notifier;
+target_db_update_notifier(_) ->
+    nil.

Modified: couchdb/trunk/src/couchdb/couch_rep_missing_revs.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_rep_missing_revs.erl?rev=1036705&r1=1036704&r2=1036705&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_rep_missing_revs.erl (original)
+++ couchdb/trunk/src/couchdb/couch_rep_missing_revs.erl Fri Nov 19 01:38:41 2010
@@ -24,7 +24,6 @@
 -record (state, {
     changes_loop,
     changes_from = nil,
-    target,
     parent,
     complete = false,
     count = 0,
@@ -44,11 +43,11 @@ next(Server) ->
 stop(Server) ->
     gen_server:call(Server, stop).
 
-init([Parent, Target, ChangesFeed, _PostProps]) ->
+init([Parent, _Target, ChangesFeed, _PostProps]) ->
     process_flag(trap_exit, true),
     Self = self(),
-    Pid = spawn_link(fun() -> changes_loop(Self, ChangesFeed, Target) end),
-    {ok, #state{changes_loop=Pid, target=Target, parent=Parent}}.
+    Pid = spawn_link(fun() -> changes_loop(Self, ChangesFeed, Parent) end),
+    {ok, #state{changes_loop=Pid, parent=Parent}}.
 
 handle_call({add_missing_revs, {HighSeq, Revs}}, From, State) ->
     State#state.parent ! {update_stats, missing_revs, length(Revs)},
@@ -133,15 +132,16 @@ handle_changes_loop_exit(normal, State) 
 handle_changes_loop_exit(Reason, State) ->
     {stop, Reason, State#state{changes_loop=nil}}.
 
-changes_loop(OurServer, SourceChangesServer, Target) ->
+changes_loop(OurServer, SourceChangesServer, Parent) ->
     case couch_rep_changes_feed:next(SourceChangesServer) of
     complete ->
         exit(normal);
     Changes ->
+        {ok, Target} = gen_server:call(Parent, get_target_db, infinity),
         MissingRevs = get_missing_revs(Target, Changes),
         gen_server:call(OurServer, {add_missing_revs, MissingRevs}, infinity)
     end,
-    changes_loop(OurServer, SourceChangesServer, Target).
+    changes_loop(OurServer, SourceChangesServer, Parent).
 
 get_missing_revs(#http_db{}=Target, Changes) ->
     Transform = fun({Props}) ->

Modified: couchdb/trunk/src/couchdb/couch_rep_reader.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_rep_reader.erl?rev=1036705&r1=1036704&r2=1036705&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_rep_reader.erl (original)
+++ couchdb/trunk/src/couchdb/couch_rep_reader.erl Fri Nov 19 01:38:41 2010
@@ -57,7 +57,8 @@ init([Parent, Source, MissingRevs, _Post
         ibrowse:set_max_pipeline_size(Host, Port, ?MAX_PIPELINE_SIZE);
     true -> ok end,
     Self = self(),
-    ReaderLoop = spawn_link(fun() -> reader_loop(Self, Source, MissingRevs) end),
+    ReaderLoop = spawn_link(
+        fun() -> reader_loop(Self, Parent, Source, MissingRevs) end),
     State = #state{
         parent = Parent,
         source = Source,
@@ -247,7 +248,7 @@ open_doc_revs(#http_db{url = Url} = DbS,
     end,
     lists:reverse(lists:foldl(Transform, [], JsonResults)).
 
-reader_loop(ReaderServer, Source, MissingRevsServer) ->
+reader_loop(ReaderServer, Parent, Source, MissingRevsServer) ->
     case couch_rep_missing_revs:next(MissingRevsServer) of
     complete ->
         exit(complete);
@@ -260,22 +261,23 @@ reader_loop(ReaderServer, Source, Missin
         #http_db{} ->
             [gen_server:call(ReaderServer, {open_remote_doc, Id, Seq, Revs},
                 infinity) || {Id,Seq,Revs} <- SortedIdsRevs],
-            reader_loop(ReaderServer, Source, MissingRevsServer);
+            reader_loop(ReaderServer, Parent, Source, MissingRevsServer);
         _Local ->
-            Source2 = maybe_reopen_db(Source, HighSeq),
+            {ok, Source1} = gen_server:call(Parent, get_source_db, infinity),
+            Source2 = maybe_reopen_db(Source1, HighSeq),
             lists:foreach(fun({Id,Seq,Revs}) ->
                 {ok, Docs} = couch_db:open_doc_revs(Source2, Id, Revs, [latest]),
                 JustTheDocs = [Doc || {ok, Doc} <- Docs],
                 gen_server:call(ReaderServer, {add_docs, Seq, JustTheDocs},
                     infinity)
             end, SortedIdsRevs),
-            reader_loop(ReaderServer, Source2, MissingRevsServer)
+            couch_db:close(Source2),
+            reader_loop(ReaderServer, Parent, Source2, MissingRevsServer)
         end
     end.
 
 maybe_reopen_db(#db{update_seq=OldSeq} = Db, HighSeq) when HighSeq > OldSeq ->
     {ok, NewDb} = couch_db:open(Db#db.name, [{user_ctx, Db#db.user_ctx}]),
-    couch_db:close(Db),
     NewDb;
 maybe_reopen_db(Db, _HighSeq) ->
     Db.

Modified: couchdb/trunk/src/couchdb/couch_rep_writer.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_rep_writer.erl?rev=1036705&r1=1036704&r2=1036705&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_rep_writer.erl (original)
+++ couchdb/trunk/src/couchdb/couch_rep_writer.erl Fri Nov 19 01:38:41 2010
@@ -16,16 +16,17 @@
 
 -include("couch_db.hrl").
 
-start_link(Parent, Target, Reader, _PostProps) ->
-    {ok, spawn_link(fun() -> writer_loop(Parent, Reader, Target) end)}.
+start_link(Parent, _Target, Reader, _PostProps) ->
+    {ok, spawn_link(fun() -> writer_loop(Parent, Reader) end)}.
 
-writer_loop(Parent, Reader, Target) ->
+writer_loop(Parent, Reader) ->
     case couch_rep_reader:next(Reader) of
     {complete, FinalSeq} ->
         Parent ! {writer_checkpoint, FinalSeq},
         ok;
     {HighSeq, Docs} ->
         DocCount = length(Docs),
+        {ok, Target} = gen_server:call(Parent, get_target_db, infinity),
         try write_docs(Target, Docs) of
         {ok, []} ->
             Parent ! {update_stats, docs_written, DocCount};
@@ -41,7 +42,7 @@ writer_loop(Parent, Reader, Target) ->
         Parent ! {writer_checkpoint, HighSeq},
         couch_rep_att:cleanup(),
         couch_util:should_flush(),
-        writer_loop(Parent, Reader, Target)
+        writer_loop(Parent, Reader)
     end.
 
 write_docs(#http_db{} = Db, Docs) ->

Modified: couchdb/trunk/test/etap/112-replication-missing-revs.t
URL: http://svn.apache.org/viewvc/couchdb/trunk/test/etap/112-replication-missing-revs.t?rev=1036705&r1=1036704&r2=1036705&view=diff
==============================================================================
--- couchdb/trunk/test/etap/112-replication-missing-revs.t (original)
+++ couchdb/trunk/test/etap/112-replication-missing-revs.t Fri Nov 19 01:38:41 2010
@@ -188,8 +188,21 @@ start_changes_feed(remote, Since, Contin
     Db = #http_db{url = "http://127.0.0.1:5984/etap-test-source/"},
     couch_rep_changes_feed:start_link(self(), Db, Since, Props).
 
+couch_rep_pid(Db) ->
+    spawn(fun() -> couch_rep_pid_loop(Db) end).
+
+couch_rep_pid_loop(Db) ->
+    receive
+    {'$gen_call', From, get_target_db} ->
+        gen_server:reply(From, {ok, Db})
+    end,
+    couch_rep_pid_loop(Db).
+
 start_missing_revs(local, Changes) ->
-    couch_rep_missing_revs:start_link(self(), get_db(target), Changes, []);
+    TargetDb = get_db(target),
+    MainPid = couch_rep_pid(TargetDb),
+    couch_rep_missing_revs:start_link(MainPid, TargetDb, Changes, []);
 start_missing_revs(remote, Changes) ->
-    Db = #http_db{url = "http://127.0.0.1:5984/etap-test-target/"},
-    couch_rep_missing_revs:start_link(self(), Db, Changes, []).
+    TargetDb = #http_db{url = "http://127.0.0.1:5984/etap-test-target/"},
+    MainPid = couch_rep_pid(TargetDb),
+    couch_rep_missing_revs:start_link(MainPid, TargetDb, Changes, []).



Mime
View raw message