couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fdman...@apache.org
Subject svn commit: r992759 - in /couchdb/branches/new_replicator/src/couchdb: Makefile.am couch_api_wrap.hrl couch_httpd_rep.erl couch_replicate.erl couch_replicator_utils.erl
Date Sun, 05 Sep 2010 11:34:45 GMT
Author: fdmanana
Date: Sun Sep  5 11:34:45 2010
New Revision: 992759

URL: http://svn.apache.org/viewvc?rev=992759&view=rev
Log:
New replicator: small refactoring to allow simpler integration with the replicator db (in
trunk only).

Added:
    couchdb/branches/new_replicator/src/couchdb/couch_replicator_utils.erl
Modified:
    couchdb/branches/new_replicator/src/couchdb/Makefile.am
    couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.hrl
    couchdb/branches/new_replicator/src/couchdb/couch_httpd_rep.erl
    couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl

Modified: couchdb/branches/new_replicator/src/couchdb/Makefile.am
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/Makefile.am?rev=992759&r1=992758&r2=992759&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/Makefile.am (original)
+++ couchdb/branches/new_replicator/src/couchdb/Makefile.am Sun Sep  5 11:34:45 2010
@@ -80,6 +80,7 @@ source_files = \
     couch_db_updater.erl \
     couch_work_queue.erl \
     couch_replicate.erl \
+    couch_replicator_utils.erl \
     couch_replication_notifier.erl \
     couch_httpd_rep.erl \
     couch_api_wrap.erl \
@@ -144,6 +145,7 @@ compiled_files = \
     couch_db_updater.beam \
     couch_work_queue.beam \
     couch_replicate.beam \
+    couch_replicator_utils.beam \
     couch_replication_notifier.beam \
     couch_httpd_rep.beam \
     couch_api_wrap.beam \

Modified: couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.hrl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.hrl?rev=992759&r1=992758&r2=992759&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.hrl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.hrl Sun Sep  5 11:34:45 2010
@@ -26,3 +26,12 @@
     consumer_secret,
     signature_method
 }).
+
+-record(rep, {
+    id,
+    source,
+    target,
+    options,
+    user_ctx,
+    doc
+}).

Modified: couchdb/branches/new_replicator/src/couchdb/couch_httpd_rep.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_httpd_rep.erl?rev=992759&r1=992758&r2=992759&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_httpd_rep.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_httpd_rep.erl Sun Sep  5 11:34:45 2010
@@ -24,12 +24,10 @@
 -export([handle_req/1]).
 
 
