couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rnew...@apache.org
Subject [1/2] couch-replicator commit: updated refs/heads/1843-feature-bigcouch to 942eb2d
Date Thu, 08 May 2014 16:30:28 GMT
Repository: couchdb-couch-replicator
Updated Branches:
  refs/heads/1843-feature-bigcouch 4965655ca -> 942eb2d22 (forced update)


Support clustered and non-clustered _replicator databases


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/9791cc1a
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/9791cc1a
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/9791cc1a

Branch: refs/heads/1843-feature-bigcouch
Commit: 9791cc1a70fdacbaaaec5b1fccaab40042e50896
Parents: 9e261d5
Author: Robert Newson <rnewson@apache.org>
Authored: Wed Apr 30 11:15:26 2014 +0100
Committer: Robert Newson <rnewson@apache.org>
Committed: Wed May 7 15:56:45 2014 +0100

----------------------------------------------------------------------
 src/couch_replicator.app.src     |   1 -
 src/couch_replicator_manager.erl | 131 +++++++++-------------------------
 2 files changed, 34 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/9791cc1a/src/couch_replicator.app.src
----------------------------------------------------------------------
diff --git a/src/couch_replicator.app.src b/src/couch_replicator.app.src
index e9b2b04..0acce97 100644
--- a/src/couch_replicator.app.src
+++ b/src/couch_replicator.app.src
@@ -36,7 +36,6 @@
         kernel,
         stdlib,
         couch_log,
-        fabric,
         mem3,
         couch
     ]}

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/9791cc1a/src/couch_replicator_manager.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_manager.erl b/src/couch_replicator_manager.erl
index eb8b0ae..8e1f8a2 100644
--- a/src/couch_replicator_manager.erl
+++ b/src/couch_replicator_manager.erl
@@ -27,6 +27,7 @@
 -export([handle_config_change/5]).
 
 -include_lib("couch/include/couch_db.hrl").
+-include_lib("mem3/include/mem3.hrl").
 -include("couch_replicator.hrl").
 -include("couch_replicator_js_functions.hrl").
 
@@ -130,6 +131,7 @@ init(_) ->
     ScanPid = spawn_link(fun() -> scan_all_dbs(Server) end),
     % Automatically start node local changes feed loop
     LocalRepDb = ?l2b(config:get("replicator", "db", "_replicator")),
