couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fdman...@apache.org
Subject svn commit: r1035780 - in /couchdb/trunk/src/couchdb: couch_changes.erl couch_rep.erl couch_rep_changes_feed.erl couch_rep_reader.erl couch_rep_writer.erl
Date Tue, 16 Nov 2010 20:12:05 GMT
Author: fdmanana
Date: Tue Nov 16 20:12:05 2010
New Revision: 1035780

URL: http://svn.apache.org/viewvc?rev=1035780&view=rev
Log:
Replicator: use the new builtin _doc_ids filter for the by doc IDs replication.
This reduces code complexity and allows for continuous by doc IDs replication.

Modified:
    couchdb/trunk/src/couchdb/couch_changes.erl
    couchdb/trunk/src/couchdb/couch_rep.erl
    couchdb/trunk/src/couchdb/couch_rep_changes_feed.erl
    couchdb/trunk/src/couchdb/couch_rep_reader.erl
    couchdb/trunk/src/couchdb/couch_rep_writer.erl

Modified: couchdb/trunk/src/couchdb/couch_changes.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_changes.erl?rev=1035780&r1=1035779&r2=1035780&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_changes.erl (original)
+++ couchdb/trunk/src/couchdb/couch_changes.erl Tue Nov 16 20:12:05 2010
@@ -119,6 +119,8 @@ os_filter_fun(FilterName, Style, Req, Db
             "filter parameter must be of the form `designname/filtername`"})
     end.
 
+builtin_filter_fun("_doc_ids", Style, {json_req, {Props}}, _Db) ->
+    filter_docids(couch_util:get_value(<<"doc_ids">>, Props), Style);
 builtin_filter_fun("_doc_ids", Style, #httpd{method='POST'}=Req, _Db) ->
     {Props} = couch_httpd:json_body_obj(Req),
     DocIds =  couch_util:get_value(<<"doc_ids">>, Props, nil),

Modified: couchdb/trunk/src/couchdb/couch_rep.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_rep.erl?rev=1035780&r1=1035779&r2=1035780&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_rep.erl (original)
+++ couchdb/trunk/src/couchdb/couch_rep.erl Tue Nov 16 20:12:05 2010
@@ -53,7 +53,6 @@
     committed_seq = 0,
 
     stats = nil,
-    doc_ids = nil,
     rep_doc = nil
 }).
 
