From commits-return-4639-apmail-couchdb-commits-archive=couchdb.apache.org@couchdb.apache.org Sun Jul 04 21:18:15 2010 Return-Path: Delivered-To: apmail-couchdb-commits-archive@www.apache.org Received: (qmail 25801 invoked from network); 4 Jul 2010 21:18:15 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 4 Jul 2010 21:18:15 -0000 Received: (qmail 58957 invoked by uid 500); 4 Jul 2010 21:18:15 -0000 Delivered-To: apmail-couchdb-commits-archive@couchdb.apache.org Received: (qmail 58917 invoked by uid 500); 4 Jul 2010 21:18:14 -0000 Mailing-List: contact commits-help@couchdb.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@couchdb.apache.org Delivered-To: mailing list commits@couchdb.apache.org Received: (qmail 58910 invoked by uid 99); 4 Jul 2010 21:18:14 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 04 Jul 2010 21:18:14 +0000 X-ASF-Spam-Status: No, hits=-1996.4 required=10.0 tests=ALL_TRUSTED,FS_REPLICA X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 04 Jul 2010 21:18:09 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 800CD2388994; Sun, 4 Jul 2010 21:16:45 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r960398 - in /couchdb/branches/new_replicator/src/couchdb: couch_api_wrap.erl couch_httpd_rep.erl couch_replicate.erl Date: Sun, 04 Jul 2010 21:16:45 -0000 To: commits@couchdb.apache.org From: fdmanana@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100704211645.800CD2388994@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: fdmanana Date: Sun Jul 4 21:16:42 2010 New Revision: 960398 URL: http://svn.apache.org/viewvc?rev=960398&view=rev Log: Code formatting only: - ensure lines are up to 80 characters wide; - make indentation more consistent; - added more white spaces to improve code readability; - removed trailling spaces Modified: couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl couchdb/branches/new_replicator/src/couchdb/couch_httpd_rep.erl couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl Modified: couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl?rev=960398&r1=960397&r2=960398&view=diff ============================================================================== --- couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl (original) +++ couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl Sun Jul 4 21:16:42 2010 @@ -63,17 +63,17 @@ get_db_info(#httpdb{url=Url,oauth=OAuth, ], infinity) of {ok, "200", _RespHeaders, Body} -> {Props} = ?JSON_DECODE(Body), - {ok, [{couch_util:to_existing_atom(K), V} || {K,V} <- Props]} + {ok, [{couch_util:to_existing_atom(K), V} || {K, V} <- Props]} end; get_db_info(Db) -> couch_db:get_db_info(Db). -open_doc(#httpdb{url=Url,oauth=OAuth,headers=Headers}, DocId, Options) -> +open_doc(#httpdb{url=Url, oauth=OAuth, headers=Headers}, DocId, Options) -> Url2 = Url ++ couch_util:url_encode(DocId), QArgs = options_to_query_args(Options, []), Headers2 = oauth_header(Url2, QArgs, get, OAuth) ++ Headers, - #url{host=Host,port=Port}=ibrowse_lib:parse_url(Url), + #url{host=Host, port=Port} = ibrowse_lib:parse_url(Url), {ok, Worker} = ibrowse:spawn_link_worker_process(Host,Port), try ibrowse:send_req_direct(Worker, Url2 ++ query_args_to_string(QArgs, []), Headers2, get, [], [ @@ -94,73 +94,79 @@ update_doc(Db, Doc, Options) -> update_doc(Db,Doc,Options,interactive_edit). -ensure_full_commit(#httpdb{url=Url,oauth=OAuth,headers=Headers}) -> +ensure_full_commit(#httpdb{url=Url, oauth=OAuth, headers=Headers}) -> Headers2 = oauth_header(Url, [], post, OAuth) ++ Headers, - #url{host=Host,port=Port}=ibrowse_lib:parse_url(Url), - {ok, Worker} = ibrowse:spawn_link_worker_process(Host,Port), - case ibrowse:send_req_direct(Worker, Url ++ "_ensure_full_commit", Headers2, post, [], [ - {response_format,binary} - ], infinity) of + #url{host=Host, port=Port} = ibrowse_lib:parse_url(Url), + {ok, Worker} = ibrowse:spawn_link_worker_process(Host, Port), + case ibrowse:send_req_direct(Worker, Url ++ "_ensure_full_commit", Headers2, + post, [], [{response_format, binary}], infinity) of {ok, "201", _RespHeaders, Body} -> catch ibrowse:stop_worker_process(Worker), {Props} = ?JSON_DECODE(Body), - {ok, couch_util:get_value(<<"instance_start_time">>,Props)} + {ok, couch_util:get_value(<<"instance_start_time">>, Props)} end; ensure_full_commit(Db) -> couch_db:ensure_full_commit(Db). -get_missing_revs(#httpdb{url=Url,oauth=OAuth,headers=Headers}, IdRevs) -> +get_missing_revs(#httpdb{url=Url, oauth=OAuth, headers=Headers}, IdRevs) -> Json = [{Id, couch_doc:revs_to_strs(Revs)} || {Id, Revs} <- IdRevs], Headers2 = oauth_header(Url, [], post, OAuth) ++ Headers, - #url{host=Host,port=Port}=ibrowse_lib:parse_url(Url), - {ok, Worker} = ibrowse:spawn_link_worker_process(Host,Port), + #url{host=Host, port=Port} = ibrowse_lib:parse_url(Url), + {ok, Worker} = ibrowse:spawn_link_worker_process(Host, Port), case ibrowse:send_req_direct(Worker, Url ++ "_revs_diff", Headers2, post, - ?JSON_ENCODE({Json}), [ - {response_format,binary} - ], infinity) of + ?JSON_ENCODE({Json}), [{response_format, binary}], infinity) of {ok, "200", _RespHeaders, Body} -> catch ibrowse:stop_worker_process(Worker), {JsonResults} = ?JSON_DECODE(Body), ConvertToNativeFun = fun({Id, {Result}}) -> - {Id, - couch_doc:parse_revs(couch_util:get_value(<<"missing">>,Result)), + { + Id, couch_doc:parse_revs( - couch_util:get_value(<<"possible_ancestors">>, Result, []))} - end, - {ok, lists:map(ConvertToNativeFun,JsonResults)} + couch_util:get_value(<<"missing">>, Result) + ), + couch_doc:parse_revs( + couch_util:get_value(<<"possible_ancestors">>, Result, []) + ) + } + end, + {ok, lists:map(ConvertToNativeFun, JsonResults)} end; get_missing_revs(Db, IdRevs) -> couch_db:get_missing_revs(Db, IdRevs). -open_doc_revs(#httpdb{url=Url,oauth=OAuth,headers=Headers}, Id, Revs, - Options, Fun, Acc) -> +open_doc_revs(#httpdb{} = HttpDb, Id, Revs, Options, Fun, Acc) -> + #httpdb{url=Url, oauth=OAuth, headers=Headers} = HttpDb, Self = self(), - QArgs = [{"revs", "true"},{"open_revs", ?JSON_ENCODE(couch_doc:revs_to_strs(Revs))} | - options_to_query_args(Options, [])], - IdEncoded = - case Id of - <<"_design/",RestId/binary>> -> + QArgs = [ + {"revs", "true"}, + {"open_revs", ?JSON_ENCODE(couch_doc:revs_to_strs(Revs))} | + options_to_query_args(Options, []) + ], + IdEncoded = case Id of + <<"_design/", RestId/binary>> -> "_design/" ++ couch_util:url_encode(RestId); _ -> couch_util:url_encode(Id) end, - Headers2 = oauth_header(Url ++ IdEncoded, QArgs, get, OAuth) ++ [{"accept", "multipart/mixed"} | Headers], - Streamer = spawn_link(fun()-> + Headers2 = oauth_header(Url ++ IdEncoded, QArgs, get, OAuth) ++ + [{"accept", "multipart/mixed"} | Headers], + Streamer = spawn_link(fun() -> FullUrl = Url ++ IdEncoded ++ query_args_to_string(QArgs, []), - #url{host=Host,port=Port}=ibrowse_lib:parse_url(Url), - {ok, Worker} = ibrowse:spawn_link_worker_process(Host,Port), - {ibrowse_req_id, ReqId} = ibrowse:send_req_direct(Worker, FullUrl, Headers2, get, [], [ - {response_format,binary}, - {stream_to, {self(), once}} - ], infinity), - + #url{host=Host, port=Port} = ibrowse_lib:parse_url(Url), + {ok, Worker} = ibrowse:spawn_link_worker_process(Host, Port), + {ibrowse_req_id, ReqId} = ibrowse:send_req_direct(Worker, FullUrl, + Headers2, get, [], + [{response_format, binary}, {stream_to, {self(), once}}], + infinity), + receive {ibrowse_async_headers, ReqId, "200", RespHeaders} -> CType = couch_util:get_value("Content-Type", RespHeaders), - couch_httpd:parse_multipart_request(CType, + couch_httpd:parse_multipart_request( + CType, fun() -> stream_data_self(ReqId) end, fun(Ev) -> mp_parse_mixed(Ev) end) end, @@ -172,76 +178,88 @@ open_doc_revs(Db, Id, Revs, Options, Fun {ok, Results} = couch_db:open_doc_revs(Db, Id, Revs, Options), {ok, lists:foldl(Fun, Acc, Results)}. - -update_doc(#httpdb{url=Url,headers=Headers,oauth=OAuth},Doc,Options,Type) -> - QArgs = if Type == replicated_changes -> - [{"new_edits", "false"}]; true -> [] end ++ - options_to_query_args(Options, []), - +update_doc(#httpdb{} = HttpDb, Doc, Options, Type) -> + #httpdb{url=Url, headers=Headers, oauth=OAuth} = HttpDb, + QArgs = case Type of + replicated_changes -> + [{"new_edits", "false"}]; + _ -> + [] + end ++ options_to_query_args(Options, []), + Boundary = couch_uuids:random(), - JsonBytes = ?JSON_ENCODE(couch_doc:to_json_obj(Doc, [revs,attachments,follows|Options])), + JsonBytes = ?JSON_ENCODE( + couch_doc:to_json_obj(Doc, [revs, attachments, follows | Options])), {ContentType, Len} = couch_doc:len_doc_to_multi_part_stream(Boundary, - JsonBytes, Doc#doc.atts, false), + JsonBytes, Doc#doc.atts, false), Self = self(), Headers2 = case lists:member(delay_commit, Options) of - true -> [{"X-Couch-Full-Commit", "false"}]; - false -> [] - end ++ [{"Content-Type", ?b2l(ContentType)}] ++ - oauth_header(Url, QArgs, put, OAuth) ++ Headers, + true -> + [{"X-Couch-Full-Commit", "false"}]; + false -> + [] + end ++ [{"Content-Type", ?b2l(ContentType)}] ++ + oauth_header(Url, QArgs, put, OAuth) ++ Headers, + Ref = make_ref(), % this streams the doc data to the ibrowse requester DocStreamer = spawn_link(fun() -> - couch_doc:doc_to_multi_part_stream(Boundary, - JsonBytes, Doc#doc.atts, - fun(Data) -> - receive {get_data, Ref, Pid} -> - Pid ! {data, Ref, Data} - end - end, - false), - unlink(Self) - end), - #url{host=Host,port=Port}=ibrowse_lib:parse_url(Url), - {ok, Worker} = ibrowse:spawn_link_worker_process(Host,Port), - case ibrowse:send_req_direct(Worker, Url ++ couch_util:url_encode(Doc#doc.id) ++ query_args_to_string(QArgs, []), - [{"Content-Length",Len}|Headers2], put, - {fun(0) -> - eof; - (LenLeft) when LenLeft > 0 -> - DocStreamer ! {get_data, Ref, self()}, - receive {data, Ref, Data} -> - {ok, Data, LenLeft - iolist_size(Data)} + couch_doc:doc_to_multi_part_stream(Boundary, + JsonBytes, Doc#doc.atts, + fun(Data) -> + receive {get_data, Ref, Pid} -> + Pid ! {data, Ref, Data} end - end, Len}, [], infinity) of + end, + false), + unlink(Self) + end), + #url{host=Host, port=Port} = ibrowse_lib:parse_url(Url), + {ok, Worker} = ibrowse:spawn_link_worker_process(Host, Port), + SendFun = fun(0) -> + eof; + (LenLeft) when LenLeft > 0 -> + DocStreamer ! {get_data, Ref, self()}, + receive {data, Ref, Data} -> + {ok, Data, LenLeft - iolist_size(Data)} + end + end, + case ibrowse:send_req_direct(Worker, + Url ++ couch_util:url_encode(Doc#doc.id) ++ + query_args_to_string(QArgs, []), [{"Content-Length", Len} | Headers2], + put, {SendFun, Len}, [], infinity) of {ok, [$2,$0, _], _RespHeaders, Body} -> catch ibrowse:stop_worker_process(Worker), {Props} = ?JSON_DECODE(Body), {ok, couch_doc:parse_rev(couch_util:get_value(<<"rev">>, Props))} end; -update_doc(Db,Doc,Options,Type) -> - couch_db:update_doc(Db,Doc,Options,Type). +update_doc(Db, Doc, Options, Type) -> + couch_db:update_doc(Db, Doc, Options, Type). -changes_since(#httpdb{url=Url,headers=Headers,oauth=OAuth}, Style, - StartSeq, UserFun, Acc) -> +changes_since(#httpdb{} = HttpDb, Style, StartSeq, UserFun, Acc) -> + #httpdb{url=Url, headers=Headers, oauth=OAuth} = HttpDb, Url2 = Url ++ "_changes", - QArgs = [{"style", atom_to_list(Style)}, - {"since", integer_to_list(StartSeq)}], + QArgs = [ + {"style", atom_to_list(Style)}, + {"since", integer_to_list(StartSeq)} + ], Headers2 = oauth_header(Url2, QArgs, get, OAuth) ++ Headers, - #url{host=Host,port=Port}=ibrowse_lib:parse_url(Url), - {ok, Worker} = ibrowse:spawn_link_worker_process(Host,Port), - {ibrowse_req_id, ReqId} = ibrowse:send_req_direct(Worker, Url2 ++ query_args_to_string(QArgs, ""), - Headers2, get, [], [ - {response_format,binary}, - {stream_to, {self(), once}}], infinity), + #url{host=Host, port=Port} = ibrowse_lib:parse_url(Url), + {ok, Worker} = ibrowse:spawn_link_worker_process(Host, Port), + + {ibrowse_req_id, ReqId} = ibrowse:send_req_direct(Worker, + Url2 ++ query_args_to_string(QArgs, ""), Headers2, get, [], + [{response_format, binary}, {stream_to, {self(), once}}], infinity), + DataFun = fun() -> - receive {ibrowse_async_headers, ReqId, "200", _Headers} -> - stream_data_self(ReqId) - end - end, + receive {ibrowse_async_headers, ReqId, "200", _Headers} -> + stream_data_self(ReqId) + end + end, EventFun = fun(Ev) -> - changes_ev1(Ev, UserFun, Acc) - end, + changes_ev1(Ev, UserFun, Acc) + end, try json_stream_parse:events(DataFun, EventFun) after @@ -255,23 +273,23 @@ changes_since(Db, Style, StartSeq, UserF options_to_query_args([], Acc) -> lists:reverse(Acc); -options_to_query_args([delay_commit|Rest], Acc) -> +options_to_query_args([delay_commit | Rest], Acc) -> options_to_query_args(Rest, Acc); -options_to_query_args([{atts_since,[]}|Rest], Acc) -> +options_to_query_args([{atts_since, []} | Rest], Acc) -> options_to_query_args(Rest, Acc); -options_to_query_args([{atts_since,PossibleAncestors}|Rest], Acc) -> +options_to_query_args([{atts_since, PossibleAncestors} | Rest], Acc) -> % NOTE, we should limit the # of PossibleAncestors sent. Since a large % # can exceed the max URL length. Limiting the # only results in % attachments being fully copied from source to target, instead of % incrementally. - options_to_query_args(Rest, [{"atts_since",?JSON_ENCODE( - couch_doc:revs_to_strs(PossibleAncestors))} | Acc]). + AncestorsJson = ?JSON_ENCODE(couch_doc:revs_to_strs(PossibleAncestors)), + options_to_query_args(Rest, [{"atts_since", AncestorsJson} | Acc]). query_args_to_string([], []) -> ""; query_args_to_string([], Acc) -> "?" ++ string:join(lists:reverse(Acc), "&"); -query_args_to_string([{K,V}|Rest], Acc) -> +query_args_to_string([{K, V} | Rest], Acc) -> query_args_to_string(Rest, [(K ++ "=" ++ V) | Acc]). receive_docs(Streamer, UserFun, UserAcc) -> @@ -281,7 +299,7 @@ receive_docs(Streamer, UserFun, UserAcc) case couch_util:get_value("content-type", Headers) of {"multipart/related", _} = ContentType -> case couch_doc:doc_from_multi_part_stream(ContentType, - fun() -> receive_doc_data(Streamer) end) of + fun() -> receive_doc_data(Streamer) end) of {ok, Doc} -> UserAcc2 = UserFun({ok, Doc}, UserAcc), receive_docs(Streamer, UserFun, UserAcc2) @@ -310,7 +328,6 @@ receive_all(Streamer, Acc)-> body_done -> lists:reverse(Acc) end. - receive_doc_data(Streamer)-> @@ -320,7 +337,7 @@ receive_doc_data(Streamer)-> {Bytes, fun() -> receive_doc_data(Streamer) end}; body_done -> {<<>>, fun() -> receive_doc_data(Streamer) end} - end. + end. mp_parse_mixed(eof) -> @@ -364,7 +381,7 @@ changes_ev1(object_start, UserFun, UserA fun(Ev) -> changes_ev2(Ev, UserFun, UserAcc) end. changes_ev2({key, <<"results">>}, UserFun, UserAcc) -> - fun(Ev)-> changes_ev3(Ev, UserFun, UserAcc) end; + fun(Ev) -> changes_ev3(Ev, UserFun, UserAcc) end; changes_ev2(_, UserFun, UserAcc) -> fun(Ev) -> changes_ev2(Ev, UserFun, UserAcc) end. @@ -374,10 +391,10 @@ changes_ev3(array_start, UserFun, UserAc changes_ev_loop(object_start, UserFun, UserAcc) -> fun(Ev) -> json_stream_parse:collect_object(Ev, - fun(Obj) -> - UserAcc2 = UserFun(json_to_doc_info(Obj), UserAcc), - fun(Ev2) -> changes_ev_loop(Ev2, UserFun, UserAcc2) end - end) + fun(Obj) -> + UserAcc2 = UserFun(json_to_doc_info(Obj), UserAcc), + fun(Ev2) -> changes_ev_loop(Ev2, UserFun, UserAcc2) end + end) end; changes_ev_loop(array_end, _UserFun, _UserAcc) -> fun(_Ev) -> changes_ev_done() end. @@ -389,27 +406,28 @@ json_to_doc_info({Props}) -> Id = couch_util:get_value(<<"id">>, Props), Seq = couch_util:get_value(<<"seq">>, Props), Changes = couch_util:get_value(<<"changes">>, Props), - + RevsInfo = lists:map( fun({Change}) -> Rev = couch_doc:parse_rev(couch_util:get_value(<<"rev">>, Change)), Del = ("true" == couch_util:get_value(<<"deleted">>, Change)), - #rev_info{rev=Rev,deleted=Del} + #rev_info{rev=Rev, deleted=Del} end, Changes), - #doc_info{id=Id,high_seq=Seq,revs=RevsInfo}. + #doc_info{id=Id, high_seq=Seq, revs=RevsInfo}. oauth_header(_Url, _QS, _Action, nil) -> []; oauth_header(Url, QS, Action, OAuth) -> - Consumer = - {OAuth#oauth.consumer_key, - OAuth#oauth.consumer_secret, - OAuth#oauth.signature_method}, + Consumer = { + OAuth#oauth.consumer_key, + OAuth#oauth.consumer_secret, + OAuth#oauth.signature_method + }, Method = case Action of - get -> "GET"; - post -> "POST"; - put -> "PUT"; - head -> "HEAD" + get -> "GET"; + post -> "POST"; + put -> "PUT"; + head -> "HEAD" end, Params = oauth:signed_params(Method, Url, QS, Consumer, #oauth.token, @@ -417,4 +435,3 @@ oauth_header(Url, QS, Action, OAuth) -> [{"Authorization", "OAuth " ++ oauth_uri:params_to_header_string(Params)}]. - \ No newline at end of file Modified: couchdb/branches/new_replicator/src/couchdb/couch_httpd_rep.erl URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_httpd_rep.erl?rev=960398&r1=960397&r2=960398&view=diff ============================================================================== --- couchdb/branches/new_replicator/src/couchdb/couch_httpd_rep.erl (original) +++ couchdb/branches/new_replicator/src/couchdb/couch_httpd_rep.erl Sun Jul 4 21:16:42 2010 @@ -15,17 +15,20 @@ -include("couch_db.hrl"). -include("couch_api_wrap.hrl"). --import(couch_httpd, - [send_json/2,send_json/3,send_json/4,send_method_not_allowed/2, - start_json_response/2,start_json_response/3, - send_chunk/2,last_chunk/1,end_json_response/1, - start_chunked_response/3, absolute_uri/2, send/2, - start_response_length/4]). +-import(couch_httpd, [ + send_json/2, send_json/3, send_json/4, + send/2, + send_method_not_allowed/2, + start_response_length/4, + start_json_response/2, start_json_response/3, end_json_response/1, + start_chunked_response/3, send_chunk/2, last_chunk/1, + absolute_uri/2 +]). -export([handle_req/1]). -handle_req(#httpd{method='POST'}=Req) -> +handle_req(#httpd{method='POST'} = Req) -> {PostBody} = couch_httpd:json_body_obj(Req), SrcDb = parse_rep_db(couch_util:get_value(<<"source">>, PostBody)), TgtDb = parse_rep_db(couch_util:get_value(<<"target">>, PostBody)), @@ -48,7 +51,7 @@ parse_rep_db({Props}) -> Url = maybe_add_trailing_slash(couch_util:get_value(<<"url">>, Props)), {AuthProps} = couch_util:get_value(<<"auth">>, Props, {[]}), {BinHeaders} = couch_util:get_value(<<"headers">>, Props, {[]}), - Headers = [{?b2l(K),?b2l(V)} || {K,V} <- BinHeaders], + Headers = [{?b2l(K), ?b2l(V)} || {K, V} <- BinHeaders], case couch_util:get_value(<<"oauth">>, AuthProps) of undefined -> @@ -78,27 +81,27 @@ parse_rep_db({Props}) -> oauth = OAuth, headers = Headers }; -parse_rep_db(<<"http://",_/binary>>=Url) -> - parse_rep_db({[{<<"url">>,Url}]}); -parse_rep_db(<<"https://",_/binary>>=Url) -> - parse_rep_db({[{<<"url">>,Url}]}); +parse_rep_db(<<"http://", _/binary>> = Url) -> + parse_rep_db({[{<<"url">>, Url}]}); +parse_rep_db(<<"https://", _/binary>> = Url) -> + parse_rep_db({[{<<"url">>, Url}]}); parse_rep_db(<>) -> DbName. convert_options([])-> []; -convert_options([{<<"cancel">>, V}|R])-> - [{cancel, V}|convert_options(R)]; -convert_options([{<<"create_target">>, V}|R])-> - [{create_target, V}|convert_options(R)]; -convert_options([{<<"continuous">>, V}|R])-> - [{continuous, V}|convert_options(R)]; -convert_options([{<<"filter">>, V}|R])-> - [{filter, V}|convert_options(R)]; -convert_options([{<<"query_params">>, V}|R])-> - [{query_params, V}|convert_options(R)]; -convert_options([_|R])-> % skip unknown option +convert_options([{<<"cancel">>, V} | R]) -> + [{cancel, V} | convert_options(R)]; +convert_options([{<<"create_target">>, V} | R]) -> + [{create_target, V} | convert_options(R)]; +convert_options([{<<"continuous">>, V} | R]) -> + [{continuous, V} | convert_options(R)]; +convert_options([{<<"filter">>, V} | R]) -> + [{filter, V} | convert_options(R)]; +convert_options([{<<"query_params">>, V} | R]) -> + [{query_params, V} | convert_options(R)]; +convert_options([_ | R]) -> % skip unknown option convert_options(R). Modified: couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl?rev=960398&r1=960397&r2=960398&view=diff ============================================================================== --- couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl (original) +++ couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl Sun Jul 4 21:16:42 2010 @@ -51,7 +51,7 @@ start(Src, Tgt, Options, UserCtx) -> % initalize the replication state, looking for existing rep records % for incremental replication. - #rep_state{source=Source,target=Target,start_seq=StartSeq} = State = + #rep_state{source=Source, target=Target, start_seq=StartSeq} = State = init_state(Src, Tgt, Options, UserCtx), % Create the work queues @@ -117,7 +117,7 @@ init_state(Src,Tgt,Options,UserCtx)-> src_starttime = couch_util:get_value(instance_start_time, SourceInfo), tgt_starttime = couch_util:get_value(instance_start_time, TargetInfo) }, - State#rep_state{timer = erlang:start_timer(checkpoint_interval(State), + State#rep_state{timer = erlang:start_timer(checkpoint_interval(State), self(), timed_checkpoint)}. @@ -125,7 +125,7 @@ spawn_changes_reader(Cp, StartSeq, Sourc spawn_link( fun()-> couch_api_wrap:changes_since(Source, all_docs, StartSeq, - fun(#doc_info{high_seq=Seq,revs=Revs}=DocInfo, _)-> + fun(#doc_info{high_seq=Seq, revs=Revs} = DocInfo, _) -> Cp ! {seq_start, {Seq, length(Revs)}}, Cp ! {add_stat, {#stats.missing_checked, length(Revs)}}, ok = couch_work_queue:queue(ChangesQueue, DocInfo), @@ -153,7 +153,7 @@ missing_revs_finder_loop(Cp, couch_work_queue:close(MissingRevsQueue); {ok, DocInfos} -> IdRevs = [{Id, [Rev || #rev_info{rev=Rev} <- RevsInfo]} || - #doc_info{id=Id,revs=RevsInfo} <- DocInfos], + #doc_info{id=Id, revs=RevsInfo} <- DocInfos], {ok, Missing} = couch_api_wrap:get_missing_revs(Target, IdRevs), % Figured out which on the target are missing. % Missing contains the id and revs missing, and any possible @@ -165,29 +165,27 @@ missing_revs_finder_loop(Cp, % now complete. IdRevsSeqDict = dict:from_list( [{Id, {[Rev || #rev_info{rev=Rev} <- RevsInfo], Seq}} || - #doc_info{id=Id,revs=RevsInfo,high_seq=Seq} <- DocInfos]), + #doc_info{id=Id, revs=RevsInfo, high_seq=Seq} <- DocInfos]), NonMissingIdRevsSeqDict = remove_missing(IdRevsSeqDict, Missing), % signal the completion of these that aren't missing - lists:foreach(fun({_Id, {Revs, Seq}})-> + lists:foreach(fun({_Id, {Revs, Seq}}) -> Cp ! {seq_changes_done, {Seq, length(Revs)}} end, dict:to_list(NonMissingIdRevsSeqDict)), % Expand out each docs and seq into it's own work item - lists:foreach(fun({Id, Revs, PAs})-> + lists:foreach(fun({Id, Revs, PAs}) -> % PA means "possible ancestor" Cp ! {add_stat, {#stats.missing_found, length(Revs)}}, {_, Seq} = dict:fetch(Id, IdRevsSeqDict), - ok = couch_work_queue:queue(MissingRevsQueue, - {Id, Revs, PAs, Seq}) + ok = couch_work_queue:queue(MissingRevsQueue, {Id, Revs, PAs, Seq}) end, Missing), - missing_revs_finder_loop(Cp, Target, ChangesQueue, - MissingRevsQueue) + missing_revs_finder_loop(Cp, Target, ChangesQueue, MissingRevsQueue) end. remove_missing(IdRevsSeqDict, []) -> IdRevsSeqDict; -remove_missing(IdRevsSeqDict, [{MissingId, MissingRevs, _}|Rest]) -> +remove_missing(IdRevsSeqDict, [{MissingId, MissingRevs, _} | Rest]) -> {AllChangedRevs, Seq} = dict:fetch(MissingId, IdRevsSeqDict), case AllChangedRevs -- MissingRevs of [] -> @@ -213,28 +211,28 @@ doc_copy_loop(Cp, Source, Target, Missin closed -> Cp ! done; {ok, [{Id, Revs, PossibleAncestors, Seq}]} -> + DocFun = fun({ok, Doc}, _) -> + % we are called for every rev read on the source + Cp ! {add_stat, {#stats.docs_read, 1}}, + % now write the doc to the target. + case couch_api_wrap:update_doc(Target, Doc, [], + replicated_changes) of + {ok, _} -> + Cp ! {add_stat, {#stats.docs_written, 1}}; + _Error -> + Cp ! {add_stat, {#stats.doc_write_failures, 1}} + end; + (_, _) -> + ok + end, couch_api_wrap:open_doc_revs(Source, Id, Revs, - [{atts_since,PossibleAncestors}], - fun({ok, Doc}, _) -> - % we are called for every rev read on the source - Cp ! {add_stat, {#stats.docs_read, 1}}, - % now write the doc to the target. - case couch_api_wrap:update_doc(Target, Doc, [], - replicated_changes) of - {ok, _} -> - Cp ! {add_stat, {#stats.docs_written, 1}}; - _Error -> - Cp ! {add_stat, {#stats.doc_write_failures, 1}} - end; - (_, _) -> - ok - end, []), + [{atts_since, PossibleAncestors}], DocFun, []), Cp ! {seq_changes_done, {Seq, length(Revs)}}, doc_copy_loop(Cp, Source, Target, MissingRevsQueue) end. checkpoint_loop(State, SeqsInProgress, Stats) -> - % SeqsInProgress contains the number of revs for each seq foiund by the + % SeqsInProgress contains the number of revs for each seq found by the % changes process. receive {seq_start, {Seq, NumChanges}} -> @@ -246,7 +244,7 @@ checkpoint_loop(State, SeqsInProgress, S TotalChanges = gb_trees:get(Seq, SeqsInProgress), case TotalChanges - NumChangesDone of 0 -> - % this seq is completely processed. Chck to see if it was the + % this seq is completely processed. Check to see if it was the % smallest seq in progess. If so, we've made progress that can % be checkpointed. State2 = @@ -256,8 +254,7 @@ checkpoint_loop(State, SeqsInProgress, S _ -> State end, - checkpoint_loop(State2, - gb_trees:delete(Seq,SeqsInProgress), Stats); + checkpoint_loop(State2, gb_trees:delete(Seq,SeqsInProgress), Stats); NewTotalChanges when NewTotalChanges > 0 -> % Still some changes that need work done. Put the new count back. SeqsInProgress2 = @@ -283,7 +280,7 @@ checkpoint_loop(State, SeqsInProgress, S % every checkpoint interval while processing State2 = do_checkpoint(State, Stats), Timer = erlang:start_timer(checkpoint_interval(State), - self(), timed_checkpoint), + self(), timed_checkpoint), checkpoint_loop(State2#rep_state{timer=Timer}, SeqsInProgress, Stats) end. @@ -291,7 +288,7 @@ checkpoint_loop(State, SeqsInProgress, S checkpoint_interval(_State) -> 5000. -do_checkpoint(#rep_state{current_through_seq=Seq,committed_seq=OldSeq}=State, +do_checkpoint(#rep_state{current_through_seq=Seq, committed_seq=OldSeq} = State, _Stats) when Seq == OldSeq -> State; do_checkpoint(State, Stats) -> @@ -335,20 +332,20 @@ do_checkpoint(State, Stats) -> ]}, try - {ok, {SrcRevPos,SrcRevId}} = couch_api_wrap:update_doc(Source, + {ok, {SrcRevPos,SrcRevId}} = couch_api_wrap:update_doc(Source, SourceLog#doc{body=NewRepHistory}, [delay_commit]), - {ok, {TgtRevPos,TgtRevId}} = couch_api_wrap:update_doc(Target, + {ok, {TgtRevPos,TgtRevId}} = couch_api_wrap:update_doc(Target, TargetLog#doc{body=NewRepHistory}, [delay_commit]), - State#rep_state{ - checkpoint_history = NewRepHistory, - committed_seq = NewSeq, - source_log = SourceLog#doc{revs={SrcRevPos, [SrcRevId]}}, - target_log = TargetLog#doc{revs={TgtRevPos, [TgtRevId]}} - } + State#rep_state{ + checkpoint_history = NewRepHistory, + committed_seq = NewSeq, + source_log = SourceLog#doc{revs={SrcRevPos, [SrcRevId]}}, + target_log = TargetLog#doc{revs={TgtRevPos, [TgtRevId]}} + } catch throw:conflict -> - ?LOG_ERROR("checkpoint failure: conflict (are you replicating to " - "yourself?)", []), - State + ?LOG_ERROR("checkpoint failure: conflict (are you replicating to " + "yourself?)", []), + State end; _Else -> ?LOG_INFO("rebooting ~p -> ~p from last known replication checkpoint", @@ -360,8 +357,10 @@ do_checkpoint(State, Stats) -> commit_to_both(Source, Target) -> % commit the src async ParentPid = self(), - SrcCommitPid = spawn_link(fun() -> - ParentPid ! {self(), couch_api_wrap:ensure_full_commit(Source)} end), + SrcCommitPid = spawn_link( + fun() -> + ParentPid ! {self(), couch_api_wrap:ensure_full_commit(Source)} + end), % commit tgt sync {ok, TargetStartTime} = couch_api_wrap:ensure_full_commit(Target), @@ -393,7 +392,7 @@ make_replication_id(Source, Target, User end, couch_util:to_hex(erlang:md5(term_to_binary(Base))). -get_rep_endpoint(_UserCtx, #httpdb{url=Url,headers=Headers,oauth=OAuth}) -> +get_rep_endpoint(_UserCtx, #httpdb{url=Url, headers=Headers, oauth=OAuth}) -> case OAuth of nil -> {remote, Url, Headers}; @@ -428,7 +427,7 @@ compare_replication_logs(SrcDoc, TgtDoc) compare_rep_history(S, T) when S =:= [] orelse T =:= [] -> ?LOG_INFO("no common ancestry -- performing full replication", []), {0, []}; -compare_rep_history([{S}|SourceRest], [{T}|TargetRest]=Target) -> +compare_rep_history([{S} | SourceRest], [{T} | TargetRest] = Target) -> SourceId = couch_util:get_value(<<"session_id">>, S), case has_session_id(SourceId, Target) of true ->