Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 2CE19200B7A for ; Mon, 5 Sep 2016 15:37:32 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 2B64C160ACB; Mon, 5 Sep 2016 13:37:32 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 4DFDB160ABC for ; Mon, 5 Sep 2016 15:37:31 +0200 (CEST) Received: (qmail 98604 invoked by uid 500); 5 Sep 2016 13:37:27 -0000 Mailing-List: contact commits-help@couchdb.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@couchdb.apache.org Delivered-To: mailing list commits@couchdb.apache.org Received: (qmail 98442 invoked by uid 99); 5 Sep 2016 13:37:27 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 05 Sep 2016 13:37:27 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 3D573E0105; Mon, 5 Sep 2016 13:37:27 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: rnewson@apache.org To: commits@couchdb.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: couch-replicator commit: updated refs/heads/2980-cluster-local-repl to ae7a905 Date: Mon, 5 Sep 2016 13:37:27 +0000 (UTC) archived-at: Mon, 05 Sep 2016 13:37:32 -0000 Repository: couchdb-couch-replicator Updated Branches: refs/heads/2980-cluster-local-repl [created] ae7a90592 support cluster-local source/target WIP Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/ae7a9059 Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/ae7a9059 Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/ae7a9059 Branch: refs/heads/2980-cluster-local-repl Commit: ae7a905925792013371331a38d959f77cdc97cd3 Parents: 0248d23 Author: Robert Newson Authored: Mon Sep 5 12:30:41 2016 +0100 Committer: Robert Newson Committed: Mon Sep 5 14:36:54 2016 +0100 ---------------------------------------------------------------------- src/couch_replicator.erl | 15 +++++++- src/couch_replicator_api_wrap.erl | 70 ++++++++++++++++++++++++++++++++-- src/couch_replicator_api_wrap.hrl | 2 + src/couch_replicator_manager.erl | 2 - 4 files changed, 81 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/ae7a9059/src/couch_replicator.erl ---------------------------------------------------------------------- diff --git a/src/couch_replicator.erl b/src/couch_replicator.erl index 7f0c7ee..ee91b5d 100644 --- a/src/couch_replicator.erl +++ b/src/couch_replicator.erl @@ -639,12 +639,15 @@ cancel_timer(#rep_state{timer = Timer} = State) -> init_state(Rep) -> #rep{ id = {BaseId, _Ext}, - source = Src0, target = Tgt, + source = Src0, target = Tgt0, options = Options, user_ctx = UserCtx, type = Type, view = View } = Rep, + % note if fabric should be used + Src1 = maybe_fabric(Rep, Src0), + Tgt = maybe_fabric(Rep, Tgt0), % Adjust minimum number of http source connections to 2 to avoid deadlock - Src = adjust_maxconn(Src0, BaseId), + Src = adjust_maxconn(Src1, BaseId), {ok, Source} = couch_replicator_api_wrap:db_open(Src, [{user_ctx, UserCtx}]), {ok, Target} = couch_replicator_api_wrap:db_open(Tgt, [{user_ctx, UserCtx}], get_value(create_target, Options, false)), @@ -693,6 +696,14 @@ init_state(Rep) -> }, State#rep_state{timer = start_timer(State)}. +%% annotate "local" dbname with fabric tuple if +%% from a clustered database. +maybe_fabric(#rep{}, #httpdb{} = HttpDb) -> + HttpDb; +maybe_fabric(#rep{db_name = <<"shards/", _/binary>>}, DbName) -> + {fabric, DbName}; +maybe_fabric(#rep{}, DbName) -> + DbName. find_replication_logs(DbList, #rep{id = {BaseId, _}} = Rep) -> LogId = ?l2b(?LOCAL_DOC_PREFIX ++ BaseId), http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/ae7a9059/src/couch_replicator_api_wrap.erl ---------------------------------------------------------------------- diff --git a/src/couch_replicator_api_wrap.erl b/src/couch_replicator_api_wrap.erl index ff6b00c..11dedc6 100644 --- a/src/couch_replicator_api_wrap.erl +++ b/src/couch_replicator_api_wrap.erl @@ -1,4 +1,4 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% licensed under the Apache License, Version 2.0 (the "License"); you may not % use this file except in compliance with the License. You may obtain a copy of % the License at % @@ -62,10 +62,12 @@ db_uri(#httpdb{url = Url}) -> db_uri(#db{name = Name}) -> db_uri(Name); +db_uri(#fabricdb{name = Name}) -> + db_uri(Name); + db_uri(DbName) -> ?b2l(DbName). - db_open(Db, Options) -> db_open(Db, Options, false). @@ -121,9 +123,9 @@ db_open(DbName, Options, Create) -> true -> ok = couch_httpd:verify_is_server_admin( get_value(user_ctx, Options)), - couch_db:create(DbName, Options) + create(DbName, Options) end, - case couch_db:open(DbName, Options) of + case open(DbName, Options) of {error, {illegal_database_name, _}} -> throw({db_not_found, DbName}); {not_found, _Reason} -> @@ -148,6 +150,9 @@ get_db_info(#httpdb{} = Db) -> fun(200, _, {Props}) -> {ok, Props} end); +get_db_info(#fabricdb{name = DbName, user_ctx = UserCtx}) -> + fabric:get_security(DbName, [{user_ctx, UserCtx}]), + fabric:get_db_info(DbName); get_db_info(#db{name = DbName, user_ctx = UserCtx}) -> {ok, Db} = couch_db:open(DbName, [{user_ctx, UserCtx}]), {ok, Info} = couch_db:get_db_info(Db), @@ -171,6 +176,9 @@ get_pending_count(#httpdb{} = Db, Seq) -> send_req(Db, Options, fun(200, _, {Props}) -> {ok, couch_util:get_value(<<"pending">>, Props, null)} end); +get_pending_count(#fabricdb{} = Db, Seq) -> + Args = #changes_args{since=Seq, limit=0, db_open_options=[{user_ctx, Db#fabricdb.user_ctx}]}, + with_fabric(changes, [Db#fabricdb.name, fun pending_callback/2, nil, Args]); get_pending_count(#db{name=DbName}=Db, Seq) when is_number(Seq) -> {ok, CountDb} = couch_db:open(DbName, [{user_ctx, Db#db.user_ctx}]), Pending = couch_db:count_changes_since(CountDb, Seq), @@ -340,6 +348,13 @@ open_doc(#httpdb{} = Db, Id, Options) -> (_, _, {Props}) -> {error, get_value(<<"error">>, Props)} end); +open_doc(#fabricdb{} = Db, Id, Options) -> + case with_fabric(open_doc, [Db#fabricdb.name, Id, Options]) of + {ok, _} = Ok -> + Ok; + {not_found, _Reason} -> + {error, <<"not_found">>} + end; open_doc(Db, Id, Options) -> case couch_db:open_doc(Db, Id, Options) of {ok, _} = Ok -> @@ -972,3 +987,50 @@ header_value(Key, Headers, Default) -> _ -> Default end. + +%% fabric bits + +create({fabric, DbName}, Options) -> + with_fabric(create_db, [DbName, Options]); +create(DbName, Options) -> + couch_db:create(DbName, Options). + +open({fabric, DbName}, Options) -> + with_fabric(get_security, [DbName, Options]), + UserCtx = couch_util:get_value(user_ctx, Options, #user_ctx{}), + {ok, #fabricdb{name=DbName, user_ctx=UserCtx}}; +open(DbName, Options) -> + couch_db:open(DbName, Options). + +handle_db_changes(Args, Req, Db) -> + couch_changes:handle_db_changes(Args, Req, Db). + +pending_callback(start, Acc) -> + {ok, Acc}; +pending_callback({stop, _Seq, Pending}, _Acc) -> + {ok, Pending}. + +with_fabric(F, A) -> + {Pid, Ref} = spawn_monitor(fun() -> + try apply(fabric, F, A) of + Resp -> + exit({exit_ok, Resp}) + catch + throw:Reason -> + exit({exit_throw, Reason}); + error:Reason -> + exit({exit_error, Reason}); + exit:Reason -> + exit({exit_exit, Reason}) + end + end), + receive + {'DOWN', Ref, process, Pid, {exit_ok, Ret}} -> + Ret; + {'DOWN', Ref, process, Pid, {exit_throw, Reason}} -> + throw(Reason); + {'DOWN', Ref, process, Pid, {exit_error, Reason}} -> + erlang:error(Reason); + {'DOWN', Ref, process, Pid, {exit_exit, Reason}} -> + erlang:exit(Reason) + end. http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/ae7a9059/src/couch_replicator_api_wrap.hrl ---------------------------------------------------------------------- diff --git a/src/couch_replicator_api_wrap.hrl b/src/couch_replicator_api_wrap.hrl index eee04da..a29e6a1 100644 --- a/src/couch_replicator_api_wrap.hrl +++ b/src/couch_replicator_api_wrap.hrl @@ -27,6 +27,8 @@ http_connections }). +-record(fabricdb, {name, user_ctx}). + -record(oauth, { consumer_key, token, http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/ae7a9059/src/couch_replicator_manager.erl ---------------------------------------------------------------------- diff --git a/src/couch_replicator_manager.erl b/src/couch_replicator_manager.erl index 953b1bf..f973cf0 100644 --- a/src/couch_replicator_manager.erl +++ b/src/couch_replicator_manager.erl @@ -514,7 +514,6 @@ rep_user_ctx({RepDoc}) -> } end. - maybe_start_replication(State, DbName, DocId, RepDoc) -> #rep{id = {BaseId, _} = RepId} = Rep0 = parse_rep_doc(RepDoc), Rep = Rep0#rep{db_name = DbName}, @@ -567,7 +566,6 @@ parse_rep_doc(RepDoc) -> end, Rep. - maybe_tag_rep_doc(DbName, DocId, {RepProps}, RepId) -> case get_json_value(<<"_replication_id">>, RepProps) of RepId ->