@@ -129,7 +128,6 @@ do_init([RepId, {PostProps} = RepDoc, Us
     SourceProps = couch_util:get_value(<<"source">>, PostProps),
     TargetProps = couch_util:get_value(<<"target">>, PostProps),
 
-    DocIds = couch_util:get_value(<<"doc_ids">>, PostProps, nil),
     Continuous = couch_util:get_value(<<"continuous">>, PostProps, false),
     CreateTarget = couch_util:get_value(<<"create_target">>, PostProps, false),
 
@@ -143,40 +141,16 @@ do_init([RepId, {PostProps} = RepDoc, Us
 
     maybe_set_triggered(RepDoc, RepId),
 
-    case DocIds of
-    List when is_list(List) ->
-        % Fast replication using only a list of doc IDs to replicate.
-        % Replication sessions, checkpoints and logs are not created
-        % since the update sequence number of the source DB is not used
-        % for determining which documents are copied into the target DB.
-        SourceLog = nil,
-        TargetLog = nil,
+    [SourceLog, TargetLog] = find_replication_logs(
+        [Source, Target], RepId, {PostProps}, UserCtx),
+    {StartSeq, History} = compare_replication_logs(SourceLog, TargetLog),
 
-        StartSeq = nil,
-        History = nil,
-
-        ChangesFeed = nil,
-        MissingRevs = nil,
-
-        {ok, Reader} =
-        couch_rep_reader:start_link(self(), Source, DocIds, PostProps);
-
-    _ ->
-        % Replication using the _changes API (DB sequence update numbers).
-
-        [SourceLog, TargetLog] = find_replication_logs(
-            [Source, Target], RepId, {PostProps}, UserCtx),
-    
-        {StartSeq, History} = compare_replication_logs(SourceLog, TargetLog),
-
-        {ok, ChangesFeed} =
+    {ok, ChangesFeed} =
         couch_rep_changes_feed:start_link(self(), Source, StartSeq, PostProps),
-        {ok, MissingRevs} =
+    {ok, MissingRevs} =
         couch_rep_missing_revs:start_link(self(), Target, ChangesFeed, PostProps),
-        {ok, Reader} =
-        couch_rep_reader:start_link(self(), Source, MissingRevs, PostProps)
-    end,
-
+    {ok, Reader} =
+        couch_rep_reader:start_link(self(), Source, MissingRevs, PostProps),
     {ok, Writer} =
     couch_rep_writer:start_link(self(), Target, Reader, PostProps),
 
@@ -213,7 +187,6 @@ do_init([RepId, {PostProps} = RepDoc, Us
         rep_starttime = httpd_util:rfc1123_date(),
         src_starttime = couch_util:get_value(instance_start_time, SourceInfo),
         tgt_starttime = couch_util:get_value(instance_start_time, TargetInfo),
-        doc_ids = DocIds,
         rep_doc = RepDoc
     },
     {ok, State}.
@@ -390,25 +363,6 @@ dbinfo(Db) ->
     {ok, Info} = couch_db:get_db_info(Db),
     Info.
 
-do_terminate(#state{doc_ids=DocIds} = State) when is_list(DocIds) ->
-    #state{
-        listeners = Listeners,
-        rep_starttime = ReplicationStartTime,
-        stats = Stats
-    } = State,
-
-    RepByDocsJson = {[
-        {<<"start_time">>, ?l2b(ReplicationStartTime)},
-        {<<"end_time">>, ?l2b(httpd_util:rfc1123_date())},
-        {<<"docs_read">>, ets:lookup_element(Stats, docs_read, 2)},
-        {<<"docs_written">>, ets:lookup_element(Stats, docs_written, 2)},
-        {<<"doc_write_failures">>,
-            ets:lookup_element(Stats, doc_write_failures, 2)}
-    ]},
-
-    terminate_cleanup(State),
-    [gen_server:reply(L, {ok, RepByDocsJson}) || L <- lists:reverse(Listeners)];
-
 do_terminate(State) ->
     #state{
         checkpoint_history = CheckpointHistory,
@@ -659,32 +613,53 @@ do_checkpoint(State) ->
         rep_starttime = ReplicationStartTime,
         src_starttime = SrcInstanceStartTime,
         tgt_starttime = TgtInstanceStartTime,
-        stats = Stats
+        stats = Stats,
+        rep_doc = {RepDoc}
     } = State,
     case commit_to_both(Source, Target, NewSeqNum) of
     {SrcInstanceStartTime, TgtInstanceStartTime} ->
         ?LOG_INFO("recording a checkpoint for ~s -> ~s at source update_seq ~p",
             [dbname(Source), dbname(Target), NewSeqNum]),
+        EndTime = ?l2b(httpd_util:rfc1123_date()),
+        StartTime = ?l2b(ReplicationStartTime),
+        DocsRead = ets:lookup_element(Stats, docs_read, 2),
+        DocsWritten = ets:lookup_element(Stats, docs_written, 2),
+        DocWriteFailures = ets:lookup_element(Stats, doc_write_failures, 2),
         NewHistoryEntry = {[
             {<<"session_id">>, SessionId},
-            {<<"start_time">>, list_to_binary(ReplicationStartTime)},
-            {<<"end_time">>, list_to_binary(httpd_util:rfc1123_date())},
+            {<<"start_time">>, StartTime},
+            {<<"end_time">>, EndTime},
             {<<"start_last_seq">>, StartSeqNum},
             {<<"end_last_seq">>, NewSeqNum},
             {<<"recorded_seq">>, NewSeqNum},
             {<<"missing_checked">>, ets:lookup_element(Stats, total_revs, 2)},
             {<<"missing_found">>, ets:lookup_element(Stats, missing_revs, 2)},
-            {<<"docs_read">>, ets:lookup_element(Stats, docs_read, 2)},
-            {<<"docs_written">>, ets:lookup_element(Stats, docs_written, 2)},
-            {<<"doc_write_failures">>,
-                ets:lookup_element(Stats, doc_write_failures, 2)}
+            {<<"docs_read">>, DocsRead},
+            {<<"docs_written">>, DocsWritten},
+            {<<"doc_write_failures">>, DocWriteFailures}
         ]},
-        % limit history to 50 entries
-        NewRepHistory = {[
+        BaseHistory = [
             {<<"session_id">>, SessionId},
-            {<<"source_last_seq">>, NewSeqNum},
-            {<<"history">>, lists:sublist([NewHistoryEntry | OldHistory], 50)}
-        ]},
+            {<<"source_last_seq">>, NewSeqNum}
+        ] ++ case couch_util:get_value(<<"doc_ids">>, RepDoc) of
+        undefined ->
+            [];
+        DocIds when is_list(DocIds) ->
+            % backwards compatibility with the result of a replication by
+            % doc IDs in versions 0.11.x and 1.0.x
+            [
+                {<<"start_time">>, StartTime},
+                {<<"end_time">>, EndTime},
+                {<<"docs_read">>, DocsRead},
+                {<<"docs_written">>, DocsWritten},
+                {<<"doc_write_failures">>, DocWriteFailures}
+            ]
+        end,
+        % limit history to 50 entries
+        NewRepHistory = {
+            BaseHistory ++
+            [{<<"history">>, lists:sublist([NewHistoryEntry | OldHistory], 50)}]
+        },
 
         try
         {SrcRevPos,SrcRevId} =

Modified: couchdb/trunk/src/couchdb/couch_rep_changes_feed.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_rep_changes_feed.erl?rev=1035780&r1=1035779&r2=1035780&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_rep_changes_feed.erl (original)
+++ couchdb/trunk/src/couchdb/couch_rep_changes_feed.erl Tue Nov 16 20:12:05 2010
@@ -18,6 +18,7 @@
 -export([start_link/4, next/1, stop/1]).
 
 -define(BUFFER_SIZE, 1000).
+-define(DOC_IDS_FILTER_NAME, "_doc_ids").
 
 -include("couch_db.hrl").
 -include("../ibrowse/ibrowse.hrl").
@@ -36,6 +37,11 @@
     rows = queue:new()
 }).
 
+-import(couch_util, [
+    get_value/2,
+    get_value/3
+]).
+
 start_link(Parent, Source, StartSeq, PostProps) ->
     gen_server:start_link(?MODULE, [Parent, Source, StartSeq, PostProps], []).
 
@@ -46,9 +52,9 @@ stop(Server) ->
     catch gen_server:call(Server, stop),
     ok.
 
-init([Parent, #http_db{}=Source, Since, PostProps]) ->
+init([Parent, #http_db{headers = Headers0} = Source, Since, PostProps]) ->
     process_flag(trap_exit, true),
-    Feed = case couch_util:get_value(<<"continuous">>, PostProps, false) of
+    Feed = case get_value(<<"continuous">>, PostProps, false) of
     false ->
         normal;
     true ->
@@ -60,33 +66,24 @@ init([Parent, #http_db{}=Source, Since, 
         {"since", Since},
         {"feed", Feed}
     ],
-    QS = case couch_util:get_value(<<"filter">>, PostProps) of
+    {QS, Method, Body, Headers} = case get_value(<<"doc_ids">>, PostProps) of
     undefined ->
-        BaseQS;
-    FilterName ->
-        {Params} = couch_util:get_value(<<"query_params">>, PostProps, {[]}),
-        lists:foldr(
-            fun({K, V}, QSAcc) ->
-                Ks = couch_util:to_list(K),
-                case proplists:is_defined(Ks, QSAcc) of
-                true ->
-                    QSAcc;
-                false ->
-                    [{Ks, V} | QSAcc]
-                end
-            end,
-            [{"filter", FilterName} | BaseQS],
-            Params
-        )
+        {maybe_add_filter_qs_params(PostProps, BaseQS), get, nil, Headers0};
+    DocIds when is_list(DocIds) ->
+        Headers1 = [{"Content-Type", "application/json"} | Headers0],
+        QS1 = [{"filter", ?l2b(?DOC_IDS_FILTER_NAME)} | BaseQS],
+        {QS1, post, {[{<<"doc_ids">>, DocIds}]}, Headers1}
     end,
     Pid = couch_rep_httpc:spawn_link_worker_process(Source),
     Req = Source#http_db{
+        method = Method,
+        body = Body,
         resource = "_changes",
         qs = QS,
         conn = Pid,
         options = [{stream_to, {self(), once}}] ++
                 lists:keydelete(inactivity_timeout, 1, Source#http_db.options),
-        headers = Source#http_db.headers -- [{"Accept-Encoding", "gzip"}]
+        headers = Headers -- [{"Accept-Encoding", "gzip"}]
     },
     {ibrowse_req_id, ReqId} = couch_rep_httpc:request(Req),
     Args = [Parent, Req, Since, PostProps],
@@ -123,11 +120,17 @@ init([Parent, #http_db{}=Source, Since, 
 init([_Parent, Source, Since, PostProps] = InitArgs) ->
     process_flag(trap_exit, true),
     Server = self(),
+    Filter = case get_value(<<"doc_ids">>, PostProps) of
+    undefined ->
+        ?b2l(get_value(<<"filter">>, PostProps, <<>>));
+    DocIds when is_list(DocIds) ->
+        ?DOC_IDS_FILTER_NAME
+    end,
     ChangesArgs = #changes_args{
         style = all_docs,
         since = Since,
-        filter = ?b2l(couch_util:get_value(<<"filter">>, PostProps, <<>>)),
-        feed = case couch_util:get_value(<<"continuous">>, PostProps, false)
of
+        filter = Filter,
+        feed = case get_value(<<"continuous">>, PostProps, false) of
             true ->
                 "continuous";
             false ->
@@ -138,7 +141,7 @@ init([_Parent, Source, Since, PostProps]
     ChangesPid = spawn_link(fun() ->
         ChangesFeedFun = couch_changes:handle_changes(
             ChangesArgs,
-            {json_req, filter_json_req(Source, PostProps)},
+            {json_req, filter_json_req(Filter, Source, PostProps)},
             Source
         ),
         ChangesFeedFun(fun({change, Change, _}, _) ->
@@ -149,29 +152,49 @@ init([_Parent, Source, Since, PostProps]
     end),
     {ok, #state{changes_loop=ChangesPid, init_args=InitArgs}}.
 
-filter_json_req(Db, PostProps) ->
-    case couch_util:get_value(<<"filter">>, PostProps) of
+maybe_add_filter_qs_params(PostProps, BaseQS) ->
+    case get_value(<<"filter">>, PostProps) of
     undefined ->
-        {[]};
+        BaseQS;
     FilterName ->
-        {Query} = couch_util:get_value(<<"query_params">>, PostProps, {[]}),
-        {ok, Info} = couch_db:get_db_info(Db),
-        % simulate a request to db_name/_changes
-        {[
-            {<<"info">>, {Info}},
-            {<<"id">>, null},
-            {<<"method">>, 'GET'},
-            {<<"path">>, [couch_db:name(Db), <<"_changes">>]},
-            {<<"query">>, {[{<<"filter">>, FilterName} | Query]}},
-            {<<"headers">>, []},
-            {<<"body">>, []},
-            {<<"peer">>, <<"replicator">>},
-            {<<"form">>, []},
-            {<<"cookie">>, []},
-            {<<"userCtx">>, couch_util:json_user_ctx(Db)}
-       ]}
+        {Params} = get_value(<<"query_params">>, PostProps, {[]}),
+        lists:foldr(
+            fun({K, V}, QSAcc) ->
+                Ks = couch_util:to_list(K),
+                case proplists:is_defined(Ks, QSAcc) of
+                true ->
+                    QSAcc;
+                false ->
+                    [{Ks, V} | QSAcc]
+                end
+            end,
+            [{"filter", FilterName} | BaseQS],
+            Params
+        )
     end.
 
+filter_json_req([], _Db, _PostProps) ->
+    {[]};
+filter_json_req(?DOC_IDS_FILTER_NAME, _Db, PostProps) ->
+    {[{<<"doc_ids">>, get_value(<<"doc_ids">>, PostProps)}]};
+filter_json_req(FilterName, Db, PostProps) ->
+    {Query} = get_value(<<"query_params">>, PostProps, {[]}),
+    {ok, Info} = couch_db:get_db_info(Db),
+    % simulate a request to db_name/_changes
+    {[
+        {<<"info">>, {Info}},
+        {<<"id">>, null},
+        {<<"method">>, 'GET'},
+        {<<"path">>, [couch_db:name(Db), <<"_changes">>]},
+        {<<"query">>, {[{<<"filter">>, FilterName} | Query]}},
+        {<<"headers">>, []},
+        {<<"body">>, []},
+        {<<"peer">>, <<"replicator">>},
+        {<<"form">>, []},
+        {<<"cookie">>, []},
+        {<<"userCtx">>, couch_util:json_user_ctx(Db)}
+    ]}.
+
 handle_call({add_change, Row}, From, State) ->
     handle_add_change(Row, From, State);
 

Modified: couchdb/trunk/src/couchdb/couch_rep_reader.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_rep_reader.erl?rev=1035780&r1=1035779&r2=1035780&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_rep_reader.erl (original)
+++ couchdb/trunk/src/couchdb/couch_rep_reader.erl Tue Nov 16 20:12:05 2010
@@ -43,15 +43,13 @@
     opened_seqs = []
 }).
 
-start_link(Parent, Source, MissingRevs_or_DocIds, PostProps) ->
-    gen_server:start_link(
-        ?MODULE, [Parent, Source, MissingRevs_or_DocIds, PostProps], []
-    ).
+start_link(Parent, Source, MissingRevs, PostProps) ->
+    gen_server:start_link(?MODULE, [Parent, Source, MissingRevs, PostProps], []).
 
 next(Pid) ->
     gen_server:call(Pid, next_docs, infinity).
 
-init([Parent, Source, MissingRevs_or_DocIds, _PostProps]) ->
+init([Parent, Source, MissingRevs, _PostProps]) ->
     process_flag(trap_exit, true),
     if is_record(Source, http_db) ->
         #url{host=Host, port=Port} = ibrowse_lib:parse_url(Source#http_db.url),
@@ -59,15 +57,7 @@ init([Parent, Source, MissingRevs_or_Doc
         ibrowse:set_max_pipeline_size(Host, Port, ?MAX_PIPELINE_SIZE);
     true -> ok end,
     Self = self(),
-    ReaderLoop = spawn_link(
-        fun() -> reader_loop(Self, Source, MissingRevs_or_DocIds) end
-    ),
-    MissingRevs = case MissingRevs_or_DocIds of
-    Pid when is_pid(Pid) ->
-        Pid;
-    _ListDocIds ->
-        nil
-    end,
+    ReaderLoop = spawn_link(fun() -> reader_loop(Self, Source, MissingRevs) end),
     State = #state{
         parent = Parent,
         source = Source,
@@ -183,8 +173,6 @@ handle_reader_loop_complete(#state{monit
 handle_reader_loop_complete(State) ->
     {noreply, State#state{complete = waiting_on_monitors}}.
 
-calculate_new_high_seq(#state{missing_revs=nil}) ->
-    nil;
 calculate_new_high_seq(#state{requested_seqs=[], opened_seqs=[Open|_]}) ->
     Open;
 calculate_new_high_seq(#state{requested_seqs=[Req|_], opened_seqs=[Open|_]})
@@ -209,8 +197,6 @@ split_revlist(Rev, {[CurrentAcc|Rest], B
 % opened seqs greater than the smallest outstanding request.  I believe its the
 % minimal set of info needed to correctly calculate which seqs have been
 % replicated (because remote docs can be opened out-of-order) -- APK
-update_sequence_lists(_Seq, #state{missing_revs=nil} = State) ->
-    State;
 update_sequence_lists(Seq, State) ->
     Requested = lists:delete(Seq, State#state.requested_seqs),
     AllOpened = lists:merge([Seq], State#state.opened_seqs),
@@ -261,44 +247,6 @@ open_doc_revs(#http_db{url = Url} = DbS,
     end,
     lists:reverse(lists:foldl(Transform, [], JsonResults)).
 
-open_doc(#http_db{url = Url} = DbS, DocId) ->
-    % get latest rev of the doc
-    Req = DbS#http_db{
-        resource=encode_doc_id(DocId),
-        qs=[{att_encoding_info, true}]
-    },
-    {Props} = Json = couch_rep_httpc:request(Req),
-    case couch_util:get_value(<<"_id">>, Props) of
-    Id when is_binary(Id) ->
-        #doc{id=Id, revs=Rev, atts=Atts} = Doc = couch_doc:from_json_obj(Json),
-        [Doc#doc{
-            atts=[couch_rep_att:convert_stub(A, {DbS,Id,Rev}) || A <- Atts]
-        }];
-    undefined ->
-        Err = couch_util:get_value(<<"error">>, Props, ?JSON_ENCODE(Json)),
-        ?LOG_ERROR("Replicator: error accessing doc ~s at ~s, reason: ~s",
-            [DocId, couch_util:url_strip_password(Url), Err]),
-        []
-    end.
-
-reader_loop(ReaderServer, Source, DocIds) when is_list(DocIds) ->
-    case Source of
-    #http_db{} ->
-        [gen_server:call(ReaderServer, {open_remote_doc, Id, nil, nil},
-            infinity) || Id <- DocIds];
-    _LocalDb ->
-        Docs = lists:foldr(fun(Id, Acc) ->
-            case couch_db:open_doc(Source, Id) of
-            {ok, Doc} ->
-                [Doc | Acc];
-            _ ->
-                Acc
-            end
-        end, [], DocIds),
-        gen_server:call(ReaderServer, {add_docs, nil, Docs}, infinity)
-    end,
-    exit(complete);
-    
 reader_loop(ReaderServer, Source, MissingRevsServer) ->
     case couch_rep_missing_revs:next(MissingRevsServer) of
     complete ->
@@ -332,8 +280,6 @@ maybe_reopen_db(#db{update_seq=OldSeq} =
 maybe_reopen_db(Db, _HighSeq) ->
     Db.
 
-spawn_document_request(Source, Id, nil, nil) ->
-    spawn_document_request(Source, Id);
 spawn_document_request(Source, Id, Seq, Revs) ->
     Server = self(),
     SpawnFun = fun() ->
@@ -341,11 +287,3 @@ spawn_document_request(Source, Id, Seq, 
         gen_server:call(Server, {add_docs, Seq, Results}, infinity)
     end,
     spawn_monitor(SpawnFun).
-
-spawn_document_request(Source, Id) ->
-    Server = self(),
-    SpawnFun = fun() ->
-        Results = open_doc(Source, Id),
-        gen_server:call(Server, {add_docs, nil, Results}, infinity)
-    end,
-    spawn_monitor(SpawnFun).

Modified: couchdb/trunk/src/couchdb/couch_rep_writer.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_rep_writer.erl?rev=1035780&r1=1035779&r2=1035780&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_rep_writer.erl (original)
+++ couchdb/trunk/src/couchdb/couch_rep_writer.erl Tue Nov 16 20:12:05 2010
@@ -21,8 +21,6 @@ start_link(Parent, Target, Reader, _Post
 
 writer_loop(Parent, Reader, Target) ->
     case couch_rep_reader:next(Reader) of
-    {complete, nil} ->
-        ok;
     {complete, FinalSeq} ->
         Parent ! {writer_checkpoint, FinalSeq},
         ok;
@@ -40,12 +38,7 @@ writer_loop(Parent, Reader, Target) ->
             ?LOG_DEBUG("writer failed to write an attachment ~p", [Err]),
             exit({attachment_request_failed, Err, Docs})
         end,
-        case HighSeq of
-        nil ->
-            ok;
-        _SeqNumber ->
-            Parent ! {writer_checkpoint, HighSeq}
-        end,
+        Parent ! {writer_checkpoint, HighSeq},
         couch_rep_att:cleanup(),
         couch_util:should_flush(),
         writer_loop(Parent, Reader, Target)



Mime
View raw message