couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fdman...@apache.org
Subject svn commit: r1036711 - in /couchdb/branches/1.0.x: src/couchdb/couch_rep.erl src/couchdb/couch_rep_missing_revs.erl src/couchdb/couch_rep_reader.erl src/couchdb/couch_rep_writer.erl test/etap/112-replication-missing-revs.t
Date Fri, 19 Nov 2010 02:05:45 GMT
Author: fdmanana
Date: Fri Nov 19 02:05:45 2010
New Revision: 1036711

URL: http://svn.apache.org/viewvc?rev=1036711&view=rev
Log:
Merged revision 1036705 from trunk:

Make sure that after a local database compaction the old database reference counters don't
get unreleased forever because of a
continuous (or long) replication is going on.

Same type of issue as in COUCHDB-926.
Thanks Adam Kocoloski for some suggestions.


Modified:
    couchdb/branches/1.0.x/src/couchdb/couch_rep.erl
    couchdb/branches/1.0.x/src/couchdb/couch_rep_missing_revs.erl
    couchdb/branches/1.0.x/src/couchdb/couch_rep_reader.erl
    couchdb/branches/1.0.x/src/couchdb/couch_rep_writer.erl
    couchdb/branches/1.0.x/test/etap/112-replication-missing-revs.t

Modified: couchdb/branches/1.0.x/src/couchdb/couch_rep.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/1.0.x/src/couchdb/couch_rep.erl?rev=1036711&r1=1036710&r2=1036711&view=diff
==============================================================================
--- couchdb/branches/1.0.x/src/couchdb/couch_rep.erl (original)
+++ couchdb/branches/1.0.x/src/couchdb/couch_rep.erl Fri Nov 19 02:05:45 2010
@@ -47,7 +47,9 @@
     committed_seq = 0,
 
     stats = nil,
-    doc_ids = nil
+    doc_ids = nil,
+    source_db_update_notifier = nil,
+    target_db_update_notifier = nil
 }).
 
 %% convenience function to do a simple replication from the shell
@@ -196,7 +198,9 @@ do_init([RepId, {PostProps}, UserCtx] = 
         rep_starttime = httpd_util:rfc1123_date(),
         src_starttime = couch_util:get_value(instance_start_time, SourceInfo),
         tgt_starttime = couch_util:get_value(instance_start_time, TargetInfo),
-        doc_ids = DocIds
+        doc_ids = DocIds,
+        source_db_update_notifier = source_db_update_notifier(Source),
+        target_db_update_notifier = target_db_update_notifier(Target)
     },
     {ok, State}.
 
