couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rnew...@apache.org
Subject [5/5] couch-replicator commit: updated refs/heads/pu to 4965655
Date Thu, 01 May 2014 11:40:48 GMT
Support clustered and non-clustered _replicator databases


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/4965655c
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/4965655c
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/4965655c

Branch: refs/heads/pu
Commit: 4965655cabeb563f23e3d9af312960d03f1ef721
Parents: 9e261d5
Author: Robert Newson <rnewson@apache.org>
Authored: Wed Apr 30 11:15:26 2014 +0100
Committer: Robert Newson <rnewson@apache.org>
Committed: Wed Apr 30 17:34:32 2014 +0100

----------------------------------------------------------------------
 src/couch_replicator_manager.erl | 83 ++++++++++++++---------------------
 1 file changed, 33 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/4965655c/src/couch_replicator_manager.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_manager.erl b/src/couch_replicator_manager.erl
index eb8b0ae..66d48b8 100644
--- a/src/couch_replicator_manager.erl
+++ b/src/couch_replicator_manager.erl
@@ -27,6 +27,7 @@
 -export([handle_config_change/5]).
 
 -include_lib("couch/include/couch_db.hrl").
+-include_lib("mem3/include/mem3.hrl").
 -include("couch_replicator.hrl").
 -include("couch_replicator_js_functions.hrl").
 
@@ -130,6 +131,7 @@ init(_) ->
     ScanPid = spawn_link(fun() -> scan_all_dbs(Server) end),
     % Automatically start node local changes feed loop
     LocalRepDb = ?l2b(config:get("replicator", "db", "_replicator")),
