couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fdman...@apache.org
Subject svn commit: r960398 - in /couchdb/branches/new_replicator/src/couchdb: couch_api_wrap.erl couch_httpd_rep.erl couch_replicate.erl
Date Sun, 04 Jul 2010 21:16:45 GMT
Author: fdmanana
Date: Sun Jul  4 21:16:42 2010
New Revision: 960398

URL: http://svn.apache.org/viewvc?rev=960398&view=rev
Log:
Code formatting only:

- ensure lines are up to 80 characters wide;
- make indentation more consistent;
- added more white spaces to improve code readability;
- removed trailling spaces


Modified:
    couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl
    couchdb/branches/new_replicator/src/couchdb/couch_httpd_rep.erl
    couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl

Modified: couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl?rev=960398&r1=960397&r2=960398&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl Sun Jul  4 21:16:42 2010
@@ -63,17 +63,17 @@ get_db_info(#httpdb{url=Url,oauth=OAuth,
            ], infinity) of
     {ok, "200", _RespHeaders, Body} ->
         {Props} = ?JSON_DECODE(Body),
-       {ok, [{couch_util:to_existing_atom(K), V} || {K,V} <- Props]}
+        {ok, [{couch_util:to_existing_atom(K), V} || {K, V} <- Props]}
     end;
 get_db_info(Db) ->
     couch_db:get_db_info(Db).
 
 