@@ -204,7 +208,21 @@ handle_call(get_result, From, #state{com
     {stop, normal, State#state{listeners=[From]}};
 handle_call(get_result, From, State) ->
     Listeners = State#state.listeners,
-    {noreply, State#state{listeners=[From|Listeners]}}.
+    {noreply, State#state{listeners=[From|Listeners]}};
+
+handle_call(get_source_db, _From, #state{source = Source} = State) ->
+    {reply, {ok, Source}, State};
+
+handle_call(get_target_db, _From, #state{target = Target} = State) ->
+    {reply, {ok, Target}, State}.
+
+handle_cast(reopen_source_db, #state{source = Source} = State) ->
+    {ok, NewSource} = couch_db:reopen(Source),
+    {noreply, State#state{source = NewSource}};
+
+handle_cast(reopen_target_db, #state{target = Target} = State) ->
+    {ok, NewTarget} = couch_db:reopen(Target),
+    {noreply, State#state{target = NewTarget}};
 
 handle_cast(do_checkpoint, State) ->
     {noreply, do_checkpoint(State)};
@@ -422,13 +440,20 @@ do_terminate(State) ->
         false ->
             [gen_server:reply(R, retry) || R <- OtherListeners]
     end,
+    couch_task_status:update("Finishing"),
     terminate_cleanup(State).
 
-terminate_cleanup(#state{source=Source, target=Target, stats=Stats}) ->
-    couch_task_status:update("Finishing"),
-    close_db(Target),
-    close_db(Source),
-    ets:delete(Stats).
+terminate_cleanup(State) ->
+    close_db(State#state.source),
+    close_db(State#state.target),
+    stop_db_update_notifier(State#state.source_db_update_notifier),
+    stop_db_update_notifier(State#state.target_db_update_notifier),
+    ets:delete(State#state.stats).
+
+stop_db_update_notifier(nil) ->
+    ok;
+stop_db_update_notifier(Notifier) ->
+    couch_db_update_notifier:stop(Notifier).
 
 has_session_id(_SessionId, []) ->
     false;
@@ -752,3 +777,27 @@ parse_proxy_params(ProxyUrl) ->
         true ->
             [{proxy_user, User}, {proxy_password, Passwd}]
         end.
+
+source_db_update_notifier(#db{name = DbName}) ->
+    Server = self(),
+    {ok, Notifier} = couch_db_update_notifier:start_link(
+        fun({compacted, DbName1}) when DbName1 =:= DbName ->
+                ok = gen_server:cast(Server, reopen_source_db);
+            (_) ->
+                ok
+        end),
+    Notifier;
+source_db_update_notifier(_) ->
+    nil.
+
+target_db_update_notifier(#db{name = DbName}) ->
+    Server = self(),
+    {ok, Notifier} = couch_db_update_notifier:start_link(
+        fun({compacted, DbName1}) when DbName1 =:= DbName ->
+                ok = gen_server:cast(Server, reopen_target_db);
+            (_) ->
+                ok
+        end),
+    Notifier;
+target_db_update_notifier(_) ->
+    nil.

Modified: couchdb/branches/1.0.x/src/couchdb/couch_rep_missing_revs.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/1.0.x/src/couchdb/couch_rep_missing_revs.erl?rev=1036711&r1=1036710&r2=1036711&view=diff
==============================================================================
--- couchdb/branches/1.0.x/src/couchdb/couch_rep_missing_revs.erl (original)
+++ couchdb/branches/1.0.x/src/couchdb/couch_rep_missing_revs.erl Fri Nov 19 02:05:45 2010
@@ -24,7 +24,6 @@
 -record (state, {
     changes_loop,
     changes_from = nil,
-    target,
     parent,
     complete = false,
     count = 0,
@@ -44,11 +43,11 @@ next(Server) ->
 stop(Server) ->
     gen_server:call(Server, stop).
 
-init([Parent, Target, ChangesFeed, _PostProps]) ->
+init([Parent, _Target, ChangesFeed, _PostProps]) ->
     process_flag(trap_exit, true),
     Self = self(),
-    Pid = spawn_link(fun() -> changes_loop(Self, ChangesFeed, Target) end),
-    {ok, #state{changes_loop=Pid, target=Target, parent=Parent}}.
+    Pid = spawn_link(fun() -> changes_loop(Self, ChangesFeed, Parent) end),
+    {ok, #state{changes_loop=Pid, parent=Parent}}.
 
 handle_call({add_missing_revs, {HighSeq, Revs}}, From, State) ->
     State#state.parent ! {update_stats, missing_revs, length(Revs)},
@@ -133,15 +132,16 @@ handle_changes_loop_exit(normal, State) 
 handle_changes_loop_exit(Reason, State) ->
     {stop, Reason, State#state{changes_loop=nil}}.
 
-changes_loop(OurServer, SourceChangesServer, Target) ->
+changes_loop(OurServer, SourceChangesServer, Parent) ->
     case couch_rep_changes_feed:next(SourceChangesServer) of
     complete ->
         exit(normal);
     Changes ->
+        {ok, Target} = gen_server:call(Parent, get_target_db, infinity),
         MissingRevs = get_missing_revs(Target, Changes),
         gen_server:call(OurServer, {add_missing_revs, MissingRevs}, infinity)
     end,
-    changes_loop(OurServer, SourceChangesServer, Target).
+    changes_loop(OurServer, SourceChangesServer, Parent).
 
 get_missing_revs(#http_db{}=Target, Changes) ->
     Transform = fun({Props}) ->

Modified: couchdb/branches/1.0.x/src/couchdb/couch_rep_reader.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/1.0.x/src/couchdb/couch_rep_reader.erl?rev=1036711&r1=1036710&r2=1036711&view=diff
==============================================================================
--- couchdb/branches/1.0.x/src/couchdb/couch_rep_reader.erl (original)
+++ couchdb/branches/1.0.x/src/couchdb/couch_rep_reader.erl Fri Nov 19 02:05:45 2010
@@ -60,7 +60,7 @@ init([Parent, Source, MissingRevs_or_Doc
     true -> ok end,
     Self = self(),
     ReaderLoop = spawn_link(
-        fun() -> reader_loop(Self, Source, MissingRevs_or_DocIds) end
+        fun() -> reader_loop(Self, Parent, Source, MissingRevs_or_DocIds) end
     ),
     MissingRevs = case MissingRevs_or_DocIds of
     Pid when is_pid(Pid) ->
@@ -281,12 +281,13 @@ open_doc(#http_db{url = Url} = DbS, DocI
         []
     end.
 
-reader_loop(ReaderServer, Source, DocIds) when is_list(DocIds) ->
-    case Source of
+reader_loop(ReaderServer, Parent, Source1, DocIds) when is_list(DocIds) ->
+    case Source1 of
     #http_db{} ->
         [gen_server:call(ReaderServer, {open_remote_doc, Id, nil, nil},
             infinity) || Id <- DocIds];
     _LocalDb ->
+        {ok, Source} = gen_server:call(Parent, get_source_db, infinity),
         Docs = lists:foldr(fun(Id, Acc) ->
             case couch_db:open_doc(Source, Id) of
             {ok, Doc} ->
@@ -299,7 +300,7 @@ reader_loop(ReaderServer, Source, DocIds
     end,
     exit(complete);
     
-reader_loop(ReaderServer, Source, MissingRevsServer) ->
+reader_loop(ReaderServer, Parent, Source, MissingRevsServer) ->
     case couch_rep_missing_revs:next(MissingRevsServer) of
     complete ->
         exit(complete);
@@ -312,22 +313,23 @@ reader_loop(ReaderServer, Source, Missin
         #http_db{} ->
             [gen_server:call(ReaderServer, {open_remote_doc, Id, Seq, Revs},
                 infinity) || {Id,Seq,Revs} <- SortedIdsRevs],
-            reader_loop(ReaderServer, Source, MissingRevsServer);
+            reader_loop(ReaderServer, Parent, Source, MissingRevsServer);
         _Local ->
-            Source2 = maybe_reopen_db(Source, HighSeq),
+            {ok, Source1} = gen_server:call(Parent, get_source_db, infinity),
+            Source2 = maybe_reopen_db(Source1, HighSeq),
             lists:foreach(fun({Id,Seq,Revs}) ->
                 {ok, Docs} = couch_db:open_doc_revs(Source2, Id, Revs, [latest]),
                 JustTheDocs = [Doc || {ok, Doc} <- Docs],
                 gen_server:call(ReaderServer, {add_docs, Seq, JustTheDocs},
                     infinity)
             end, SortedIdsRevs),
-            reader_loop(ReaderServer, Source2, MissingRevsServer)
+            couch_db:close(Source2),
+            reader_loop(ReaderServer, Parent, Source2, MissingRevsServer)
         end
     end.
 
 maybe_reopen_db(#db{update_seq=OldSeq} = Db, HighSeq) when HighSeq > OldSeq ->
     {ok, NewDb} = couch_db:open(Db#db.name, [{user_ctx, Db#db.user_ctx}]),
-    couch_db:close(Db),
     NewDb;
 maybe_reopen_db(Db, _HighSeq) ->
     Db.

Modified: couchdb/branches/1.0.x/src/couchdb/couch_rep_writer.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/1.0.x/src/couchdb/couch_rep_writer.erl?rev=1036711&r1=1036710&r2=1036711&view=diff
==============================================================================
--- couchdb/branches/1.0.x/src/couchdb/couch_rep_writer.erl (original)
+++ couchdb/branches/1.0.x/src/couchdb/couch_rep_writer.erl Fri Nov 19 02:05:45 2010
@@ -16,10 +16,10 @@
 
 -include("couch_db.hrl").
 
-start_link(Parent, Target, Reader, _PostProps) ->
-    {ok, spawn_link(fun() -> writer_loop(Parent, Reader, Target) end)}.
+start_link(Parent, _Target, Reader, _PostProps) ->
+    {ok, spawn_link(fun() -> writer_loop(Parent, Reader) end)}.
 
-writer_loop(Parent, Reader, Target) ->
+writer_loop(Parent, Reader) ->
     case couch_rep_reader:next(Reader) of
     {complete, nil} ->
         ok;
@@ -28,6 +28,7 @@ writer_loop(Parent, Reader, Target) ->
         ok;
     {HighSeq, Docs} ->
         DocCount = length(Docs),
+        {ok, Target} = gen_server:call(Parent, get_target_db, infinity),
         try write_docs(Target, Docs) of
         {ok, []} ->
             Parent ! {update_stats, docs_written, DocCount};
@@ -48,7 +49,7 @@ writer_loop(Parent, Reader, Target) ->
         end,
         couch_rep_att:cleanup(),
         couch_util:should_flush(),
-        writer_loop(Parent, Reader, Target)
+        writer_loop(Parent, Reader)
     end.
 
 write_docs(#http_db{} = Db, Docs) ->

Modified: couchdb/branches/1.0.x/test/etap/112-replication-missing-revs.t
URL: http://svn.apache.org/viewvc/couchdb/branches/1.0.x/test/etap/112-replication-missing-revs.t?rev=1036711&r1=1036710&r2=1036711&view=diff
==============================================================================
--- couchdb/branches/1.0.x/test/etap/112-replication-missing-revs.t (original)
+++ couchdb/branches/1.0.x/test/etap/112-replication-missing-revs.t Fri Nov 19 02:05:45 2010
@@ -188,8 +188,21 @@ start_changes_feed(remote, Since, Contin
     Db = #http_db{url = "http://127.0.0.1:5984/etap-test-source/"},
     couch_rep_changes_feed:start_link(self(), Db, Since, Props).
 
+couch_rep_pid(Db) ->
+    spawn(fun() -> couch_rep_pid_loop(Db) end).
+
+couch_rep_pid_loop(Db) ->
+    receive
+    {'$gen_call', From, get_target_db} ->
+        gen_server:reply(From, {ok, Db})
+    end,
+    couch_rep_pid_loop(Db).
+
 start_missing_revs(local, Changes) ->
-    couch_rep_missing_revs:start_link(self(), get_db(target), Changes, []);
+    TargetDb = get_db(target),
+    MainPid = couch_rep_pid(TargetDb),
+    couch_rep_missing_revs:start_link(MainPid, TargetDb, Changes, []);
 start_missing_revs(remote, Changes) ->
-    Db = #http_db{url = "http://127.0.0.1:5984/etap-test-target/"},
-    couch_rep_missing_revs:start_link(self(), Db, Changes, []).
+    TargetDb = #http_db{url = "http://127.0.0.1:5984/etap-test-target/"},
+    MainPid = couch_rep_pid(TargetDb),
+    couch_rep_missing_revs:start_link(MainPid, TargetDb, Changes, []).



Mime
View raw message