-handle_req(#httpd{method='POST'} = Req) ->
-    {PostBody} = couch_httpd:json_body_obj(Req),
-    SrcDb = parse_rep_db(couch_util:get_value(<<"source">>, PostBody)),
-    TgtDb = parse_rep_db(couch_util:get_value(<<"target">>, PostBody)),
-    Options = convert_options(PostBody),
-    case couch_replicate:replicate(SrcDb, TgtDb, Options, Req#httpd.user_ctx) of
+handle_req(#httpd{method = 'POST', user_ctx = UserCtx} = Req) ->
+    RepDoc = couch_httpd:json_body_obj(Req),
+    {ok, Rep} = couch_replicator_utils:parse_rep_doc(RepDoc, UserCtx),
+    case couch_replicate:replicate(Rep) of
     {error, Reason} ->
         try
             send_json(Req, 500, {[{error, Reason}]})
@@ -44,77 +42,6 @@ handle_req(#httpd{method='POST'} = Req) 
     {ok, {HistoryResults}} ->
         send_json(Req, {[{ok, true} | HistoryResults]})
     end;
+
 handle_req(Req) ->
     send_method_not_allowed(Req, "POST").
-
-
-maybe_add_trailing_slash(Url) when is_binary(Url) ->
-    maybe_add_trailing_slash(?b2l(Url));
-maybe_add_trailing_slash(Url) ->
-    case lists:last(Url) of
-    $/ ->
-        Url;
-    _ ->
-        Url ++ "/"
-    end.
-
-parse_rep_db({Props}) ->
-    Url = maybe_add_trailing_slash(couch_util:get_value(<<"url">>, Props)),
-    {AuthProps} = couch_util:get_value(<<"auth">>, Props, {[]}),
-    {BinHeaders} = couch_util:get_value(<<"headers">>, Props, {[]}),
-    Headers = [{?b2l(K), ?b2l(V)} || {K, V} <- BinHeaders],
-    
-    case couch_util:get_value(<<"oauth">>, AuthProps) of
-    undefined ->
-        OAuth = nil;
-    {OauthProps} -> 
-        OAuth = #oauth{
-            consumer_key = 
-                ?b2l(couch_util:get_value(<<"consumer_key">>, OauthProps)),
-            token = 
-                ?b2l(couch_util:get_value(<<"token">>, OauthProps)),
-            token_secret = 
-                ?b2l(couch_util:get_value(<<"token_secret">>, OauthProps)),
-            consumer_secret = 
-                ?b2l(couch_util:get_value(<<"consumer_secret">>, OauthProps)),
-            signature_method = 
-                case couch_util:get_value(<<"signature_method">>, OauthProps)
of
-                undefined ->        hmac_sha1;
-                <<"PLAINTEXT">> ->  plaintext;
-                <<"HMAC-SHA1">> ->  hmac_sha1;
-                <<"RSA-SHA1">> ->   rsa_sha1
-                end
-        }
-    end,
-    
-    #httpdb{
-        url = Url,
-        oauth = OAuth,
-        headers = Headers
-    };
-parse_rep_db(<<"http://", _/binary>> = Url) ->
-    parse_rep_db({[{<<"url">>, Url}]});
-parse_rep_db(<<"https://", _/binary>> = Url) ->
-    parse_rep_db({[{<<"url">>, Url}]});
-parse_rep_db(<<DbName/binary>>) ->
-    DbName.
-
-
-convert_options([])->
-    [];
-convert_options([{<<"cancel">>, V} | R]) ->
-    [{cancel, V} | convert_options(R)];
-convert_options([{<<"create_target">>, V} | R]) ->
-    [{create_target, V} | convert_options(R)];
-convert_options([{<<"continuous">>, V} | R]) ->
-    [{continuous, V} | convert_options(R)];
-convert_options([{<<"filter">>, V} | R]) ->
-    [{filter, V} | convert_options(R)];
-convert_options([{<<"query_params">>, V} | R]) ->
-    [{query_params, V} | convert_options(R)];
-convert_options([{<<"doc_ids">>, V} | R]) ->
-    [{doc_ids, V} | convert_options(R)];
-convert_options([_ | R]) -> % skip unknown option
-    convert_options(R).
-
-

Modified: couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl?rev=992759&r1=992758&r2=992759&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl Sun Sep  5 11:34:45 2010
@@ -14,7 +14,7 @@
 -behaviour(gen_server).
 
 % public API
--export([replicate/4]).
+-export([replicate/1]).
 
 % gen_server callbacks
 -export([init/1, terminate/2, code_change/3]).
@@ -37,8 +37,7 @@
     }).
 
 -record(rep_state, {
-    rep_id,
-    rep_options,
+    rep_details,
     source_name,
     target_name,
     source,
@@ -67,20 +66,19 @@
     }).
 
 
