Added: couchdb/trunk/src/couchdb/couch_api_wrap.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_api_wrap.erl?rev=1071375&view=auto
==============================================================================
--- couchdb/trunk/src/couchdb/couch_api_wrap.erl (added)
+++ couchdb/trunk/src/couchdb/couch_api_wrap.erl Wed Feb 16 20:05:31 2011
@@ -0,0 +1,714 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_api_wrap).
+
+% This module wraps the native erlang API, and allows for performing
+% operations on a remote vs. local databases via the same API.
+%
+% Notes:
+% Many options and apis aren't yet supported here, they are added as needed.
+
+-include("couch_db.hrl").
+-include("couch_api_wrap.hrl").
+
+-export([
+ db_open/2,
+ db_open/3,
+ db_close/1,
+ get_db_info/1,
+ update_doc/3,
+ update_doc/4,
+ update_docs/3,
+ update_docs/4,
+ ensure_full_commit/1,
+ get_missing_revs/2,
+ open_doc/3,
+ open_doc_revs/6,
+ changes_since/5,
+ db_uri/1
+ ]).
+
+-import(couch_api_wrap_httpc, [
+ httpdb_setup/1,
+ send_req/3
+ ]).
+
+-import(couch_util, [
+ encode_doc_id/1,
+ get_value/2,
+ get_value/3
+ ]).
+
+
+db_uri(#httpdb{url = Url}) ->
+ couch_util:url_strip_password(Url);
+
+db_uri(#db{name = Name}) ->
+ db_uri(Name);
+
+db_uri(DbName) ->
+ ?b2l(DbName).
+
+
+db_open(Db, Options) ->
+ db_open(Db, Options, false).
+
+db_open(#httpdb{} = Db1, _Options, Create) ->
+ {ok, Db} = couch_api_wrap_httpc:setup(Db1),
+ case Create of
+ false ->
+ ok;
+ true ->
+ send_req(Db, [{method, put}, {direct, true}], fun(_, _, _) -> ok end)
+ end,
+ send_req(Db, [{method, head}],
+ fun(200, _, _) ->
+ {ok, Db};
+ (401, _, _) ->
+ throw({unauthorized, ?l2b(db_uri(Db))});
+ (_, _, _) ->
+ throw({db_not_found, ?l2b(db_uri(Db))})
+ end);
+db_open(DbName, Options, Create) ->
+ try
+ case Create of
+ false ->
+ ok;
+ true ->
+ ok = couch_httpd:verify_is_server_admin(
+ get_value(user_ctx, Options)),
+ couch_db:create(DbName, Options)
+ end,
+ case couch_db:open(DbName, Options) of
+ {not_found, _Reason} ->
+ throw({db_not_found, DbName});
+ {ok, _Db} = Success ->
+ Success
+ end
+ catch
+ throw:{unauthorized, _} ->
+ throw({unauthorized, DbName})
+ end.
+
+db_close(#httpdb{httpc_pool = Pool}) ->
+ unlink(Pool),
+ ok = couch_httpc_pool:stop(Pool);
+db_close(DbName) ->
+ couch_db:close(DbName).
+
+
+get_db_info(#httpdb{} = Db) ->
+ send_req(Db, [],
+ fun(200, _, {Props}) ->
+ {ok, Props}
+ end);
+get_db_info(Db) ->
+ {ok, Info} = couch_db:get_db_info(Db),
+ {ok, [{couch_util:to_binary(K), V} || {K, V} <- Info]}.
+
+
+ensure_full_commit(#httpdb{} = Db) ->
+ send_req(
+ Db,
+ [{method, post}, {path, "_ensure_full_commit"}, {direct, true},
+ {headers, [{"Content-Type", "application/json"}]}],
+ fun(201, _, {Props}) ->
+ {ok, get_value(<<"instance_start_time">>, Props)}
+ end);
+ensure_full_commit(Db) ->
+ couch_db:ensure_full_commit(Db).
+
+
+get_missing_revs(#httpdb{} = Db, IdRevs) ->
+ JsonBody = {[{Id, couch_doc:revs_to_strs(Revs)} || {Id, Revs} <- IdRevs]},
+ send_req(
+ Db,
+ [{method, post}, {path, "_revs_diff"}, {body, ?JSON_ENCODE(JsonBody)}],
+ fun(200, _, {Props}) ->
+ ConvertToNativeFun = fun({Id, {Result}}) ->
+ MissingRevs = couch_doc:parse_revs(
+ get_value(<<"missing">>, Result)
+ ),
+ PossibleAncestors = couch_doc:parse_revs(
+ get_value(<<"possible_ancestors">>, Result, [])
+ ),
+ {Id, MissingRevs, PossibleAncestors}
+ end,
+ {ok, lists:map(ConvertToNativeFun, Props)}
+ end);
+get_missing_revs(Db, IdRevs) ->
+ couch_db:get_missing_revs(Db, IdRevs).
+
+
+
+open_doc_revs(#httpdb{} = HttpDb, Id, Revs, Options, Fun, Acc) ->
+ Path = encode_doc_id(Id),
+ QArgs = options_to_query_args(
+ HttpDb, Path, [revs, {open_revs, Revs} | Options]),
+ Self = self(),
+ Streamer = spawn_link(fun() ->
+ send_req(
+ HttpDb,
+ [{path, Path}, {qs, QArgs},
+ {ibrowse_options, [{stream_to, {self(), once}}]},
+ {headers, [{"Accept", "multipart/mixed"}]}],
+ fun(200, Headers, StreamDataFun) ->
+ remote_open_doc_revs_streamer_start(Self),
+ {<<"--">>, _, _} = couch_httpd:parse_multipart_request(
+ get_value("Content-Type", Headers),
+ StreamDataFun,
+ fun mp_parse_mixed/1)
+ end),
+ unlink(Self)
+ end),
+ receive
+ started ->
+ receive_docs_loop(Streamer, Fun, Acc)
+ end;
+open_doc_revs(Db, Id, Revs, Options, Fun, Acc) ->
+ {ok, Results} = couch_db:open_doc_revs(Db, Id, Revs, Options),
+ {ok, lists:foldl(Fun, Acc, Results)}.
+
+
+open_doc(#httpdb{} = Db, Id, Options) ->
+ send_req(
+ Db,
+ [{path, encode_doc_id(Id)}, {qs, options_to_query_args(Options, [])}],
+ fun(200, _, Body) ->
+ {ok, couch_doc:from_json_obj(Body)};
+ (_, _, {Props}) ->
+ {error, get_value(<<"error">>, Props)}
+ end);
+open_doc(Db, Id, Options) ->
+ case couch_db:open_doc(Db, Id, Options) of
+ {ok, _} = Ok ->
+ Ok;
+ {not_found, _Reason} ->
+ {error, <<"not_found">>}
+ end.
+
+
+update_doc(Db, Doc, Options) ->
+ update_doc(Db, Doc, Options, interactive_edit).
+
+update_doc(#httpdb{} = HttpDb, #doc{id = DocId} = Doc, Options, Type) ->
+ QArgs = case Type of
+ replicated_changes ->
+ [{"new_edits", "false"}];
+ _ ->
+ []
+ end ++ options_to_query_args(Options, []),
+ Boundary = couch_uuids:random(),
+ JsonBytes = ?JSON_ENCODE(
+ couch_doc:to_json_obj(
+ Doc, [revs, attachments, follows, att_encoding_info | Options])),
+ {ContentType, Len} = couch_doc:len_doc_to_multi_part_stream(Boundary,
+ JsonBytes, Doc#doc.atts, true),
+ Headers = case lists:member(delay_commit, Options) of
+ true ->
+ [{"X-Couch-Full-Commit", "false"}];
+ false ->
+ []
+ end ++ [{"Content-Type", ?b2l(ContentType)}, {"Content-Length", Len}],
+ Body = {fun stream_doc/1, {JsonBytes, Doc#doc.atts, Boundary, Len}},
+ send_req(
+ HttpDb,
+ [{method, put}, {path, encode_doc_id(DocId)},
+ {direct, Type =:= interactive_edit}, {qs, QArgs},
+ {headers, Headers}, {body, Body}],
+ fun(Code, _, {Props}) when Code =:= 200 orelse Code =:= 201 ->
+ {ok, couch_doc:parse_rev(get_value(<<"rev">>, Props))};
+ (_, _, {Props}) ->
+ {error, get_value(<<"error">>, Props)}
+ end);
+update_doc(Db, Doc, Options, Type) ->
+ try
+ couch_db:update_doc(Db, Doc, Options, Type)
+ catch
+ throw:{unauthorized, _} ->
+ {error, <<"unauthorized">>}
+ end.
+
+
+update_docs(Db, DocList, Options) ->
+ update_docs(Db, DocList, Options, interactive_edit).
+
+update_docs(#httpdb{} = HttpDb, DocList, Options, UpdateType) ->
+ FullCommit = atom_to_list(not lists:member(delay_commit, Options)),
+ Prefix1 = case UpdateType of
+ replicated_changes ->
+ {prefix, <<"{\"new_edits\":false,\"docs\":[">>};
+ interactive_edit ->
+ {prefix, <<"{\"docs\":[">>}
+ end,
+ BodyFun = fun(eof) ->
+ eof;
+ ([]) ->
+ {ok, <<"]}">>, eof};
+ ([{prefix, Prefix} | Rest]) ->
+ {ok, Prefix, Rest};
+ ([Doc]) when is_record(Doc, doc) ->
+ DocJson = couch_doc:to_json_obj(Doc, [revs, attachments]),
+ {ok, ?JSON_ENCODE(DocJson), []};
+ ([Doc | RestDocs]) when is_record(Doc, doc) ->
+ DocJson = couch_doc:to_json_obj(Doc, [revs, attachments]),
+ {ok, [?JSON_ENCODE(DocJson), ","], RestDocs};
+ ([Doc]) ->
+ % IO list
+ {ok, Doc, []};
+ ([Doc | RestDocs]) ->
+ % IO list
+ {ok, [Doc, ","], RestDocs}
+ end,
+ send_req(
+ HttpDb,
+ [{method, post}, {path, "_bulk_docs"},
+ {body, {BodyFun, [Prefix1 | DocList]}},
+ {ibrowse_options, [{transfer_encoding, chunked}]},
+ {headers, [
+ {"X-Couch-Full-Commit", FullCommit},
+ {"Content-Type", "application/json"} ]}],
+ fun(201, _, Results) when is_list(Results) ->
+ {ok, bulk_results_to_errors(DocList, Results, remote)};
+ (417, _, Results) when is_list(Results) ->
+ {ok, bulk_results_to_errors(DocList, Results, remote)}
+ end);
+update_docs(Db, DocList, Options, UpdateType) ->
+ Result = couch_db:update_docs(Db, DocList, Options, UpdateType),
+ {ok, bulk_results_to_errors(DocList, Result, UpdateType)}.
+
+
+changes_since(#httpdb{headers = Headers1} = HttpDb, Style, StartSeq,
+ UserFun, Options) ->
+ BaseQArgs = [
+ {"style", atom_to_list(Style)}, {"since", couch_util:to_list(StartSeq)}
+ ],
+ {QArgs, Method, Body, Headers} = case get_value(doc_ids, Options) of
+ undefined ->
+ QArgs1 = maybe_add_changes_filter_q_args(BaseQArgs, Options),
+ {QArgs1, get, [], Headers1};
+ DocIds ->
+ Headers2 = [{"Content-Type", "application/json"} | Headers1],
+ JsonDocIds = ?JSON_ENCODE({[{<<"doc_ids">>, DocIds}]}),
+ {[{"filter", "_doc_ids"} | BaseQArgs], post, JsonDocIds, Headers2}
+ end,
+ send_req(
+ HttpDb,
+ [{method, Method}, {path, "_changes"}, {qs, QArgs},
+ {headers, Headers}, {body, Body},
+ {ibrowse_options, [{stream_to, {self(), once}}]}],
+ fun(200, _, DataStreamFun) ->
+ case couch_util:get_value(continuous, Options, false) of
+ true ->
+ continuous_changes(DataStreamFun, UserFun);
+ false ->
+ EventFun = fun(Ev) ->
+ changes_ev1(Ev, fun(DocInfo, _) -> UserFun(DocInfo) end, [])
+ end,
+ json_stream_parse:events(DataStreamFun, EventFun)
+ end
+ end);
+changes_since(Db, Style, StartSeq, UserFun, Options) ->
+ Filter = case get_value(doc_ids, Options) of
+ undefined ->
+ ?b2l(get_value(filter, Options, <<>>));
+ _DocIds ->
+ "_doc_ids"
+ end,
+ Args = #changes_args{
+ style = Style,
+ since = StartSeq,
+ filter = Filter,
+ feed = case get_value(continuous, Options, false) of
+ true ->
+ "continuous";
+ false ->
+ "normal"
+ end,
+ timeout = infinity
+ },
+ QueryParams = get_value(query_params, Options, {[]}),
+ Req = changes_json_req(Db, Filter, QueryParams, Options),
+ ChangesFeedFun = couch_changes:handle_changes(Args, {json_req, Req}, Db),
+ ChangesFeedFun(fun({change, Change, _}, _) ->
+ UserFun(json_to_doc_info(Change));
+ (_, _) ->
+ ok
+ end).
+
+
+% internal functions
+
+maybe_add_changes_filter_q_args(BaseQS, Options) ->
+ case get_value(filter, Options) of
+ undefined ->
+ BaseQS;
+ FilterName ->
+ {Params} = get_value(query_params, Options, {[]}),
+ [{"filter", ?b2l(FilterName)} | lists:foldl(
+ fun({K, V}, QSAcc) ->
+ Ks = couch_util:to_list(K),
+ case lists:keymember(Ks, 1, QSAcc) of
+ true ->
+ QSAcc;
+ false ->
+ [{Ks, couch_util:to_list(V)} | QSAcc]
+ end
+ end,
+ BaseQS, Params)]
+ end ++
+ case get_value(continuous, Options, false) of
+ false ->
+ [{"feed", "normal"}];
+ true ->
+ [{"feed", "continuous"}, {"heartbeat", "10000"}]
+ end.
+
+changes_json_req(_Db, "", _QueryParams, _Options) ->
+ {[]};
+changes_json_req(_Db, "_doc_ids", _QueryParams, Options) ->
+ {[{<<"doc_ids">>, get_value(doc_ids, Options)}]};
+changes_json_req(Db, FilterName, {QueryParams}, _Options) ->
+ {ok, Info} = couch_db:get_db_info(Db),
+ % simulate a request to db_name/_changes
+ {[
+ {<<"info">>, {Info}},
+ {<<"id">>, null},
+ {<<"method">>, 'GET'},
+ {<<"path">>, [couch_db:name(Db), <<"_changes">>]},
+ {<<"query">>, {[{<<"filter">>, FilterName} | QueryParams]}},
+ {<<"headers">>, []},
+ {<<"body">>, []},
+ {<<"peer">>, <<"replicator">>},
+ {<<"form">>, []},
+ {<<"cookie">>, []},
+ {<<"userCtx">>, couch_util:json_user_ctx(Db)}
+ ]}.
+
+
+options_to_query_args(HttpDb, Path, Options) ->
+ case lists:keytake(atts_since, 1, Options) of
+ false ->
+ options_to_query_args(Options, []);
+ {value, {atts_since, []}, Options2} ->
+ options_to_query_args(Options2, []);
+ {value, {atts_since, PAs}, Options2} ->
+ QueryArgs1 = options_to_query_args(Options2, []),
+ FullUrl = couch_api_wrap_httpc:full_url(
+ HttpDb, [{path, Path}, {qs, QueryArgs1}]),
+ RevList = atts_since_arg(
+ length(FullUrl) + length("&atts_since=[]"), PAs, []),
+ [{"atts_since", ?JSON_ENCODE(RevList)} | QueryArgs1]
+ end.
+
+
+options_to_query_args([], Acc) ->
+ lists:reverse(Acc);
+options_to_query_args([delay_commit | Rest], Acc) ->
+ options_to_query_args(Rest, Acc);
+options_to_query_args([revs | Rest], Acc) ->
+ options_to_query_args(Rest, [{"revs", "true"} | Acc]);
+options_to_query_args([{open_revs, all} | Rest], Acc) ->
+ options_to_query_args(Rest, [{"open_revs", "all"} | Acc]);
+options_to_query_args([{open_revs, Revs} | Rest], Acc) ->
+ JsonRevs = ?JSON_ENCODE(couch_doc:revs_to_strs(Revs)),
+ options_to_query_args(Rest, [{"open_revs", JsonRevs} | Acc]).
+
+
+-define(MAX_URL_LEN, 8192).
+
+atts_since_arg(_UrlLen, [], Acc) ->
+ lists:reverse(Acc);
+atts_since_arg(UrlLen, [PA | Rest], Acc) ->
+ RevStr = couch_doc:rev_to_str(PA),
+ NewUrlLen = case Rest of
+ [] ->
+ UrlLen + size(RevStr) + 2; % plus 2 double quotes
+ _ ->
+ UrlLen + size(RevStr) + 3 % plus 2 double quotes and a comma
+ end,
+ case NewUrlLen > ?MAX_URL_LEN of
+ true ->
+ lists:reverse(Acc);
+ false ->
+ atts_since_arg(NewUrlLen, Rest, [RevStr | Acc])
+ end.
+
+
+% TODO: A less verbose, more elegant and automatic restart strategy for
+% the exported open_doc_revs/6 function. The restart should be
+% transparent to the caller like any other Couch API function exported
+% by this module.
+receive_docs_loop(Streamer, Fun, Acc) ->
+ try
+ receive_docs(Streamer, Fun, Acc)
+ catch
+ throw:restart ->
+ receive_docs_loop(Streamer, Fun, Acc)
+ end.
+
+receive_docs(Streamer, UserFun, UserAcc) ->
+ Streamer ! {get_headers, self()},
+ receive
+ started ->
+ restart_remote_open_doc_revs();
+ {headers, Headers} ->
+ case get_value("content-type", Headers) of
+ {"multipart/related", _} = ContentType ->
+ case doc_from_multi_part_stream(
+ ContentType, fun() -> receive_doc_data(Streamer) end) of
+ {ok, Doc} ->
+ UserAcc2 = UserFun({ok, Doc}, UserAcc),
+ receive_docs(Streamer, UserFun, UserAcc2)
+ end;
+ {"application/json", []} ->
+ Doc = couch_doc:from_json_obj(
+ ?JSON_DECODE(receive_all(Streamer, []))),
+ UserAcc2 = UserFun({ok, Doc}, UserAcc),
+ receive_docs(Streamer, UserFun, UserAcc2);
+ {"application/json", [{"error","true"}]} ->
+ {ErrorProps} = ?JSON_DECODE(receive_all(Streamer, [])),
+ Rev = get_value(<<"missing">>, ErrorProps),
+ Result = {{not_found, missing}, couch_doc:parse_rev(Rev)},
+ UserAcc2 = UserFun(Result, UserAcc),
+ receive_docs(Streamer, UserFun, UserAcc2)
+ end;
+ done ->
+ {ok, UserAcc}
+ end.
+
+
+restart_remote_open_doc_revs() ->
+ receive
+ {body_bytes, _} ->
+ restart_remote_open_doc_revs();
+ body_done ->
+ restart_remote_open_doc_revs();
+ done ->
+ restart_remote_open_doc_revs();
+ {headers, _} ->
+ restart_remote_open_doc_revs();
+ started ->
+ restart_remote_open_doc_revs()
+ after 0 ->
+ throw(restart)
+ end.
+
+
+remote_open_doc_revs_streamer_start(Parent) ->
+ receive
+ {get_headers, _} ->
+ remote_open_doc_revs_streamer_start(Parent);
+ {next_bytes, _} ->
+ remote_open_doc_revs_streamer_start(Parent)
+ after 0 ->
+ Parent ! started
+ end.
+
+
+receive_all(Streamer, Acc) ->
+ Streamer ! {next_bytes, self()},
+ receive
+ started ->
+ restart_remote_open_doc_revs();
+ {body_bytes, Bytes} ->
+ receive_all(Streamer, [Bytes | Acc]);
+ body_done ->
+ lists:reverse(Acc)
+ end.
+
+
+mp_parse_mixed(eof) ->
+ receive {get_headers, From} ->
+ From ! done
+ end;
+mp_parse_mixed({headers, H}) ->
+ receive {get_headers, From} ->
+ From ! {headers, H}
+ end,
+ fun mp_parse_mixed/1;
+mp_parse_mixed({body, Bytes}) ->
+ receive {next_bytes, From} ->
+ From ! {body_bytes, Bytes}
+ end,
+ fun mp_parse_mixed/1;
+mp_parse_mixed(body_end) ->
+ receive {next_bytes, From} ->
+ From ! body_done;
+ {get_headers, From} ->
+ self() ! {get_headers, From}
+ end,
+ fun mp_parse_mixed/1.
+
+
+receive_doc_data(Streamer) ->
+ Streamer ! {next_bytes, self()},
+ receive
+ {body_bytes, Bytes} ->
+ {Bytes, fun() -> receive_doc_data(Streamer) end};
+ body_done ->
+ {<<>>, fun() -> receive_doc_data(Streamer) end}
+ end.
+
+doc_from_multi_part_stream(ContentType, DataFun) ->
+ Self = self(),
+ Parser = spawn_link(fun() ->
+ {<<"--">>, _, _} = couch_httpd:parse_multipart_request(
+ ContentType, DataFun,
+ fun(Next) -> couch_doc:mp_parse_doc(Next, []) end),
+ unlink(Self)
+ end),
+ Parser ! {get_doc_bytes, self()},
+ receive
+ started ->
+ unlink(Parser),
+ exit(Parser, kill),
+ restart_remote_open_doc_revs();
+ {doc_bytes, DocBytes} ->
+ Doc = couch_doc:from_json_obj(?JSON_DECODE(DocBytes)),
+ ReadAttachmentDataFun = fun() ->
+ Parser ! {get_bytes, self()},
+ receive
+ started ->
+ unlink(Parser),
+ exit(Parser, kill),
+ restart_remote_open_doc_revs();
+ {bytes, Bytes} ->
+ Bytes
+ end
+ end,
+ Atts2 = lists:map(
+ fun(#att{data = follows} = A) ->
+ A#att{data = ReadAttachmentDataFun};
+ (A) ->
+ A
+ end, Doc#doc.atts),
+ {ok, Doc#doc{atts = Atts2}}
+ end.
+
+
+changes_ev1(object_start, UserFun, UserAcc) ->
+ fun(Ev) -> changes_ev2(Ev, UserFun, UserAcc) end.
+
+changes_ev2({key, <<"results">>}, UserFun, UserAcc) ->
+ fun(Ev) -> changes_ev3(Ev, UserFun, UserAcc) end;
+changes_ev2(_, UserFun, UserAcc) ->
+ fun(Ev) -> changes_ev2(Ev, UserFun, UserAcc) end.
+
+changes_ev3(array_start, UserFun, UserAcc) ->
+ fun(Ev) -> changes_ev_loop(Ev, UserFun, UserAcc) end.
+
+changes_ev_loop(object_start, UserFun, UserAcc) ->
+ fun(Ev) ->
+ json_stream_parse:collect_object(Ev,
+ fun(Obj) ->
+ UserAcc2 = UserFun(json_to_doc_info(Obj), UserAcc),
+ fun(Ev2) -> changes_ev_loop(Ev2, UserFun, UserAcc2) end
+ end)
+ end;
+changes_ev_loop(array_end, _UserFun, _UserAcc) ->
+ fun(_Ev) -> changes_ev_done() end.
+
+changes_ev_done() ->
+ fun(_Ev) -> changes_ev_done() end.
+
+continuous_changes(DataFun, UserFun) ->
+ {DataFun2, _, Rest} = json_stream_parse:events(
+ DataFun,
+ fun(Ev) -> parse_changes_line(Ev, UserFun) end),
+ continuous_changes(fun() -> {Rest, DataFun2} end, UserFun).
+
+parse_changes_line(object_start, UserFun) ->
+ fun(Ev) ->
+ json_stream_parse:collect_object(Ev,
+ fun(Obj) -> UserFun(json_to_doc_info(Obj)) end)
+ end.
+
+json_to_doc_info({Props}) ->
+ RevsInfo = lists:map(
+ fun({Change}) ->
+ Rev = couch_doc:parse_rev(get_value(<<"rev">>, Change)),
+ Del = (true =:= get_value(<<"deleted">>, Change)),
+ #rev_info{rev=Rev, deleted=Del}
+ end, get_value(<<"changes">>, Props)),
+ #doc_info{
+ id = get_value(<<"id">>, Props),
+ high_seq = get_value(<<"seq">>, Props),
+ revs = RevsInfo
+ }.
+
+
+bulk_results_to_errors(Docs, {ok, Results}, interactive_edit) ->
+ lists:reverse(lists:foldl(
+ fun({_, {ok, _}}, Acc) ->
+ Acc;
+ ({#doc{id = Id}, Error}, Acc) ->
+ {_, Error, _Reason} = couch_httpd:error_info(Error),
+ [ {[{<<"id">>, Id}, {<<"error">>, Error}]} | Acc ]
+ end,
+ [], lists:zip(Docs, Results)));
+
+bulk_results_to_errors(Docs, {ok, Results}, replicated_changes) ->
+ bulk_results_to_errors(Docs, {aborted, Results}, interactive_edit);
+
+bulk_results_to_errors(_Docs, {aborted, Results}, interactive_edit) ->
+ lists:map(
+ fun({{Id, _Rev}, Err}) ->
+ {_, Error, _Reason} = couch_httpd:error_info(Err),
+ {[{<<"id">>, Id}, {<<"error">>, Error}]}
+ end,
+ Results);
+
+bulk_results_to_errors(_Docs, Results, remote) ->
+ lists:reverse(lists:foldl(
+ fun({Props}, Acc) ->
+ case get_value(<<"error">>, Props, get_value(error, Props)) of
+ undefined ->
+ Acc;
+ Error ->
+ Id = get_value(<<"id">>, Props, get_value(id, Props)),
+ [ {[{<<"id">>, Id}, {<<"error">>, Error}]} | Acc ]
+ end
+ end,
+ [], Results)).
+
+
+stream_doc({JsonBytes, Atts, Boundary, Len}) ->
+ case erlang:erase({doc_streamer, Boundary}) of
+ Pid when is_pid(Pid) ->
+ unlink(Pid),
+ exit(Pid, kill);
+ _ ->
+ ok
+ end,
+ Self = self(),
+ DocStreamer = spawn_link(fun() ->
+ couch_doc:doc_to_multi_part_stream(
+ Boundary, JsonBytes, Atts,
+ fun(Data) ->
+ receive {get_data, From} ->
+ From ! {data, Data}
+ end
+ end, true),
+ unlink(Self)
+ end),
+ erlang:put({doc_streamer, Boundary}, DocStreamer),
+ {ok, <<>>, {Len, Boundary}};
+stream_doc({0, Id}) ->
+ erlang:erase({doc_streamer, Id}),
+ eof;
+stream_doc({LenLeft, Id}) when LenLeft > 0 ->
+ erlang:get({doc_streamer, Id}) ! {get_data, self()},
+ receive {data, Data} ->
+ {ok, Data, {LenLeft - iolist_size(Data), Id}}
+ end.
Added: couchdb/trunk/src/couchdb/couch_api_wrap.hrl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_api_wrap.hrl?rev=1071375&view=auto
==============================================================================
--- couchdb/trunk/src/couchdb/couch_api_wrap.hrl (added)
+++ couchdb/trunk/src/couchdb/couch_api_wrap.hrl Wed Feb 16 20:05:31 2011
@@ -0,0 +1,37 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+
+
+-record(httpdb, {
+ url,
+ oauth = nil,
+ headers = [
+ {"Accept", "application/json"},
+ {"User-Agent", "CouchDB/" ++ couch_server:get_version()}
+ ],
+ timeout, % milliseconds
+ ibrowse_options = [],
+ retries = 10,
+ wait = 250, % milliseconds
+ httpc_pool = nil,
+ http_connections,
+ http_pipeline_size
+}).
+
+-record(oauth, {
+ consumer_key,
+ token,
+ token_secret,
+ consumer_secret,
+ signature_method
+}).
Added: couchdb/trunk/src/couchdb/couch_api_wrap_httpc.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_api_wrap_httpc.erl?rev=1071375&view=auto
==============================================================================
--- couchdb/trunk/src/couchdb/couch_api_wrap_httpc.erl (added)
+++ couchdb/trunk/src/couchdb/couch_api_wrap_httpc.erl Wed Feb 16 20:05:31 2011
@@ -0,0 +1,308 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_api_wrap_httpc).
+
+-include("couch_db.hrl").
+-include("couch_api_wrap.hrl").
+-include("../ibrowse/ibrowse.hrl").
+
+-export([setup/1]).
+-export([send_req/3]).
+-export([full_url/2]).
+
+-import(couch_util, [
+ get_value/2,
+ get_value/3
+]).
+
+-define(RETRY_LATER_WAIT, 50).
+-define(replace(L, K, V), lists:keystore(K, 1, L, {K, V})).
+
+
+setup(#httpdb{httpc_pool = nil, url = Url, ibrowse_options = IbrowseOptions,
+ http_connections = MaxConns, http_pipeline_size = PipeSize} = Db) ->
+ HttpcPoolOptions = [
+ {ssl_options, get_value(ssl_options, IbrowseOptions, [])},
+ {max_piped_connections, MaxConns},
+ {pipeline_size, PipeSize}
+ ],
+ {ok, Pid} = couch_httpc_pool:start_link(
+ ibrowse_lib:parse_url(Url), HttpcPoolOptions),
+ {ok, Db#httpdb{httpc_pool = Pid}}.
+
+
+send_req(HttpDb, Params1, Callback) ->
+ Params2 = ?replace(Params1, qs,
+ [{K, ?b2l(iolist_to_binary(V))} || {K, V} <- get_value(qs, Params1, [])]),
+ Params = ?replace(Params2, ibrowse_options,
+ lists:keysort(1, get_value(ibrowse_options, Params2, []))),
+ {Worker, Response} = send_ibrowse_req(HttpDb, Params),
+ process_response(Response, Worker, HttpDb, Params, Callback).
+
+
+send_ibrowse_req(#httpdb{headers = BaseHeaders} = HttpDb, Params) ->
+ Method = get_value(method, Params, get),
+ UserHeaders = lists:keysort(1, get_value(headers, Params, [])),
+ Headers1 = lists:ukeymerge(1, UserHeaders, BaseHeaders),
+ Headers2 = oauth_header(HttpDb, Params) ++ Headers1,
+ Url = full_url(HttpDb, Params),
+ Body = get_value(body, Params, []),
+ {_Type, WorkerPid} = Worker =
+ case get_value(path, Params) of
+ "_changes" ->
+ {ok, Pid} = ibrowse:spawn_link_worker_process(Url),
+ {ibrowse_direct, Pid};
+ _ ->
+ % Direct means no usage of HTTP pipeline. As section 8.1.2.2 of
+ % RFC 2616 says, clients should not pipeline non-idempotent requests.
+ % Let the caller explicitly say which requests are not idempotent.
+ % For e.g. POSTs against "/some_db/_revs_diff" are idempotent
+ % (despite the verb not being GET).
+ case get_value(direct, Params, false) of
+ true ->
+ {ok, Pid} = couch_httpc_pool:get_worker(HttpDb#httpdb.httpc_pool),
+ {direct, Pid};
+ false ->
+ Pid = get_piped_worker(HttpDb),
+ {piped, Pid}
+ end
+ end,
+ IbrowseOptions = [
+ {response_format, binary}, {inactivity_timeout, HttpDb#httpdb.timeout} |
+ lists:ukeymerge(1, get_value(ibrowse_options, Params, []),
+ HttpDb#httpdb.ibrowse_options)
+ ],
+ Response = ibrowse:send_req_direct(
+ WorkerPid, Url, Headers2, Method, Body, IbrowseOptions, infinity),
+ {Worker, Response}.
+
+
+get_piped_worker(#httpdb{httpc_pool = Pool} = HttpDb) ->
+ case couch_httpc_pool:get_piped_worker(Pool) of
+ {ok, Worker} ->
+ Worker;
+ retry_later ->
+ ok = timer:sleep(?RETRY_LATER_WAIT),
+ get_piped_worker(HttpDb)
+ end.
+
+
+process_response({error, sel_conn_closed}, _Worker, HttpDb, Params, Callback) ->
+ ok = timer:sleep(?RETRY_LATER_WAIT),
+ send_req(HttpDb, Params, Callback);
+
+process_response({error, {'EXIT', {normal, _}}}, _Worker, HttpDb, Params, Cb) ->
+ % ibrowse worker terminated because remote peer closed the socket
+ % -> not an error
+ send_req(HttpDb, Params, Cb);
+
+process_response({ibrowse_req_id, ReqId}, Worker, HttpDb, Params, Callback) ->
+ process_stream_response(ReqId, Worker, HttpDb, Params, Callback);
+
+process_response({ok, Code, Headers, Body}, Worker, HttpDb, Params, Callback) ->
+ stop_worker(Worker, HttpDb),
+ case list_to_integer(Code) of
+ Ok when Ok =:= 200 ; Ok =:= 201 ; (Ok >= 400 andalso Ok < 500) ->
+ EJson = case Body of
+ <<>> ->
+ null;
+ Json ->
+ ?JSON_DECODE(Json)
+ end,
+ Callback(Ok, Headers, EJson);
+ R when R =:= 301 ; R =:= 302 ; R =:= 303 ->
+ do_redirect(Worker, R, Headers, HttpDb, Params, Callback);
+ Error ->
+ maybe_retry({code, Error}, Worker, HttpDb, Params, Callback)
+ end;
+
+process_response(Error, Worker, HttpDb, Params, Callback) ->
+ maybe_retry(Error, Worker, HttpDb, Params, Callback).
+
+
+process_stream_response(ReqId, Worker, HttpDb, Params, Callback) ->
+ receive
+ {ibrowse_async_headers, ReqId, Code, Headers} ->
+ case list_to_integer(Code) of
+ Ok when Ok =:= 200 ; Ok =:= 201 ; (Ok >= 400 andalso Ok < 500) ->
+ StreamDataFun = fun() ->
+ stream_data_self(HttpDb, Params, Worker, ReqId, Callback)
+ end,
+ ibrowse:stream_next(ReqId),
+ try
+ Ret = Callback(Ok, Headers, StreamDataFun),
+ stop_worker(Worker, HttpDb),
+ clean_mailbox_req(ReqId),
+ Ret
+ catch throw:{maybe_retry_req, Err} ->
+ clean_mailbox_req(ReqId),
+ maybe_retry(Err, Worker, HttpDb, Params, Callback)
+ end;
+ R when R =:= 301 ; R =:= 302 ; R =:= 303 ->
+ do_redirect(Worker, R, Headers, HttpDb, Params, Callback);
+ Error ->
+ report_error(Worker, HttpDb, Params, {code, Error})
+ end;
+ {ibrowse_async_response, ReqId, {error, _} = Error} ->
+ maybe_retry(Error, Worker, HttpDb, Params, Callback)
+ end.
+
+
+clean_mailbox_req(ReqId) ->
+ receive
+ {ibrowse_async_response, ReqId, _} ->
+ clean_mailbox_req(ReqId);
+ {ibrowse_async_response_end, ReqId} ->
+ clean_mailbox_req(ReqId)
+ after 0 ->
+ ok
+ end.
+
+
+stop_worker({ibrowse_direct, Worker}, _HttpDb) ->
+ unlink(Worker),
+ receive {'EXIT', Worker, _} -> ok after 0 -> ok end,
+ catch ibrowse:stop_worker_process(Worker);
+stop_worker({direct, Worker}, #httpdb{httpc_pool = Pool}) ->
+ ok = couch_httpc_pool:release_worker(Pool, Worker);
+stop_worker({piped, _Worker}, _HttpDb) ->
+ ok.
+
+
+maybe_retry(Error, Worker, #httpdb{retries = 0} = HttpDb, Params, _Cb) ->
+ report_error(Worker, HttpDb, Params, {error, Error});
+
+maybe_retry(Error, Worker, #httpdb{retries = Retries, wait = Wait} = HttpDb,
+ Params, Cb) ->
+ stop_worker(Worker, HttpDb),
+ Method = string:to_upper(atom_to_list(get_value(method, Params, get))),
+ Url = couch_util:url_strip_password(full_url(HttpDb, Params)),
+ ?LOG_INFO("Retrying ~s request to ~s in ~p seconds due to error ~s",
+ [Method, Url, Wait / 1000, error_cause(Error)]),
+ ok = timer:sleep(Wait),
+ send_req(HttpDb#httpdb{retries = Retries - 1, wait = Wait * 2}, Params, Cb).
+
+
+report_error(Worker, HttpDb, Params, Error) ->
+ Method = string:to_upper(atom_to_list(get_value(method, Params, get))),
+ Url = couch_util:url_strip_password(full_url(HttpDb, Params)),
+ do_report_error(Url, Method, Error),
+ stop_worker(Worker, HttpDb),
+ exit({http_request_failed, Method, Url, Error}).
+
+
+do_report_error(Url, Method, {code, Code}) ->
+ ?LOG_ERROR("Replicator, request ~s to ~p failed. The received "
+ "HTTP error code is ~p", [Method, Url, Code]);
+
+do_report_error(FullUrl, Method, Error) ->
+ ?LOG_ERROR("Replicator, request ~s to ~p failed due to error ~s",
+ [Method, FullUrl, error_cause(Error)]).
+
+
+error_cause({throw, {error, Cause}}) ->
+ lists:flatten(io_lib:format("~p", [Cause]));
+error_cause({error, Cause}) ->
+ lists:flatten(io_lib:format("~p", [Cause]));
+error_cause(Cause) ->
+ lists:flatten(io_lib:format("~p", [Cause])).
+
+
+stream_data_self(HttpDb, Params, Worker, ReqId, Cb) ->
+ receive
+ {ibrowse_async_response, ReqId, {error, Error}} ->
+ throw({maybe_retry_req, Error});
+ {ibrowse_async_response, ReqId, <<>>} ->
+ ibrowse:stream_next(ReqId),
+ stream_data_self(HttpDb, Params, Worker, ReqId, Cb);
+ {ibrowse_async_response, ReqId, Data} ->
+ ibrowse:stream_next(ReqId),
+ {Data, fun() -> stream_data_self(HttpDb, Params, Worker, ReqId, Cb) end};
+ {ibrowse_async_response_end, ReqId} ->
+ {<<>>, fun() -> throw({maybe_retry_req, more_data_expected}) end}
+ end.
+
+
+full_url(#httpdb{url = BaseUrl}, Params) ->
+ Path = get_value(path, Params, []),
+ QueryArgs = get_value(qs, Params, []),
+ BaseUrl ++ Path ++ query_args_to_string(QueryArgs, []).
+
+
+query_args_to_string([], []) ->
+ "";
+query_args_to_string([], Acc) ->
+ "?" ++ string:join(lists:reverse(Acc), "&");
+query_args_to_string([{K, V} | Rest], Acc) ->
+ query_args_to_string(Rest, [K ++ "=" ++ V | Acc]).
+
+
+oauth_header(#httpdb{oauth = nil}, _ConnParams) ->
+ [];
+oauth_header(#httpdb{url = BaseUrl, oauth = OAuth}, ConnParams) ->
+ Consumer = {
+ OAuth#oauth.consumer_key,
+ OAuth#oauth.consumer_secret,
+ OAuth#oauth.signature_method
+ },
+ Method = case get_value(method, ConnParams, get) of
+ get -> "GET";
+ post -> "POST";
+ put -> "PUT";
+ head -> "HEAD"
+ end,
+ QSL = get_value(qs, ConnParams, []),
+ OAuthParams = oauth:signed_params(Method,
+ BaseUrl ++ get_value(path, ConnParams, []),
+ QSL, Consumer, OAuth#oauth.token, OAuth#oauth.token_secret) -- QSL,
+ [{"Authorization",
+ "OAuth " ++ oauth_uri:params_to_header_string(OAuthParams)}].
+
+
+do_redirect(Worker, Code, Headers, #httpdb{url = Url} = HttpDb, Params, Cb) ->
+ stop_worker(Worker, HttpDb),
+ RedirectUrl = redirect_url(Headers, Url),
+ {HttpDb2, Params2} = after_redirect(RedirectUrl, Code, HttpDb, Params),
+ send_req(HttpDb2, Params2, Cb).
+
+
+redirect_url(RespHeaders, OrigUrl) ->
+ MochiHeaders = mochiweb_headers:make(RespHeaders),
+ RedUrl = mochiweb_headers:get_value("Location", MochiHeaders),
+ #url{
+ host = Base,
+ port = Port,
+ path = Path, % includes query string
+ protocol = Proto
+ } = ibrowse_lib:parse_url(RedUrl),
+ #url{
+ username = User,
+ password = Passwd
+ } = ibrowse_lib:parse_url(OrigUrl),
+ Creds = case is_list(User) andalso is_list(Passwd) of
+ true ->
+ User ++ ":" ++ Passwd ++ "@";
+ false ->
+ []
+ end,
+ atom_to_list(Proto) ++ "://" ++ Creds ++ Base ++ ":" ++
+ integer_to_list(Port) ++ Path.
+
+after_redirect(RedirectUrl, 303, HttpDb, Params) ->
+ after_redirect(RedirectUrl, HttpDb, ?replace(Params, method, get));
+after_redirect(RedirectUrl, _Code, HttpDb, Params) ->
+ after_redirect(RedirectUrl, HttpDb, Params).
+
+after_redirect(RedirectUrl, HttpDb, Params) ->
+ Params2 = lists:keydelete(path, 1, lists:keydelete(qs, 1, Params)),
+ {HttpDb#httpdb{url = RedirectUrl}, Params2}.
Modified: couchdb/trunk/src/couchdb/couch_db.hrl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_db.hrl?rev=1071375&r1=1071374&r2=1071375&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_db.hrl (original)
+++ couchdb/trunk/src/couchdb/couch_db.hrl Wed Feb 16 20:05:31 2011
@@ -17,6 +17,9 @@
-define(MIN_STR, <<"">>).
-define(MAX_STR, <<255>>). % illegal utf string
+% the lowest possible database sequence number
+-define(LOWEST_SEQ, 0).
+
-define(JSON_ENCODE(V), couch_util:json_encode(V)).
-define(JSON_DECODE(V), couch_util:json_decode(V)).
@@ -243,27 +246,6 @@
view_states=nil
}).
--record(http_db, {
- url,
- auth = [],
- resource = "",
- headers = [
- {"User-Agent", "CouchDB/"++couch_server:get_version()},
- {"Accept", "application/json"},
- {"Accept-Encoding", "gzip"}
- ],
- qs = [],
- method = get,
- body = nil,
- options = [
- {response_format,binary},
- {inactivity_timeout, 30000}
- ],
- retries = 10,
- pause = 500,
- conn = nil
-}).
-
% small value used in revision trees to indicate the revision isn't stored
-define(REV_MISSING, []).
Modified: couchdb/trunk/src/couchdb/couch_doc.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_doc.erl?rev=1071375&r1=1071374&r2=1071375&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_doc.erl (original)
+++ couchdb/trunk/src/couchdb/couch_doc.erl Wed Feb 16 20:05:31 2011
@@ -19,6 +19,7 @@
-export([doc_from_multi_part_stream/2]).
-export([doc_to_multi_part_stream/5, len_doc_to_multi_part_stream/4]).
-export([to_path/1]).
+-export([mp_parse_doc/2]).
-include("couch_db.hrl").
Added: couchdb/trunk/src/couchdb/couch_httpc_pool.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_httpc_pool.erl?rev=1071375&view=auto
==============================================================================
--- couchdb/trunk/src/couchdb/couch_httpc_pool.erl (added)
+++ couchdb/trunk/src/couchdb/couch_httpc_pool.erl Wed Feb 16 20:05:31 2011
@@ -0,0 +1,165 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+% This module is similar to Ibrowse's ibrowse_lb.erl (load balancer) module.
+% The main differences are:
+%
+% 1) Several HTTP connection pools can be spawned. This is important for
+% replications, as each replication can have its own pool which allows
+% for better error isolation - connections (and their pipelines) are not
+% shared between different replications;
+%
+% 2) The caller can request both pipelined connections and non-pipelined
+% connections.
+%
+-module(couch_httpc_pool).
+-behaviour(gen_server).
+
+% public API
+-export([start_link/2, stop/1]).
+-export([get_piped_worker/1]).
+-export([get_worker/1, release_worker/2]).
+
+% gen_server API
+-export([init/1, handle_call/3, handle_info/2, handle_cast/2]).
+-export([code_change/3, terminate/2]).
+
+-include("couch_db.hrl").
+-include("../ibrowse/ibrowse.hrl").
+
+-import(couch_util, [
+ get_value/2,
+ get_value/3
+]).
+
+-record(state, {
+ url,
+ ssl_options,
+ max_piped_workers,
+ pipeline_size,
+ piped_workers,
+ used_piped_workers = 0,
+ free_workers = [], % free workers (connections) without pipeline
+ busy_workers = [] % busy workers (connections) without pipeline
+}).
+
+
+start_link(BaseUrl, Options) ->
+ gen_server:start_link(?MODULE, {BaseUrl, Options}, []).
+
+
+stop(Pool) ->
+ ok = gen_server:call(Pool, stop, infinity).
+
+
+get_piped_worker(Pool) ->
+ gen_server:call(Pool, get_piped_worker, infinity).
+
+
+get_worker(Pool) ->
+ gen_server:call(Pool, get_worker, infinity).
+
+
+% Only workers without a pipeline need to be released.
+release_worker(Pool, Worker) ->
+ ok = gen_server:call(Pool, {release_worker, Worker}, infinity).
+
+
+init({BaseUrl, Options}) ->
+ process_flag(trap_exit, true),
+ State = #state{
+ url = BaseUrl,
+ ssl_options = get_value(ssl_options, Options, []),
+ pipeline_size = get_value(pipeline_size, Options),
+ max_piped_workers = get_value(max_piped_connections, Options),
+ piped_workers = ets:new(httpc_pool, [ordered_set, public])
+ },
+ {ok, State}.
+
+
+handle_call(get_piped_worker, _From,
+ #state{piped_workers = WorkersEts, max_piped_workers = Max,
+ used_piped_workers = Used, url = Url,
+ ssl_options = SslOptions} = State) when Used < Max ->
+ {ok, Worker} = ibrowse_http_client:start_link({WorkersEts, Url,
+ {SslOptions, SslOptions =/= []}}),
+ true = ets:insert(WorkersEts, {{1, Worker}, []}),
+ {reply, {ok, Worker}, State#state{used_piped_workers = Used + 1}};
+
+handle_call(get_piped_worker, _From,
+ #state{piped_workers = WorkersEts, pipeline_size = PipeSize} = State) ->
+ case ets:first(WorkersEts) of
+ {NumSessions, Worker} when NumSessions < PipeSize ->
+ true = ets:delete(WorkersEts, {NumSessions, Worker}),
+ true = ets:insert(WorkersEts, {{NumSessions + 1, Worker}, []}),
+ {reply, {ok, Worker}, State};
+ _ ->
+ {reply, retry_later, State}
+ end;
+
+handle_call(get_worker, _From, #state{
+ free_workers = [], busy_workers = Busy,
+ url = #url{host = Host, port = Port}} = State) ->
+ {ok, Worker} = ibrowse_http_client:start_link({Host, Port}),
+ {reply, {ok, Worker}, State#state{busy_workers = [Worker | Busy]}};
+
+handle_call(get_worker, _From, #state{
+ free_workers = [Worker | RestFree], busy_workers = Busy} = State) ->
+ {reply, {ok, Worker}, State#state{
+ busy_workers = [Worker | Busy], free_workers = RestFree}};
+
+handle_call({release_worker, Worker}, _From, #state{
+ free_workers = Free, busy_workers = Busy} = State) ->
+ case Busy -- [Worker] of
+ Busy ->
+ {reply, ok, State};
+ Busy2 ->
+ {reply, ok, State#state{
+ busy_workers = Busy2, free_workers = [Worker | Free]}}
+ end;
+
+handle_call(stop, _From, #state{piped_workers = WorkersEts,
+ free_workers = Free, busy_workers = Busy} = State) ->
+ ets:foldl(
+ fun({{_, W}, _}, _) -> ibrowse_http_client:stop(W) end, ok, WorkersEts),
+ lists:foreach(fun ibrowse_http_client:stop/1, Free),
+ lists:foreach(fun ibrowse_http_client:stop/1, Busy),
+ {stop, normal, ok, State}.
+
+
+handle_cast(Msg, State) ->
+ {stop, {unexpected_cast, Msg}, State}.
+
+
+handle_info({'EXIT', Pid, _Reason}, #state{
+ piped_workers = WorkersEts, used_piped_workers = Used,
+ busy_workers = Busy, free_workers = Free} = State) ->
+ case Free -- [Pid] of
+ Free ->
+ case Busy -- [Pid] of
+ Busy ->
+ true = ets:match_delete(WorkersEts, {{'_', Pid}, '_'}),
+ {noreply, State#state{used_piped_workers = Used - 1}};
+ Busy2 ->
+ {noreply, State#state{busy_workers = Busy2}}
+ end;
+ Free2 ->
+ {noreply, State#state{free_workers = Free2}}
+ end.
+
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+
+terminate(_Reason, _State) ->
+ ok.
Modified: couchdb/trunk/src/couchdb/couch_httpd_misc_handlers.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_httpd_misc_handlers.erl?rev=1071375&r1=1071374&r2=1071375&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_httpd_misc_handlers.erl (original)
+++ couchdb/trunk/src/couchdb/couch_httpd_misc_handlers.erl Wed Feb 16 20:05:31 2011
@@ -13,7 +13,7 @@
-module(couch_httpd_misc_handlers).
-export([handle_welcome_req/2,handle_favicon_req/2,handle_utils_dir_req/2,
- handle_all_dbs_req/1,handle_replicate_req/1,handle_restart_req/1,
+ handle_all_dbs_req/1,handle_restart_req/1,
handle_uuids_req/1,handle_config_req/1,handle_log_req/1,
handle_task_status_req/1]).
@@ -78,36 +78,6 @@ handle_task_status_req(#httpd{method='GE
handle_task_status_req(Req) ->
send_method_not_allowed(Req, "GET,HEAD").
-handle_replicate_req(#httpd{method='POST'}=Req) ->
- couch_httpd:validate_ctype(Req, "application/json"),
- PostBody = couch_httpd:json_body_obj(Req),
- try couch_rep:replicate(PostBody, Req#httpd.user_ctx) of
- {ok, {continuous, RepId}} ->
- send_json(Req, 202, {[{ok, true}, {<<"_local_id">>, RepId}]});
- {ok, {cancelled, RepId}} ->
- send_json(Req, 200, {[{ok, true}, {<<"_local_id">>, RepId}]});
- {ok, {JsonResults}} ->
- send_json(Req, {[{ok, true} | JsonResults]});
- {error, {Type, Details}} ->
- send_json(Req, 500, {[{error, Type}, {reason, Details}]});
- {error, not_found} ->
- send_json(Req, 404, {[{error, not_found}]});
- {error, Reason} ->
- try
- send_json(Req, 500, {[{error, Reason}]})
- catch
- exit:{json_encode, _} ->
- send_json(Req, 500, {[{error, couch_util:to_binary(Reason)}]})
- end
- catch
- throw:{db_not_found, Msg} ->
- send_json(Req, 404, {[{error, db_not_found}, {reason, Msg}]});
- throw:{unauthorized, Msg} ->
- send_json(Req, 404, {[{error, unauthorized}, {reason, Msg}]})
- end;
-handle_replicate_req(Req) ->
- send_method_not_allowed(Req, "POST").
-
handle_restart_req(#httpd{method='POST'}=Req) ->
couch_httpd:validate_ctype(Req, "application/json"),
Added: couchdb/trunk/src/couchdb/couch_httpd_replicator.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_httpd_replicator.erl?rev=1071375&view=auto
==============================================================================
--- couchdb/trunk/src/couchdb/couch_httpd_replicator.erl (added)
+++ couchdb/trunk/src/couchdb/couch_httpd_replicator.erl Wed Feb 16 20:05:31 2011
@@ -0,0 +1,50 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_httpd_replicator).
+
+-include("couch_db.hrl").
+
+-import(couch_httpd, [
+ send_json/2,
+ send_json/3,
+ send_method_not_allowed/2
+]).
+
+-import(couch_util, [
+ to_binary/1
+]).
+
+-export([handle_req/1]).
+
+
+handle_req(#httpd{method = 'POST', user_ctx = UserCtx} = Req) ->
+ couch_httpd:validate_ctype(Req, "application/json"),
+ RepDoc = couch_httpd:json_body_obj(Req),
+ {ok, Rep} = couch_replicator_utils:parse_rep_doc(RepDoc, UserCtx),
+ case couch_replicator:replicate(Rep) of
+ {error, {Error, Reason}} ->
+ send_json(
+ Req, 404,
+ {[{error, to_binary(Error)}, {reason, to_binary(Reason)}]});
+ {error, Reason} ->
+ send_json(Req, 500, {[{error, to_binary(Reason)}]});
+ {ok, {cancelled, RepId}} ->
+ send_json(Req, 200, {[{ok, true}, {<<"_local_id">>, RepId}]});
+ {ok, {continuous, RepId}} ->
+ send_json(Req, 200, {[{ok, true}, {<<"_local_id">>, RepId}]});
+ {ok, {HistoryResults}} ->
+ send_json(Req, {[{ok, true} | HistoryResults]})
+ end;
+
+handle_req(Req) ->
+ send_method_not_allowed(Req, "POST").
Modified: couchdb/trunk/src/couchdb/couch_primary_sup.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_primary_sup.erl?rev=1071375&r1=1071374&r2=1071375&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_primary_sup.erl (original)
+++ couchdb/trunk/src/couchdb/couch_primary_sup.erl Wed Feb 16 20:05:31 2011
@@ -43,6 +43,12 @@ init([]) ->
brutal_kill,
worker,
dynamic},
+ {couch_replication_event,
+ {gen_event, start_link, [{local, couch_replication}]},
+ permanent,
+ brutal_kill,
+ worker,
+ dynamic},
{couch_replication_supervisor,
{couch_rep_sup, start_link, []},
permanent,
Modified: couchdb/trunk/src/couchdb/couch_rep_db_listener.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_rep_db_listener.erl?rev=1071375&r1=1071374&r2=1071375&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_rep_db_listener.erl (original)
+++ couchdb/trunk/src/couchdb/couch_rep_db_listener.erl Wed Feb 16 20:05:31 2011
@@ -17,11 +17,21 @@
-export([code_change/3, terminate/2]).
-include("couch_db.hrl").
+-include("couch_replicator.hrl").
-define(DOC_ID_TO_REP_ID, rep_doc_id_to_rep_id).
-define(REP_ID_TO_DOC_ID, rep_id_to_rep_doc_id).
-define(INITIAL_WAIT, 5).
+-import(couch_replicator_utils, [
+ parse_rep_doc/2,
+ update_rep_doc/2
+]).
+-import(couch_util, [
+ get_value/2,
+ get_value/3
+]).
+
-record(state, {
changes_feed_loop = nil,
db_notifier = nil,
@@ -30,11 +40,6 @@
max_retries
}).
--import(couch_util, [
- get_value/2,
- get_value/3
-]).
-
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
@@ -77,7 +82,7 @@ handle_call({restart_failure, {Props} =
?LOG_ERROR("Failed to start replication `~s` after ~p attempts using "
"the document `~s`. Last error reason was: ~p",
[pp_rep_id(RepId), MaxRetries, DocId, Error]),
- couch_rep:update_rep_doc(
+ update_rep_doc(
RepDoc,
[{<<"_replication_state">>, <<"error">>},
{<<"_replication_id">>, ?l2b(BaseId)}]),
@@ -153,7 +158,7 @@ code_change(_OldVsn, State, _Extra) ->
changes_feed_loop() ->
- {ok, RepDb} = couch_rep:ensure_rep_db_exists(),
+ {ok, RepDb} = couch_replicator_utils:ensure_rep_db_exists(),
Server = self(),
Pid = spawn_link(
fun() ->
@@ -266,18 +271,17 @@ rep_user_ctx({RepDoc}) ->
maybe_start_replication(#state{max_retries = MaxRetries} = State,
DocId, JsonRepDoc) ->
- UserCtx = rep_user_ctx(JsonRepDoc),
- {BaseId, _} = RepId = couch_rep:make_replication_id(JsonRepDoc, UserCtx),
+ {ok, #rep{id = {BaseId, _} = RepId} = Rep} =
+ parse_rep_doc(JsonRepDoc, rep_user_ctx(JsonRepDoc)),
case ets:lookup(?REP_ID_TO_DOC_ID, BaseId) of
[] ->
true = ets:insert(?REP_ID_TO_DOC_ID, {BaseId, {DocId, true}}),
true = ets:insert(?DOC_ID_TO_REP_ID, {DocId, {RepId, MaxRetries}}),
Server = self(),
- Pid = spawn_link(fun() ->
- start_replication(Server, JsonRepDoc, RepId, UserCtx, MaxRetries)
- end),
+ Pid = spawn_link(
+ fun() -> start_replication(Server, Rep, MaxRetries) end),
State#state{rep_start_pids = [Pid | State#state.rep_start_pids]};
- [{BaseId, {DocId, _}}] ->
+ [{BaseId, DocId}] ->
State;
[{BaseId, {OtherDocId, false}}] ->
?LOG_INFO("The replication specified by the document `~s` was already"
@@ -297,42 +301,37 @@ maybe_tag_rep_doc({Props} = JsonRepDoc,
RepId ->
ok;
_ ->
- couch_rep:update_rep_doc(JsonRepDoc, [{<<"_replication_id">>, RepId}])
+ update_rep_doc(JsonRepDoc, [{<<"_replication_id">>, RepId}])
end.
-start_replication(Server, {RepProps} = RepDoc, RepId, UserCtx, MaxRetries) ->
- case (catch couch_rep:start_replication(RepDoc, RepId, UserCtx)) of
- Pid when is_pid(Pid) ->
- ?LOG_INFO("Document `~s` triggered replication `~s`",
- [get_value(<<"_id">>, RepProps), pp_rep_id(RepId)]),
+start_replication(Server, #rep{id = RepId, doc = {RepProps}} = Rep, MaxRetries) ->
+ case (catch couch_replicator:async_replicate(Rep)) of
+ {ok, _} ->
ok = gen_server:call(Server, {triggered, RepId}, infinity),
- couch_rep:get_result(Pid, RepId, RepDoc, UserCtx);
+ ?LOG_INFO("Document `~s` triggered replication `~s`",
+ [get_value(<<"_id">>, RepProps), pp_rep_id(RepId)]);
Error ->
- keep_retrying(
- Server, RepId, RepDoc, UserCtx, Error, ?INITIAL_WAIT, MaxRetries)
+ keep_retrying(Server, Rep, Error, ?INITIAL_WAIT, MaxRetries)
end.
-keep_retrying(Server, _RepId, RepDoc, _UserCtx, Error, _Wait, 0) ->
- ok = gen_server:call(Server, {restart_failure, RepDoc, Error}, infinity);
+keep_retrying(Server, Rep, Error, _Wait, 0) ->
+ ok = gen_server:call(Server, {restart_failure, Rep, Error}, infinity);
-keep_retrying(Server, RepId, RepDoc, UserCtx, Error, Wait, RetriesLeft) ->
+keep_retrying(Server, #rep{doc = {RepProps}} = Rep, Error, Wait, RetriesLeft) ->
?LOG_ERROR("Error starting replication `~s`: ~p. "
- "Retrying in ~p seconds", [pp_rep_id(RepId), Error, Wait]),
+ "Retrying in ~p seconds", [pp_rep_id(Rep), Error, Wait]),
ok = timer:sleep(Wait * 1000),
- case (catch couch_rep:start_replication(RepDoc, RepId, UserCtx)) of
- Pid when is_pid(Pid) ->
- ok = gen_server:call(Server, {triggered, RepId}, infinity),
- {RepProps} = RepDoc,
+ case (catch couch_replicator:async_replicate(Rep)) of
+ {ok, _} ->
+ ok = gen_server:call(Server, {triggered, Rep#rep.id}, infinity),
DocId = get_value(<<"_id">>, RepProps),
[{DocId, {RepId, MaxRetries}}] = ets:lookup(?DOC_ID_TO_REP_ID, DocId),
?LOG_INFO("Document `~s` triggered replication `~s` after ~p attempts",
- [DocId, pp_rep_id(RepId), MaxRetries - RetriesLeft + 1]),
- couch_rep:get_result(Pid, RepId, RepDoc, UserCtx);
+ [DocId, pp_rep_id(RepId), MaxRetries - RetriesLeft + 1]);
NewError ->
- keep_retrying(
- Server, RepId, RepDoc, UserCtx, NewError, Wait * 2, RetriesLeft - 1)
+ keep_retrying(Server, Rep, NewError, Wait * 2, RetriesLeft - 1)
end.
@@ -359,7 +358,7 @@ replication_complete(DocId) ->
stop_replication(DocId) ->
case ets:lookup(?DOC_ID_TO_REP_ID, DocId) of
[{DocId, {{BaseId, _} = RepId, _MaxRetries}}] ->
- couch_rep:end_replication(RepId),
+ couch_replicator:cancel_replication(RepId),
true = ets:delete(?REP_ID_TO_DOC_ID, BaseId),
true = ets:delete(?DOC_ID_TO_REP_ID, DocId),
{ok, RepId};
@@ -372,7 +371,7 @@ stop_all_replications() ->
?LOG_INFO("Stopping all ongoing replications because the replicator DB "
"was deleted or changed", []),
ets:foldl(
- fun({_, {RepId, _}}, _) -> couch_rep:end_replication(RepId) end,
+ fun({_, {RepId, _}}, _) -> couch_replicator:cancel_replication(RepId) end,
ok,
?DOC_ID_TO_REP_ID
),
@@ -381,5 +380,7 @@ stop_all_replications() ->
% pretty-print replication id
+pp_rep_id(#rep{id = RepId}) ->
+ pp_rep_id(RepId);
pp_rep_id({Base, Extension}) ->
Base ++ Extension.
Added: couchdb/trunk/src/couchdb/couch_replication_notifier.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_replication_notifier.erl?rev=1071375&view=auto
==============================================================================
--- couchdb/trunk/src/couchdb/couch_replication_notifier.erl (added)
+++ couchdb/trunk/src/couchdb/couch_replication_notifier.erl Wed Feb 16 20:05:31 2011
@@ -0,0 +1,57 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replication_notifier).
+
+-behaviour(gen_event).
+
+% public API
+-export([start_link/1, stop/1, notify/1]).
+
+% gen_event callbacks
+-export([init/1, terminate/2, code_change/3]).
+-export([handle_event/2, handle_call/2, handle_info/2]).
+
+-include("couch_db.hrl").
+
+start_link(FunAcc) ->
+ couch_event_sup:start_link(couch_replication,
+ {couch_replication_notifier, make_ref()}, FunAcc).
+
+notify(Event) ->
+ gen_event:notify(couch_replication, Event).
+
+stop(Pid) ->
+ couch_event_sup:stop(Pid).
+
+
+init(FunAcc) ->
+ {ok, FunAcc}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+handle_event(Event, Fun) when is_function(Fun, 1) ->
+ Fun(Event),
+ {ok, Fun};
+handle_event(Event, {Fun, Acc}) when is_function(Fun, 2) ->
+ Acc2 = Fun(Event, Acc),
+ {ok, {Fun, Acc2}}.
+
+handle_call(_Msg, State) ->
+ {reply, ok, State}.
+
+handle_info(_Msg, State) ->
+ {ok, State}.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
Added: couchdb/trunk/src/couchdb/couch_replicator.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_replicator.erl?rev=1071375&view=auto
==============================================================================
--- couchdb/trunk/src/couchdb/couch_replicator.erl (added)
+++ couchdb/trunk/src/couchdb/couch_replicator.erl Wed Feb 16 20:05:31 2011
@@ -0,0 +1,776 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator).
+-behaviour(gen_server).
+
+% public API
+-export([replicate/1]).
+
+% meant to be used only by the replicator database listener
+-export([async_replicate/1]).
+-export([cancel_replication/1]).
+
+% gen_server callbacks
+-export([init/1, terminate/2, code_change/3]).
+-export([handle_call/3, handle_cast/2, handle_info/2]).
+
+-include("couch_db.hrl").
+-include("couch_api_wrap.hrl").
+-include("couch_replicator.hrl").
+
+-import(couch_util, [
+ get_value/2,
+ get_value/3
+]).
+
+-import(couch_replicator_utils, [
+ update_rep_doc/2,
+ start_db_compaction_notifier/2,
+ stop_db_compaction_notifier/1
+]).
+
+-record(rep_state, {
+ rep_details,
+ source_name,
+ target_name,
+ source,
+ target,
+ history,
+ checkpoint_history,
+ start_seq,
+ committed_seq,
+ current_through_seq,
+ seqs_in_progress = [],
+ highest_seq_done = ?LOWEST_SEQ,
+ source_log,
+ target_log,
+ rep_starttime,
+ src_starttime,
+ tgt_starttime,
+ timer, % checkpoint timer
+ missing_revs_queue,
+ changes_queue,
+ changes_reader,
+ missing_rev_finders,
+ workers,
+ stats = #rep_stats{},
+ session_id,
+ source_db_compaction_notifier = nil,
+ target_db_compaction_notifier = nil,
+ source_monitor = nil,
+ target_monitor = nil
+}).
+
+
+replicate(#rep{id = RepId, options = Options} = Rep) ->
+ case get_value(cancel, Options, false) of
+ true ->
+ cancel_replication(RepId);
+ false ->
+ {ok, Listener} = rep_result_listener(RepId),
+ Result = do_replication_loop(Rep),
+ couch_replication_notifier:stop(Listener),
+ Result
+ end.
+
+
+do_replication_loop(#rep{id = {BaseId,_} = Id, options = Options} = Rep) ->
+ case async_replicate(Rep) of
+ {ok, _Pid} ->
+ case get_value(continuous, Options, false) of
+ true ->
+ {ok, {continuous, ?l2b(BaseId)}};
+ false ->
+ wait_for_result(Id)
+ end;
+ Error ->
+ Error
+ end.
+
+
+async_replicate(#rep{id = {BaseId, Ext}, source = Src, target = Tgt} = Rep) ->
+ RepChildId = BaseId ++ Ext,
+ Source = couch_api_wrap:db_uri(Src),
+ Target = couch_api_wrap:db_uri(Tgt),
+ ChildSpec = {
+ RepChildId,
+ {gen_server, start_link, [?MODULE, Rep, []]},
+ transient,
+ 1,
+ worker,
+ [?MODULE]
+ },
+ % All these nested cases to attempt starting/restarting a replication child
+ % are ugly and not 100% race condition free. The following patch submission
+ % is a solution:
+ %
+ % http://erlang.2086793.n4.nabble.com/PATCH-supervisor-atomically-delete-child-spec-when-child-terminates-td3226098.html
+ %
+ case supervisor:start_child(couch_rep_sup, ChildSpec) of
+ {ok, Pid} ->
+ ?LOG_INFO("starting new replication `~s` at ~p (`~s` -> `~s`)",
+ [RepChildId, Pid, Source, Target]),
+ {ok, Pid};
+ {error, already_present} ->
+ case supervisor:restart_child(couch_rep_sup, RepChildId) of
+ {ok, Pid} ->
+ ?LOG_INFO("restarting replication `~s` at ~p (`~s` -> `~s`)",
+ [RepChildId, Pid, Source, Target]),
+ {ok, Pid};
+ {error, running} ->
+ %% this error occurs if multiple replicators are racing
+ %% each other to start and somebody else won. Just grab
+ %% the Pid by calling start_child again.
+ {error, {already_started, Pid}} =
+ supervisor:start_child(couch_rep_sup, ChildSpec),
+ ?LOG_INFO("replication `~s` already running at ~p (`~s` -> `~s`)",
+ [RepChildId, Pid, Source, Target]),
+ {ok, Pid};
+ {error, _} = Error ->
+ Error
+ end;
+ {error, {already_started, Pid}} ->
+ ?LOG_INFO("replication `~s` already running at ~p (`~s` -> `~s`)",
+ [RepChildId, Pid, Source, Target]),
+ {ok, Pid};
+ {error, {Error, _}} ->
+ {error, Error}
+ end.
+
+
+rep_result_listener(RepId) ->
+ ReplyTo = self(),
+ {ok, _Listener} = couch_replication_notifier:start_link(
+ fun({_, RepId2, _} = Ev) when RepId2 =:= RepId ->
+ ReplyTo ! Ev;
+ (_) ->
+ ok
+ end).
+
+
+wait_for_result(RepId) ->
+ receive
+ {finished, RepId, RepResult} ->
+ {ok, RepResult};
+ {error, RepId, Reason} ->
+ {error, Reason}
+ end.
+
+
+cancel_replication({BaseId, Extension}) ->
+ FullRepId = BaseId ++ Extension,
+ case supervisor:terminate_child(couch_rep_sup, FullRepId) of
+ ok ->
+ ok = supervisor:delete_child(couch_rep_sup, FullRepId),
+ {ok, {cancelled, ?l2b(BaseId)}};
+ Error ->
+ Error
+ end.
+
+
+init(InitArgs) ->
+ try
+ do_init(InitArgs)
+ catch
+ throw:{unauthorized, DbUri} ->
+ {stop, {unauthorized,
+ <<"unauthorized to access database ", DbUri/binary>>}};
+ throw:{db_not_found, DbUri} ->
+ {stop, {db_not_found, <<"could not open ", DbUri/binary>>}};
+ throw:Error ->
+ {stop, Error}
+ end.
+
+do_init(#rep{options = Options, id = {BaseId, Ext}} = Rep) ->
+ process_flag(trap_exit, true),
+
+ #rep_state{
+ source = Source,
+ target = Target,
+ source_name = SourceName,
+ target_name = TargetName,
+ start_seq = StartSeq
+ } = State = init_state(Rep),
+
+ CopiersCount = get_value(worker_processes, Options),
+ RevFindersCount = CopiersCount,
+ BatchSize = get_value(worker_batch_size, Options),
+ {ok, MissingRevsQueue} = couch_work_queue:new([
+ {multi_workers, true},
+ {max_items, trunc(CopiersCount * 2.0)}
+ ]),
+ {ok, ChangesQueue} = couch_work_queue:new([
+ {multi_workers, true},
+ {max_items, trunc(BatchSize * RevFindersCount * 2.0)}
+ ]),
+ % This starts the _changes reader process. It adds the changes from
+ % the source db to the ChangesQueue.
+ ChangesReader = spawn_changes_reader(
+ StartSeq, Source, ChangesQueue, Options),
+ % This starts the missing rev finders. They check the target for changes
+ % in the ChangesQueue to see if they exist on the target or not. If not,
+ % adds them to MissingRevsQueue.
+ MissingRevFinders = lists:map(
+ fun(_) ->
+ {ok, Pid} = couch_replicator_rev_finder:start_link(
+ self(), Target, ChangesQueue, MissingRevsQueue, BatchSize),
+ Pid
+ end,
+ lists:seq(1, RevFindersCount)),
+ % This starts the doc copy processes. They fetch documents from the
+ % MissingRevsQueue and copy them from the source to the target database.
+ MaxHttpConns = get_value(http_connections, Options),
+ HttpPipeSize = get_value(http_pipeline_size, Options),
+ MaxParallelConns = (MaxHttpConns * HttpPipeSize) div CopiersCount,
+ Workers = lists:map(
+ fun(_) ->
+ {ok, Pid} = couch_replicator_doc_copier:start_link(
+ self(), Source, Target, MissingRevsQueue, MaxParallelConns),
+ Pid
+ end,
+ lists:seq(1, CopiersCount)),
+
+ maybe_set_triggered(Rep),
+
+ couch_task_status:add_task(
+ "Replication",
+ io_lib:format("`~s`: `~s` -> `~s`",
+ [BaseId ++ Ext, SourceName, TargetName]), "Starting"),
+
+ % Restarting a supervised child implies that the original arguments
+ % (#rep{} record) specified in the MFA component of the supervisor
+ % child spec will always be used whenever the child is restarted.
+ % This implies the same replication performance tunning parameters will
+ % always be used. See the following threads for details:
+ %
+ % http://www.erlang.org/cgi-bin/ezmlm-cgi?3:sss:1772:201012:kihiniifeclgnpodlipd#b
+ % http://erlang.2086793.n4.nabble.com/PATCH-supervisor-atomically-delete-child-spec-when-child-terminates-td3226098.html
+ %
+ % The current solution is to delete the child spec (see cancel_replication/1)
+ % and then start the replication again, but this is unfortunately not immune
+ % to race conditions.
+
+ ?LOG_INFO("Replication `~p` is using:~n"
+ "~c~p worker processes~n"
+ "~ca worker batch size of ~p~n"
+ "~c~p HTTP connections, each with a pipeline size of ~p~n"
+ "~ca connection timeout of ~p milliseconds~n"
+ "~csocket options are: ~s",
+ [BaseId ++ Ext, $\t, CopiersCount, $\t, BatchSize, $\t, MaxHttpConns,
+ HttpPipeSize, $\t, get_value(connection_timeout, Options),
+ $\t, io_lib:format("~p", [get_value(socket_options, Options)])]),
+
+ ?LOG_DEBUG("Missing rev finder pids are: ~p", [MissingRevFinders]),
+ ?LOG_DEBUG("Worker pids are: ~p", [Workers]),
+
+ {ok, State#rep_state{
+ missing_revs_queue = MissingRevsQueue,
+ changes_queue = ChangesQueue,
+ changes_reader = ChangesReader,
+ missing_rev_finders = MissingRevFinders,
+ workers = Workers
+ }
+ }.
+
+
+handle_info({'DOWN', Ref, _, _, Why}, #rep_state{source_monitor = Ref} = St) ->
+ ?LOG_ERROR("Source database is down. Reason: ~p", [Why]),
+ {stop, source_db_down, St};
+
+handle_info({'DOWN', Ref, _, _, Why}, #rep_state{target_monitor = Ref} = St) ->
+ ?LOG_ERROR("Target database is down. Reason: ~p", [Why]),
+ {stop, target_db_down, St};
+
+handle_info({'EXIT', Pid, normal}, #rep_state{changes_reader=Pid} = State) ->
+ {noreply, State};
+
+handle_info({'EXIT', Pid, Reason}, #rep_state{changes_reader=Pid} = State) ->
+ ?LOG_ERROR("ChangesReader process died with reason: ~p", [Reason]),
+ {stop, changes_reader_died, cancel_timer(State)};
+
+handle_info({'EXIT', Pid, normal}, #rep_state{missing_revs_queue=Pid} = St) ->
+ {noreply, St};
+
+handle_info({'EXIT', Pid, Reason}, #rep_state{missing_revs_queue=Pid} = St) ->
+ ?LOG_ERROR("MissingRevsQueue process died with reason: ~p", [Reason]),
+ {stop, missing_revs_queue_died, cancel_timer(St)};
+
+handle_info({'EXIT', Pid, normal}, #rep_state{changes_queue=Pid} = State) ->
+ {noreply, State};
+
+handle_info({'EXIT', Pid, Reason}, #rep_state{changes_queue=Pid} = State) ->
+ ?LOG_ERROR("ChangesQueue process died with reason: ~p", [Reason]),
+ {stop, changes_queue_died, cancel_timer(State)};
+
+handle_info({'EXIT', Pid, normal}, State) ->
+ #rep_state{
+ workers = Workers,
+ missing_rev_finders = RevFinders,
+ missing_revs_queue = RevsQueue
+ } = State,
+ case lists:member(Pid, RevFinders) of
+ false ->
+ case lists:member(Pid, Workers) of
+ false ->
+ {stop, {unknown_process_died, Pid, normal}, State};
+ true ->
+ case Workers -- [Pid] of
+ [] ->
+ {stop, normal, do_last_checkpoint(State)};
+ Workers2 ->
+ {noreply, State#rep_state{workers = Workers2}}
+ end
+ end;
+ true ->
+ case RevFinders -- [Pid] of
+ [] ->
+ couch_work_queue:close(RevsQueue),
+ {noreply, State#rep_state{missing_rev_finders = []}};
+ RevFinders2 ->
+ {noreply, State#rep_state{missing_rev_finders = RevFinders2}}
+ end
+ end;
+
+handle_info({'EXIT', Pid, Reason}, State) ->
+ #rep_state{
+ workers = Workers,
+ missing_rev_finders = RevFinders
+ } = State,
+ State2 = cancel_timer(State),
+ case lists:member(Pid, Workers) of
+ false ->
+ case lists:member(Pid, RevFinders) of
+ false ->
+ {stop, {unknown_process_died, Pid, Reason}, State2};
+ true ->
+ ?LOG_ERROR("RevsFinder ~p died with reason: ~p", [Pid, Reason]),
+ {stop, {revs_finder_died, Pid, Reason}, State2}
+ end;
+ true ->
+ ?LOG_ERROR("DocCopier ~p died with reason: ~p", [Pid, Reason]),
+ {stop, {doc_copier_died, Pid, Reason}, State2}
+ end.
+
+
+handle_call(Msg, _From, State) ->
+ ?LOG_ERROR("Replicator received an unexpected synchronous call: ~p", [Msg]),
+ {stop, unexpected_sync_message, State}.
+
+
+handle_cast({db_compacted, DbName},
+ #rep_state{source = #db{name = DbName} = Source} = State) ->
+ {ok, NewSource} = couch_db:reopen(Source),
+ {noreply, State#rep_state{source = NewSource}};
+
+handle_cast({db_compacted, DbName},
+ #rep_state{target = #db{name = DbName} = Target} = State) ->
+ {ok, NewTarget} = couch_db:reopen(Target),
+ {noreply, State#rep_state{target = NewTarget}};
+
+handle_cast(checkpoint, State) ->
+ State2 = do_checkpoint(State),
+ {noreply, State2#rep_state{timer = start_timer(State)}};
+
+handle_cast({report_seq, Seq},
+ #rep_state{seqs_in_progress = SeqsInProgress} = State) ->
+ NewSeqsInProgress = ordsets:add_element(Seq, SeqsInProgress),
+ {noreply, State#rep_state{seqs_in_progress = NewSeqsInProgress}};
+
+handle_cast({report_seq_done, Seq, StatsInc},
+ #rep_state{seqs_in_progress = SeqsInProgress, highest_seq_done = HighestDone,
+ current_through_seq = ThroughSeq, stats = Stats} = State) ->
+ {NewThroughSeq0, NewSeqsInProgress} = case SeqsInProgress of
+ [Seq | Rest] ->
+ {Seq, Rest};
+ [_ | _] ->
+ {ThroughSeq, ordsets:del_element(Seq, SeqsInProgress)}
+ end,
+ NewHighestDone = lists:max([HighestDone, Seq]),
+ NewThroughSeq = case NewSeqsInProgress of
+ [] ->
+ lists:max([NewThroughSeq0, NewHighestDone]);
+ _ ->
+ NewThroughSeq0
+ end,
+ ?LOG_DEBUG("Worker reported seq ~p, through seq was ~p, "
+ "new through seq is ~p, highest seq done was ~p, "
+ "new highest seq done is ~p~n"
+ "Seqs in progress were: ~p~nSeqs in progress are now: ~p",
+ [Seq, ThroughSeq, NewThroughSeq, HighestDone,
+ NewHighestDone, SeqsInProgress, NewSeqsInProgress]),
+ case NewThroughSeq of
+ ThroughSeq ->
+ ok;
+ _ ->
+ couch_task_status:update("Processed source seq ~p", [NewThroughSeq])
+ end,
+ NewState = State#rep_state{
+ stats = sum_stats([Stats, StatsInc]),
+ current_through_seq = NewThroughSeq,
+ seqs_in_progress = NewSeqsInProgress,
+ highest_seq_done = NewHighestDone
+ },
+ {noreply, NewState};
+
+handle_cast({add_stats, StatsInc}, #rep_state{stats = Stats} = State) ->
+ {noreply, State#rep_state{stats = sum_stats([Stats, StatsInc])}};
+
+handle_cast(Msg, State) ->
+ ?LOG_ERROR("Replicator received an unexpected asynchronous call: ~p", [Msg]),
+ {stop, unexpected_async_message, State}.
+
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+
+terminate(normal, #rep_state{rep_details = #rep{id = RepId, doc = RepDoc},
+ checkpoint_history = CheckpointHistory} = State) ->
+ terminate_cleanup(State),
+ update_rep_doc(RepDoc, [{<<"_replication_state">>, <<"completed">>}]),
+ couch_replication_notifier:notify({finished, RepId, CheckpointHistory});
+
+terminate(shutdown, State) ->
+ % cancelled replication throught ?MODULE:cancel_replication/1
+ terminate_cleanup(State);
+
+terminate(Reason, #rep_state{rep_details = Rep} = State) ->
+ terminate_cleanup(State),
+ update_rep_doc(Rep#rep.doc, [{<<"_replication_state">>, <<"error">>}]),
+ couch_replication_notifier:notify({error, Rep#rep.id, Reason}).
+
+
+terminate_cleanup(State) ->
+ couch_task_status:update("Finishing"),
+ stop_db_compaction_notifier(State#rep_state.source_db_compaction_notifier),
+ stop_db_compaction_notifier(State#rep_state.target_db_compaction_notifier),
+ couch_api_wrap:db_close(State#rep_state.source),
+ couch_api_wrap:db_close(State#rep_state.target).
+
+
+do_last_checkpoint(#rep_state{seqs_in_progress = [],
+ highest_seq_done = ?LOWEST_SEQ} = State) ->
+ cancel_timer(State);
+do_last_checkpoint(#rep_state{seqs_in_progress = [],
+ highest_seq_done = Seq} = State) ->
+ cancel_timer(do_checkpoint(State#rep_state{current_through_seq = Seq})).
+
+
+start_timer(State) ->
+ After = checkpoint_interval(State),
+ case timer:apply_after(After, gen_server, cast, [self(), checkpoint]) of
+ {ok, Ref} ->
+ Ref;
+ Error ->
+ ?LOG_ERROR("Replicator, error scheduling checkpoint: ~p", [Error]),
+ nil
+ end.
+
+
+cancel_timer(#rep_state{timer = nil} = State) ->
+ State;
+cancel_timer(#rep_state{timer = Timer} = State) ->
+ {ok, cancel} = timer:cancel(Timer),
+ State#rep_state{timer = nil}.
+
+
+init_state(Rep) ->
+ #rep{
+ source = Src, target = Tgt,
+ options = Options, user_ctx = UserCtx
+ } = Rep,
+ {ok, Source} = couch_api_wrap:db_open(Src, [{user_ctx, UserCtx}]),
+ {ok, Target} = couch_api_wrap:db_open(Tgt, [{user_ctx, UserCtx}],
+ get_value(create_target, Options, false)),
+
+ {ok, SourceInfo} = couch_api_wrap:get_db_info(Source),
+ {ok, TargetInfo} = couch_api_wrap:get_db_info(Target),
+
+ [SourceLog, TargetLog] = find_replication_logs([Source, Target], Rep),
+
+ {StartSeq, History} = compare_replication_logs(SourceLog, TargetLog),
+ #doc{body={CheckpointHistory}} = SourceLog,
+ State = #rep_state{
+ rep_details = Rep,
+ source_name = couch_api_wrap:db_uri(Source),
+ target_name = couch_api_wrap:db_uri(Target),
+ source = Source,
+ target = Target,
+ history = History,
+ checkpoint_history = {[{<<"no_changes">>, true}| CheckpointHistory]},
+ start_seq = StartSeq,
+ current_through_seq = StartSeq,
+ committed_seq = StartSeq,
+ source_log = SourceLog,
+ target_log = TargetLog,
+ rep_starttime = httpd_util:rfc1123_date(),
+ src_starttime = get_value(<<"instance_start_time">>, SourceInfo),
+ tgt_starttime = get_value(<<"instance_start_time">>, TargetInfo),
+ session_id = couch_uuids:random(),
+ source_db_compaction_notifier =
+ start_db_compaction_notifier(Source, self()),
+ target_db_compaction_notifier =
+ start_db_compaction_notifier(Target, self()),
+ source_monitor = db_monitor(Source),
+ target_monitor = db_monitor(Target)
+ },
+ State#rep_state{timer = start_timer(State)}.
+
+
+find_replication_logs(DbList, #rep{id = {BaseId, _}} = Rep) ->
+ LogId = ?l2b(?LOCAL_DOC_PREFIX ++ BaseId),
+ fold_replication_logs(DbList, ?REP_ID_VERSION, LogId, LogId, Rep, []).
+
+
+fold_replication_logs([], _Vsn, _LogId, _NewId, _Rep, Acc) ->
+ lists:reverse(Acc);
+
+fold_replication_logs([Db | Rest] = Dbs, Vsn, LogId, NewId, Rep, Acc) ->
+ case couch_api_wrap:open_doc(Db, LogId, []) of
+ {error, <<"not_found">>} when Vsn > 1 ->
+ OldRepId = couch_replicator_utils:replication_id(Rep, Vsn - 1),
+ fold_replication_logs(Dbs, Vsn - 1,
+ ?l2b(?LOCAL_DOC_PREFIX ++ OldRepId), NewId, Rep, Acc);
+ {error, <<"not_found">>} ->
+ fold_replication_logs(
+ Rest, ?REP_ID_VERSION, NewId, NewId, Rep, [#doc{id = NewId} | Acc]);
+ {ok, Doc} when LogId =:= NewId ->
+ fold_replication_logs(
+ Rest, ?REP_ID_VERSION, NewId, NewId, Rep, [Doc | Acc]);
+ {ok, Doc} ->
+ MigratedLog = #doc{id = NewId, body = Doc#doc.body},
+ fold_replication_logs(
+ Rest, ?REP_ID_VERSION, NewId, NewId, Rep, [MigratedLog | Acc])
+ end.
+
+
+spawn_changes_reader(StartSeq, Source, ChangesQueue, Options) ->
+ spawn_link(
+ fun()->
+ couch_api_wrap:changes_since(Source, all_docs, StartSeq,
+ fun(DocInfo) ->
+ ok = couch_work_queue:queue(ChangesQueue, DocInfo)
+ end, Options),
+ couch_work_queue:close(ChangesQueue)
+ end).
+
+
+checkpoint_interval(_State) ->
+ 5000.
+
+do_checkpoint(#rep_state{current_through_seq=Seq, committed_seq=Seq} = State) ->
+ State;
+do_checkpoint(State) ->
+ #rep_state{
+ source_name=SourceName,
+ target_name=TargetName,
+ source = Source,
+ target = Target,
+ history = OldHistory,
+ start_seq = StartSeq,
+ current_through_seq = NewSeq,
+ source_log = SourceLog,
+ target_log = TargetLog,
+ rep_starttime = ReplicationStartTime,
+ src_starttime = SrcInstanceStartTime,
+ tgt_starttime = TgtInstanceStartTime,
+ stats = Stats,
+ rep_details = #rep{options = Options, id = {BaseId, Ext}},
+ session_id = SessionId
+ } = State,
+ case commit_to_both(Source, Target) of
+ {SrcInstanceStartTime, TgtInstanceStartTime} ->
+ ?LOG_INFO("recording a checkpoint for `~s` -> `~s` at source update_seq ~p",
+ [SourceName, TargetName, NewSeq]),
+ StartTime = ?l2b(ReplicationStartTime),
+ EndTime = ?l2b(httpd_util:rfc1123_date()),
+ NewHistoryEntry = {[
+ {<<"session_id">>, SessionId},
+ {<<"start_time">>, StartTime},
+ {<<"end_time">>, EndTime},
+ {<<"start_last_seq">>, StartSeq},
+ {<<"end_last_seq">>, NewSeq},
+ {<<"recorded_seq">>, NewSeq},
+ {<<"missing_checked">>, Stats#rep_stats.missing_checked},
+ {<<"missing_found">>, Stats#rep_stats.missing_found},
+ {<<"docs_read">>, Stats#rep_stats.docs_read},
+ {<<"docs_written">>, Stats#rep_stats.docs_written},
+ {<<"doc_write_failures">>, Stats#rep_stats.doc_write_failures}
+ ]},
+ BaseHistory = [
+ {<<"session_id">>, SessionId},
+ {<<"source_last_seq">>, NewSeq}
+ ] ++ case get_value(doc_ids, Options) of
+ undefined ->
+ [];
+ _DocIds ->
+ % backwards compatibility with the result of a replication by
+ % doc IDs in versions 0.11.x and 1.0.x
+ % TODO: deprecate (use same history format, simplify code)
+ [
+ {<<"start_time">>, StartTime},
+ {<<"end_time">>, EndTime},
+ {<<"docs_read">>, Stats#rep_stats.docs_read},
+ {<<"docs_written">>, Stats#rep_stats.docs_written},
+ {<<"doc_write_failures">>, Stats#rep_stats.doc_write_failures}
+ ]
+ end,
+ % limit history to 50 entries
+ NewRepHistory = {
+ BaseHistory ++
+ [{<<"history">>, lists:sublist([NewHistoryEntry | OldHistory], 50)}]
+ },
+
+ try
+ {ok, {SrcRevPos,SrcRevId}} = couch_api_wrap:update_doc(Source,
+ SourceLog#doc{body=NewRepHistory}, [delay_commit]),
+ {ok, {TgtRevPos,TgtRevId}} = couch_api_wrap:update_doc(Target,
+ TargetLog#doc{body=NewRepHistory}, [delay_commit]),
+ State#rep_state{
+ checkpoint_history = NewRepHistory,
+ committed_seq = NewSeq,
+ source_log = SourceLog#doc{revs={SrcRevPos, [SrcRevId]}},
+ target_log = TargetLog#doc{revs={TgtRevPos, [TgtRevId]}}
+ }
+ catch throw:conflict ->
+ ?LOG_ERROR("checkpoint failure: conflict (are you replicating to "
+ "yourself?)", []),
+ State
+ end;
+ _Else ->
+ ?LOG_INFO("rebooting replication `~s` (`~s` -> `~s`) from last known "
+ "replication checkpoint", [BaseId ++ Ext, SourceName, TargetName]),
+ RepInfo = io_lib:format("replication `~s` (`~s` -> `~s`)",
+ [BaseId ++ Ext, SourceName, TargetName]),
+ exit({checkpoint_commit_failure, lists:flatten(RepInfo)})
+ end.
+
+
+commit_to_both(Source, Target) ->
+ % commit the src async
+ ParentPid = self(),
+ SrcCommitPid = spawn_link(
+ fun() ->
+ ParentPid ! {self(), couch_api_wrap:ensure_full_commit(Source)}
+ end),
+
+ % commit tgt sync
+ {ok, TargetStartTime} = couch_api_wrap:ensure_full_commit(Target),
+
+ SourceStartTime =
+ receive
+ {SrcCommitPid, {ok, Timestamp}} ->
+ receive
+ {'EXIT', SrcCommitPid, normal} ->
+ ok
+ end,
+ Timestamp;
+ {'EXIT', SrcCommitPid, _} ->
+ exit(replication_link_failure)
+ end,
+ {SourceStartTime, TargetStartTime}.
+
+
+compare_replication_logs(SrcDoc, TgtDoc) ->
+ #doc{body={RepRecProps}} = SrcDoc,
+ #doc{body={RepRecPropsTgt}} = TgtDoc,
+ case get_value(<<"session_id">>, RepRecProps) ==
+ get_value(<<"session_id">>, RepRecPropsTgt) of
+ true ->
+ % if the records have the same session id,
+ % then we have a valid replication history
+ OldSeqNum = get_value(<<"source_last_seq">>, RepRecProps, ?LOWEST_SEQ),
+ OldHistory = get_value(<<"history">>, RepRecProps, []),
+ {OldSeqNum, OldHistory};
+ false ->
+ SourceHistory = get_value(<<"history">>, RepRecProps, []),
+ TargetHistory = get_value(<<"history">>, RepRecPropsTgt, []),
+ ?LOG_INFO("Replication records differ. "
+ "Scanning histories to find a common ancestor.", []),
+ ?LOG_DEBUG("Record on source:~p~nRecord on target:~p~n",
+ [RepRecProps, RepRecPropsTgt]),
+ compare_rep_history(SourceHistory, TargetHistory)
+ end.
+
+compare_rep_history(S, T) when S =:= [] orelse T =:= [] ->
+ ?LOG_INFO("no common ancestry -- performing full replication", []),
+ {?LOWEST_SEQ, []};
+compare_rep_history([{S} | SourceRest], [{T} | TargetRest] = Target) ->
+ SourceId = get_value(<<"session_id">>, S),
+ case has_session_id(SourceId, Target) of
+ true ->
+ RecordSeqNum = get_value(<<"recorded_seq">>, S, ?LOWEST_SEQ),
+ ?LOG_INFO("found a common replication record with source_seq ~p",
+ [RecordSeqNum]),
+ {RecordSeqNum, SourceRest};
+ false ->
+ TargetId = get_value(<<"session_id">>, T),
+ case has_session_id(TargetId, SourceRest) of
+ true ->
+ RecordSeqNum = get_value(<<"recorded_seq">>, T, ?LOWEST_SEQ),
+ ?LOG_INFO("found a common replication record with source_seq ~p",
+ [RecordSeqNum]),
+ {RecordSeqNum, TargetRest};
+ false ->
+ compare_rep_history(SourceRest, TargetRest)
+ end
+ end.
+
+
+has_session_id(_SessionId, []) ->
+ false;
+has_session_id(SessionId, [{Props} | Rest]) ->
+ case get_value(<<"session_id">>, Props, nil) of
+ SessionId ->
+ true;
+ _Else ->
+ has_session_id(SessionId, Rest)
+ end.
+
+
+sum_stats([Stats1 | RestStats]) ->
+ lists:foldl(
+ fun(Stats, Acc) ->
+ #rep_stats{
+ missing_checked = Stats#rep_stats.missing_checked +
+ Acc#rep_stats.missing_checked,
+ missing_found = Stats#rep_stats.missing_found +
+ Acc#rep_stats.missing_found,
+ docs_read = Stats#rep_stats.docs_read + Acc#rep_stats.docs_read,
+ docs_written = Stats#rep_stats.docs_written +
+ Acc#rep_stats.docs_written,
+ doc_write_failures = Stats#rep_stats.doc_write_failures +
+ Acc#rep_stats.doc_write_failures
+ }
+ end,
+ Stats1, RestStats).
+
+
+maybe_set_triggered(#rep{id = {BaseId, _}, doc = {RepProps} = RepDoc}) ->
+ case get_value(<<"_replication_state">>, RepProps) of
+ <<"triggered">> ->
+ ok;
+ _ ->
+ update_rep_doc(
+ RepDoc,
+ [
+ {<<"_replication_state">>, <<"triggered">>},
+ {<<"_replication_id">>, ?l2b(BaseId)}
+ ])
+ end.
+
+
+db_monitor(#db{} = Db) ->
+ couch_db:monitor(Db);
+db_monitor(_HttpDb) ->
+ nil.
Added: couchdb/trunk/src/couchdb/couch_replicator.hrl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_replicator.hrl?rev=1071375&view=auto
==============================================================================
--- couchdb/trunk/src/couchdb/couch_replicator.hrl (added)
+++ couchdb/trunk/src/couchdb/couch_replicator.hrl Wed Feb 16 20:05:31 2011
@@ -0,0 +1,30 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-define(REP_ID_VERSION, 2).
+
+-record(rep, {
+ id,
+ source,
+ target,
+ options,
+ user_ctx,
+ doc
+}).
+
+-record(rep_stats, {
+ missing_checked = 0,
+ missing_found = 0,
+ docs_read = 0,
+ docs_written = 0,
+ doc_write_failures = 0
+}).
|