Return-Path: X-Original-To: apmail-couchdb-commits-archive@www.apache.org Delivered-To: apmail-couchdb-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3588170B0 for ; Mon, 5 Dec 2011 09:34:26 +0000 (UTC) Received: (qmail 42622 invoked by uid 500); 5 Dec 2011 09:34:25 -0000 Delivered-To: apmail-couchdb-commits-archive@couchdb.apache.org Received: (qmail 42581 invoked by uid 500); 5 Dec 2011 09:34:25 -0000 Mailing-List: contact commits-help@couchdb.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@couchdb.apache.org Delivered-To: mailing list commits@couchdb.apache.org Received: (qmail 42574 invoked by uid 99); 5 Dec 2011 09:34:24 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 05 Dec 2011 09:34:24 +0000 X-ASF-Spam-Status: No, hits=-1998.7 required=5.0 tests=ALL_TRUSTED,URI_HEX X-Spam-Check-By: apache.org Received: from [140.211.11.114] (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 05 Dec 2011 09:34:14 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 53731561C1; Mon, 5 Dec 2011 09:33:29 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: benoitc@apache.org To: commits@couchdb.apache.org X-Mailer: ASF-Git Admin Mailer Subject: [2/8] refactor couch_replicator. close #COUCHDB-1323 . Message-Id: <20111205093330.53731561C1@tyr.zones.apache.org> Date: Mon, 5 Dec 2011 09:33:29 +0000 (UTC) X-Virus-Checked: Checked by ClamAV on apache.org http://git-wip-us.apache.org/repos/asf/couchdb/blob/f913ca6e/src/couch_replicator/src/couch_replicator.erl ---------------------------------------------------------------------- diff --git a/src/couch_replicator/src/couch_replicator.erl b/src/couch_replicator/src/couch_replicator.erl index 40cb9a4..f321bf8 100644 --- a/src/couch_replicator/src/couch_replicator.erl +++ b/src/couch_replicator/src/couch_replicator.erl @@ -25,7 +25,7 @@ -export([handle_call/3, handle_cast/2, handle_info/2]). -include("couch_db.hrl"). --include("couch_api_wrap.hrl"). +-include("couch_replicator_api_wrap.hrl"). -include("couch_replicator.hrl"). -import(couch_util, [ @@ -84,7 +84,7 @@ replicate(#rep{id = RepId, options = Options, user_ctx = UserCtx} = Rep) -> false -> {ok, Listener} = rep_result_listener(RepId), Result = do_replication_loop(Rep), - couch_replication_notifier:stop(Listener), + couch_replicator_notifier:stop(Listener), Result end. @@ -105,8 +105,8 @@ do_replication_loop(#rep{id = {BaseId, Ext} = Id, options = Options} = Rep) -> async_replicate(#rep{id = {BaseId, Ext}, source = Src, target = Tgt} = Rep) -> RepChildId = BaseId ++ Ext, - Source = couch_api_wrap:db_uri(Src), - Target = couch_api_wrap:db_uri(Tgt), + Source = couch_replicator_api_wrap:db_uri(Src), + Target = couch_replicator_api_wrap:db_uri(Tgt), Timeout = get_value(connection_timeout, Rep#rep.options), ChildSpec = { RepChildId, @@ -122,13 +122,13 @@ async_replicate(#rep{id = {BaseId, Ext}, source = Src, target = Tgt} = Rep) -> % % http://erlang.2086793.n4.nabble.com/PATCH-supervisor-atomically-delete-child-spec-when-child-terminates-td3226098.html % - case supervisor:start_child(couch_rep_sup, ChildSpec) of + case supervisor:start_child(couch_replicator_job_sup, ChildSpec) of {ok, Pid} -> ?LOG_INFO("starting new replication `~s` at ~p (`~s` -> `~s`)", [RepChildId, Pid, Source, Target]), {ok, Pid}; {error, already_present} -> - case supervisor:restart_child(couch_rep_sup, RepChildId) of + case supervisor:restart_child(couch_replicator_job_sup, RepChildId) of {ok, Pid} -> ?LOG_INFO("restarting replication `~s` at ~p (`~s` -> `~s`)", [RepChildId, Pid, Source, Target]), @@ -138,7 +138,7 @@ async_replicate(#rep{id = {BaseId, Ext}, source = Src, target = Tgt} = Rep) -> %% each other to start and somebody else won. Just grab %% the Pid by calling start_child again. {error, {already_started, Pid}} = - supervisor:start_child(couch_rep_sup, ChildSpec), + supervisor:start_child(couch_replicator_job_sup, ChildSpec), ?LOG_INFO("replication `~s` already running at ~p (`~s` -> `~s`)", [RepChildId, Pid, Source, Target]), {ok, Pid}; @@ -147,7 +147,7 @@ async_replicate(#rep{id = {BaseId, Ext}, source = Src, target = Tgt} = Rep) -> % Clause to deal with a change in the supervisor module introduced % in R14B02. For more details consult the thread at: % http://erlang.org/pipermail/erlang-bugs/2011-March/002273.html - _ = supervisor:delete_child(couch_rep_sup, RepChildId), + _ = supervisor:delete_child(couch_replicator_job_sup, RepChildId), async_replicate(Rep); {error, _} = Error -> Error @@ -163,7 +163,7 @@ async_replicate(#rep{id = {BaseId, Ext}, source = Src, target = Tgt} = Rep) -> rep_result_listener(RepId) -> ReplyTo = self(), - {ok, _Listener} = couch_replication_notifier:start_link( + {ok, _Listener} = couch_replicator_notifier:start_link( fun({_, RepId2, _} = Ev) when RepId2 =:= RepId -> ReplyTo ! Ev; (_) -> @@ -183,10 +183,10 @@ wait_for_result(RepId) -> cancel_replication({BaseId, Extension}) -> FullRepId = BaseId ++ Extension, ?LOG_INFO("Canceling replication `~s`...", [FullRepId]), - case supervisor:terminate_child(couch_rep_sup, FullRepId) of + case supervisor:terminate_child(couch_replicator_job_sup, FullRepId) of ok -> ?LOG_INFO("Replication `~s` canceled.", [FullRepId]), - case supervisor:delete_child(couch_rep_sup, FullRepId) of + case supervisor:delete_child(couch_replicator_job_sup, FullRepId) of ok -> {ok, {cancelled, ?l2b(FullRepId)}}; {error, not_found} -> @@ -206,7 +206,7 @@ cancel_replication(RepId, #user_ctx{name = Name, roles = Roles}) -> false -> {BaseId, Ext} = RepId, case lists:keysearch( - BaseId ++ Ext, 1, supervisor:which_children(couch_rep_sup)) of + BaseId ++ Ext, 1, supervisor:which_children(couch_replicator_job_sup)) of {value, {_, Pid, _, _}} when is_pid(Pid) -> case (catch gen_server:call(Pid, get_details, infinity)) of {ok, #rep{user_ctx = #user_ctx{name = Name}}} -> @@ -321,7 +321,7 @@ do_init(#rep{options = Options, id = {BaseId, Ext}} = Rep) -> ?LOG_DEBUG("Worker pids are: ~p", [Workers]), - couch_replication_manager:replication_started(Rep), + couch_replicator_manager:replication_started(Rep), {ok, State#rep_state{ changes_queue = ChangesQueue, @@ -458,12 +458,12 @@ code_change(_OldVsn, State, _Extra) -> terminate(normal, #rep_state{rep_details = #rep{id = RepId} = Rep, checkpoint_history = CheckpointHistory} = State) -> terminate_cleanup(State), - couch_replication_notifier:notify({finished, RepId, CheckpointHistory}), - couch_replication_manager:replication_completed(Rep); + couch_replicator_notifier:notify({finished, RepId, CheckpointHistory}), + couch_replicator_manager:replication_completed(Rep); terminate(shutdown, #rep_state{rep_details = #rep{id = RepId}} = State) -> % cancelled replication throught ?MODULE:cancel_replication/1 - couch_replication_notifier:notify({error, RepId, <<"cancelled">>}), + couch_replicator_notifier:notify({error, RepId, <<"cancelled">>}), terminate_cleanup(State); terminate(Reason, State) -> @@ -475,16 +475,16 @@ terminate(Reason, State) -> ?LOG_ERROR("Replication `~s` (`~s` -> `~s`) failed: ~s", [BaseId ++ Ext, Source, Target, to_binary(Reason)]), terminate_cleanup(State), - couch_replication_notifier:notify({error, RepId, Reason}), - couch_replication_manager:replication_error(Rep, Reason). + couch_replicator_notifier:notify({error, RepId, Reason}), + couch_replicator_manager:replication_error(Rep, Reason). terminate_cleanup(State) -> update_task(State), stop_db_compaction_notifier(State#rep_state.source_db_compaction_notifier), stop_db_compaction_notifier(State#rep_state.target_db_compaction_notifier), - couch_api_wrap:db_close(State#rep_state.source), - couch_api_wrap:db_close(State#rep_state.target). + couch_replicator_api_wrap:db_close(State#rep_state.source), + couch_replicator_api_wrap:db_close(State#rep_state.target). do_last_checkpoint(#rep_state{seqs_in_progress = [], @@ -523,12 +523,12 @@ init_state(Rep) -> source = Src, target = Tgt, options = Options, user_ctx = UserCtx } = Rep, - {ok, Source} = couch_api_wrap:db_open(Src, [{user_ctx, UserCtx}]), - {ok, Target} = couch_api_wrap:db_open(Tgt, [{user_ctx, UserCtx}], + {ok, Source} = couch_replicator_api_wrap:db_open(Src, [{user_ctx, UserCtx}]), + {ok, Target} = couch_replicator_api_wrap:db_open(Tgt, [{user_ctx, UserCtx}], get_value(create_target, Options, false)), - {ok, SourceInfo} = couch_api_wrap:get_db_info(Source), - {ok, TargetInfo} = couch_api_wrap:get_db_info(Target), + {ok, SourceInfo} = couch_replicator_api_wrap:get_db_info(Source), + {ok, TargetInfo} = couch_replicator_api_wrap:get_db_info(Target), [SourceLog, TargetLog] = find_replication_logs([Source, Target], Rep), @@ -538,8 +538,8 @@ init_state(Rep) -> #doc{body={CheckpointHistory}} = SourceLog, State = #rep_state{ rep_details = Rep, - source_name = couch_api_wrap:db_uri(Source), - target_name = couch_api_wrap:db_uri(Target), + source_name = couch_replicator_api_wrap:db_uri(Source), + target_name = couch_replicator_api_wrap:db_uri(Target), source = Source, target = Target, history = History, @@ -573,7 +573,7 @@ fold_replication_logs([], _Vsn, _LogId, _NewId, _Rep, Acc) -> lists:reverse(Acc); fold_replication_logs([Db | Rest] = Dbs, Vsn, LogId, NewId, Rep, Acc) -> - case couch_api_wrap:open_doc(Db, LogId, [ejson_body]) of + case couch_replicator_api_wrap:open_doc(Db, LogId, [ejson_body]) of {error, <<"not_found">>} when Vsn > 1 -> OldRepId = couch_replicator_utils:replication_id(Rep, Vsn - 1), fold_replication_logs(Dbs, Vsn - 1, @@ -604,7 +604,7 @@ spawn_changes_reader(StartSeq, Db, ChangesQueue, Options) -> read_changes(StartSeq, Db, ChangesQueue, Options) -> try - couch_api_wrap:changes_since(Db, all_docs, StartSeq, + couch_replicator_api_wrap:changes_since(Db, all_docs, StartSeq, fun(#doc_info{high_seq = Seq, id = Id} = DocInfo) -> case Id of <<>> -> @@ -613,7 +613,7 @@ read_changes(StartSeq, Db, ChangesQueue, Options) -> % is impossible to GET. ?LOG_ERROR("Replicator: ignoring document with empty ID in " "source database `~s` (_changes sequence ~p)", - [couch_api_wrap:db_uri(Db), Seq]); + [couch_replicator_api_wrap:db_uri(Db), Seq]); _ -> ok = couch_work_queue:queue(ChangesQueue, DocInfo) end, @@ -629,12 +629,12 @@ read_changes(StartSeq, Db, ChangesQueue, Options) -> StartSeq -> ?LOG_INFO("Retrying _changes request to source database ~s" " with since=~p in ~p seconds", - [couch_api_wrap:db_uri(Db), LastSeq, Db#httpdb.wait / 1000]), + [couch_replicator_api_wrap:db_uri(Db), LastSeq, Db#httpdb.wait / 1000]), ok = timer:sleep(Db#httpdb.wait), Db#httpdb{wait = 2 * Db#httpdb.wait}; _ -> ?LOG_INFO("Retrying _changes request to source database ~s" - " with since=~p", [couch_api_wrap:db_uri(Db), LastSeq]), + " with since=~p", [couch_replicator_api_wrap:db_uri(Db), LastSeq]), Db end, read_changes(LastSeq, Db2, ChangesQueue, Options); @@ -782,14 +782,14 @@ update_checkpoint(Db, Doc, DbType) -> update_checkpoint(Db, #doc{id = LogId, body = LogBody} = Doc) -> try - case couch_api_wrap:update_doc(Db, Doc, [delay_commit]) of + case couch_replicator_api_wrap:update_doc(Db, Doc, [delay_commit]) of {ok, PosRevId} -> PosRevId; {error, Reason} -> throw({checkpoint_commit_failure, Reason}) end catch throw:conflict -> - case (catch couch_api_wrap:open_doc(Db, LogId, [ejson_body])) of + case (catch couch_replicator_api_wrap:open_doc(Db, LogId, [ejson_body])) of {ok, #doc{body = LogBody, revs = {Pos, [RevId | _]}}} -> % This means that we were able to update successfully the % checkpoint doc in a previous attempt but we got a connection @@ -810,12 +810,12 @@ commit_to_both(Source, Target) -> ParentPid = self(), SrcCommitPid = spawn_link( fun() -> - Result = (catch couch_api_wrap:ensure_full_commit(Source)), + Result = (catch couch_replicator_api_wrap:ensure_full_commit(Source)), ParentPid ! {self(), Result} end), % commit tgt sync - TargetResult = (catch couch_api_wrap:ensure_full_commit(Target)), + TargetResult = (catch couch_replicator_api_wrap:ensure_full_commit(Target)), SourceResult = receive {SrcCommitPid, Result} -> @@ -902,14 +902,14 @@ db_monitor(_HttpDb) -> source_cur_seq(#rep_state{source = #httpdb{} = Db, source_seq = Seq}) -> - case (catch couch_api_wrap:get_db_info(Db#httpdb{retries = 3})) of + case (catch couch_replicator_api_wrap:get_db_info(Db#httpdb{retries = 3})) of {ok, Info} -> get_value(<<"update_seq">>, Info, Seq); _ -> Seq end; source_cur_seq(#rep_state{source = Db, source_seq = Seq}) -> - {ok, Info} = couch_api_wrap:get_db_info(Db), + {ok, Info} = couch_replicator_api_wrap:get_db_info(Db), get_value(<<"update_seq">>, Info, Seq). http://git-wip-us.apache.org/repos/asf/couchdb/blob/f913ca6e/src/couch_replicator/src/couch_replicator_api_wrap.erl ---------------------------------------------------------------------- diff --git a/src/couch_replicator/src/couch_replicator_api_wrap.erl b/src/couch_replicator/src/couch_replicator_api_wrap.erl new file mode 100644 index 0000000..a29fe94 --- /dev/null +++ b/src/couch_replicator/src/couch_replicator_api_wrap.erl @@ -0,0 +1,775 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_replicator_api_wrap). + +% This module wraps the native erlang API, and allows for performing +% operations on a remote vs. local databases via the same API. +% +% Notes: +% Many options and apis aren't yet supported here, they are added as needed. + +-include("couch_db.hrl"). +-include("couch_replicator_api_wrap.hrl"). + +-export([ + db_open/2, + db_open/3, + db_close/1, + get_db_info/1, + update_doc/3, + update_doc/4, + update_docs/3, + update_docs/4, + ensure_full_commit/1, + get_missing_revs/2, + open_doc/3, + open_doc_revs/6, + changes_since/5, + db_uri/1 + ]). + +-import(couch_replicator_httpc, [ + send_req/3 + ]). + +-import(couch_util, [ + encode_doc_id/1, + get_value/2, + get_value/3 + ]). + + +db_uri(#httpdb{url = Url}) -> + couch_util:url_strip_password(Url); + +db_uri(#db{name = Name}) -> + db_uri(Name); + +db_uri(DbName) -> + ?b2l(DbName). + + +db_open(Db, Options) -> + db_open(Db, Options, false). + +db_open(#httpdb{} = Db1, _Options, Create) -> + {ok, Db} = couch_replicator_httpc:setup(Db1), + case Create of + false -> + ok; + true -> + send_req(Db, [{method, put}], fun(_, _, _) -> ok end) + end, + send_req(Db, [{method, head}], + fun(200, _, _) -> + {ok, Db}; + (401, _, _) -> + throw({unauthorized, ?l2b(db_uri(Db))}); + (_, _, _) -> + throw({db_not_found, ?l2b(db_uri(Db))}) + end); +db_open(DbName, Options, Create) -> + try + case Create of + false -> + ok; + true -> + ok = couch_httpd:verify_is_server_admin( + get_value(user_ctx, Options)), + couch_db:create(DbName, Options) + end, + case couch_db:open(DbName, Options) of + {not_found, _Reason} -> + throw({db_not_found, DbName}); + {ok, _Db} = Success -> + Success + end + catch + throw:{unauthorized, _} -> + throw({unauthorized, DbName}) + end. + +db_close(#httpdb{httpc_pool = Pool}) -> + unlink(Pool), + ok = couch_replicator_httpc_pool:stop(Pool); +db_close(DbName) -> + catch couch_db:close(DbName). + + +get_db_info(#httpdb{} = Db) -> + send_req(Db, [], + fun(200, _, {Props}) -> + {ok, Props} + end); +get_db_info(#db{name = DbName, user_ctx = UserCtx}) -> + {ok, Db} = couch_db:open(DbName, [{user_ctx, UserCtx}]), + {ok, Info} = couch_db:get_db_info(Db), + couch_db:close(Db), + {ok, [{couch_util:to_binary(K), V} || {K, V} <- Info]}. + + +ensure_full_commit(#httpdb{} = Db) -> + send_req( + Db, + [{method, post}, {path, "_ensure_full_commit"}, + {headers, [{"Content-Type", "application/json"}]}], + fun(201, _, {Props}) -> + {ok, get_value(<<"instance_start_time">>, Props)}; + (_, _, {Props}) -> + {error, get_value(<<"error">>, Props)} + end); +ensure_full_commit(Db) -> + couch_db:ensure_full_commit(Db). + + +get_missing_revs(#httpdb{} = Db, IdRevs) -> + JsonBody = {[{Id, couch_doc:revs_to_strs(Revs)} || {Id, Revs} <- IdRevs]}, + send_req( + Db, + [{method, post}, {path, "_revs_diff"}, {body, ?JSON_ENCODE(JsonBody)}], + fun(200, _, {Props}) -> + ConvertToNativeFun = fun({Id, {Result}}) -> + MissingRevs = couch_doc:parse_revs( + get_value(<<"missing">>, Result) + ), + PossibleAncestors = couch_doc:parse_revs( + get_value(<<"possible_ancestors">>, Result, []) + ), + {Id, MissingRevs, PossibleAncestors} + end, + {ok, lists:map(ConvertToNativeFun, Props)} + end); +get_missing_revs(Db, IdRevs) -> + couch_db:get_missing_revs(Db, IdRevs). + + + +open_doc_revs(#httpdb{} = HttpDb, Id, Revs, Options, Fun, Acc) -> + Path = encode_doc_id(Id), + QArgs = options_to_query_args( + HttpDb, Path, [revs, {open_revs, Revs} | Options]), + Self = self(), + Streamer = spawn_link(fun() -> + send_req( + HttpDb, + [{path, Path}, {qs, QArgs}, + {ibrowse_options, [{stream_to, {self(), once}}]}, + {headers, [{"Accept", "multipart/mixed"}]}], + fun(200, Headers, StreamDataFun) -> + remote_open_doc_revs_streamer_start(Self), + {<<"--">>, _, _} = couch_httpd:parse_multipart_request( + get_value("Content-Type", Headers), + StreamDataFun, + fun mp_parse_mixed/1) + end), + unlink(Self) + end), + receive + {started_open_doc_revs, Ref} -> + receive_docs_loop(Streamer, Fun, Id, Revs, Ref, Acc) + end; +open_doc_revs(Db, Id, Revs, Options, Fun, Acc) -> + {ok, Results} = couch_db:open_doc_revs(Db, Id, Revs, Options), + {ok, lists:foldl(fun(R, A) -> {_, A2} = Fun(R, A), A2 end, Acc, Results)}. + + +open_doc(#httpdb{} = Db, Id, Options) -> + send_req( + Db, + [{path, encode_doc_id(Id)}, {qs, options_to_query_args(Options, [])}], + fun(200, _, Body) -> + {ok, couch_doc:from_json_obj(Body)}; + (_, _, {Props}) -> + {error, get_value(<<"error">>, Props)} + end); +open_doc(Db, Id, Options) -> + case couch_db:open_doc(Db, Id, Options) of + {ok, _} = Ok -> + Ok; + {not_found, _Reason} -> + {error, <<"not_found">>} + end. + + +update_doc(Db, Doc, Options) -> + update_doc(Db, Doc, Options, interactive_edit). + +update_doc(#httpdb{} = HttpDb, #doc{id = DocId} = Doc, Options, Type) -> + QArgs = case Type of + replicated_changes -> + [{"new_edits", "false"}]; + _ -> + [] + end ++ options_to_query_args(Options, []), + Boundary = couch_uuids:random(), + JsonBytes = ?JSON_ENCODE( + couch_doc:to_json_obj( + Doc, [revs, attachments, follows, att_encoding_info | Options])), + {ContentType, Len} = couch_doc:len_doc_to_multi_part_stream(Boundary, + JsonBytes, Doc#doc.atts, true), + Headers = case lists:member(delay_commit, Options) of + true -> + [{"X-Couch-Full-Commit", "false"}]; + false -> + [] + end ++ [{"Content-Type", ?b2l(ContentType)}, {"Content-Length", Len}], + Body = {fun stream_doc/1, {JsonBytes, Doc#doc.atts, Boundary, Len}}, + send_req( + HttpDb, + [{method, put}, {path, encode_doc_id(DocId)}, + {qs, QArgs}, {headers, Headers}, {body, Body}], + fun(Code, _, {Props}) when Code =:= 200 orelse Code =:= 201 -> + {ok, couch_doc:parse_rev(get_value(<<"rev">>, Props))}; + (409, _, _) -> + throw(conflict); + (Code, _, {Props}) -> + case {Code, get_value(<<"error">>, Props)} of + {401, <<"unauthorized">>} -> + throw({unauthorized, get_value(<<"reason">>, Props)}); + {403, <<"forbidden">>} -> + throw({forbidden, get_value(<<"reason">>, Props)}); + {412, <<"missing_stub">>} -> + throw({missing_stub, get_value(<<"reason">>, Props)}); + {_, Error} -> + {error, Error} + end + end); +update_doc(Db, Doc, Options, Type) -> + couch_db:update_doc(Db, Doc, Options, Type). + + +update_docs(Db, DocList, Options) -> + update_docs(Db, DocList, Options, interactive_edit). + +update_docs(_Db, [], _Options, _UpdateType) -> + {ok, []}; +update_docs(#httpdb{} = HttpDb, DocList, Options, UpdateType) -> + FullCommit = atom_to_list(not lists:member(delay_commit, Options)), + Prefix = case UpdateType of + replicated_changes -> + <<"{\"new_edits\":false,\"docs\":[">>; + interactive_edit -> + <<"{\"docs\":[">> + end, + Suffix = <<"]}">>, + % Note: nginx and other servers don't like PUT/POST requests without + % a Content-Length header, so we can't do a chunked transfer encoding + % and JSON encode each doc only before sending it through the socket. + {Docs, Len} = lists:mapfoldl( + fun(#doc{} = Doc, Acc) -> + Json = ?JSON_ENCODE(couch_doc:to_json_obj(Doc, [revs, attachments])), + {Json, Acc + iolist_size(Json)}; + (Doc, Acc) -> + {Doc, Acc + iolist_size(Doc)} + end, + byte_size(Prefix) + byte_size(Suffix) + length(DocList) - 1, + DocList), + BodyFun = fun(eof) -> + eof; + ([]) -> + {ok, Suffix, eof}; + ([prefix | Rest]) -> + {ok, Prefix, Rest}; + ([Doc]) -> + {ok, Doc, []}; + ([Doc | RestDocs]) -> + {ok, [Doc, ","], RestDocs} + end, + Headers = [ + {"Content-Length", Len}, + {"Content-Type", "application/json"}, + {"X-Couch-Full-Commit", FullCommit} + ], + send_req( + HttpDb, + [{method, post}, {path, "_bulk_docs"}, + {body, {BodyFun, [prefix | Docs]}}, {headers, Headers}], + fun(201, _, Results) when is_list(Results) -> + {ok, bulk_results_to_errors(DocList, Results, remote)}; + (417, _, Results) when is_list(Results) -> + {ok, bulk_results_to_errors(DocList, Results, remote)} + end); +update_docs(Db, DocList, Options, UpdateType) -> + Result = couch_db:update_docs(Db, DocList, Options, UpdateType), + {ok, bulk_results_to_errors(DocList, Result, UpdateType)}. + + +changes_since(#httpdb{headers = Headers1} = HttpDb, Style, StartSeq, + UserFun, Options) -> + BaseQArgs = case get_value(continuous, Options, false) of + false -> + [{"feed", "normal"}]; + true -> + [{"feed", "continuous"}, {"heartbeat", "10000"}] + end ++ [ + {"style", atom_to_list(Style)}, {"since", couch_util:to_list(StartSeq)} + ], + DocIds = get_value(doc_ids, Options), + {QArgs, Method, Body, Headers} = case DocIds of + undefined -> + QArgs1 = maybe_add_changes_filter_q_args(BaseQArgs, Options), + {QArgs1, get, [], Headers1}; + _ when is_list(DocIds) -> + Headers2 = [{"Content-Type", "application/json"} | Headers1], + JsonDocIds = ?JSON_ENCODE({[{<<"doc_ids">>, DocIds}]}), + {[{"filter", "_doc_ids"} | BaseQArgs], post, JsonDocIds, Headers2} + end, + send_req( + HttpDb, + [{method, Method}, {path, "_changes"}, {qs, QArgs}, + {headers, Headers}, {body, Body}, + {ibrowse_options, [{stream_to, {self(), once}}]}], + fun(200, _, DataStreamFun) -> + parse_changes_feed(Options, UserFun, DataStreamFun); + (405, _, _) when is_list(DocIds) -> + % CouchDB versions < 1.1.0 don't have the builtin _changes feed + % filter "_doc_ids" neither support POST + send_req(HttpDb, [{method, get}, {path, "_changes"}, + {qs, BaseQArgs}, {headers, Headers1}, + {ibrowse_options, [{stream_to, {self(), once}}]}], + fun(200, _, DataStreamFun2) -> + UserFun2 = fun(#doc_info{id = Id} = DocInfo) -> + case lists:member(Id, DocIds) of + true -> + UserFun(DocInfo); + false -> + ok + end + end, + parse_changes_feed(Options, UserFun2, DataStreamFun2) + end) + end); +changes_since(Db, Style, StartSeq, UserFun, Options) -> + Filter = case get_value(doc_ids, Options) of + undefined -> + ?b2l(get_value(filter, Options, <<>>)); + _DocIds -> + "_doc_ids" + end, + Args = #changes_args{ + style = Style, + since = StartSeq, + filter = Filter, + feed = case get_value(continuous, Options, false) of + true -> + "continuous"; + false -> + "normal" + end, + timeout = infinity + }, + QueryParams = get_value(query_params, Options, {[]}), + Req = changes_json_req(Db, Filter, QueryParams, Options), + ChangesFeedFun = couch_changes:handle_changes(Args, {json_req, Req}, Db), + ChangesFeedFun(fun({change, Change, _}, _) -> + UserFun(json_to_doc_info(Change)); + (_, _) -> + ok + end). + + +% internal functions + +maybe_add_changes_filter_q_args(BaseQS, Options) -> + case get_value(filter, Options) of + undefined -> + BaseQS; + FilterName -> + {Params} = get_value(query_params, Options, {[]}), + [{"filter", ?b2l(FilterName)} | lists:foldl( + fun({K, V}, QSAcc) -> + Ks = couch_util:to_list(K), + case lists:keymember(Ks, 1, QSAcc) of + true -> + QSAcc; + false -> + [{Ks, couch_util:to_list(V)} | QSAcc] + end + end, + BaseQS, Params)] + end. + +parse_changes_feed(Options, UserFun, DataStreamFun) -> + case get_value(continuous, Options, false) of + true -> + continuous_changes(DataStreamFun, UserFun); + false -> + EventFun = fun(Ev) -> + changes_ev1(Ev, fun(DocInfo, _) -> UserFun(DocInfo) end, []) + end, + json_stream_parse:events(DataStreamFun, EventFun) + end. + +changes_json_req(_Db, "", _QueryParams, _Options) -> + {[]}; +changes_json_req(_Db, "_doc_ids", _QueryParams, Options) -> + {[{<<"doc_ids">>, get_value(doc_ids, Options)}]}; +changes_json_req(Db, FilterName, {QueryParams}, _Options) -> + {ok, Info} = couch_db:get_db_info(Db), + % simulate a request to db_name/_changes + {[ + {<<"info">>, {Info}}, + {<<"id">>, null}, + {<<"method">>, 'GET'}, + {<<"path">>, [couch_db:name(Db), <<"_changes">>]}, + {<<"query">>, {[{<<"filter">>, FilterName} | QueryParams]}}, + {<<"headers">>, []}, + {<<"body">>, []}, + {<<"peer">>, <<"replicator">>}, + {<<"form">>, []}, + {<<"cookie">>, []}, + {<<"userCtx">>, couch_util:json_user_ctx(Db)} + ]}. + + +options_to_query_args(HttpDb, Path, Options) -> + case lists:keytake(atts_since, 1, Options) of + false -> + options_to_query_args(Options, []); + {value, {atts_since, []}, Options2} -> + options_to_query_args(Options2, []); + {value, {atts_since, PAs}, Options2} -> + QueryArgs1 = options_to_query_args(Options2, []), + FullUrl = couch_replicator_httpc:full_url( + HttpDb, [{path, Path}, {qs, QueryArgs1}]), + RevList = atts_since_arg( + length("GET " ++ FullUrl ++ " HTTP/1.1\r\n") + + length("&atts_since=") + 6, % +6 = % encoded [ and ] + PAs, []), + [{"atts_since", ?JSON_ENCODE(RevList)} | QueryArgs1] + end. + + +options_to_query_args([], Acc) -> + lists:reverse(Acc); +options_to_query_args([ejson_body | Rest], Acc) -> + options_to_query_args(Rest, Acc); +options_to_query_args([delay_commit | Rest], Acc) -> + options_to_query_args(Rest, Acc); +options_to_query_args([revs | Rest], Acc) -> + options_to_query_args(Rest, [{"revs", "true"} | Acc]); +options_to_query_args([{open_revs, all} | Rest], Acc) -> + options_to_query_args(Rest, [{"open_revs", "all"} | Acc]); +options_to_query_args([{open_revs, Revs} | Rest], Acc) -> + JsonRevs = ?b2l(?JSON_ENCODE(couch_doc:revs_to_strs(Revs))), + options_to_query_args(Rest, [{"open_revs", JsonRevs} | Acc]). + + +-define(MAX_URL_LEN, 7000). + +atts_since_arg(_UrlLen, [], Acc) -> + lists:reverse(Acc); +atts_since_arg(UrlLen, [PA | Rest], Acc) -> + RevStr = couch_doc:rev_to_str(PA), + NewUrlLen = case Rest of + [] -> + % plus 2 double quotes (% encoded) + UrlLen + size(RevStr) + 6; + _ -> + % plus 2 double quotes and a comma (% encoded) + UrlLen + size(RevStr) + 9 + end, + case NewUrlLen >= ?MAX_URL_LEN of + true -> + lists:reverse(Acc); + false -> + atts_since_arg(NewUrlLen, Rest, [RevStr | Acc]) + end. + + +% TODO: A less verbose, more elegant and automatic restart strategy for +% the exported open_doc_revs/6 function. The restart should be +% transparent to the caller like any other Couch API function exported +% by this module. +receive_docs_loop(Streamer, Fun, Id, Revs, Ref, Acc) -> + try + % Left only for debugging purposes via an interactive or remote shell + erlang:put(open_doc_revs, {Id, Revs, Ref, Streamer}), + receive_docs(Streamer, Fun, Ref, Acc) + catch + error:{restart_open_doc_revs, NewRef} -> + receive_docs_loop(Streamer, Fun, Id, Revs, NewRef, Acc) + end. + +receive_docs(Streamer, UserFun, Ref, UserAcc) -> + Streamer ! {get_headers, Ref, self()}, + receive + {started_open_doc_revs, NewRef} -> + restart_remote_open_doc_revs(Ref, NewRef); + {headers, Ref, Headers} -> + case get_value("content-type", Headers) of + {"multipart/related", _} = ContentType -> + case doc_from_multi_part_stream( + ContentType, + fun() -> receive_doc_data(Streamer, Ref) end, + Ref) of + {ok, Doc, Parser} -> + case UserFun({ok, Doc}, UserAcc) of + {ok, UserAcc2} -> + ok; + {skip, UserAcc2} -> + couch_doc:abort_multi_part_stream(Parser) + end, + receive_docs(Streamer, UserFun, Ref, UserAcc2) + end; + {"application/json", []} -> + Doc = couch_doc:from_json_obj( + ?JSON_DECODE(receive_all(Streamer, Ref, []))), + {_, UserAcc2} = UserFun({ok, Doc}, UserAcc), + receive_docs(Streamer, UserFun, Ref, UserAcc2); + {"application/json", [{"error","true"}]} -> + {ErrorProps} = ?JSON_DECODE(receive_all(Streamer, Ref, [])), + Rev = get_value(<<"missing">>, ErrorProps), + Result = {{not_found, missing}, couch_doc:parse_rev(Rev)}, + {_, UserAcc2} = UserFun(Result, UserAcc), + receive_docs(Streamer, UserFun, Ref, UserAcc2) + end; + {done, Ref} -> + {ok, UserAcc} + end. + + +restart_remote_open_doc_revs(Ref, NewRef) -> + receive + {body_bytes, Ref, _} -> + restart_remote_open_doc_revs(Ref, NewRef); + {body_done, Ref} -> + restart_remote_open_doc_revs(Ref, NewRef); + {done, Ref} -> + restart_remote_open_doc_revs(Ref, NewRef); + {headers, Ref, _} -> + restart_remote_open_doc_revs(Ref, NewRef) + after 0 -> + erlang:error({restart_open_doc_revs, NewRef}) + end. + + +remote_open_doc_revs_streamer_start(Parent) -> + receive + {get_headers, _Ref, Parent} -> + remote_open_doc_revs_streamer_start(Parent); + {next_bytes, _Ref, Parent} -> + remote_open_doc_revs_streamer_start(Parent) + after 0 -> + Parent ! {started_open_doc_revs, make_ref()} + end. + + +receive_all(Streamer, Ref, Acc) -> + Streamer ! {next_bytes, Ref, self()}, + receive + {started_open_doc_revs, NewRef} -> + restart_remote_open_doc_revs(Ref, NewRef); + {body_bytes, Ref, Bytes} -> + receive_all(Streamer, Ref, [Bytes | Acc]); + {body_done, Ref} -> + lists:reverse(Acc) + end. + + +mp_parse_mixed(eof) -> + receive {get_headers, Ref, From} -> + From ! {done, Ref} + end; +mp_parse_mixed({headers, H}) -> + receive {get_headers, Ref, From} -> + From ! {headers, Ref, H} + end, + fun mp_parse_mixed/1; +mp_parse_mixed({body, Bytes}) -> + receive {next_bytes, Ref, From} -> + From ! {body_bytes, Ref, Bytes} + end, + fun mp_parse_mixed/1; +mp_parse_mixed(body_end) -> + receive {next_bytes, Ref, From} -> + From ! {body_done, Ref}; + {get_headers, Ref, From} -> + self() ! {get_headers, Ref, From} + end, + fun mp_parse_mixed/1. + + +receive_doc_data(Streamer, Ref) -> + Streamer ! {next_bytes, Ref, self()}, + receive + {body_bytes, Ref, Bytes} -> + {Bytes, fun() -> receive_doc_data(Streamer, Ref) end}; + {body_done, Ref} -> + {<<>>, fun() -> receive_doc_data(Streamer, Ref) end} + end. + +doc_from_multi_part_stream(ContentType, DataFun, Ref) -> + Self = self(), + Parser = spawn_link(fun() -> + {<<"--">>, _, _} = couch_httpd:parse_multipart_request( + ContentType, DataFun, + fun(Next) -> couch_doc:mp_parse_doc(Next, []) end), + unlink(Self) + end), + Parser ! {get_doc_bytes, Ref, self()}, + receive + {started_open_doc_revs, NewRef} -> + unlink(Parser), + exit(Parser, kill), + restart_remote_open_doc_revs(Ref, NewRef); + {doc_bytes, Ref, DocBytes} -> + Doc = couch_doc:from_json_obj(?JSON_DECODE(DocBytes)), + ReadAttachmentDataFun = fun() -> + Parser ! {get_bytes, Ref, self()}, + receive + {started_open_doc_revs, NewRef} -> + unlink(Parser), + exit(Parser, kill), + receive {bytes, Ref, _} -> ok after 0 -> ok end, + restart_remote_open_doc_revs(Ref, NewRef); + {bytes, Ref, Bytes} -> + Bytes + end + end, + Atts2 = lists:map( + fun(#att{data = follows} = A) -> + A#att{data = ReadAttachmentDataFun}; + (A) -> + A + end, Doc#doc.atts), + {ok, Doc#doc{atts = Atts2}, Parser} + end. + + +changes_ev1(object_start, UserFun, UserAcc) -> + fun(Ev) -> changes_ev2(Ev, UserFun, UserAcc) end. + +changes_ev2({key, <<"results">>}, UserFun, UserAcc) -> + fun(Ev) -> changes_ev3(Ev, UserFun, UserAcc) end; +changes_ev2(_, UserFun, UserAcc) -> + fun(Ev) -> changes_ev2(Ev, UserFun, UserAcc) end. + +changes_ev3(array_start, UserFun, UserAcc) -> + fun(Ev) -> changes_ev_loop(Ev, UserFun, UserAcc) end. + +changes_ev_loop(object_start, UserFun, UserAcc) -> + fun(Ev) -> + json_stream_parse:collect_object(Ev, + fun(Obj) -> + UserAcc2 = UserFun(json_to_doc_info(Obj), UserAcc), + fun(Ev2) -> changes_ev_loop(Ev2, UserFun, UserAcc2) end + end) + end; +changes_ev_loop(array_end, _UserFun, _UserAcc) -> + fun(_Ev) -> changes_ev_done() end. + +changes_ev_done() -> + fun(_Ev) -> changes_ev_done() end. + +continuous_changes(DataFun, UserFun) -> + {DataFun2, _, Rest} = json_stream_parse:events( + DataFun, + fun(Ev) -> parse_changes_line(Ev, UserFun) end), + continuous_changes(fun() -> {Rest, DataFun2} end, UserFun). + +parse_changes_line(object_start, UserFun) -> + fun(Ev) -> + json_stream_parse:collect_object(Ev, + fun(Obj) -> UserFun(json_to_doc_info(Obj)) end) + end. + +json_to_doc_info({Props}) -> + RevsInfo = lists:map( + fun({Change}) -> + Rev = couch_doc:parse_rev(get_value(<<"rev">>, Change)), + Del = (true =:= get_value(<<"deleted">>, Change)), + #rev_info{rev=Rev, deleted=Del} + end, get_value(<<"changes">>, Props)), + #doc_info{ + id = get_value(<<"id">>, Props), + high_seq = get_value(<<"seq">>, Props), + revs = RevsInfo + }. + + +bulk_results_to_errors(Docs, {ok, Results}, interactive_edit) -> + lists:reverse(lists:foldl( + fun({_, {ok, _}}, Acc) -> + Acc; + ({#doc{id = Id, revs = {Pos, [RevId | _]}}, Error}, Acc) -> + {_, Error, Reason} = couch_httpd:error_info(Error), + [ {[{id, Id}, {rev, rev_to_str({Pos, RevId})}, + {error, Error}, {reason, Reason}]} | Acc ] + end, + [], lists:zip(Docs, Results))); + +bulk_results_to_errors(Docs, {ok, Results}, replicated_changes) -> + bulk_results_to_errors(Docs, {aborted, Results}, interactive_edit); + +bulk_results_to_errors(_Docs, {aborted, Results}, interactive_edit) -> + lists:map( + fun({{Id, Rev}, Err}) -> + {_, Error, Reason} = couch_httpd:error_info(Err), + {[{id, Id}, {rev, rev_to_str(Rev)}, {error, Error}, {reason, Reason}]} + end, + Results); + +bulk_results_to_errors(_Docs, Results, remote) -> + lists:reverse(lists:foldl( + fun({Props}, Acc) -> + case get_value(<<"error">>, Props, get_value(error, Props)) of + undefined -> + Acc; + Error -> + Id = get_value(<<"id">>, Props, get_value(id, Props)), + Rev = get_value(<<"rev">>, Props, get_value(rev, Props)), + Reason = get_value(<<"reason">>, Props, get_value(reason, Props)), + [ {[{id, Id}, {rev, rev_to_str(Rev)}, + {error, Error}, {reason, Reason}]} | Acc ] + end + end, + [], Results)). + + +rev_to_str({_Pos, _Id} = Rev) -> + couch_doc:rev_to_str(Rev); +rev_to_str(Rev) -> + Rev. + + +stream_doc({JsonBytes, Atts, Boundary, Len}) -> + case erlang:erase({doc_streamer, Boundary}) of + Pid when is_pid(Pid) -> + unlink(Pid), + exit(Pid, kill); + _ -> + ok + end, + Self = self(), + DocStreamer = spawn_link(fun() -> + couch_doc:doc_to_multi_part_stream( + Boundary, JsonBytes, Atts, + fun(Data) -> + receive {get_data, Ref, From} -> + From ! {data, Ref, Data} + end + end, true), + unlink(Self) + end), + erlang:put({doc_streamer, Boundary}, DocStreamer), + {ok, <<>>, {Len, Boundary}}; +stream_doc({0, Id}) -> + erlang:erase({doc_streamer, Id}), + eof; +stream_doc({LenLeft, Id}) when LenLeft > 0 -> + Ref = make_ref(), + erlang:get({doc_streamer, Id}) ! {get_data, Ref, self()}, + receive {data, Ref, Data} -> + {ok, Data, {LenLeft - iolist_size(Data), Id}} + end. http://git-wip-us.apache.org/repos/asf/couchdb/blob/f913ca6e/src/couch_replicator/src/couch_replicator_api_wrap.hrl ---------------------------------------------------------------------- diff --git a/src/couch_replicator/src/couch_replicator_api_wrap.hrl b/src/couch_replicator/src/couch_replicator_api_wrap.hrl new file mode 100644 index 0000000..1a6f27a --- /dev/null +++ b/src/couch_replicator/src/couch_replicator_api_wrap.hrl @@ -0,0 +1,36 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + + + +-record(httpdb, { + url, + oauth = nil, + headers = [ + {"Accept", "application/json"}, + {"User-Agent", "CouchDB/" ++ couch_server:get_version()} + ], + timeout, % milliseconds + ibrowse_options = [], + retries = 10, + wait = 250, % milliseconds + httpc_pool = nil, + http_connections +}). + +-record(oauth, { + consumer_key, + token, + token_secret, + consumer_secret, + signature_method +}). http://git-wip-us.apache.org/repos/asf/couchdb/blob/f913ca6e/src/couch_replicator/src/couch_replicator_httpc.erl ---------------------------------------------------------------------- diff --git a/src/couch_replicator/src/couch_replicator_httpc.erl b/src/couch_replicator/src/couch_replicator_httpc.erl new file mode 100644 index 0000000..6804448 --- /dev/null +++ b/src/couch_replicator/src/couch_replicator_httpc.erl @@ -0,0 +1,286 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_replicator_httpc). + +-include("couch_db.hrl"). +-include("couch_replicator_api_wrap.hrl"). +-include("../ibrowse/ibrowse.hrl"). + +-export([setup/1]). +-export([send_req/3]). +-export([full_url/2]). + +-import(couch_util, [ + get_value/2, + get_value/3 +]). + +-define(replace(L, K, V), lists:keystore(K, 1, L, {K, V})). +-define(MAX_WAIT, 5 * 60 * 1000). + + +setup(#httpdb{httpc_pool = nil, url = Url, http_connections = MaxConns} = Db) -> + {ok, Pid} = couch_replicator_httpc_pool:start_link(Url, [{max_connections, MaxConns}]), + {ok, Db#httpdb{httpc_pool = Pid}}. + + +send_req(HttpDb, Params1, Callback) -> + Params2 = ?replace(Params1, qs, + [{K, ?b2l(iolist_to_binary(V))} || {K, V} <- get_value(qs, Params1, [])]), + Params = ?replace(Params2, ibrowse_options, + lists:keysort(1, get_value(ibrowse_options, Params2, []))), + {Worker, Response} = send_ibrowse_req(HttpDb, Params), + process_response(Response, Worker, HttpDb, Params, Callback). + + +send_ibrowse_req(#httpdb{headers = BaseHeaders} = HttpDb, Params) -> + Method = get_value(method, Params, get), + UserHeaders = lists:keysort(1, get_value(headers, Params, [])), + Headers1 = lists:ukeymerge(1, UserHeaders, BaseHeaders), + Headers2 = oauth_header(HttpDb, Params) ++ Headers1, + Url = full_url(HttpDb, Params), + Body = get_value(body, Params, []), + case get_value(path, Params) of + "_changes" -> + {ok, Worker} = ibrowse:spawn_link_worker_process(Url); + _ -> + {ok, Worker} = couch_replicator_httpc_pool:get_worker(HttpDb#httpdb.httpc_pool) + end, + IbrowseOptions = [ + {response_format, binary}, {inactivity_timeout, HttpDb#httpdb.timeout} | + lists:ukeymerge(1, get_value(ibrowse_options, Params, []), + HttpDb#httpdb.ibrowse_options) + ], + Response = ibrowse:send_req_direct( + Worker, Url, Headers2, Method, Body, IbrowseOptions, infinity), + {Worker, Response}. + + +process_response({error, sel_conn_closed}, _Worker, HttpDb, Params, Callback) -> + send_req(HttpDb, Params, Callback); + +process_response({error, {'EXIT', {normal, _}}}, _Worker, HttpDb, Params, Cb) -> + % ibrowse worker terminated because remote peer closed the socket + % -> not an error + send_req(HttpDb, Params, Cb); + +process_response({ibrowse_req_id, ReqId}, Worker, HttpDb, Params, Callback) -> + process_stream_response(ReqId, Worker, HttpDb, Params, Callback); + +process_response({ok, Code, Headers, Body}, Worker, HttpDb, Params, Callback) -> + release_worker(Worker, HttpDb), + case list_to_integer(Code) of + Ok when Ok =:= 200 ; Ok =:= 201 ; (Ok >= 400 andalso Ok < 500) -> + EJson = case Body of + <<>> -> + null; + Json -> + ?JSON_DECODE(Json) + end, + Callback(Ok, Headers, EJson); + R when R =:= 301 ; R =:= 302 ; R =:= 303 -> + do_redirect(Worker, R, Headers, HttpDb, Params, Callback); + Error -> + maybe_retry({code, Error}, Worker, HttpDb, Params, Callback) + end; + +process_response(Error, Worker, HttpDb, Params, Callback) -> + maybe_retry(Error, Worker, HttpDb, Params, Callback). + + +process_stream_response(ReqId, Worker, HttpDb, Params, Callback) -> + receive + {ibrowse_async_headers, ReqId, Code, Headers} -> + case list_to_integer(Code) of + Ok when Ok =:= 200 ; Ok =:= 201 ; (Ok >= 400 andalso Ok < 500) -> + StreamDataFun = fun() -> + stream_data_self(HttpDb, Params, Worker, ReqId, Callback) + end, + ibrowse:stream_next(ReqId), + try + Ret = Callback(Ok, Headers, StreamDataFun), + release_worker(Worker, HttpDb), + clean_mailbox_req(ReqId), + Ret + catch throw:{maybe_retry_req, Err} -> + clean_mailbox_req(ReqId), + maybe_retry(Err, Worker, HttpDb, Params, Callback) + end; + R when R =:= 301 ; R =:= 302 ; R =:= 303 -> + do_redirect(Worker, R, Headers, HttpDb, Params, Callback); + Error -> + report_error(Worker, HttpDb, Params, {code, Error}) + end; + {ibrowse_async_response, ReqId, {error, _} = Error} -> + maybe_retry(Error, Worker, HttpDb, Params, Callback) + after HttpDb#httpdb.timeout + 500 -> + % Note: ibrowse should always reply with timeouts, but this doesn't + % seem to be always true when there's a very high rate of requests + % and many open connections. + maybe_retry(timeout, Worker, HttpDb, Params, Callback) + end. + + +clean_mailbox_req(ReqId) -> + receive + {ibrowse_async_response, ReqId, _} -> + clean_mailbox_req(ReqId); + {ibrowse_async_response_end, ReqId} -> + clean_mailbox_req(ReqId) + after 0 -> + ok + end. + + +release_worker(Worker, #httpdb{httpc_pool = Pool}) -> + ok = couch_replicator_httpc_pool:release_worker(Pool, Worker). + + +maybe_retry(Error, Worker, #httpdb{retries = 0} = HttpDb, Params, _Cb) -> + report_error(Worker, HttpDb, Params, {error, Error}); + +maybe_retry(Error, Worker, #httpdb{retries = Retries, wait = Wait} = HttpDb, + Params, Cb) -> + release_worker(Worker, HttpDb), + Method = string:to_upper(atom_to_list(get_value(method, Params, get))), + Url = couch_util:url_strip_password(full_url(HttpDb, Params)), + ?LOG_INFO("Retrying ~s request to ~s in ~p seconds due to error ~s", + [Method, Url, Wait / 1000, error_cause(Error)]), + ok = timer:sleep(Wait), + Wait2 = erlang:min(Wait * 2, ?MAX_WAIT), + send_req(HttpDb#httpdb{retries = Retries - 1, wait = Wait2}, Params, Cb). + + +report_error(Worker, HttpDb, Params, Error) -> + Method = string:to_upper(atom_to_list(get_value(method, Params, get))), + Url = couch_util:url_strip_password(full_url(HttpDb, Params)), + do_report_error(Url, Method, Error), + release_worker(Worker, HttpDb), + exit({http_request_failed, Method, Url, Error}). + + +do_report_error(Url, Method, {code, Code}) -> + ?LOG_ERROR("Replicator, request ~s to ~p failed. The received " + "HTTP error code is ~p", [Method, Url, Code]); + +do_report_error(FullUrl, Method, Error) -> + ?LOG_ERROR("Replicator, request ~s to ~p failed due to error ~s", + [Method, FullUrl, error_cause(Error)]). + + +error_cause({error, Cause}) -> + lists:flatten(io_lib:format("~p", [Cause])); +error_cause(Cause) -> + lists:flatten(io_lib:format("~p", [Cause])). + + +stream_data_self(#httpdb{timeout = T} = HttpDb, Params, Worker, ReqId, Cb) -> + receive + {ibrowse_async_response, ReqId, {error, Error}} -> + throw({maybe_retry_req, Error}); + {ibrowse_async_response, ReqId, <<>>} -> + ibrowse:stream_next(ReqId), + stream_data_self(HttpDb, Params, Worker, ReqId, Cb); + {ibrowse_async_response, ReqId, Data} -> + ibrowse:stream_next(ReqId), + {Data, fun() -> stream_data_self(HttpDb, Params, Worker, ReqId, Cb) end}; + {ibrowse_async_response_end, ReqId} -> + {<<>>, fun() -> throw({maybe_retry_req, more_data_expected}) end} + after T + 500 -> + % Note: ibrowse should always reply with timeouts, but this doesn't + % seem to be always true when there's a very high rate of requests + % and many open connections. + throw({maybe_retry_req, timeout}) + end. + + +full_url(#httpdb{url = BaseUrl}, Params) -> + Path = get_value(path, Params, []), + QueryArgs = get_value(qs, Params, []), + BaseUrl ++ Path ++ query_args_to_string(QueryArgs, []). + + +query_args_to_string([], []) -> + ""; +query_args_to_string([], Acc) -> + "?" ++ string:join(lists:reverse(Acc), "&"); +query_args_to_string([{K, V} | Rest], Acc) -> + query_args_to_string(Rest, [K ++ "=" ++ couch_httpd:quote(V) | Acc]). + + +oauth_header(#httpdb{oauth = nil}, _ConnParams) -> + []; +oauth_header(#httpdb{url = BaseUrl, oauth = OAuth}, ConnParams) -> + Consumer = { + OAuth#oauth.consumer_key, + OAuth#oauth.consumer_secret, + OAuth#oauth.signature_method + }, + Method = case get_value(method, ConnParams, get) of + get -> "GET"; + post -> "POST"; + put -> "PUT"; + head -> "HEAD" + end, + QSL = get_value(qs, ConnParams, []), + OAuthParams = oauth:signed_params(Method, + BaseUrl ++ get_value(path, ConnParams, []), + QSL, Consumer, OAuth#oauth.token, OAuth#oauth.token_secret) -- QSL, + [{"Authorization", + "OAuth " ++ oauth_uri:params_to_header_string(OAuthParams)}]. + + +do_redirect(Worker, Code, Headers, #httpdb{url = Url} = HttpDb, Params, Cb) -> + release_worker(Worker, HttpDb), + RedirectUrl = redirect_url(Headers, Url), + {HttpDb2, Params2} = after_redirect(RedirectUrl, Code, HttpDb, Params), + send_req(HttpDb2, Params2, Cb). + + +redirect_url(RespHeaders, OrigUrl) -> + MochiHeaders = mochiweb_headers:make(RespHeaders), + RedUrl = mochiweb_headers:get_value("Location", MochiHeaders), + #url{ + host = Host, + host_type = HostType, + port = Port, + path = Path, % includes query string + protocol = Proto + } = ibrowse_lib:parse_url(RedUrl), + #url{ + username = User, + password = Passwd + } = ibrowse_lib:parse_url(OrigUrl), + Creds = case is_list(User) andalso is_list(Passwd) of + true -> + User ++ ":" ++ Passwd ++ "@"; + false -> + [] + end, + HostPart = case HostType of + ipv6_address -> + "[" ++ Host ++ "]"; + _ -> + Host + end, + atom_to_list(Proto) ++ "://" ++ Creds ++ HostPart ++ ":" ++ + integer_to_list(Port) ++ Path. + +after_redirect(RedirectUrl, 303, HttpDb, Params) -> + after_redirect(RedirectUrl, HttpDb, ?replace(Params, method, get)); +after_redirect(RedirectUrl, _Code, HttpDb, Params) -> + after_redirect(RedirectUrl, HttpDb, Params). + +after_redirect(RedirectUrl, HttpDb, Params) -> + Params2 = lists:keydelete(path, 1, lists:keydelete(qs, 1, Params)), + {HttpDb#httpdb{url = RedirectUrl}, Params2}. http://git-wip-us.apache.org/repos/asf/couchdb/blob/f913ca6e/src/couch_replicator/src/couch_replicator_httpc_pool.erl ---------------------------------------------------------------------- diff --git a/src/couch_replicator/src/couch_replicator_httpc_pool.erl b/src/couch_replicator/src/couch_replicator_httpc_pool.erl new file mode 100644 index 0000000..b065b7c --- /dev/null +++ b/src/couch_replicator/src/couch_replicator_httpc_pool.erl @@ -0,0 +1,138 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_replicator_httpc_pool). +-behaviour(gen_server). + +% public API +-export([start_link/2, stop/1]). +-export([get_worker/1, release_worker/2]). + +% gen_server API +-export([init/1, handle_call/3, handle_info/2, handle_cast/2]). +-export([code_change/3, terminate/2]). + +-include("couch_db.hrl"). + +-import(couch_util, [ + get_value/2, + get_value/3 +]). + +-record(state, { + url, + limit, % max # of workers allowed + free = [], % free workers (connections) + busy = [], % busy workers (connections) + waiting = queue:new() % blocked clients waiting for a worker +}). + + +start_link(Url, Options) -> + gen_server:start_link(?MODULE, {Url, Options}, []). + + +stop(Pool) -> + ok = gen_server:call(Pool, stop, infinity). + + +get_worker(Pool) -> + {ok, _Worker} = gen_server:call(Pool, get_worker, infinity). + + +release_worker(Pool, Worker) -> + ok = gen_server:cast(Pool, {release_worker, Worker}). + + +init({Url, Options}) -> + process_flag(trap_exit, true), + State = #state{ + url = Url, + limit = get_value(max_connections, Options) + }, + {ok, State}. + + +handle_call(get_worker, From, #state{waiting = Waiting} = State) -> + #state{url = Url, limit = Limit, busy = Busy, free = Free} = State, + case length(Busy) >= Limit of + true -> + {noreply, State#state{waiting = queue:in(From, Waiting)}}; + false -> + case Free of + [] -> + {ok, Worker} = ibrowse:spawn_link_worker_process(Url), + Free2 = Free; + [Worker | Free2] -> + ok + end, + NewState = State#state{free = Free2, busy = [Worker | Busy]}, + {reply, {ok, Worker}, NewState} + end; + +handle_call(stop, _From, State) -> + {stop, normal, ok, State}. + + +handle_cast({release_worker, Worker}, #state{waiting = Waiting} = State) -> + case is_process_alive(Worker) andalso + lists:member(Worker, State#state.busy) of + true -> + case queue:out(Waiting) of + {empty, Waiting2} -> + Busy2 = State#state.busy -- [Worker], + Free2 = [Worker | State#state.free]; + {{value, From}, Waiting2} -> + gen_server:reply(From, {ok, Worker}), + Busy2 = State#state.busy, + Free2 = State#state.free + end, + NewState = State#state{ + busy = Busy2, + free = Free2, + waiting = Waiting2 + }, + {noreply, NewState}; + false -> + {noreply, State} + end. + + +handle_info({'EXIT', Pid, _Reason}, #state{busy = Busy, free = Free} = State) -> + case Free -- [Pid] of + Free -> + case Busy -- [Pid] of + Busy -> + {noreply, State}; + Busy2 -> + case queue:out(State#state.waiting) of + {empty, _} -> + {noreply, State#state{busy = Busy2}}; + {{value, From}, Waiting2} -> + {ok, Worker} = ibrowse:spawn_link_worker_process(State#state.url), + gen_server:reply(From, {ok, Worker}), + {noreply, State#state{busy = [Worker | Busy2], waiting = Waiting2}} + end + end; + Free2 -> + {noreply, State#state{free = Free2}} + end. + + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + + +terminate(_Reason, State) -> + lists:foreach(fun ibrowse_http_client:stop/1, State#state.free), + lists:foreach(fun ibrowse_http_client:stop/1, State#state.busy). + http://git-wip-us.apache.org/repos/asf/couchdb/blob/f913ca6e/src/couch_replicator/src/couch_replicator_httpd.erl ---------------------------------------------------------------------- diff --git a/src/couch_replicator/src/couch_replicator_httpd.erl b/src/couch_replicator/src/couch_replicator_httpd.erl new file mode 100644 index 0000000..eccf885 --- /dev/null +++ b/src/couch_replicator/src/couch_replicator_httpd.erl @@ -0,0 +1,66 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_replicator_httpd). + +-include("couch_db.hrl"). + +-import(couch_httpd, [ + send_json/2, + send_json/3, + send_method_not_allowed/2 +]). + +-import(couch_util, [ + to_binary/1 +]). + +-export([handle_req/1]). + + +handle_req(#httpd{method = 'POST', user_ctx = UserCtx} = Req) -> + couch_httpd:validate_ctype(Req, "application/json"), + RepDoc = {Props} = couch_httpd:json_body_obj(Req), + validate_rep_props(Props), + {ok, Rep} = couch_replicator_utils:parse_rep_doc(RepDoc, UserCtx), + case couch_replicator:replicate(Rep) of + {error, {Error, Reason}} -> + send_json( + Req, 404, + {[{error, to_binary(Error)}, {reason, to_binary(Reason)}]}); + {error, not_found} -> + % Tried to cancel a replication that didn't exist. + send_json(Req, 404, {[{error, <<"not found">>}]}); + {error, Reason} -> + send_json(Req, 500, {[{error, to_binary(Reason)}]}); + {ok, {cancelled, RepId}} -> + send_json(Req, 200, {[{ok, true}, {<<"_local_id">>, RepId}]}); + {ok, {continuous, RepId}} -> + send_json(Req, 202, {[{ok, true}, {<<"_local_id">>, RepId}]}); + {ok, {HistoryResults}} -> + send_json(Req, {[{ok, true} | HistoryResults]}) + end; + +handle_req(Req) -> + send_method_not_allowed(Req, "POST"). + +validate_rep_props([]) -> + ok; +validate_rep_props([{<<"query_params">>, {Params}}|Rest]) -> + lists:foreach(fun + ({_,V}) when is_binary(V) -> ok; + ({K,_}) -> throw({bad_request, + <>}) + end, Params), + validate_rep_props(Rest); +validate_rep_props([_|Rest]) -> + validate_rep_props(Rest). http://git-wip-us.apache.org/repos/asf/couchdb/blob/f913ca6e/src/couch_replicator/src/couch_replicator_job_sup.erl ---------------------------------------------------------------------- diff --git a/src/couch_replicator/src/couch_replicator_job_sup.erl b/src/couch_replicator/src/couch_replicator_job_sup.erl new file mode 100644 index 0000000..e8a7b96 --- /dev/null +++ b/src/couch_replicator/src/couch_replicator_job_sup.erl @@ -0,0 +1,31 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_replicator_job_sup). +-behaviour(supervisor). +-export([init/1, start_link/0]). + +-include("couch_db.hrl"). + +start_link() -> + supervisor:start_link({local,?MODULE}, ?MODULE, []). + +%%============================================================================= +%% supervisor callbacks +%%============================================================================= + +init([]) -> + {ok, {{one_for_one, 3, 10}, []}}. + +%%============================================================================= +%% internal functions +%%============================================================================= http://git-wip-us.apache.org/repos/asf/couchdb/blob/f913ca6e/src/couch_replicator/src/couch_replicator_manager.erl ---------------------------------------------------------------------- diff --git a/src/couch_replicator/src/couch_replicator_manager.erl b/src/couch_replicator/src/couch_replicator_manager.erl new file mode 100644 index 0000000..c61b27b --- /dev/null +++ b/src/couch_replicator/src/couch_replicator_manager.erl @@ -0,0 +1,626 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_replicator_manager). +-behaviour(gen_server). + +% public API +-export([replication_started/1, replication_completed/1, replication_error/2]). + +% gen_server callbacks +-export([start_link/0, init/1, handle_call/3, handle_info/2, handle_cast/2]). +-export([code_change/3, terminate/2]). + +-include("couch_db.hrl"). +-include("couch_replicator.hrl"). +-include("couch_replicator_js_functions.hrl"). + +-define(DOC_TO_REP, couch_rep_doc_id_to_rep_id). +-define(REP_TO_STATE, couch_rep_id_to_rep_state). +-define(INITIAL_WAIT, 2.5). % seconds +-define(MAX_WAIT, 600). % seconds + +-record(rep_state, { + rep, + starting, + retries_left, + max_retries, + wait = ?INITIAL_WAIT +}). + +-import(couch_util, [ + get_value/2, + get_value/3, + to_binary/1 +]). + +-record(state, { + changes_feed_loop = nil, + db_notifier = nil, + rep_db_name = nil, + rep_start_pids = [], + max_retries +}). + + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + + +replication_started(#rep{id = {BaseId, _} = RepId}) -> + case rep_state(RepId) of + nil -> + ok; + #rep_state{rep = #rep{doc_id = DocId}} -> + update_rep_doc(DocId, [ + {<<"_replication_state">>, <<"triggered">>}, + {<<"_replication_id">>, ?l2b(BaseId)}]), + ok = gen_server:call(?MODULE, {rep_started, RepId}, infinity), + ?LOG_INFO("Document `~s` triggered replication `~s`", + [DocId, pp_rep_id(RepId)]) + end. + + +replication_completed(#rep{id = RepId}) -> + case rep_state(RepId) of + nil -> + ok; + #rep_state{rep = #rep{doc_id = DocId}} -> + update_rep_doc(DocId, [{<<"_replication_state">>, <<"completed">>}]), + ok = gen_server:call(?MODULE, {rep_complete, RepId}, infinity), + ?LOG_INFO("Replication `~s` finished (triggered by document `~s`)", + [pp_rep_id(RepId), DocId]) + end. + + +replication_error(#rep{id = {BaseId, _} = RepId}, Error) -> + case rep_state(RepId) of + nil -> + ok; + #rep_state{rep = #rep{doc_id = DocId}} -> + % TODO: maybe add error reason to replication document + update_rep_doc(DocId, [ + {<<"_replication_state">>, <<"error">>}, + {<<"_replication_id">>, ?l2b(BaseId)}]), + ok = gen_server:call(?MODULE, {rep_error, RepId, Error}, infinity) + end. + + +init(_) -> + process_flag(trap_exit, true), + ?DOC_TO_REP = ets:new(?DOC_TO_REP, [named_table, set, protected]), + ?REP_TO_STATE = ets:new(?REP_TO_STATE, [named_table, set, protected]), + Server = self(), + ok = couch_config:register( + fun("replicator", "db", NewName) -> + ok = gen_server:cast(Server, {rep_db_changed, ?l2b(NewName)}); + ("replicator", "max_replication_retry_count", V) -> + ok = gen_server:cast(Server, {set_max_retries, retries_value(V)}) + end + ), + {Loop, RepDbName} = changes_feed_loop(), + {ok, #state{ + changes_feed_loop = Loop, + rep_db_name = RepDbName, + db_notifier = db_update_notifier(), + max_retries = retries_value( + couch_config:get("replicator", "max_replication_retry_count", "10")) + }}. + + +handle_call({rep_db_update, {ChangeProps} = Change}, _From, State) -> + NewState = try + process_update(State, Change) + catch + _Tag:Error -> + {RepProps} = get_value(doc, ChangeProps), + DocId = get_value(<<"_id">>, RepProps), + rep_db_update_error(Error, DocId), + State + end, + {reply, ok, NewState}; + + +handle_call({rep_started, RepId}, _From, State) -> + case rep_state(RepId) of + nil -> + ok; + RepState -> + NewRepState = RepState#rep_state{ + starting = false, + retries_left = State#state.max_retries, + max_retries = State#state.max_retries, + wait = ?INITIAL_WAIT + }, + true = ets:insert(?REP_TO_STATE, {RepId, NewRepState}) + end, + {reply, ok, State}; + +handle_call({rep_complete, RepId}, _From, State) -> + true = ets:delete(?REP_TO_STATE, RepId), + {reply, ok, State}; + +handle_call({rep_error, RepId, Error}, _From, State) -> + {reply, ok, replication_error(State, RepId, Error)}; + +handle_call(Msg, From, State) -> + ?LOG_ERROR("Replication manager received unexpected call ~p from ~p", + [Msg, From]), + {stop, {error, {unexpected_call, Msg}}, State}. + + +handle_cast({rep_db_changed, NewName}, #state{rep_db_name = NewName} = State) -> + {noreply, State}; + +handle_cast({rep_db_changed, _NewName}, State) -> + {noreply, restart(State)}; + +handle_cast({rep_db_created, NewName}, #state{rep_db_name = NewName} = State) -> + {noreply, State}; + +handle_cast({rep_db_created, _NewName}, State) -> + {noreply, restart(State)}; + +handle_cast({set_max_retries, MaxRetries}, State) -> + {noreply, State#state{max_retries = MaxRetries}}; + +handle_cast(Msg, State) -> + ?LOG_ERROR("Replication manager received unexpected cast ~p", [Msg]), + {stop, {error, {unexpected_cast, Msg}}, State}. + + +handle_info({'EXIT', From, normal}, #state{changes_feed_loop = From} = State) -> + % replicator DB deleted + {noreply, State#state{changes_feed_loop = nil, rep_db_name = nil}}; + +handle_info({'EXIT', From, Reason}, #state{db_notifier = From} = State) -> + ?LOG_ERROR("Database update notifier died. Reason: ~p", [Reason]), + {stop, {db_update_notifier_died, Reason}, State}; + +handle_info({'EXIT', From, normal}, #state{rep_start_pids = Pids} = State) -> + % one of the replication start processes terminated successfully + {noreply, State#state{rep_start_pids = Pids -- [From]}}; + +handle_info({'DOWN', _Ref, _, _, _}, State) -> + % From a db monitor created by a replication process. Ignore. + {noreply, State}; + +handle_info(Msg, State) -> + ?LOG_ERROR("Replication manager received unexpected message ~p", [Msg]), + {stop, {unexpected_msg, Msg}, State}. + + +terminate(_Reason, State) -> + #state{ + rep_start_pids = StartPids, + changes_feed_loop = Loop, + db_notifier = DbNotifier + } = State, + stop_all_replications(), + lists:foreach( + fun(Pid) -> + catch unlink(Pid), + catch exit(Pid, stop) + end, + [Loop | StartPids]), + true = ets:delete(?REP_TO_STATE), + true = ets:delete(?DOC_TO_REP), + couch_db_update_notifier:stop(DbNotifier). + + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + + +changes_feed_loop() -> + {ok, RepDb} = ensure_rep_db_exists(), + RepDbName = couch_db:name(RepDb), + couch_db:close(RepDb), + Server = self(), + Pid = spawn_link( + fun() -> + DbOpenOptions = [{user_ctx, RepDb#db.user_ctx}, sys_db], + {ok, Db} = couch_db:open_int(RepDbName, DbOpenOptions), + ChangesFeedFun = couch_changes:handle_changes( + #changes_args{ + include_docs = true, + feed = "continuous", + timeout = infinity, + db_open_options = [sys_db] + }, + {json_req, null}, + Db + ), + ChangesFeedFun( + fun({change, Change, _}, _) -> + case has_valid_rep_id(Change) of + true -> + ok = gen_server:call( + Server, {rep_db_update, Change}, infinity); + false -> + ok + end; + (_, _) -> + ok + end + ), + couch_db:close(Db) + end + ), + {Pid, RepDbName}. + + +has_valid_rep_id({Change}) -> + has_valid_rep_id(get_value(<<"id">>, Change)); +has_valid_rep_id(<>) -> + false; +has_valid_rep_id(_Else) -> + true. + + +db_update_notifier() -> + Server = self(), + {ok, Notifier} = couch_db_update_notifier:start_link( + fun({created, DbName}) -> + case ?l2b(couch_config:get("replicator", "db", "_replicator")) of + DbName -> + ok = gen_server:cast(Server, {rep_db_created, DbName}); + _ -> + ok + end; + (_) -> + % no need to handle the 'deleted' event - the changes feed loop + % dies when the database is deleted + ok + end + ), + Notifier. + + +restart(#state{changes_feed_loop = Loop, rep_start_pids = StartPids} = State) -> + stop_all_replications(), + lists:foreach( + fun(Pid) -> + catch unlink(Pid), + catch exit(Pid, rep_db_changed) + end, + [Loop | StartPids]), + {NewLoop, NewRepDbName} = changes_feed_loop(), + State#state{ + changes_feed_loop = NewLoop, + rep_db_name = NewRepDbName, + rep_start_pids = [] + }. + + +process_update(State, {Change}) -> + {RepProps} = JsonRepDoc = get_value(doc, Change), + DocId = get_value(<<"_id">>, RepProps), + case get_value(<<"deleted">>, Change, false) of + true -> + rep_doc_deleted(DocId), + State; + false -> + case get_value(<<"_replication_state">>, RepProps) of + undefined -> + maybe_start_replication(State, DocId, JsonRepDoc); + <<"triggered">> -> + maybe_start_replication(State, DocId, JsonRepDoc); + <<"completed">> -> + replication_complete(DocId), + State; + <<"error">> -> + case ets:lookup(?DOC_TO_REP, DocId) of + [] -> + maybe_start_replication(State, DocId, JsonRepDoc); + _ -> + State + end + end + end. + + +rep_db_update_error(Error, DocId) -> + case Error of + {bad_rep_doc, Reason} -> + ok; + _ -> + Reason = to_binary(Error) + end, + ?LOG_ERROR("Replication manager, error processing document `~s`: ~s", + [DocId, Reason]), + update_rep_doc(DocId, [{<<"_replication_state">>, <<"error">>}]). + + +rep_user_ctx({RepDoc}) -> + case get_value(<<"user_ctx">>, RepDoc) of + undefined -> + #user_ctx{}; + {UserCtx} -> + #user_ctx{ + name = get_value(<<"name">>, UserCtx, null), + roles = get_value(<<"roles">>, UserCtx, []) + } + end. + + +maybe_start_replication(State, DocId, RepDoc) -> + #rep{id = {BaseId, _} = RepId} = Rep = parse_rep_doc(RepDoc), + case rep_state(RepId) of + nil -> + RepState = #rep_state{ + rep = Rep, + starting = true, + retries_left = State#state.max_retries, + max_retries = State#state.max_retries + }, + true = ets:insert(?REP_TO_STATE, {RepId, RepState}), + true = ets:insert(?DOC_TO_REP, {DocId, RepId}), + ?LOG_INFO("Attempting to start replication `~s` (document `~s`).", + [pp_rep_id(RepId), DocId]), + Pid = spawn_link(fun() -> start_replication(Rep, 0) end), + State#state{rep_start_pids = [Pid | State#state.rep_start_pids]}; + #rep_state{rep = #rep{doc_id = DocId}} -> + State; + #rep_state{starting = false, rep = #rep{doc_id = OtherDocId}} -> + ?LOG_INFO("The replication specified by the document `~s` was already" + " triggered by the document `~s`", [DocId, OtherDocId]), + maybe_tag_rep_doc(DocId, RepDoc, ?l2b(BaseId)), + State; + #rep_state{starting = true, rep = #rep{doc_id = OtherDocId}} -> + ?LOG_INFO("The replication specified by the document `~s` is already" + " being triggered by the document `~s`", [DocId, OtherDocId]), + maybe_tag_rep_doc(DocId, RepDoc, ?l2b(BaseId)), + State + end. + + +parse_rep_doc(RepDoc) -> + {ok, Rep} = try + couch_replicator_utils:parse_rep_doc(RepDoc, rep_user_ctx(RepDoc)) + catch + throw:{error, Reason} -> + throw({bad_rep_doc, Reason}); + Tag:Err -> + throw({bad_rep_doc, to_binary({Tag, Err})}) + end, + Rep. + + +maybe_tag_rep_doc(DocId, {RepProps}, RepId) -> + case get_value(<<"_replication_id">>, RepProps) of + RepId -> + ok; + _ -> + update_rep_doc(DocId, [{<<"_replication_id">>, RepId}]) + end. + + +start_replication(Rep, Wait) -> + ok = timer:sleep(Wait * 1000), + case (catch couch_replicator:async_replicate(Rep)) of + {ok, _} -> + ok; + Error -> + replication_error(Rep, Error) + end. + + +replication_complete(DocId) -> + case ets:lookup(?DOC_TO_REP, DocId) of + [{DocId, {BaseId, Ext} = RepId}] -> + case rep_state(RepId) of + nil -> + % Prior to OTP R14B02, temporary child specs remain in + % in the supervisor after a worker finishes - remove them. + % We want to be able to start the same replication but with + % eventually different values for parameters that don't + % contribute to its ID calculation. + _ = supervisor:delete_child(couch_replicator_job_sup, BaseId ++ Ext); + #rep_state{} -> + ok + end, + true = ets:delete(?DOC_TO_REP, DocId); + _ -> + ok + end. + + +rep_doc_deleted(DocId) -> + case ets:lookup(?DOC_TO_REP, DocId) of + [{DocId, RepId}] -> + couch_replicator:cancel_replication(RepId), + true = ets:delete(?REP_TO_STATE, RepId), + true = ets:delete(?DOC_TO_REP, DocId), + ?LOG_INFO("Stopped replication `~s` because replication document `~s`" + " was deleted", [pp_rep_id(RepId), DocId]); + [] -> + ok + end. + + +replication_error(State, RepId, Error) -> + case rep_state(RepId) of + nil -> + State; + RepState -> + maybe_retry_replication(RepState, Error, State) + end. + +maybe_retry_replication(#rep_state{retries_left = 0} = RepState, Error, State) -> + #rep_state{ + rep = #rep{id = RepId, doc_id = DocId}, + max_retries = MaxRetries + } = RepState, + couch_replicator:cancel_replication(RepId), + true = ets:delete(?REP_TO_STATE, RepId), + true = ets:delete(?DOC_TO_REP, DocId), + ?LOG_ERROR("Error in replication `~s` (triggered by document `~s`): ~s" + "~nReached maximum retry attempts (~p).", + [pp_rep_id(RepId), DocId, to_binary(error_reason(Error)), MaxRetries]), + State; + +maybe_retry_replication(RepState, Error, State) -> + #rep_state{ + rep = #rep{id = RepId, doc_id = DocId} = Rep + } = RepState, + #rep_state{wait = Wait} = NewRepState = state_after_error(RepState), + true = ets:insert(?REP_TO_STATE, {RepId, NewRepState}), + ?LOG_ERROR("Error in replication `~s` (triggered by document `~s`): ~s" + "~nRestarting replication in ~p seconds.", + [pp_rep_id(RepId), DocId, to_binary(error_reason(Error)), Wait]), + Pid = spawn_link(fun() -> start_replication(Rep, Wait) end), + State#state{rep_start_pids = [Pid | State#state.rep_start_pids]}. + + +stop_all_replications() -> + ?LOG_INFO("Stopping all ongoing replications because the replicator" + " database was deleted or changed", []), + ets:foldl( + fun({_, RepId}, _) -> + couch_replicator:cancel_replication(RepId) + end, + ok, ?DOC_TO_REP), + true = ets:delete_all_objects(?REP_TO_STATE), + true = ets:delete_all_objects(?DOC_TO_REP). + + +update_rep_doc(RepDocId, KVs) -> + {ok, RepDb} = ensure_rep_db_exists(), + try + case couch_db:open_doc(RepDb, RepDocId, [ejson_body]) of + {ok, LatestRepDoc} -> + update_rep_doc(RepDb, LatestRepDoc, KVs); + _ -> + ok + end + catch throw:conflict -> + % Shouldn't happen, as by default only the role _replicator can + % update replication documents. + ?LOG_ERROR("Conflict error when updating replication document `~s`." + " Retrying.", [RepDocId]), + ok = timer:sleep(5), + update_rep_doc(RepDocId, KVs) + after + couch_db:close(RepDb) + end. + +update_rep_doc(RepDb, #doc{body = {RepDocBody}} = RepDoc, KVs) -> + NewRepDocBody = lists:foldl( + fun({<<"_replication_state">> = K, State} = KV, Body) -> + case get_value(K, Body) of + State -> + Body; + _ -> + Body1 = lists:keystore(K, 1, Body, KV), + lists:keystore( + <<"_replication_state_time">>, 1, Body1, + {<<"_replication_state_time">>, timestamp()}) + end; + ({K, _V} = KV, Body) -> + lists:keystore(K, 1, Body, KV) + end, + RepDocBody, KVs), + case NewRepDocBody of + RepDocBody -> + ok; + _ -> + % Might not succeed - when the replication doc is deleted right + % before this update (not an error, ignore). + couch_db:update_doc(RepDb, RepDoc#doc{body = {NewRepDocBody}}, []) + end. + + +% RFC3339 timestamps. +% Note: doesn't include the time seconds fraction (RFC3339 says it's optional). +timestamp() -> + {{Year, Month, Day}, {Hour, Min, Sec}} = calendar:now_to_local_time(now()), + UTime = erlang:universaltime(), + LocalTime = calendar:universal_time_to_local_time(UTime), + DiffSecs = calendar:datetime_to_gregorian_seconds(LocalTime) - + calendar:datetime_to_gregorian_seconds(UTime), + zone(DiffSecs div 3600, (DiffSecs rem 3600) div 60), + iolist_to_binary( + io_lib:format("~4..0w-~2..0w-~2..0wT~2..0w:~2..0w:~2..0w~s", + [Year, Month, Day, Hour, Min, Sec, + zone(DiffSecs div 3600, (DiffSecs rem 3600) div 60)])). + +zone(Hr, Min) when Hr >= 0, Min >= 0 -> + io_lib:format("+~2..0w:~2..0w", [Hr, Min]); +zone(Hr, Min) -> + io_lib:format("-~2..0w:~2..0w", [abs(Hr), abs(Min)]). + + + +ensure_rep_db_exists() -> + DbName = ?l2b(couch_config:get("replicator", "db", "_replicator")), + UserCtx = #user_ctx{roles = [<<"_admin">>, <<"_replicator">>]}, + case couch_db:open_int(DbName, [sys_db, {user_ctx, UserCtx}]) of + {ok, Db} -> + Db; + _Error -> + {ok, Db} = couch_db:create(DbName, [sys_db, {user_ctx, UserCtx}]) + end, + ensure_rep_ddoc_exists(Db, <<"_design/_replicator">>), + {ok, Db}. + + +ensure_rep_ddoc_exists(RepDb, DDocID) -> + case couch_db:open_doc(RepDb, DDocID, []) of + {ok, _Doc} -> + ok; + _ -> + DDoc = couch_doc:from_json_obj({[ + {<<"_id">>, DDocID}, + {<<"language">>, <<"javascript">>}, + {<<"validate_doc_update">>, ?REP_DB_DOC_VALIDATE_FUN} + ]}), + {ok, _Rev} = couch_db:update_doc(RepDb, DDoc, []) + end. + + +% pretty-print replication id +pp_rep_id(#rep{id = RepId}) -> + pp_rep_id(RepId); +pp_rep_id({Base, Extension}) -> + Base ++ Extension. + + +rep_state(RepId) -> + case ets:lookup(?REP_TO_STATE, RepId) of + [{RepId, RepState}] -> + RepState; + [] -> + nil + end. + + +error_reason({error, Reason}) -> + Reason; +error_reason(Reason) -> + Reason. + + +retries_value("infinity") -> + infinity; +retries_value(Value) -> + list_to_integer(Value). + + +state_after_error(#rep_state{retries_left = Left, wait = Wait} = State) -> + Wait2 = erlang:min(trunc(Wait * 2), ?MAX_WAIT), + case Left of + infinity -> + State#rep_state{wait = Wait2}; + _ -> + State#rep_state{retries_left = Left - 1, wait = Wait2} + end. http://git-wip-us.apache.org/repos/asf/couchdb/blob/f913ca6e/src/couch_replicator/src/couch_replicator_notifier.erl ---------------------------------------------------------------------- diff --git a/src/couch_replicator/src/couch_replicator_notifier.erl b/src/couch_replicator/src/couch_replicator_notifier.erl new file mode 100644 index 0000000..3b48b71 --- /dev/null +++ b/src/couch_replicator/src/couch_replicator_notifier.erl @@ -0,0 +1,57 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_replicator_notifier). + +-behaviour(gen_event). + +% public API +-export([start_link/1, stop/1, notify/1]). + +% gen_event callbacks +-export([init/1, terminate/2, code_change/3]). +-export([handle_event/2, handle_call/2, handle_info/2]). + +-include("couch_db.hrl"). + +start_link(FunAcc) -> + couch_event_sup:start_link(couch_replication, + {couch_replicator_notifier, make_ref()}, FunAcc). + +notify(Event) -> + gen_event:notify(couch_replication, Event). + +stop(Pid) -> + couch_event_sup:stop(Pid). + + +init(FunAcc) -> + {ok, FunAcc}. + +terminate(_Reason, _State) -> + ok. + +handle_event(Event, Fun) when is_function(Fun, 1) -> + Fun(Event), + {ok, Fun}; +handle_event(Event, {Fun, Acc}) when is_function(Fun, 2) -> + Acc2 = Fun(Event, Acc), + {ok, {Fun, Acc2}}. + +handle_call(_Msg, State) -> + {reply, ok, State}. + +handle_info(_Msg, State) -> + {ok, State}. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. http://git-wip-us.apache.org/repos/asf/couchdb/blob/f913ca6e/src/couch_replicator/src/couch_replicator_utils.erl ---------------------------------------------------------------------- diff --git a/src/couch_replicator/src/couch_replicator_utils.erl b/src/couch_replicator/src/couch_replicator_utils.erl index 6cc4db8..d4fc0e7 100644 --- a/src/couch_replicator/src/couch_replicator_utils.erl +++ b/src/couch_replicator/src/couch_replicator_utils.erl @@ -19,7 +19,7 @@ -export([sum_stats/2]). -include("couch_db.hrl"). --include("couch_api_wrap.hrl"). +-include("couch_replicator_api_wrap.hrl"). -include("couch_replicator.hrl"). -include("../ibrowse/ibrowse.hrl"). @@ -109,23 +109,23 @@ filter_code(Filter, Source, UserCtx) -> _ -> throw({error, <<"Invalid filter. Must match `ddocname/filtername`.">>}) end, - Db = case (catch couch_api_wrap:db_open(Source, [{user_ctx, UserCtx}])) of + Db = case (catch couch_replicator_api_wrap:db_open(Source, [{user_ctx, UserCtx}])) of {ok, Db0} -> Db0; DbError -> DbErrorMsg = io_lib:format("Could not open source database `~s`: ~s", - [couch_api_wrap:db_uri(Source), couch_util:to_binary(DbError)]), + [couch_replicator_api_wrap:db_uri(Source), couch_util:to_binary(DbError)]), throw({error, iolist_to_binary(DbErrorMsg)}) end, try - Body = case (catch couch_api_wrap:open_doc( + Body = case (catch couch_replicator_api_wrap:open_doc( Db, <<"_design/", DDocName/binary>>, [ejson_body])) of {ok, #doc{body = Body0}} -> Body0; DocError -> DocErrorMsg = io_lib:format( "Couldn't open document `_design/~s` from source " - "database `~s`: ~s", [DDocName, couch_api_wrap:db_uri(Source), + "database `~s`: ~s", [DDocName, couch_replicator_api_wrap:db_uri(Source), couch_util:to_binary(DocError)]), throw({error, iolist_to_binary(DocErrorMsg)}) end, @@ -133,7 +133,7 @@ filter_code(Filter, Source, UserCtx) -> Body, [<<"filters">>, FilterName]), re:replace(Code, [$^, "\s*(.*?)\s*", $$], "\\1", [{return, binary}]) after - couch_api_wrap:db_close(Db) + couch_replicator_api_wrap:db_close(Db) end.