-replicate(Src, Tgt, Options, UserCtx) ->
-    RepId = make_replication_id(Src, Tgt, UserCtx, Options),
+replicate(#rep{id = RepId, options = Options} = Rep) ->
     case couch_util:get_value(cancel, Options, false) of
     true ->
         end_replication(RepId);
     false ->
         {ok, Listener} = rep_result_listener(RepId),
-        Result = do_replication_loop(RepId, Src, Tgt, Options, UserCtx),
+        Result = do_replication_loop(Rep),
         couch_replication_notifier:stop(Listener),
         Result
     end.
 
 
-do_replication_loop(RepId, Src, Tgt, Options, UserCtx) ->
+do_replication_loop(#rep{options = Options, source = Src} = Rep) ->
     DocIds = couch_util:get_value(doc_ids, Options),
     Continuous = couch_util:get_value(continuous, Options, false),
     Seq = case {DocIds, Continuous} of
@@ -89,33 +87,33 @@ do_replication_loop(RepId, Src, Tgt, Opt
     _ ->
         undefined
     end,
-    do_replication_loop(RepId, Src, Tgt, Options, UserCtx, Seq).
+    do_replication_loop(Rep, Seq).
 
-do_replication_loop({BaseId, _} = RepId, Src, Tgt, Options, UserCtx, Seq) ->
-    case start_replication(RepId, Src, Tgt, Options, UserCtx) of
+do_replication_loop(#rep{id = {BaseId,_} = Id, options = Options} = Rep, Seq) ->
+    case start_replication(Rep) of
     {ok, _Pid} ->
         case couch_util:get_value(continuous, Options, false) of
         true ->
             {ok, {continuous, ?l2b(BaseId)}};
         false ->
-            Result = wait_for_result(RepId),
-            maybe_retry(Result, RepId, Src, Tgt, Options, UserCtx, Seq)
+            Result = wait_for_result(Id),
+            maybe_retry(Result, Rep, Seq)
         end;
     Error ->
         Error
     end.
 
 
-maybe_retry(RepResult, _RepId, _Src, _Tgt, _Options, _UserCtx, undefined) ->
+maybe_retry(RepResult, _Rep, undefined) ->
     RepResult;
-maybe_retry({ok, {Props}} = Result, RepId, Src, Tgt, Options, UserCtx, Seq) ->
+maybe_retry({ok, {Props}} = Result, Rep, Seq) ->
     case couch_util:get_value(source_last_seq, Props) >= Seq of
     true ->
         Result;
     false ->
-        do_replication_loop(RepId, Src, Tgt, Options, UserCtx, Seq)
+        do_replication_loop(Rep, Seq)
     end;
-maybe_retry(RepResult, _RepId, _Src, _Tgt, _Options, _UserCtx, _Seq) ->
+maybe_retry(RepResult, _Rep, _Seq) ->
     RepResult.
 
 
@@ -128,12 +126,11 @@ last_seq(DbName) ->
     Seq.
 
 
-start_replication({BaseId, Extension} = RepId, Src, Tgt, Options, UserCtx) ->
+start_replication(#rep{id = {BaseId, Extension}} = Rep) ->
     RepChildId = BaseId ++ Extension,
     ChildSpec = {
         RepChildId,
-        {gen_server, start_link,
-           [?MODULE, [RepId, Src, Tgt, Options, UserCtx], []]},
+        {gen_server, start_link, [?MODULE, Rep, []]},
         transient,
         1,
         worker,
@@ -214,14 +211,14 @@ init(InitArgs) ->
         {stop, Error}
     end.
 
-do_init([RepId, Src, Tgt, Options, UserCtx]) ->
+do_init(#rep{options = Options} = Rep) ->
     process_flag(trap_exit, true),
 
     #rep_state{
         source = Source,
         target = Target,
         start_seq = StartSeq
-    } = State = init_state(RepId, Src, Tgt, Options, UserCtx),
+    } = State = init_state(Rep),
 
     {ok, MissingRevsQueue} = couch_work_queue:new(
         [{max_size, 100000}, {max_items, 500}, {multi_workers, true}]),
@@ -414,7 +411,7 @@ code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
 
 
-terminate(normal, #rep_state{rep_id = RepId} = State) ->
+terminate(normal, #rep_state{rep_details = #rep{id = RepId}} = State) ->
     terminate_cleanup(State),
     couch_replication_notifier:notify({finished, RepId, get_result(State)});
 
@@ -422,7 +419,7 @@ terminate(shutdown, State) ->
     % cancelled replication throught ?MODULE:end_replication/1
     terminate_cleanup(State);
 
-terminate(Reason, #rep_state{rep_id = RepId} = State) ->
+terminate(Reason, #rep_state{rep_details = #rep{id = RepId}} = State) ->
     terminate_cleanup(State),
     couch_replication_notifier:notify({error, RepId, Reason}).
 
@@ -432,7 +429,7 @@ terminate_cleanup(#rep_state{source = So
     couch_api_wrap:db_close(Target).
 
 
-start_timer(#rep_state{rep_options = Options} = State) ->
+start_timer(#rep_state{rep_details = #rep{options = Options}} = State) ->
     case couch_util:get_value(doc_ids, Options) of
     undefined ->
         After = checkpoint_interval(State),
@@ -454,8 +451,8 @@ cancel_timer(#rep_state{timer = Timer}) 
     {ok, cancel} = timer:cancel(Timer).
 
 
-get_result(#rep_state{stats = Stats, rep_options = Options} = State) ->
-    case couch_util:get_value(doc_ids, Options) of
+get_result(#rep_state{stats = Stats, rep_details = Rep} = State) ->
+    case couch_util:get_value(doc_ids, Rep#rep.options) of
     undefined ->
         State#rep_state.checkpoint_history;
     _DocIdList ->
@@ -469,7 +466,12 @@ get_result(#rep_state{stats = Stats, rep
     end.
 
 
-init_state({BaseId, _Ext} = RepId, Src, Tgt, Options, UserCtx) ->
+init_state(Rep) ->
+    #rep{
+        id = {BaseId, _Ext},
+        source = Src, target = Tgt,
+        options = Options, user_ctx = UserCtx
+    } = Rep,
     {ok, Source} = couch_api_wrap:db_open(Src, [{user_ctx, UserCtx}]),
     {ok, Target} = couch_api_wrap:db_open(Tgt, [{user_ctx, UserCtx}],
         couch_util:get_value(create_target, Options, false)),
@@ -489,8 +491,7 @@ init_state({BaseId, _Ext} = RepId, Src, 
     {StartSeq, History} = compare_replication_logs(SourceLog, TargetLog),
     #doc{body={CheckpointHistory}} = SourceLog,
     State = #rep_state{
-        rep_id = RepId,
-        rep_options = Options,
+        rep_details = Rep,
         source_name = couch_api_wrap:db_uri(Source),
         target_name = couch_api_wrap:db_uri(Target),
         source = Source,
@@ -749,51 +750,6 @@ commit_to_both(Source, Target) ->
     {SourceStartTime, TargetStartTime}.
 
 
-make_replication_id(Source, Target, UserCtx, Options) ->
-    %% funky algorithm to preserve backwards compatibility
-    {ok, HostName} = inet:gethostname(),
-    % Port = mochiweb_socket_server:get(couch_httpd, port),
-    Src = get_rep_endpoint(UserCtx, Source),
-    Tgt = get_rep_endpoint(UserCtx, Target),
-    Base = [HostName, Src, Tgt] ++
-        case couch_util:get_value(filter, Options) of
-        undefined ->
-            case couch_util:get_value(doc_ids, Options) of
-            undefined ->
-                [];
-            DocIds ->
-                [DocIds]
-            end;
-        Filter ->
-            [Filter, couch_util:get_value(query_params, Options, {[]})]
-        end,
-    Extension = maybe_append_options([continuous, create_target], Options),
-    {couch_util:to_hex(couch_util:md5(term_to_binary(Base))), Extension}.
-
-
-maybe_append_options(Options, RepOptions) ->
-    lists:foldl(fun(Option, Acc) ->
-        Acc ++
-        case couch_util:get_value(Option, RepOptions, false) of
-        true ->
-            "+" ++ atom_to_list(Option);
-        false ->
-            ""
-        end
-    end, [], Options).
-
-
-get_rep_endpoint(_UserCtx, #httpdb{url=Url, headers=Headers, oauth=OAuth}) ->
-    case OAuth of
-    nil ->
-        {remote, Url, Headers};
-    {OAuth} ->
-        {remote, Url, Headers, OAuth}
-    end;
-get_rep_endpoint(UserCtx, <<DbName/binary>>) ->
-    {local, DbName, UserCtx}.
-
-
 compare_replication_logs(SrcDoc, TgtDoc) ->
     #doc{body={RepRecProps}} = SrcDoc,
     #doc{body={RepRecPropsTgt}} = TgtDoc,

Added: couchdb/branches/new_replicator/src/couchdb/couch_replicator_utils.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_replicator_utils.erl?rev=992759&view=auto
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_replicator_utils.erl (added)
+++ couchdb/branches/new_replicator/src/couchdb/couch_replicator_utils.erl Sun Sep  5 11:34:45
2010
@@ -0,0 +1,150 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_utils).
+
+-export([parse_rep_doc/2]).
+
+-include("couch_db.hrl").
+-include("couch_api_wrap.hrl").
+-include("../ibrowse/ibrowse.hrl").
+
+-import(couch_util, [
+    get_value/2,
+    get_value/3
+]).
+
+
+parse_rep_doc({Props} = RepObj, UserCtx) ->
+    Source = parse_rep_db(get_value(<<"source">>, Props)),
+    Target = parse_rep_db(get_value(<<"target">>, Props)),
+    Options = convert_options(Props),
+    Rep = #rep{
+        id = make_replication_id(Source, Target, UserCtx, Options),
+        source = Source,
+        target = Target,
+        options = Options,
+        user_ctx = UserCtx,
+        doc = RepObj
+    },
+    {ok, Rep}.
+
+
+make_replication_id(Source, Target, UserCtx, Options) ->
+    %% funky algorithm to preserve backwards compatibility
+    {ok, HostName} = inet:gethostname(),
+    % Port = mochiweb_socket_server:get(couch_httpd, port),
+    Src = get_rep_endpoint(UserCtx, Source),
+    Tgt = get_rep_endpoint(UserCtx, Target),
+    Base = [HostName, Src, Tgt] ++
+        case get_value(filter, Options) of
+        undefined ->
+            case get_value(doc_ids, Options) of
+            undefined ->
+                [];
+            DocIds ->
+                [DocIds]
+            end;
+        Filter ->
+            [Filter, get_value(query_params, Options, {[]})]
+        end,
+    Extension = maybe_append_options([continuous, create_target], Options),
+    {couch_util:to_hex(couch_util:md5(term_to_binary(Base))), Extension}.
+
+
+maybe_append_options(Options, RepOptions) ->
+    lists:foldl(fun(Option, Acc) ->
+        Acc ++
+        case get_value(Option, RepOptions, false) of
+        true ->
+            "+" ++ atom_to_list(Option);
+        false ->
+            ""
+        end
+    end, [], Options).
+
+
+get_rep_endpoint(_UserCtx, #httpdb{url=Url, headers=Headers, oauth=OAuth}) ->
+    case OAuth of
+    nil ->
+        {remote, Url, Headers};
+    {OAuth} ->
+        {remote, Url, Headers, OAuth}
+    end;
+get_rep_endpoint(UserCtx, <<DbName/binary>>) ->
+    {local, DbName, UserCtx}.
+
+
+parse_rep_db({Props}) ->
+    Url = maybe_add_trailing_slash(get_value(<<"url">>, Props)),
+    {AuthProps} = get_value(<<"auth">>, Props, {[]}),
+    {BinHeaders} = get_value(<<"headers">>, Props, {[]}),
+    Headers = [{?b2l(K), ?b2l(V)} || {K, V} <- BinHeaders],
+    OAuth = case get_value(<<"oauth">>, AuthProps) of
+    undefined ->
+        nil;
+    {OauthProps} ->
+        #oauth{
+            consumer_key = ?b2l(get_value(<<"consumer_key">>, OauthProps)),
+            token = ?b2l(get_value(<<"token">>, OauthProps)),
+            token_secret = ?b2l(get_value(<<"token_secret">>, OauthProps)),
+            consumer_secret = ?b2l(get_value(<<"consumer_secret">>, OauthProps)),
+            signature_method =
+                case get_value(<<"signature_method">>, OauthProps) of
+                undefined ->        hmac_sha1;
+                <<"PLAINTEXT">> ->  plaintext;
+                <<"HMAC-SHA1">> ->  hmac_sha1;
+                <<"RSA-SHA1">> ->   rsa_sha1
+                end
+        }
+    end,
+    #httpdb{
+        url = Url,
+        oauth = OAuth,
+        headers = Headers
+    };
+parse_rep_db(<<"http://", _/binary>> = Url) ->
+    parse_rep_db({[{<<"url">>, Url}]});
+parse_rep_db(<<"https://", _/binary>> = Url) ->
+    parse_rep_db({[{<<"url">>, Url}]});
+parse_rep_db(<<DbName/binary>>) ->
+    DbName.
+
+
+maybe_add_trailing_slash(Url) when is_binary(Url) ->
+    maybe_add_trailing_slash(?b2l(Url));
+maybe_add_trailing_slash(Url) ->
+    case lists:last(Url) of
+    $/ ->
+        Url;
+    _ ->
+        Url ++ "/"
+    end.
+
+
+convert_options([])->
+    [];
+convert_options([{<<"cancel">>, V} | R]) ->
+    [{cancel, V} | convert_options(R)];
+convert_options([{<<"create_target">>, V} | R]) ->
+    [{create_target, V} | convert_options(R)];
+convert_options([{<<"continuous">>, V} | R]) ->
+    [{continuous, V} | convert_options(R)];
+convert_options([{<<"filter">>, V} | R]) ->
+    [{filter, V} | convert_options(R)];
+convert_options([{<<"query_params">>, V} | R]) ->
+    [{query_params, V} | convert_options(R)];
+convert_options([{<<"doc_ids">>, V} | R]) ->
+    [{doc_ids, V} | convert_options(R)];
+convert_options([_ | R]) -> % skip unknown option
+    convert_options(R).
+



Mime
View raw message