-open_doc(#httpdb{url=Url,oauth=OAuth,headers=Headers}, DocId, Options) ->
+open_doc(#httpdb{url=Url, oauth=OAuth, headers=Headers}, DocId, Options) ->
     Url2 = Url ++ couch_util:url_encode(DocId),
     QArgs = options_to_query_args(Options, []),
     Headers2 = oauth_header(Url2, QArgs, get, OAuth) ++ Headers,
-    #url{host=Host,port=Port}=ibrowse_lib:parse_url(Url),
+    #url{host=Host, port=Port} = ibrowse_lib:parse_url(Url),
     {ok, Worker} = ibrowse:spawn_link_worker_process(Host,Port),
     try ibrowse:send_req_direct(Worker, Url2 ++ query_args_to_string(QArgs, []), 
             Headers2, get, [], [ 
@@ -94,73 +94,79 @@ update_doc(Db, Doc, Options) ->
     update_doc(Db,Doc,Options,interactive_edit).
 
 
-ensure_full_commit(#httpdb{url=Url,oauth=OAuth,headers=Headers}) ->
+ensure_full_commit(#httpdb{url=Url, oauth=OAuth, headers=Headers}) ->
     Headers2 = oauth_header(Url, [], post, OAuth) ++ Headers,
-    #url{host=Host,port=Port}=ibrowse_lib:parse_url(Url),
-    {ok, Worker} = ibrowse:spawn_link_worker_process(Host,Port),
-    case ibrowse:send_req_direct(Worker, Url ++ "_ensure_full_commit", Headers2, post, [],
[ 
-           {response_format,binary}
-           ], infinity) of
+    #url{host=Host, port=Port} = ibrowse_lib:parse_url(Url),
+    {ok, Worker} = ibrowse:spawn_link_worker_process(Host, Port),
+    case ibrowse:send_req_direct(Worker, Url ++ "_ensure_full_commit", Headers2,
+            post, [], [{response_format, binary}], infinity) of
     {ok, "201", _RespHeaders, Body} ->
         catch ibrowse:stop_worker_process(Worker),
         {Props} = ?JSON_DECODE(Body),
-       {ok, couch_util:get_value(<<"instance_start_time">>,Props)}
+       {ok, couch_util:get_value(<<"instance_start_time">>, Props)}
     end;
 ensure_full_commit(Db) ->
     couch_db:ensure_full_commit(Db).
 
 
-get_missing_revs(#httpdb{url=Url,oauth=OAuth,headers=Headers}, IdRevs) ->
+get_missing_revs(#httpdb{url=Url, oauth=OAuth, headers=Headers}, IdRevs) ->
     Json = [{Id, couch_doc:revs_to_strs(Revs)} || {Id, Revs} <- IdRevs],
     Headers2 = oauth_header(Url, [], post, OAuth) ++ Headers,
-    #url{host=Host,port=Port}=ibrowse_lib:parse_url(Url),
-    {ok, Worker} = ibrowse:spawn_link_worker_process(Host,Port),
+    #url{host=Host, port=Port} = ibrowse_lib:parse_url(Url),
+    {ok, Worker} = ibrowse:spawn_link_worker_process(Host, Port),
     case ibrowse:send_req_direct(Worker, Url ++ "_revs_diff", Headers2, post,
-            ?JSON_ENCODE({Json}), [ 
-           {response_format,binary}
-           ], infinity) of
+            ?JSON_ENCODE({Json}), [{response_format, binary}], infinity) of
     {ok, "200", _RespHeaders, Body} ->
         catch ibrowse:stop_worker_process(Worker),
         {JsonResults} = ?JSON_DECODE(Body),
         ConvertToNativeFun = fun({Id, {Result}}) ->
-                {Id,
-                couch_doc:parse_revs(couch_util:get_value(<<"missing">>,Result)),
+            {
+                Id,
                 couch_doc:parse_revs(
-                    couch_util:get_value(<<"possible_ancestors">>, Result, []))}
-            end,
-        {ok, lists:map(ConvertToNativeFun,JsonResults)}
+                    couch_util:get_value(<<"missing">>, Result)
+                ),
+                couch_doc:parse_revs(
+                    couch_util:get_value(<<"possible_ancestors">>, Result, [])
+                )
+            }
+        end,
+        {ok, lists:map(ConvertToNativeFun, JsonResults)}
     end;
 get_missing_revs(Db, IdRevs) ->
     couch_db:get_missing_revs(Db, IdRevs).
 
 
 
-open_doc_revs(#httpdb{url=Url,oauth=OAuth,headers=Headers}, Id, Revs, 
-        Options, Fun, Acc) ->
+open_doc_revs(#httpdb{} = HttpDb, Id, Revs, Options, Fun, Acc) ->
+    #httpdb{url=Url, oauth=OAuth, headers=Headers} = HttpDb,
     Self = self(),
-    QArgs = [{"revs", "true"},{"open_revs", ?JSON_ENCODE(couch_doc:revs_to_strs(Revs))} |

-        options_to_query_args(Options, [])],
-    IdEncoded =
-    case Id of
-    <<"_design/",RestId/binary>> ->
+    QArgs = [
+        {"revs", "true"},
+        {"open_revs", ?JSON_ENCODE(couch_doc:revs_to_strs(Revs))} |
+        options_to_query_args(Options, [])
+    ],
+    IdEncoded = case Id of
+    <<"_design/", RestId/binary>> ->
         "_design/" ++ couch_util:url_encode(RestId);
     _ ->
         couch_util:url_encode(Id)
     end,
-    Headers2 = oauth_header(Url ++ IdEncoded, QArgs, get, OAuth) ++ [{"accept", "multipart/mixed"}
| Headers],
-    Streamer = spawn_link(fun()->
+    Headers2 = oauth_header(Url ++ IdEncoded, QArgs, get, OAuth) ++
+        [{"accept", "multipart/mixed"} | Headers],
+    Streamer = spawn_link(fun() ->
             FullUrl = Url ++ IdEncoded ++ query_args_to_string(QArgs, []),
-            #url{host=Host,port=Port}=ibrowse_lib:parse_url(Url),
-            {ok, Worker} = ibrowse:spawn_link_worker_process(Host,Port),
-            {ibrowse_req_id, ReqId} = ibrowse:send_req_direct(Worker, FullUrl, Headers2,
get, [], [
-                {response_format,binary},
-                {stream_to, {self(), once}}
-                ], infinity),
-            
+            #url{host=Host, port=Port} = ibrowse_lib:parse_url(Url),
+            {ok, Worker} = ibrowse:spawn_link_worker_process(Host, Port),
+            {ibrowse_req_id, ReqId} = ibrowse:send_req_direct(Worker, FullUrl,
+                Headers2, get, [],
+                [{response_format, binary}, {stream_to, {self(), once}}],
+                infinity),
+
             receive
             {ibrowse_async_headers, ReqId, "200", RespHeaders} ->
                 CType = couch_util:get_value("Content-Type", RespHeaders),
-                couch_httpd:parse_multipart_request(CType, 
+                couch_httpd:parse_multipart_request(
+                    CType,
                     fun() -> stream_data_self(ReqId) end,
                     fun(Ev) -> mp_parse_mixed(Ev) end)
             end,
@@ -172,76 +178,88 @@ open_doc_revs(Db, Id, Revs, Options, Fun
     {ok, Results} = couch_db:open_doc_revs(Db, Id, Revs, Options),
     {ok, lists:foldl(Fun, Acc, Results)}.
 
-    
 
-update_doc(#httpdb{url=Url,headers=Headers,oauth=OAuth},Doc,Options,Type) ->
-    QArgs = if Type == replicated_changes ->
-        [{"new_edits", "false"}]; true -> [] end ++ 
-        options_to_query_args(Options, []),
-    
+update_doc(#httpdb{} = HttpDb, Doc, Options, Type) ->
+    #httpdb{url=Url, headers=Headers, oauth=OAuth} = HttpDb,
+    QArgs = case Type of
+    replicated_changes ->
+        [{"new_edits", "false"}];
+    _ ->
+        []
+    end ++ options_to_query_args(Options, []),
+
     Boundary = couch_uuids:random(),
-    JsonBytes = ?JSON_ENCODE(couch_doc:to_json_obj(Doc, [revs,attachments,follows|Options])),
+    JsonBytes = ?JSON_ENCODE(
+        couch_doc:to_json_obj(Doc, [revs, attachments, follows | Options])),
     {ContentType, Len} = couch_doc:len_doc_to_multi_part_stream(Boundary,
-            JsonBytes, Doc#doc.atts, false),
+        JsonBytes, Doc#doc.atts, false),
     Self = self(),
     Headers2 = case lists:member(delay_commit, Options) of 
-            true -> [{"X-Couch-Full-Commit", "false"}];
-            false ->  []
-            end ++ [{"Content-Type", ?b2l(ContentType)}] ++ 
-            oauth_header(Url, QArgs, put, OAuth) ++ Headers,
+    true ->
+        [{"X-Couch-Full-Commit", "false"}];
+    false ->
+        []
+    end ++ [{"Content-Type", ?b2l(ContentType)}] ++
+        oauth_header(Url, QArgs, put, OAuth) ++ Headers,
+
     Ref = make_ref(),
     % this streams the doc data to the ibrowse requester
     DocStreamer = spawn_link(fun() ->
-                couch_doc:doc_to_multi_part_stream(Boundary,
-                    JsonBytes, Doc#doc.atts,
-                    fun(Data) ->
-                        receive {get_data, Ref, Pid} ->
-                            Pid ! {data, Ref, Data}
-                        end
-                    end,
-                    false),
-                unlink(Self)
-            end),
-    #url{host=Host,port=Port}=ibrowse_lib:parse_url(Url),
-    {ok, Worker} = ibrowse:spawn_link_worker_process(Host,Port),
-    case ibrowse:send_req_direct(Worker, Url ++ couch_util:url_encode(Doc#doc.id) ++ query_args_to_string(QArgs,
[]),
-            [{"Content-Length",Len}|Headers2], put, 
-            {fun(0) ->
-                eof;
-             (LenLeft) when LenLeft > 0 ->
-                DocStreamer ! {get_data, Ref, self()},
-                receive {data, Ref, Data} ->
-                    {ok, Data, LenLeft - iolist_size(Data)}
+        couch_doc:doc_to_multi_part_stream(Boundary,
+            JsonBytes, Doc#doc.atts,
+            fun(Data) ->
+                receive {get_data, Ref, Pid} ->
+                    Pid ! {data, Ref, Data}
                 end
-            end, Len}, [], infinity) of
+            end,
+            false),
+            unlink(Self)
+    end),
+    #url{host=Host, port=Port} = ibrowse_lib:parse_url(Url),
+    {ok, Worker} = ibrowse:spawn_link_worker_process(Host, Port),
+    SendFun = fun(0) ->
+            eof;
+        (LenLeft) when LenLeft > 0 ->
+            DocStreamer ! {get_data, Ref, self()},
+            receive {data, Ref, Data} ->
+                {ok, Data, LenLeft - iolist_size(Data)}
+            end
+    end,
+    case ibrowse:send_req_direct(Worker,
+        Url ++ couch_util:url_encode(Doc#doc.id) ++
+        query_args_to_string(QArgs, []), [{"Content-Length", Len} | Headers2],
+        put, {SendFun, Len}, [], infinity) of
     {ok, [$2,$0, _], _RespHeaders, Body} ->
         catch ibrowse:stop_worker_process(Worker),
         {Props} = ?JSON_DECODE(Body),
         {ok, couch_doc:parse_rev(couch_util:get_value(<<"rev">>, Props))}
     end;
-update_doc(Db,Doc,Options,Type) ->
-    couch_db:update_doc(Db,Doc,Options,Type).
+update_doc(Db, Doc, Options, Type) ->
+    couch_db:update_doc(Db, Doc, Options, Type).
 
-changes_since(#httpdb{url=Url,headers=Headers,oauth=OAuth}, Style,
-        StartSeq, UserFun, Acc) ->
+changes_since(#httpdb{} = HttpDb, Style, StartSeq, UserFun, Acc) ->
+    #httpdb{url=Url, headers=Headers, oauth=OAuth} = HttpDb,
     Url2 = Url ++ "_changes",
-    QArgs = [{"style", atom_to_list(Style)},
-            {"since", integer_to_list(StartSeq)}],
+    QArgs = [
+        {"style", atom_to_list(Style)},
+        {"since", integer_to_list(StartSeq)}
+    ],
     Headers2 = oauth_header(Url2, QArgs, get, OAuth) ++ Headers,        
-    #url{host=Host,port=Port}=ibrowse_lib:parse_url(Url),
-    {ok, Worker} = ibrowse:spawn_link_worker_process(Host,Port),
-    {ibrowse_req_id, ReqId} = ibrowse:send_req_direct(Worker, Url2 ++ query_args_to_string(QArgs,
""), 
-            Headers2, get, [], [
-            {response_format,binary},
-            {stream_to, {self(), once}}], infinity),
+    #url{host=Host, port=Port} = ibrowse_lib:parse_url(Url),
+    {ok, Worker} = ibrowse:spawn_link_worker_process(Host, Port),
+
+    {ibrowse_req_id, ReqId} = ibrowse:send_req_direct(Worker,
+        Url2 ++ query_args_to_string(QArgs, ""), Headers2, get, [],
+        [{response_format, binary}, {stream_to, {self(), once}}], infinity),
+
     DataFun = fun() ->
-            receive {ibrowse_async_headers, ReqId, "200", _Headers} ->
-                stream_data_self(ReqId)
-            end
-        end,
+        receive {ibrowse_async_headers, ReqId, "200", _Headers} ->
+            stream_data_self(ReqId)
+        end
+    end,
     EventFun = fun(Ev) ->
-            changes_ev1(Ev, UserFun, Acc)
-        end,
+        changes_ev1(Ev, UserFun, Acc)
+    end,
     try
         json_stream_parse:events(DataFun, EventFun)
     after
@@ -255,23 +273,23 @@ changes_since(Db, Style, StartSeq, UserF
 
 options_to_query_args([], Acc) ->
     lists:reverse(Acc);
-options_to_query_args([delay_commit|Rest], Acc) ->
+options_to_query_args([delay_commit | Rest], Acc) ->
     options_to_query_args(Rest, Acc);
-options_to_query_args([{atts_since,[]}|Rest], Acc) ->
+options_to_query_args([{atts_since, []} | Rest], Acc) ->
     options_to_query_args(Rest, Acc);
-options_to_query_args([{atts_since,PossibleAncestors}|Rest], Acc) ->
+options_to_query_args([{atts_since, PossibleAncestors} | Rest], Acc) ->
     % NOTE, we should limit the # of PossibleAncestors sent. Since a large
     % # can exceed the max URL length. Limiting the # only results in
     % attachments being fully copied from source to target, instead of
     % incrementally.
-    options_to_query_args(Rest, [{"atts_since",?JSON_ENCODE(
-            couch_doc:revs_to_strs(PossibleAncestors))} | Acc]).
+    AncestorsJson = ?JSON_ENCODE(couch_doc:revs_to_strs(PossibleAncestors)),
+    options_to_query_args(Rest, [{"atts_since", AncestorsJson} | Acc]).
 
 query_args_to_string([], []) ->
     "";
 query_args_to_string([], Acc) ->
     "?" ++ string:join(lists:reverse(Acc), "&");
-query_args_to_string([{K,V}|Rest], Acc) ->
+query_args_to_string([{K, V} | Rest], Acc) ->
     query_args_to_string(Rest, [(K ++ "=" ++ V) | Acc]).
 
 receive_docs(Streamer, UserFun, UserAcc) ->
@@ -281,7 +299,7 @@ receive_docs(Streamer, UserFun, UserAcc)
         case couch_util:get_value("content-type", Headers) of
         {"multipart/related", _} = ContentType ->
             case couch_doc:doc_from_multi_part_stream(ContentType, 
-                 fun() -> receive_doc_data(Streamer) end) of
+                fun() -> receive_doc_data(Streamer) end) of
             {ok, Doc} ->
                 UserAcc2 = UserFun({ok, Doc}, UserAcc),
                 receive_docs(Streamer, UserFun, UserAcc2)
@@ -310,7 +328,6 @@ receive_all(Streamer, Acc)->
     body_done ->
         lists:reverse(Acc)
      end.
-    
 
 
 receive_doc_data(Streamer)->    
@@ -320,7 +337,7 @@ receive_doc_data(Streamer)->    
         {Bytes, fun() -> receive_doc_data(Streamer) end};
     body_done ->
         {<<>>, fun() -> receive_doc_data(Streamer) end}
-     end.
+    end.
 
 
 mp_parse_mixed(eof) ->
@@ -364,7 +381,7 @@ changes_ev1(object_start, UserFun, UserA
     fun(Ev) -> changes_ev2(Ev, UserFun, UserAcc) end.
 
 changes_ev2({key, <<"results">>}, UserFun, UserAcc) ->
-    fun(Ev)-> changes_ev3(Ev, UserFun, UserAcc) end;
+    fun(Ev) -> changes_ev3(Ev, UserFun, UserAcc) end;
 changes_ev2(_, UserFun, UserAcc) ->
     fun(Ev) -> changes_ev2(Ev, UserFun, UserAcc) end.
 
@@ -374,10 +391,10 @@ changes_ev3(array_start, UserFun, UserAc
 changes_ev_loop(object_start, UserFun, UserAcc) ->
     fun(Ev) ->
         json_stream_parse:collect_object(Ev,
-                fun(Obj) ->
-                    UserAcc2 = UserFun(json_to_doc_info(Obj), UserAcc),
-                    fun(Ev2) -> changes_ev_loop(Ev2, UserFun, UserAcc2) end
-                end)
+            fun(Obj) ->
+                UserAcc2 = UserFun(json_to_doc_info(Obj), UserAcc),
+                fun(Ev2) -> changes_ev_loop(Ev2, UserFun, UserAcc2) end
+            end)
     end;
 changes_ev_loop(array_end, _UserFun, _UserAcc) ->
     fun(_Ev) -> changes_ev_done() end.
@@ -389,27 +406,28 @@ json_to_doc_info({Props}) ->
     Id = couch_util:get_value(<<"id">>, Props),
     Seq = couch_util:get_value(<<"seq">>, Props),
     Changes = couch_util:get_value(<<"changes">>, Props),
-    
+
     RevsInfo = lists:map(
         fun({Change}) ->
             Rev = couch_doc:parse_rev(couch_util:get_value(<<"rev">>, Change)),
             Del = ("true" == couch_util:get_value(<<"deleted">>, Change)),
-            #rev_info{rev=Rev,deleted=Del}
+            #rev_info{rev=Rev, deleted=Del}
         end, Changes),
-    #doc_info{id=Id,high_seq=Seq,revs=RevsInfo}.
+    #doc_info{id=Id, high_seq=Seq, revs=RevsInfo}.
 
 oauth_header(_Url, _QS, _Action, nil) ->
     [];
 oauth_header(Url, QS, Action, OAuth) ->
-    Consumer =
-            {OAuth#oauth.consumer_key,
-            OAuth#oauth.consumer_secret,
-            OAuth#oauth.signature_method},
+    Consumer = {
+        OAuth#oauth.consumer_key,
+        OAuth#oauth.consumer_secret,
+        OAuth#oauth.signature_method
+    },
     Method = case Action of
-        get -> "GET";
-        post -> "POST";
-        put -> "PUT";
-        head -> "HEAD"
+    get -> "GET";
+    post -> "POST";
+    put -> "PUT";
+    head -> "HEAD"
     end,
     Params = oauth:signed_params(Method, Url, QS, Consumer, 
         #oauth.token,
@@ -417,4 +435,3 @@ oauth_header(Url, QS, Action, OAuth) ->
     [{"Authorization", "OAuth " ++ oauth_uri:params_to_header_string(Params)}].
 
 
-    
\ No newline at end of file

Modified: couchdb/branches/new_replicator/src/couchdb/couch_httpd_rep.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_httpd_rep.erl?rev=960398&r1=960397&r2=960398&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_httpd_rep.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_httpd_rep.erl Sun Jul  4 21:16:42 2010
@@ -15,17 +15,20 @@
 -include("couch_db.hrl").
 -include("couch_api_wrap.hrl").
 
--import(couch_httpd,
-    [send_json/2,send_json/3,send_json/4,send_method_not_allowed/2,
-    start_json_response/2,start_json_response/3,
-    send_chunk/2,last_chunk/1,end_json_response/1,
-    start_chunked_response/3, absolute_uri/2, send/2,
-    start_response_length/4]).
+-import(couch_httpd, [
+    send_json/2, send_json/3, send_json/4,
+    send/2,
+    send_method_not_allowed/2,
+    start_response_length/4,
+    start_json_response/2, start_json_response/3, end_json_response/1,
+    start_chunked_response/3, send_chunk/2, last_chunk/1,
+    absolute_uri/2
+]).
     
 -export([handle_req/1]).
 
 
-handle_req(#httpd{method='POST'}=Req) ->
+handle_req(#httpd{method='POST'} = Req) ->
     {PostBody} = couch_httpd:json_body_obj(Req),
     SrcDb = parse_rep_db(couch_util:get_value(<<"source">>, PostBody)),
     TgtDb = parse_rep_db(couch_util:get_value(<<"target">>, PostBody)),
@@ -48,7 +51,7 @@ parse_rep_db({Props}) ->
     Url = maybe_add_trailing_slash(couch_util:get_value(<<"url">>, Props)),
     {AuthProps} = couch_util:get_value(<<"auth">>, Props, {[]}),
     {BinHeaders} = couch_util:get_value(<<"headers">>, Props, {[]}),
-    Headers = [{?b2l(K),?b2l(V)} || {K,V} <- BinHeaders],
+    Headers = [{?b2l(K), ?b2l(V)} || {K, V} <- BinHeaders],
     
     case couch_util:get_value(<<"oauth">>, AuthProps) of
     undefined ->
@@ -78,27 +81,27 @@ parse_rep_db({Props}) ->
         oauth = OAuth,
         headers = Headers
     };
-parse_rep_db(<<"http://",_/binary>>=Url) ->
-    parse_rep_db({[{<<"url">>,Url}]});
-parse_rep_db(<<"https://",_/binary>>=Url) ->
-    parse_rep_db({[{<<"url">>,Url}]});
+parse_rep_db(<<"http://", _/binary>> = Url) ->
+    parse_rep_db({[{<<"url">>, Url}]});
+parse_rep_db(<<"https://", _/binary>> = Url) ->
+    parse_rep_db({[{<<"url">>, Url}]});
 parse_rep_db(<<DbName/binary>>) ->
     DbName.
 
 
 convert_options([])->
     [];
-convert_options([{<<"cancel">>, V}|R])->
-    [{cancel, V}|convert_options(R)];
-convert_options([{<<"create_target">>, V}|R])->
-    [{create_target, V}|convert_options(R)];
-convert_options([{<<"continuous">>, V}|R])->
-    [{continuous, V}|convert_options(R)];
-convert_options([{<<"filter">>, V}|R])->
-    [{filter, V}|convert_options(R)];
-convert_options([{<<"query_params">>, V}|R])->
-    [{query_params, V}|convert_options(R)];
-convert_options([_|R])-> % skip unknown option
+convert_options([{<<"cancel">>, V} | R]) ->
+    [{cancel, V} | convert_options(R)];
+convert_options([{<<"create_target">>, V} | R]) ->
+    [{create_target, V} | convert_options(R)];
+convert_options([{<<"continuous">>, V} | R]) ->
+    [{continuous, V} | convert_options(R)];
+convert_options([{<<"filter">>, V} | R]) ->
+    [{filter, V} | convert_options(R)];
+convert_options([{<<"query_params">>, V} | R]) ->
+    [{query_params, V} | convert_options(R)];
+convert_options([_ | R]) -> % skip unknown option
     convert_options(R).
 
 

Modified: couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl?rev=960398&r1=960397&r2=960398&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl Sun Jul  4 21:16:42 2010
@@ -51,7 +51,7 @@ start(Src, Tgt, Options, UserCtx) ->
     
     % initalize the replication state, looking for existing rep records
     % for incremental replication.
-    #rep_state{source=Source,target=Target,start_seq=StartSeq} = State = 
+    #rep_state{source=Source, target=Target, start_seq=StartSeq} = State =
             init_state(Src, Tgt, Options, UserCtx), 
     
     % Create the work queues
@@ -117,7 +117,7 @@ init_state(Src,Tgt,Options,UserCtx)->   
         src_starttime = couch_util:get_value(instance_start_time, SourceInfo),
         tgt_starttime = couch_util:get_value(instance_start_time, TargetInfo)
     },
-    State#rep_state{timer = erlang:start_timer(checkpoint_interval(State), 
+    State#rep_state{timer = erlang:start_timer(checkpoint_interval(State),
             self(), timed_checkpoint)}.
 
 
@@ -125,7 +125,7 @@ spawn_changes_reader(Cp, StartSeq, Sourc
     spawn_link(
         fun()->
             couch_api_wrap:changes_since(Source, all_docs, StartSeq,
-                fun(#doc_info{high_seq=Seq,revs=Revs}=DocInfo, _)->
+                fun(#doc_info{high_seq=Seq, revs=Revs} = DocInfo, _) ->
                     Cp ! {seq_start, {Seq, length(Revs)}},
                     Cp ! {add_stat, {#stats.missing_checked, length(Revs)}},
                     ok = couch_work_queue:queue(ChangesQueue, DocInfo),
@@ -153,7 +153,7 @@ missing_revs_finder_loop(Cp, 
         couch_work_queue:close(MissingRevsQueue);
     {ok, DocInfos} ->
         IdRevs = [{Id, [Rev || #rev_info{rev=Rev} <- RevsInfo]} ||
-                #doc_info{id=Id,revs=RevsInfo} <- DocInfos],
+                #doc_info{id=Id, revs=RevsInfo} <- DocInfos],
         {ok, Missing} = couch_api_wrap:get_missing_revs(Target, IdRevs),
         % Figured out which on the target are missing.
         % Missing contains the id and revs missing, and any possible
@@ -165,29 +165,27 @@ missing_revs_finder_loop(Cp, 
         % now complete.
         IdRevsSeqDict = dict:from_list(
             [{Id, {[Rev || #rev_info{rev=Rev} <- RevsInfo], Seq}} ||
-                    #doc_info{id=Id,revs=RevsInfo,high_seq=Seq} <- DocInfos]),
+                    #doc_info{id=Id, revs=RevsInfo, high_seq=Seq} <- DocInfos]),
         NonMissingIdRevsSeqDict = remove_missing(IdRevsSeqDict, Missing),
         % signal the completion of these that aren't missing
-        lists:foreach(fun({_Id, {Revs, Seq}})->
+        lists:foreach(fun({_Id, {Revs, Seq}}) ->
                 Cp ! {seq_changes_done, {Seq, length(Revs)}}
             end, dict:to_list(NonMissingIdRevsSeqDict)),
 
         % Expand out each docs and seq into it's own work item
-        lists:foreach(fun({Id, Revs, PAs})->
+        lists:foreach(fun({Id, Revs, PAs}) ->
             % PA means "possible ancestor"
             Cp ! {add_stat, {#stats.missing_found, length(Revs)}},
             {_, Seq} = dict:fetch(Id, IdRevsSeqDict),
-            ok = couch_work_queue:queue(MissingRevsQueue,
-                {Id, Revs, PAs, Seq})
+            ok = couch_work_queue:queue(MissingRevsQueue, {Id, Revs, PAs, Seq})
             end, Missing),
-        missing_revs_finder_loop(Cp, Target, ChangesQueue, 
-                MissingRevsQueue)
+        missing_revs_finder_loop(Cp, Target, ChangesQueue, MissingRevsQueue)
     end.
 
 
 remove_missing(IdRevsSeqDict, []) ->
     IdRevsSeqDict;
-remove_missing(IdRevsSeqDict, [{MissingId, MissingRevs, _}|Rest]) ->
+remove_missing(IdRevsSeqDict, [{MissingId, MissingRevs, _} | Rest]) ->
     {AllChangedRevs, Seq} = dict:fetch(MissingId, IdRevsSeqDict),
     case AllChangedRevs -- MissingRevs of
     [] ->
@@ -213,28 +211,28 @@ doc_copy_loop(Cp, Source, Target, Missin
     closed ->
         Cp ! done;
     {ok, [{Id, Revs, PossibleAncestors, Seq}]} ->
+        DocFun = fun({ok, Doc}, _) ->
+            % we are called for every rev read on the source
+            Cp ! {add_stat, {#stats.docs_read, 1}},
+            % now write the doc to the target.
+            case couch_api_wrap:update_doc(Target, Doc, [],
+                replicated_changes) of
+            {ok, _} ->
+                Cp ! {add_stat, {#stats.docs_written, 1}};
+            _Error ->
+                Cp ! {add_stat, {#stats.doc_write_failures, 1}}
+            end;
+        (_, _) ->
+            ok
+        end,
         couch_api_wrap:open_doc_revs(Source, Id, Revs,
-                [{atts_since,PossibleAncestors}],
-                fun({ok, Doc}, _) ->
-                    % we are called for every rev read on the source
-                    Cp ! {add_stat, {#stats.docs_read, 1}},
-                    % now write the doc to the target.
-                    case couch_api_wrap:update_doc(Target, Doc, [],
-                            replicated_changes) of
-                    {ok, _} ->
-                        Cp ! {add_stat, {#stats.docs_written, 1}};
-                    _Error ->
-                        Cp ! {add_stat, {#stats.doc_write_failures, 1}}
-                    end;
-                (_, _) ->
-                    ok
-                end, []),
+            [{atts_since, PossibleAncestors}], DocFun, []),
         Cp ! {seq_changes_done, {Seq, length(Revs)}},
         doc_copy_loop(Cp, Source, Target, MissingRevsQueue)
     end.
 
 checkpoint_loop(State, SeqsInProgress, Stats) ->
-    % SeqsInProgress contains the number of revs for each seq foiund by the
+    % SeqsInProgress contains the number of revs for each seq found by the
     % changes process.
     receive
     {seq_start, {Seq, NumChanges}} ->
@@ -246,7 +244,7 @@ checkpoint_loop(State, SeqsInProgress, S
         TotalChanges = gb_trees:get(Seq, SeqsInProgress),
         case TotalChanges - NumChangesDone of
         0 ->
-            % this seq is completely processed. Chck to see if it was the
+            % this seq is completely processed. Check to see if it was the
             % smallest seq in progess. If so, we've made progress that can
             % be checkpointed.
             State2 =
@@ -256,8 +254,7 @@ checkpoint_loop(State, SeqsInProgress, S
             _ ->
                 State
             end,
-            checkpoint_loop(State2, 
-                    gb_trees:delete(Seq,SeqsInProgress), Stats);
+            checkpoint_loop(State2, gb_trees:delete(Seq,SeqsInProgress), Stats);
         NewTotalChanges when NewTotalChanges > 0 ->
             % Still some changes that need work done. Put the new count back.
             SeqsInProgress2 =
@@ -283,7 +280,7 @@ checkpoint_loop(State, SeqsInProgress, S
         % every checkpoint interval while processing
         State2 = do_checkpoint(State, Stats),
         Timer = erlang:start_timer(checkpoint_interval(State), 
-                self(), timed_checkpoint),
+            self(), timed_checkpoint),
         checkpoint_loop(State2#rep_state{timer=Timer}, SeqsInProgress, Stats)
     end.
 
@@ -291,7 +288,7 @@ checkpoint_loop(State, SeqsInProgress, S
 checkpoint_interval(_State) ->
     5000.
 
-do_checkpoint(#rep_state{current_through_seq=Seq,committed_seq=OldSeq}=State,
+do_checkpoint(#rep_state{current_through_seq=Seq, committed_seq=OldSeq} = State,
         _Stats) when Seq == OldSeq ->
     State;
 do_checkpoint(State, Stats) ->
@@ -335,20 +332,20 @@ do_checkpoint(State, Stats) ->
         ]},
 
         try
-        {ok, {SrcRevPos,SrcRevId}} = couch_api_wrap:update_doc(Source, 
+            {ok, {SrcRevPos,SrcRevId}} = couch_api_wrap:update_doc(Source,
                 SourceLog#doc{body=NewRepHistory}, [delay_commit]),
-        {ok, {TgtRevPos,TgtRevId}} = couch_api_wrap:update_doc(Target, 
+            {ok, {TgtRevPos,TgtRevId}} = couch_api_wrap:update_doc(Target,
                 TargetLog#doc{body=NewRepHistory}, [delay_commit]),
-        State#rep_state{
-            checkpoint_history = NewRepHistory,
-            committed_seq = NewSeq,
-            source_log = SourceLog#doc{revs={SrcRevPos, [SrcRevId]}},
-            target_log = TargetLog#doc{revs={TgtRevPos, [TgtRevId]}}
-        }
+            State#rep_state{
+                checkpoint_history = NewRepHistory,
+                committed_seq = NewSeq,
+                source_log = SourceLog#doc{revs={SrcRevPos, [SrcRevId]}},
+                target_log = TargetLog#doc{revs={TgtRevPos, [TgtRevId]}}
+            }
         catch throw:conflict ->
-        ?LOG_ERROR("checkpoint failure: conflict (are you replicating to "
-            "yourself?)", []),
-        State
+            ?LOG_ERROR("checkpoint failure: conflict (are you replicating to "
+                "yourself?)", []),
+            State
         end;
     _Else ->
         ?LOG_INFO("rebooting ~p -> ~p from last known replication checkpoint",
@@ -360,8 +357,10 @@ do_checkpoint(State, Stats) ->
 commit_to_both(Source, Target) ->
     % commit the src async
     ParentPid = self(),
-    SrcCommitPid = spawn_link(fun() ->
-            ParentPid ! {self(), couch_api_wrap:ensure_full_commit(Source)} end),
+    SrcCommitPid = spawn_link(
+        fun() ->
+            ParentPid ! {self(), couch_api_wrap:ensure_full_commit(Source)}
+        end),
 
     % commit tgt sync
     {ok, TargetStartTime} = couch_api_wrap:ensure_full_commit(Target),
@@ -393,7 +392,7 @@ make_replication_id(Source, Target, User
         end,
     couch_util:to_hex(erlang:md5(term_to_binary(Base))).
 
-get_rep_endpoint(_UserCtx, #httpdb{url=Url,headers=Headers,oauth=OAuth}) ->
+get_rep_endpoint(_UserCtx, #httpdb{url=Url, headers=Headers, oauth=OAuth}) ->
     case OAuth of
     nil ->
         {remote, Url, Headers};
@@ -428,7 +427,7 @@ compare_replication_logs(SrcDoc, TgtDoc)
 compare_rep_history(S, T) when S =:= [] orelse T =:= [] ->
     ?LOG_INFO("no common ancestry -- performing full replication", []),
     {0, []};
-compare_rep_history([{S}|SourceRest], [{T}|TargetRest]=Target) ->
+compare_rep_history([{S} | SourceRest], [{T} | TargetRest] = Target) ->
     SourceId = couch_util:get_value(<<"session_id">>, S),
     case has_session_id(SourceId, Target) of
     true ->



Mime
View raw message