couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fdman...@apache.org
Subject svn commit: r1040492 - in /couchdb/branches/new_replicator/src/couchdb: couch_api_wrap.erl couch_db.erl couch_db_updater.erl couch_replicator.erl couch_replicator_doc_copiers.erl couch_replicator_rev_finders.erl
Date Tue, 30 Nov 2010 12:23:55 GMT
Author: fdmanana
Date: Tue Nov 30 12:23:54 2010
New Revision: 1040492

URL: http://svn.apache.org/viewvc?rev=1040492&view=rev
Log:
New replicator: avoid unreleased file handles after compaction of a local endpoint (backport
of COUCHDB-926 related fixes).

Modified:
    couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl
    couchdb/branches/new_replicator/src/couchdb/couch_db.erl
    couchdb/branches/new_replicator/src/couchdb/couch_db_updater.erl
    couchdb/branches/new_replicator/src/couchdb/couch_replicator.erl
    couchdb/branches/new_replicator/src/couchdb/couch_replicator_doc_copiers.erl
    couchdb/branches/new_replicator/src/couchdb/couch_replicator_rev_finders.erl

Modified: couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl?rev=1040492&r1=1040491&r2=1040492&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl Tue Nov 30 12:23:54 2010
@@ -24,7 +24,6 @@
 -export([
     db_open/2,
     db_open/3,
-    maybe_reopen_db/2,
     db_close/1,
     get_db_info/1,
     update_doc/3,
@@ -190,17 +189,6 @@ open_doc(Db, Id, Options) ->
     couch_db:open_doc(Db, Id, Options).
 
 
-maybe_reopen_db(#httpdb{} = Db, _TargetSeq) ->
-    Db;
-maybe_reopen_db(#db{update_seq = UpSeq, main_pid = Pid} = Db, TargetSeq) ->
-    case TargetSeq > UpSeq of
-    true ->
-        {ok, Db2} = gen_server:call(Pid, get_db, infinity),
-        Db2;
-    false ->
-        Db
-    end.
-
 update_doc(Db, Doc, Options) ->
     update_doc(Db, Doc, Options, interactive_edit).
 

Modified: couchdb/branches/new_replicator/src/couchdb/couch_db.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_db.erl?rev=1040492&r1=1040491&r2=1040492&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_db.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_db.erl Tue Nov 30 12:23:54 2010
@@ -27,6 +27,7 @@
 -export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]).
 -export([changes_since/5,changes_since/6,read_doc/2,new_revid/1]).
 -export([check_is_admin/1, check_is_reader/1]).
+-export([reopen/1]).
 
 -include("couch_db.hrl").
 
@@ -86,6 +87,18 @@ open(DbName, Options) ->
         Else -> Else
     end.
 
+reopen(#db{main_pid = Pid, fd_ref_counter = OldRefCntr, user_ctx = UserCtx}) ->
+    {ok, #db{fd_ref_counter = NewRefCntr} = NewDb} =
+        gen_server:call(Pid, get_db, infinity),
+    case NewRefCntr =:= OldRefCntr of
+    true ->
+        ok;
+    false ->
+        couch_ref_counter:add(NewRefCntr),
+        couch_ref_counter:drop(OldRefCntr)
+    end,
+    {ok, NewDb#db{user_ctx = UserCtx}}.
+
 ensure_full_commit(#db{update_pid=UpdatePid,instance_start_time=StartTime}) ->
     ok = gen_server:call(UpdatePid, full_commit, infinity),
     {ok, StartTime}.

Modified: couchdb/branches/new_replicator/src/couchdb/couch_db_updater.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_db_updater.erl?rev=1040492&r1=1040491&r2=1040492&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_db_updater.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_db_updater.erl Tue Nov 30 12:23:54 2010
@@ -175,6 +175,7 @@ handle_cast({compact_done, CompactFilepa
         ok = file:rename(CompactFilepath, Filepath),
         close_db(Db),
         ok = gen_server:call(Db#db.main_pid, {db_updated, NewDb2}),
+        couch_db_update_notifier:notify({compacted, NewDb2#db.name}),
         ?LOG_INFO("Compaction for db \"~s\" completed.", [Db#db.name]),
         {noreply, NewDb2#db{compactor_pid=nil}};
     false ->

Modified: couchdb/branches/new_replicator/src/couchdb/couch_replicator.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_replicator.erl?rev=1040492&r1=1040491&r2=1040492&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_replicator.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_replicator.erl Tue Nov 30 12:23:54 2010
@@ -58,7 +58,9 @@
     doc_copiers,
     seqs_in_progress = gb_sets:empty(),
     stats = #rep_stats{},
-    session_id
+    session_id,
+    source_db_update_notifier = nil,
+    target_db_update_notifier = nil
     }).
 
 
@@ -326,6 +328,14 @@ handle_call(Msg, _From, State) ->
     {stop, unexpected_sync_message, State}.
 
 
+handle_cast(reopen_source_db, #rep_state{source = Source} = State) ->
+    {ok, NewSource} = couch_db:reopen(Source),
+    {noreply, State#rep_state{source = NewSource}};
+
+handle_cast(reopen_target_db, #rep_state{target = Target} = State) ->
+    {ok, NewTarget} = couch_db:reopen(Target),
+    {noreply, State#rep_state{target = NewTarget}};
+
 handle_cast(checkpoint, State) ->
     State2 = do_checkpoint(State),
     {noreply, State2#rep_state{timer = start_timer(State)}};
@@ -373,9 +383,11 @@ terminate(Reason, #rep_state{rep_details
     couch_replication_notifier:notify({error, RepId, Reason}).
 
 
-terminate_cleanup(#rep_state{source = Source, target = Target}) ->
-    couch_api_wrap:db_close(Source),
-    couch_api_wrap:db_close(Target).
+terminate_cleanup(State) ->
+    stop_db_update_notifier(State#rep_state.source_db_update_notifier),
+    stop_db_update_notifier(State#rep_state.target_db_update_notifier),
+    couch_api_wrap:db_close(State#rep_state.source),
+    couch_api_wrap:db_close(State#rep_state.target).
 
 
 do_last_checkpoint(State) ->
@@ -455,7 +467,9 @@ init_state(Rep) ->
         rep_starttime = httpd_util:rfc1123_date(),
         src_starttime = get_value(<<"instance_start_time">>, SourceInfo),
         tgt_starttime = get_value(<<"instance_start_time">>, TargetInfo),
-        session_id = couch_uuids:random()
+        session_id = couch_uuids:random(),
+        source_db_update_notifier = source_db_update_notifier(Source),
+        target_db_update_notifier = target_db_update_notifier(Target)
     },
     State#rep_state{timer = start_timer(State)}.
 
@@ -711,3 +725,34 @@ sum_stats([Stats1 | RestStats]) ->
             }
         end,
         Stats1, RestStats).
+
+
+source_db_update_notifier(#db{name = DbName}) ->
+    Server = self(),
+    {ok, Notifier} = couch_db_update_notifier:start_link(
+        fun({compacted, DbName1}) when DbName1 =:= DbName ->
+                ok = gen_server:cast(Server, reopen_source_db);
+            (_) ->
+                ok
+        end),
+    Notifier;
+source_db_update_notifier(_) ->
+    nil.
+
+target_db_update_notifier(#db{name = DbName}) ->
+    Server = self(),
+    {ok, Notifier} = couch_db_update_notifier:start_link(
+        fun({compacted, DbName1}) when DbName1 =:= DbName ->
+                ok = gen_server:cast(Server, reopen_target_db);
+            (_) ->
+                ok
+        end),
+    Notifier;
+target_db_update_notifier(_) ->
+    nil.
+
+
+stop_db_update_notifier(nil) ->
+    ok;
+stop_db_update_notifier(Notifier) ->
+    couch_db_update_notifier:stop(Notifier).

Modified: couchdb/branches/new_replicator/src/couchdb/couch_replicator_doc_copiers.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_replicator_doc_copiers.erl?rev=1040492&r1=1040491&r2=1040492&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_replicator_doc_copiers.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_replicator_doc_copiers.erl Tue Nov 30
12:23:54 2010
@@ -88,14 +88,15 @@ handle_call({fetch_doc, Params}, {Pid, _
 
 handle_call({add_doc, Doc}, _From, #state{target = Target} = State) ->
     #state{docs = DocAcc, size_docs = SizeAcc, stats = S} = State,
-    {DocAcc2, SizeAcc2, W, F} = maybe_flush_docs(Target, DocAcc, SizeAcc, Doc),
+    Target2 = reopen_db(Target),
+    {DocAcc2, SizeAcc2, W, F} = maybe_flush_docs(Target2, DocAcc, SizeAcc, Doc),
     NewStats = S#rep_stats{
         docs_read = S#rep_stats.docs_read + 1,
         docs_written = S#rep_stats.docs_written + W,
         doc_write_failures = S#rep_stats.doc_write_failures + F
     },
     NewState = State#state{
-        docs = DocAcc2, size_docs = SizeAcc2, stats = NewStats
+        docs = DocAcc2, size_docs = SizeAcc2, stats = NewStats, target = Target2
     },
     {reply, ok, NewState};
 
@@ -111,11 +112,11 @@ handle_call({add_write_stats, Stats}, _F
 
 handle_call({flush, ReportSeq}, {Pid, _} = From,
     #state{loop = Pid, writer = nil, report_seq = nil,
-        pending_flush = nil} = State) ->
+        pending_flush = nil, target = Target, docs = DocAcc} = State) ->
     State2 = case State#state.readers of
     [] ->
-        Writer = spawn_writer(State#state.target, State#state.docs),
-        State#state{writer = Writer};
+        {Target2, Writer} = spawn_writer(Target, DocAcc),
+        State#state{writer = Writer, target = Target2};
     _ ->
         State
     end,
@@ -165,9 +166,11 @@ handle_info({'EXIT', Pid, normal}, #stat
             case (Flush =/= nil) andalso (Writer =:= nil) andalso
                 (Readers2 =:= [])  of
             true ->
+                {Target2, Writer2} = spawn_writer(Target, Docs),
                 State#state{
                     readers = Readers2,
-                    writer = spawn_writer(Target, Docs)
+                    writer = Writer2,
+                    target = Target2
                 };
             false ->
                 State#state{readers = Readers2}
@@ -216,8 +219,8 @@ queue_fetch_loop(Parent, MissingRevsQueu
     end.
 
 
-spawn_doc_reader(Source, {_, _, _, Seq} = FetchParams) ->
-    Source2 = couch_api_wrap:maybe_reopen_db(Source, Seq),
+spawn_doc_reader(Source, FetchParams) ->
+    Source2 = reopen_db(Source),
     Parent = self(),
     Pid = spawn_link(fun() -> fetch_doc(Parent, Source2, FetchParams) end),
     {Pid, Source2}.
@@ -236,16 +239,18 @@ doc_handler(_, Parent) ->
 
 
 spawn_writer(Target, DocList) ->
+    Target2 = reopen_db(Target),
     Parent = self(),
-    spawn_link(
+    Pid = spawn_link(
         fun() ->
-            {Written, Failed} = flush_docs(Target, DocList),
+            {Written, Failed} = flush_docs(Target2, DocList),
             Stats = #rep_stats{
                 docs_written = Written,
                 doc_write_failures = Failed
             },
             ok = gen_server:call(Parent, {add_write_stats, Stats}, infinity)
-        end).
+        end),
+    {Target2, Pid}.
 
 
 maybe_flush_docs(#httpdb{} = Target, DocAcc, SizeAcc, Doc) ->
@@ -304,3 +309,10 @@ flush_docs(Target, Doc) ->
     _ ->
         {0, 1}
     end.
+
+
+reopen_db(#db{main_pid = Pid, user_ctx = UserCtx}) ->
+    {ok, NewDb} = gen_server:call(Pid, get_db, infinity),
+    NewDb#db{user_ctx = UserCtx};
+reopen_db(HttpDb) ->
+    HttpDb.

Modified: couchdb/branches/new_replicator/src/couchdb/couch_replicator_rev_finders.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_replicator_rev_finders.erl?rev=1040492&r1=1040491&r2=1040492&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_replicator_rev_finders.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_replicator_rev_finders.erl Tue Nov 30
12:23:54 2010
@@ -38,7 +38,8 @@ missing_revs_finder_loop(Cp, Target, Cha
                 #doc_info{id=Id, revs=RevsInfo} <- DocInfos],
         ?LOG_DEBUG("Revs finder ~p got ~p IdRev pairs from queue",
             [self(), length(IdRevs)]),
-        {ok, Missing} = couch_api_wrap:get_missing_revs(Target, IdRevs),
+        Target2 = reopen_db(Target),
+        {ok, Missing} = couch_api_wrap:get_missing_revs(Target2, IdRevs),
         ?LOG_DEBUG("Revs finder ~p found ~p missing IdRev pairs",
             [self(), length(Missing)]),
         % Figured out which on the target are missing.
@@ -47,7 +48,7 @@ missing_revs_finder_loop(Cp, Target, Cha
         % incremental attachment replication, so the source only needs to send
         % attachments modified since the common ancestor on target.
         queue_missing_revs(Missing, DocInfos, RevsQueue, Cp),
-        missing_revs_finder_loop(Cp, Target, ChangesQueue, RevsQueue, BatchSize)
+        missing_revs_finder_loop(Cp, Target2, ChangesQueue, RevsQueue, BatchSize)
     end.
 
 
@@ -68,3 +69,10 @@ queue_missing_revs(Missing, DocInfos, Qu
     ok = gen_server:cast(Cp, {seq_start, {LargestSeq, MissingCount}}),
     ok = couch_work_queue:queue(
            Queue, {LargestSeq, QueueItemList}).
+
+
+reopen_db(#db{main_pid = Pid, user_ctx = UserCtx}) ->
+    {ok, NewDb} = gen_server:call(Pid, get_db, infinity),
+    NewDb#db{user_ctx = UserCtx};
+reopen_db(HttpDb) ->
+    HttpDb.



Mime
View raw message