+    ensure_rep_db_exists(LocalRepDb),
     Pid = changes_feed_loop(LocalRepDb, 0),
     {ok, #state{
         db_notifier = db_update_notifier(),
@@ -180,6 +182,7 @@ handle_call({resume_scan, DbName}, _From, State) ->
         [] -> 0;
         [{DbName, EndSeq}] -> EndSeq
     end,
+    ensure_rep_ddoc_exists(DbName),
     Pid = changes_feed_loop(DbName, Since),
     couch_log:debug("Scanning ~s from update_seq ~p", [DbName, Since]),
     {reply, ok, State#state{rep_start_pids = [Pid | State#state.rep_start_pids]}};
@@ -267,39 +270,7 @@ terminate(_Reason, State) ->
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
 
-changes_feed_loop(<<"shards/", _/binary>>=DbName, Since) ->
-    Server = self(),
-    Pid = spawn_link(
-        fun() ->
-            fabric:changes(DbName, fun
-            ({change, Change}, Acc) ->
-                case has_valid_rep_id(Change) of
-                true ->
-                    ok = gen_server:call(
-                        Server, {rep_db_update, DbName, Change}, infinity);
-                false ->
-                    ok
-                end,
-                {ok, Acc};
-            ({stop, EndSeq}, Acc) ->
-                ok = gen_server:call(Server, {rep_db_checkpoint, DbName, EndSeq}, infinity),
-                {ok, Acc};
-            (_, Acc) ->
-                {ok, Acc}
-            end,
-            nil,
-            #changes_args{
-                include_docs = true,
-                feed = "longpoll",
-                since = Since,
-                filter = main_only,
-                timeout = infinity
-                }
-            )
-        end),
-    Pid;
 changes_feed_loop(DbName, Since) ->
-    ensure_rep_db_exists(DbName),
     Server = self(),
     spawn_link(fun() ->
         UserCtx = #user_ctx{roles = [<<"_admin">>, <<"_replicator">>]},
@@ -341,15 +312,13 @@ has_valid_rep_id(_Else) ->
 db_update_notifier() ->
     Server = self(),
     {ok, Notifier} = couch_db_update_notifier:start_link(fun
-        ({Event, ShardDbName})
+        ({Event, DbName})
                 when Event == created; Event == updated; Event == deleted ->
-            DbName = mem3:dbname(ShardDbName),
             IsRepDb = is_replicator_db(DbName),
             case Event of
                 created when IsRepDb ->
                     ensure_rep_ddoc_exists(DbName);
                 updated when IsRepDb ->
-                    ensure_rep_ddoc_exists(DbName),
                     Msg = {resume_scan, DbName},
                     ok = gen_server:call(Server, Msg, infinity);
                 deleted when IsRepDb ->
@@ -404,7 +373,7 @@ process_update(State, DbName, {Change}) ->
 
 
 is_owner(<<"shards/", _/binary>>=DbName, DocId) ->
-    mem3_util:owner(DbName, DocId);
+    mem3_util:owner(mem3:dbname(DbName), DocId);
 is_owner(_, _) ->
     true.
 
@@ -486,7 +455,6 @@ maybe_tag_rep_doc(DbName, DocId, {RepProps}, RepId) ->
         update_rep_doc(DbName, DocId, [{<<"_replication_id">>, RepId}])
     end.
 
-%% note to self: this is markedly diff from mem3_rep_manager
 start_replication(Rep, Wait) ->
     ok = timer:sleep(Wait * 1000),
     case (catch couch_replicator:async_replicate(Rep)) of
@@ -572,8 +540,7 @@ maybe_retry_replication(RepState, Error, State) ->
 
 
 stop_all_replications() ->
-    couch_log:notice("Stopping all ongoing replications because the replicator"
-        " database was deleted or changed", []),
+    couch_log:notice("Stopping all ongoing replications", []),
     ets:foldl(
         fun({_, RepId}, _) ->
             couch_replicator:cancel_replication(RepId)
@@ -638,10 +605,9 @@ update_rep_doc(RepDbName, #doc{body = {RepDocBody}} = RepDoc, KVs) ->
         save_rep_doc(RepDbName, RepDoc#doc{body = {NewRepDocBody}})
     end.
 
-
 open_rep_doc(<<"shards/", _/binary>>=ShardDbName, DocId) ->
     defer_call(fun() ->
-        fabric:open_doc(mem3:dbname(ShardDbName), DocId, [])
+        fabric:open_doc(mem3:dbname(ShardDbName), DocId, [ejson_body])
     end);
 open_rep_doc(DbName, DocId) ->
     {ok, Db} = couch_db:open_int(DbName, [?CTX, sys_db]),
@@ -653,7 +619,7 @@ open_rep_doc(DbName, DocId) ->
 
 save_rep_doc(<<"shards/", _/binary>>=DbName, Doc) ->
     defer_call(fun() ->
-        fabric:update_doc(DbName, Doc, [?CTX])
+        fabric:update_doc(mem3:dbname(DbName), Doc, [?CTX])
     end);
 save_rep_doc(DbName, Doc) ->
     {ok, Db} = couch_db:open_int(DbName, [?CTX, sys_db]),
@@ -684,7 +650,6 @@ defer_call(Fun) ->
             exit(Error)
     end.
 
-
 % RFC3339 timestamps.
 % Note: doesn't include the time seconds fraction (RFC3339 says it's optional).
 timestamp() ->
@@ -705,6 +670,9 @@ zone(Hr, Min) ->
     io_lib:format("-~2..0w:~2..0w", [abs(Hr), abs(Min)]).
 
 
+ensure_rep_db_exists(<<"shards/", _/binary>>=DbName) ->
+    ensure_rep_ddoc_exists(DbName),
+    ok;
 ensure_rep_db_exists(DbName) ->
     Db = case couch_db:open_int(DbName, [?CTX, sys_db, nologifmissing]) of
         {ok, Db0} ->
@@ -716,9 +684,16 @@ ensure_rep_db_exists(DbName) ->
     ensure_rep_ddoc_exists(DbName),
     {ok, Db}.
 
-
 ensure_rep_ddoc_exists(RepDb) ->
     DDocId = <<"_design/_replicator">>,
+    case local_docid(RepDb, DDocId) of
+	true ->
+	    ensure_rep_ddoc_exists(RepDb, DDocId);
+	false ->
+	    ok
+    end.
+
+ensure_rep_ddoc_exists(RepDb, DDocId) ->
     case open_rep_doc(RepDb, DDocId) of
         {ok, _Doc} ->
             ok;
@@ -738,7 +713,6 @@ ensure_rep_ddoc_exists(RepDb) ->
             end
     end.
 
-
 % pretty-print replication id
 pp_rep_id(#rep{id = RepId}) ->
     pp_rep_id(RepId);
@@ -855,8 +829,9 @@ scan_all_dbs(Server) when is_pid(Server) ->
             false ->
                 case is_replicator_db(DbName) of
                     true ->
-                        ensure_rep_ddoc_exists(DbName),
-                        gen_server:call(Server, {resume_scan, DbName});
+			couch_log:notice("Scanning local shards of ~s", [DbName]),
+			[gen_server:call(Server, {resume_scan, S#shard.name}) ||
+			    S <- local_shards(DbName)];
                     false ->
                         ok
                 end
@@ -866,9 +841,8 @@ scan_all_dbs(Server) when is_pid(Server) ->
     end),
     couch_db:close(Db).
 
-is_replicator_db(Name) ->
-    DbName = mem3:dbname(Name),
-    case lists:last(binary:split(DbName, <<"/">>, [global])) of
+is_replicator_db(DbName) ->
+    case lists:last(binary:split(mem3:dbname(DbName), <<"/">>, [global])) of
         <<"_replicator">> ->
             true;
         _ ->
@@ -894,3 +868,12 @@ get_json_value(Key, Props, Default) when is_binary(Key) ->
         Else ->
             Else
     end.
+
+local_shards(DbName) ->
+    [S || S <- mem3:shards(DbName), S#shard.node =:= node()].
+
+local_docid(<<"shards/", _/binary>>=DbName, DocId) ->
+    [Shard|_] = mem3:shards(mem3:dbname(DbName), DocId),
+    Shard#shard.name =:= DbName;
+local_docid(_, _) ->
+    true.


Mime
View raw message