couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rnew...@apache.org
Subject svn commit: r965331 - /couchdb/trunk/src/couchdb/couch_rep.erl
Date Mon, 19 Jul 2010 00:10:11 GMT
Author: rnewson
Date: Mon Jul 19 00:10:11 2010
New Revision: 965331

URL: http://svn.apache.org/viewvc?rev=965331&view=rev
Log:
COUCHDB-810: Adds port to replication checkpoints.

New replication checkpoints now include the port number, which allows for efficient replication
between multiple couchdb instances running on the same machine.

Old replication checkpoints are recognized (Full replication is not induced) and they are
automatically migrated to the new checkpoint format.

Thanks to Randall Leeds for the patch.

Modified:
    couchdb/trunk/src/couchdb/couch_rep.erl

Modified: couchdb/trunk/src/couchdb/couch_rep.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_rep.erl?rev=965331&r1=965330&r2=965331&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_rep.erl (original)
+++ couchdb/trunk/src/couchdb/couch_rep.erl Mon Jul 19 00:10:11 2010
@@ -19,6 +19,8 @@
 
 -include("couch_db.hrl").
 
+-define(REP_ID_VERSION, 2).
+
 -record(state, {
     changes_feed,
     missing_revs,
@@ -59,7 +61,9 @@ replicate(Source, Target) when is_binary
 
 %% function handling POST to _replicate
 replicate({Props}=PostBody, UserCtx) ->
-    {BaseId, Extension} = make_replication_id(PostBody, UserCtx),
+    BaseId = make_replication_id({Props}, UserCtx, ?REP_ID_VERSION),
+    Extension = maybe_append_options(
+        [<<"continuous">>, <<"create_target">>], Props),
     Replicator = {BaseId ++ Extension,
         {gen_server, start_link, [?MODULE, [BaseId, PostBody, UserCtx], []]},
         temporary,
@@ -144,8 +148,9 @@ do_init([RepId, {PostProps}, UserCtx] = 
 
     _ ->
         % Replication using the _changes API (DB sequence update numbers).
-        SourceLog = open_replication_log(Source, RepId),
-        TargetLog = open_replication_log(Target, RepId),
+
+        [SourceLog, TargetLog] = find_replication_logs(
+            [Source, Target], RepId, {PostProps}, UserCtx),
     
         {StartSeq, History} = compare_replication_logs(SourceLog, TargetLog),
 
@@ -451,13 +456,23 @@ maybe_append_options(Options, Props) ->
         end
     end, [], Options).
 
-make_replication_id({Props}, UserCtx) ->
-    %% funky algorithm to preserve backwards compatibility
+% Versioned clauses for generating replication ids
+% If a change is made to how replications are identified
+% add a new clause and increase ?REP_ID_VERSION at the top
+make_replication_id({Props}, UserCtx, 2) ->
+    {ok, HostName} = inet:gethostname(),
+    Port = mochiweb_socket_server:get(couch_httpd, port),
+    Src = get_rep_endpoint(UserCtx, couch_util:get_value(<<"source">>, Props)),
+    Tgt = get_rep_endpoint(UserCtx, couch_util:get_value(<<"target">>, Props)),
+    maybe_append_filters({Props}, [HostName, Port, Src, Tgt]);
+make_replication_id({Props}, UserCtx, 1) ->
     {ok, HostName} = inet:gethostname(),
-    % Port = mochiweb_socket_server:get(couch_httpd, port),
     Src = get_rep_endpoint(UserCtx, couch_util:get_value(<<"source">>, Props)),
     Tgt = get_rep_endpoint(UserCtx, couch_util:get_value(<<"target">>, Props)),
-    Base = [HostName, Src, Tgt] ++
+    maybe_append_filters({Props}, [HostName, Src, Tgt]).
+
+maybe_append_filters({Props}, Base) ->
+    Base2 = Base ++ 
         case couch_util:get_value(<<"filter">>, Props) of
         undefined ->
             case couch_util:get_value(<<"doc_ids">>, Props) of
@@ -469,9 +484,7 @@ make_replication_id({Props}, UserCtx) ->
         Filter ->
             [Filter, couch_util:get_value(<<"query_params">>, Props, {[]})]
         end,
-    Extension = maybe_append_options(
-        [<<"continuous">>, <<"create_target">>], Props),
-    {couch_util:to_hex(couch_util:md5(term_to_binary(Base))), Extension}.
+    couch_util:to_hex(couch_util:md5(term_to_binary(Base2))).
 
 maybe_add_trailing_slash(Url) ->
     re:replace(Url, "[^/]$", "&/", [{return, list}]).
@@ -493,26 +506,52 @@ get_rep_endpoint(_UserCtx, <<"https://",
 get_rep_endpoint(UserCtx, <<DbName/binary>>) ->
     {local, DbName, UserCtx}.
 
-open_replication_log(#http_db{}=Db, RepId) ->
-    DocId = ?LOCAL_DOC_PREFIX ++ RepId,
-    Req = Db#http_db{resource=couch_util:url_encode(DocId)},
+find_replication_logs(Logs, RepId, {Props}, UserCtx) ->
+    LogId = ?l2b(?LOCAL_DOC_PREFIX ++ RepId),
+    fold_replication_logs(Logs, ?REP_ID_VERSION,
+        LogId, LogId, {Props}, UserCtx, []).
+
+% Accumulate the replication logs
+% Falls back to older log document ids and migrates them
+fold_replication_logs([], _Vsn, _LogId, _NewId, {_Props}, _UserCtx, Acc) ->
+    lists:reverse(Acc);
+fold_replication_logs([Db|Rest]=Dbs, Vsn, LogId, NewId,
+        {Props}, UserCtx, Acc) ->
+    case open_replication_log(Db, LogId) of
+    {error, not_found} when Vsn > 1 ->
+        OldRepId = make_replication_id({Props}, UserCtx, Vsn - 1),
+        fold_replication_logs(Dbs, Vsn - 1,
+            ?l2b(?LOCAL_DOC_PREFIX ++ OldRepId), NewId, {Props}, UserCtx, Acc);
+    {error, not_found} ->
+        fold_replication_logs(Rest, ?REP_ID_VERSION, NewId, NewId,
+            {Props}, UserCtx, [#doc{id=NewId}|Acc]);
+    {ok, Doc} when LogId =:= NewId ->
+        fold_replication_logs(Rest, ?REP_ID_VERSION, NewId, NewId,
+            {Props}, UserCtx, [Doc|Acc]);
+    {ok, Doc} ->
+        MigratedLog = #doc{id=NewId,body=Doc#doc.body},
+        fold_replication_logs(Rest, ?REP_ID_VERSION, NewId, NewId,
+            {Props}, UserCtx, [MigratedLog|Acc])
+    end.
+
+open_replication_log(#http_db{}=Db, DocId) ->
+    Req = Db#http_db{resource=couch_util:url_encode(?b2l(DocId))},
     case couch_rep_httpc:request(Req) of
     {[{<<"error">>, _}, {<<"reason">>, _}]} ->
         ?LOG_DEBUG("didn't find a replication log for ~s", [Db#http_db.url]),
-        #doc{id=?l2b(DocId)};
+        {error, not_found};
     Doc ->
         ?LOG_DEBUG("found a replication log for ~s", [Db#http_db.url]),
-        couch_doc:from_json_obj(Doc)
+        {ok, couch_doc:from_json_obj(Doc)}
     end;
-open_replication_log(Db, RepId) ->
-    DocId = ?l2b(?LOCAL_DOC_PREFIX ++ RepId),
+open_replication_log(Db, DocId) ->
     case couch_db:open_doc(Db, DocId, []) of
     {ok, Doc} ->
         ?LOG_DEBUG("found a replication log for ~s", [Db#db.name]),
-        Doc;
+        {ok, Doc};
     _ ->
         ?LOG_DEBUG("didn't find a replication log for ~s", [Db#db.name]),
-        #doc{id=DocId}
+        {error, not_found}
     end.
 
 open_db(Props, UserCtx) ->



Mime
View raw message