+    ensure_rep_db_exists(LocalRepDb),
     Pid = changes_feed_loop(LocalRepDb, 0),
     {ok, #state{
         db_notifier = db_update_notifier(),
@@ -180,6 +182,7 @@ handle_call({resume_scan, DbName}, _From, State) ->
         [] -> 0;
         [{DbName, EndSeq}] -> EndSeq
     end,
+    ensure_rep_ddoc_exists(DbName),
     Pid = changes_feed_loop(DbName, Since),
     couch_log:debug("Scanning ~s from update_seq ~p", [DbName, Since]),
     {reply, ok, State#state{rep_start_pids = [Pid | State#state.rep_start_pids]}};
@@ -267,39 +270,7 @@ terminate(_Reason, State) ->
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
 
-changes_feed_loop(<<"shards/", _/binary>>=DbName, Since) ->
-    Server = self(),
-    Pid = spawn_link(
-        fun() ->
-            fabric:changes(DbName, fun
-            ({change, Change}, Acc) ->
-                case has_valid_rep_id(Change) of
-                true ->
-                    ok = gen_server:call(
-                        Server, {rep_db_update, DbName, Change}, infinity);
-                false ->
-                    ok
-                end,
-                {ok, Acc};
-            ({stop, EndSeq}, Acc) ->
-                ok = gen_server:call(Server, {rep_db_checkpoint, DbName, EndSeq}, infinity),
-                {ok, Acc};
-            (_, Acc) ->
-                {ok, Acc}
-            end,
-            nil,
-            #changes_args{
-                include_docs = true,
-                feed = "longpoll",
-                since = Since,
-                filter = main_only,
-                timeout = infinity
-                }
-            )
-        end),
-    Pid;
 changes_feed_loop(DbName, Since) ->
-    ensure_rep_db_exists(DbName),
     Server = self(),
     spawn_link(fun() ->
         UserCtx = #user_ctx{roles = [<<"_admin">>, <<"_replicator">>]},
@@ -341,15 +312,13 @@ has_valid_rep_id(_Else) ->
 db_update_notifier() ->
     Server = self(),
     {ok, Notifier} = couch_db_update_notifier:start_link(fun
-        ({Event, ShardDbName})
+        ({Event, DbName})
                 when Event == created; Event == updated; Event == deleted ->
-            DbName = mem3:dbname(ShardDbName),
             IsRepDb = is_replicator_db(DbName),
             case Event of
                 created when IsRepDb ->
                     ensure_rep_ddoc_exists(DbName);
                 updated when IsRepDb ->
-                    ensure_rep_ddoc_exists(DbName),
                     Msg = {resume_scan, DbName},
                     ok = gen_server:call(Server, Msg, infinity);
                 deleted when IsRepDb ->
@@ -404,7 +373,7 @@ process_update(State, DbName, {Change}) ->
 
 
 is_owner(<<"shards/", _/binary>>=DbName, DocId) ->
-    mem3_util:owner(DbName, DocId);
+    mem3_util:owner(mem3:dbname(DbName), DocId);
 is_owner(_, _) ->
     true.
 
@@ -486,7 +455,6 @@ maybe_tag_rep_doc(DbName, DocId, {RepProps}, RepId) ->
         update_rep_doc(DbName, DocId, [{<<"_replication_id">>, RepId}])
     end.
 
-%% note to self: this is markedly diff from mem3_rep_manager
 start_replication(Rep, Wait) ->
     ok = timer:sleep(Wait * 1000),
     case (catch couch_replicator:async_replicate(Rep)) of
@@ -572,8 +540,7 @@ maybe_retry_replication(RepState, Error, State) ->
 
 
 stop_all_replications() ->
-    couch_log:notice("Stopping all ongoing replications because the replicator"
-        " database was deleted or changed", []),
+    couch_log:notice("Stopping all ongoing replications", []),
     ets:foldl(
         fun({_, RepId}, _) ->
             couch_replicator:cancel_replication(RepId)
@@ -638,11 +605,6 @@ update_rep_doc(RepDbName, #doc{body = {RepDocBody}} = RepDoc, KVs) ->
         save_rep_doc(RepDbName, RepDoc#doc{body = {NewRepDocBody}})
     end.
 
-
-open_rep_doc(<<"shards/", _/binary>>=ShardDbName, DocId) ->
-    defer_call(fun() ->
-        fabric:open_doc(mem3:dbname(ShardDbName), DocId, [])
-    end);
 open_rep_doc(DbName, DocId) ->
     {ok, Db} = couch_db:open_int(DbName, [?CTX, sys_db]),
     try
@@ -651,10 +613,6 @@ open_rep_doc(DbName, DocId) ->
         couch_db:close(Db)
     end.
 
-save_rep_doc(<<"shards/", _/binary>>=DbName, Doc) ->
-    defer_call(fun() ->
-        fabric:update_doc(DbName, Doc, [?CTX])
-    end);
 save_rep_doc(DbName, Doc) ->
     {ok, Db} = couch_db:open_int(DbName, [?CTX, sys_db]),
     try
@@ -663,28 +621,6 @@ save_rep_doc(DbName, Doc) ->
         couch_db:close(Db)
     end.
 
-defer_call(Fun) ->
-    {Pid, Ref} = erlang:spawn_monitor(fun() ->
-        Res = try
-            Fun()
-        catch
-            Type:Reason ->
-                exit({exit_err, Type, Reason})
-        end,
-        exit({exit_ok, Res})
-    end),
-    receive
-        {'DOWN', Ref, process, Pid, {exit_ok, Resp}} ->
-            Resp;
-        {'DOWN', Ref, process, Pid, {exit_err, throw, Error}} ->
-            throw(Error);
-        {'DOWN', Ref, process, Pid, {exit_err, error, Error}} ->
-            erlang:error(Error);
-        {'DOWN', Ref, process, Pid, {exit_err, exit, Error}} ->
-            exit(Error)
-    end.
-
-
 % RFC3339 timestamps.
 % Note: doesn't include the time seconds fraction (RFC3339 says it's optional).
 timestamp() ->
@@ -705,6 +641,9 @@ zone(Hr, Min) ->
     io_lib:format("-~2..0w:~2..0w", [abs(Hr), abs(Min)]).
 
 
+ensure_rep_db_exists(<<"shards/", _/binary>>=DbName) ->
+    ensure_rep_ddoc_exists(DbName),
+    ok;
 ensure_rep_db_exists(DbName) ->
     Db = case couch_db:open_int(DbName, [?CTX, sys_db, nologifmissing]) of
         {ok, Db0} ->
@@ -716,9 +655,16 @@ ensure_rep_db_exists(DbName) ->
     ensure_rep_ddoc_exists(DbName),
     {ok, Db}.
 
-
 ensure_rep_ddoc_exists(RepDb) ->
     DDocId = <<"_design/_replicator">>,
+    case mem3:belongs(RepDb, DDocId) of
+	true ->
+	    ensure_rep_ddoc_exists(RepDb, DDocId);
+	false ->
+	    ok
+    end.
+
+ensure_rep_ddoc_exists(RepDb, DDocId) ->
     case open_rep_doc(RepDb, DDocId) of
         {ok, _Doc} ->
             ok;
@@ -738,7 +684,6 @@ ensure_rep_ddoc_exists(RepDb) ->
             end
     end.
 
-
 % pretty-print replication id
 pp_rep_id(#rep{id = RepId}) ->
     pp_rep_id(RepId);
@@ -844,31 +789,23 @@ strip_credentials({Props}) ->
     {lists:keydelete(<<"oauth">>, 1, Props)}.
 
 scan_all_dbs(Server) when is_pid(Server) ->
-    {ok, Db} = mem3_util:ensure_exists(config:get("mem3", "shard_db", "dbs")),
-    ChangesFun = couch_changes:handle_changes(#changes_args{}, nil, Db),
-    ChangesFun(fun({change, {Change}, _}, _) ->
-        DbName = get_json_value(<<"id">>, Change),
-        case DbName of <<"_design/", _/binary>> -> ok; _Else ->
-            case couch_replicator_utils:is_deleted(Change) of
-            true ->
-                ok;
-            false ->
-                case is_replicator_db(DbName) of
-                    true ->
-                        ensure_rep_ddoc_exists(DbName),
-                        gen_server:call(Server, {resume_scan, DbName});
-                    false ->
-                        ok
-                end
-            end
-        end;
-        (_, _) -> ok
-    end),
-    couch_db:close(Db).
-
-is_replicator_db(Name) ->
-    DbName = mem3:dbname(Name),
-    case lists:last(binary:split(DbName, <<"/">>, [global])) of
+    Root = config:get("couchdb", "database_dir", "."),
+    NormRoot = couch_util:normpath(Root),
+    filelib:fold_files(Root, "_replicator(\\.[0-9]{10,})?.couch$", true,
+        fun(Filename, _) ->
+	    % shamelessly stolen from couch_server.erl
+            NormFilename = couch_util:normpath(Filename),
+            case NormFilename -- NormRoot of
+                [$/ | RelativeFilename] -> ok;
+                RelativeFilename -> ok
+            end,
+            DbName = ?l2b(filename:rootname(RelativeFilename, ".couch")),
+	    gen_server:call(Server, {resume_scan, DbName}),
+	    ok
+	end, ok).
+
+is_replicator_db(DbName) ->
+    case lists:last(binary:split(mem3:dbname(DbName), <<"/">>, [global])) of
         <<"_replicator">> ->
             true;
         _ ->


Mime
View raw message