Return-Path: X-Original-To: apmail-couchdb-commits-archive@www.apache.org Delivered-To: apmail-couchdb-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A564E108C0 for ; Tue, 4 Feb 2014 23:51:47 +0000 (UTC) Received: (qmail 53323 invoked by uid 500); 4 Feb 2014 23:45:46 -0000 Delivered-To: apmail-couchdb-commits-archive@couchdb.apache.org Received: (qmail 52536 invoked by uid 500); 4 Feb 2014 23:44:56 -0000 Mailing-List: contact commits-help@couchdb.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@couchdb.apache.org Delivered-To: mailing list commits@couchdb.apache.org Received: (qmail 52116 invoked by uid 99); 4 Feb 2014 23:44:21 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 04 Feb 2014 23:44:21 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id A270E91B0E5; Tue, 4 Feb 2014 23:44:15 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: davisp@apache.org To: commits@couchdb.apache.org Date: Tue, 04 Feb 2014 23:44:42 -0000 Message-Id: <1feefabbabf3458a9131e744a2e5f58a@git.apache.org> In-Reply-To: <6f807b3972824f1c95b927d0faf2f925@git.apache.org> References: <6f807b3972824f1c95b927d0faf2f925@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [29/44] couchdb commit: updated refs/heads/1843-feature-bigcouch-multi-repo to 2385a37 Remove src/fabric Project: http://git-wip-us.apache.org/repos/asf/couchdb/repo Commit: http://git-wip-us.apache.org/repos/asf/couchdb/commit/753e7462 Tree: http://git-wip-us.apache.org/repos/asf/couchdb/tree/753e7462 Diff: http://git-wip-us.apache.org/repos/asf/couchdb/diff/753e7462 Branch: refs/heads/1843-feature-bigcouch-multi-repo Commit: 753e7462d7f5535ff275489042e7558f7bb0351f Parents: ed8c2fb Author: Paul J. Davis Authored: Tue Feb 4 17:41:11 2014 -0600 Committer: Paul J. Davis Committed: Tue Feb 4 17:41:11 2014 -0600 ---------------------------------------------------------------------- src/fabric/README.md | 24 - src/fabric/include/couch_db_tmp.hrl | 296 ------------- src/fabric/include/fabric.hrl | 36 -- src/fabric/src/fabric.app.src | 49 -- src/fabric/src/fabric.erl | 479 -------------------- src/fabric/src/fabric_db_create.erl | 159 ------- src/fabric/src/fabric_db_delete.erl | 93 ---- src/fabric/src/fabric_db_doc_count.erl | 66 --- src/fabric/src/fabric_db_info.erl | 102 ----- src/fabric/src/fabric_db_meta.erl | 157 ------- src/fabric/src/fabric_db_update_listener.erl | 150 ------- src/fabric/src/fabric_dict.erl | 49 -- src/fabric/src/fabric_doc_attachments.erl | 151 ------- src/fabric/src/fabric_doc_missing_revs.erl | 88 ---- src/fabric/src/fabric_doc_open.erl | 470 -------------------- src/fabric/src/fabric_doc_open_revs.erl | 305 ------------- src/fabric/src/fabric_doc_update.erl | 306 ------------- src/fabric/src/fabric_group_info.erl | 98 ---- src/fabric/src/fabric_rpc.erl | 516 ---------------------- src/fabric/src/fabric_util.erl | 171 ------- src/fabric/src/fabric_view.erl | 344 --------------- src/fabric/src/fabric_view_all_docs.erl | 212 --------- src/fabric/src/fabric_view_changes.erl | 422 ------------------ src/fabric/src/fabric_view_map.erl | 147 ------ src/fabric/src/fabric_view_reduce.erl | 127 ------ 25 files changed, 5017 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/couchdb/blob/753e7462/src/fabric/README.md ---------------------------------------------------------------------- diff --git a/src/fabric/README.md b/src/fabric/README.md deleted file mode 100644 index 6df941b..0000000 --- a/src/fabric/README.md +++ /dev/null @@ -1,24 +0,0 @@ -## fabric - -Fabric is a collection of proxy functions for [CouchDB][1] operations in a cluster. These functions are used in [BigCouch][2] as the remote procedure endpoints on each of the cluster nodes. - -For example, creating a database is a straightforward task in standalone CouchDB, but for BigCouch, each node that will store a shard for the database needs to receive and execute a fabric function. The node handling the request also needs to compile the results from each of the nodes and respond accordingly to the client. - -Fabric is used in conjunction with 'Rexi' which is also an application within BigCouch. - -### Getting Started -Fabric requires R13B03 or higher and can be built with [rebar][6], which comes bundled in the repository. - -### License -[Apache 2.0][3] - -### Contact - * [http://cloudant.com][4] - * [info@cloudant.com][5] - -[1]: http://couchdb.apache.org -[2]: http://github.com/cloudant/bigcouch -[3]: http://www.apache.org/licenses/LICENSE-2.0.html -[4]: http://cloudant.com -[5]: mailto:info@cloudant.com -[6]: http://github.com/basho/rebar http://git-wip-us.apache.org/repos/asf/couchdb/blob/753e7462/src/fabric/include/couch_db_tmp.hrl ---------------------------------------------------------------------- diff --git a/src/fabric/include/couch_db_tmp.hrl b/src/fabric/include/couch_db_tmp.hrl deleted file mode 100644 index 96f3a2f..0000000 --- a/src/fabric/include/couch_db_tmp.hrl +++ /dev/null @@ -1,296 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --define(LOCAL_DOC_PREFIX, "_local/"). --define(DESIGN_DOC_PREFIX0, "_design"). --define(DESIGN_DOC_PREFIX, "_design/"). - --define(MIN_STR, <<"">>). --define(MAX_STR, <<255>>). % illegal utf string - --define(JSON_ENCODE(V), couch_util:json_encode(V)). --define(JSON_DECODE(V), couch_util:json_decode(V)). - --define(b2l(V), binary_to_list(V)). --define(l2b(V), list_to_binary(V)). - --define(DEFAULT_ATTACHMENT_CONTENT_TYPE, <<"application/octet-stream">>). - --define(LOG_DEBUG(Format, Args), twig:log(debug, Format, Args)). --define(LOG_INFO(Format, Args), twig:log(notice, Format, Args)). --define(LOG_ERROR(Format, Args), twig:log(error, Format, Args)). - --record(rev_info, - { - rev, - seq = 0, - deleted = false, - body_sp = nil % stream pointer - }). - --record(doc_info, - { - id = <<"">>, - high_seq = 0, - revs = [] % rev_info - }). - --record(full_doc_info, - {id = <<"">>, - update_seq = 0, - deleted = false, - data_size = 0, - rev_tree = [] - }). - --record(httpd, - {mochi_req, - peer, - method, - path_parts, - db_url_handlers, - user_ctx, - req_body = undefined, - design_url_handlers, - auth, - default_fun, - url_handlers - }). - - --record(doc, - { - id = <<"">>, - revs = {0, []}, - - % the json body object. - body = {[]}, - - atts = [], % attachments - - deleted = false, - - % key/value tuple of meta information, provided when using special options: - % couch_db:open_doc(Db, Id, Options). - meta = [] - }). - - --record(att, - { - name, - type, - att_len, - disk_len, % length of the attachment in its identity form - % (that is, without a content encoding applied to it) - % differs from att_len when encoding /= identity - md5= <<>>, - revpos=0, - data, - encoding=identity % currently supported values are: - % identity, gzip - % additional values to support in the future: - % deflate, compress - }). - - --record(user_ctx, - { - name=null, - roles=[], - handler - }). - -% This should be updated anytime a header change happens that requires more -% than filling in new defaults. -% -% As long the changes are limited to new header fields (with inline -% defaults) added to the end of the record, then there is no need to increment -% the disk revision number. -% -% if the disk revision is incremented, then new upgrade logic will need to be -% added to couch_db_updater:init_db. - --define(LATEST_DISK_VERSION, 5). - --record(db_header, - {disk_version = ?LATEST_DISK_VERSION, - update_seq = 0, - unused = 0, - id_tree_state = nil, - seq_tree_state = nil, - local_tree_state = nil, - purge_seq = 0, - purged_docs = nil, - security_ptr = nil, - revs_limit = 1000 - }). - --record(db, - {main_pid = nil, - update_pid = nil, - compactor_pid = nil, - instance_start_time, % number of microsecs since jan 1 1970 as a binary string - fd, - fd_monitor, - header = #db_header{}, - committed_update_seq, - id_tree, - seq_tree, - local_tree, - update_seq, - name, - filepath, - validate_doc_funs = undefined, - security = [], - security_ptr = nil, - user_ctx = #user_ctx{}, - waiting_delayed_commit = nil, - revs_limit = 1000, - fsync_options = [], - is_sys_db = false - }). - - --record(view_query_args, { - start_key, - end_key, - start_docid = ?MIN_STR, - end_docid = ?MAX_STR, - - direction = fwd, - inclusive_end=true, % aka a closed-interval - - limit = 10000000000, % Huge number to simplify logic - skip = 0, - - group_level = 0, - - view_type = nil, - include_docs = false, - stale = false, - multi_get = false, - callback = nil, - list = nil, - keys = nil, - sorted = true, - extra = [] -}). - --record(view_fold_helper_funs, { - reduce_count, - passed_end, - start_response, - send_row -}). - --record(reduce_fold_helper_funs, { - start_response, - send_row -}). - --record(extern_resp_args, { - code = 200, - stop = false, - data = <<>>, - ctype = "application/json", - headers = [], - json = nil -}). - --record(group, { - sig=nil, - dbname, - fd=nil, - name, - def_lang, - design_options=[], - views, - id_btree=nil, - current_seq=0, - purge_seq=0, - query_server=nil, - waiting_delayed_commit=nil, - atts=[] - }). - --record(view, - {id_num, - map_names=[], - def, - btree=nil, - reduce_funs=[], - dbcopies=[], - options=[] - }). - --record(index_header, - {seq=0, - purge_seq=0, - id_btree_state=nil, - view_states=nil - }). - --record(http_db, { - url, - auth = [], - resource = "", - headers = [ - {"User-Agent", "CouchDB/"++couch:version()}, - {"Accept", "application/json"}, - {"Accept-Encoding", "gzip"} - ], - qs = [], - method = get, - body = nil, - options = [ - {response_format,binary}, - {inactivity_timeout, 30000} - ], - retries = 10, - pause = 500, - conn = nil -}). - -% small value used in revision trees to indicate the revision isn't stored --define(REV_MISSING, []). - --record(changes_args, { - feed = "normal", - dir = fwd, - since = "0", - limit = 1000000000000000, - style = main_only, - heartbeat, - timeout, - filter, - include_docs = false -}). - --record(proc, { - pid, - lang, - client = nil, - ddoc_keys = [], - prompt_fun, - set_timeout_fun, - stop_fun, - data_fun -}). - --record(leaf, { - deleted, - ptr, - seq, - size = 0, - atts = [] -}). http://git-wip-us.apache.org/repos/asf/couchdb/blob/753e7462/src/fabric/include/fabric.hrl ---------------------------------------------------------------------- diff --git a/src/fabric/include/fabric.hrl b/src/fabric/include/fabric.hrl deleted file mode 100644 index 94769bd..0000000 --- a/src/fabric/include/fabric.hrl +++ /dev/null @@ -1,36 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --include_lib("eunit/include/eunit.hrl"). - --record(collector, { - db_name=nil, - query_args, - callback, - counters, - buffer_size, - blocked = [], - total_rows = 0, - offset = 0, - rows = [], - skip, - limit, - keys, - os_proc, - reducer, - lang, - sorted, - user_acc -}). - --record(view_row, {key, id, value, doc, worker}). --record(change, {key, id, value, deleted=false, doc, worker}). http://git-wip-us.apache.org/repos/asf/couchdb/blob/753e7462/src/fabric/src/fabric.app.src ---------------------------------------------------------------------- diff --git a/src/fabric/src/fabric.app.src b/src/fabric/src/fabric.app.src deleted file mode 100644 index 5ac86ef..0000000 --- a/src/fabric/src/fabric.app.src +++ /dev/null @@ -1,49 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - -{application, fabric, [ - {description, "Routing and proxying layer for CouchDB cluster"}, - {vsn, git}, - {modules, [ - fabric, - fabric_db_create, - fabric_db_delete, - fabric_db_doc_count, - fabric_db_info, - fabric_db_meta, - fabric_db_update_listener, - fabric_dict, - fabric_doc_attachments, - fabric_doc_missing_revs, - fabric_doc_open, - fabric_doc_open_revs, - fabric_doc_update, - fabric_group_info, - fabric_rpc, - fabric_util, - fabric_view, - fabric_view_all_docs, - fabric_view_changes, - fabric_view_map, - fabric_view_reduce - ]}, - {registered, []}, - {applications, [ - kernel, - stdlib, - config, - couch, - rexi, - mem3, - twig - ]} -]}. http://git-wip-us.apache.org/repos/asf/couchdb/blob/753e7462/src/fabric/src/fabric.erl ---------------------------------------------------------------------- diff --git a/src/fabric/src/fabric.erl b/src/fabric/src/fabric.erl deleted file mode 100644 index 1f05ed6..0000000 --- a/src/fabric/src/fabric.erl +++ /dev/null @@ -1,479 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(fabric). - --include_lib("mem3/include/mem3.hrl"). --include_lib("couch/include/couch_db.hrl"). --include_lib("couch_mrview/include/couch_mrview.hrl"). - --define(ADMIN_CTX, {user_ctx, #user_ctx{roles = [<<"_admin">>]}}). - -% DBs --export([all_dbs/0, all_dbs/1, create_db/1, create_db/2, delete_db/1, - delete_db/2, get_db_info/1, get_doc_count/1, set_revs_limit/3, - set_security/2, set_security/3, get_revs_limit/1, get_security/1, - get_security/2, get_all_security/1, get_all_security/2]). - -% Documents --export([open_doc/3, open_revs/4, get_missing_revs/2, get_missing_revs/3, - update_doc/3, update_docs/3, purge_docs/2, att_receiver/2]). - -% Views --export([all_docs/4, changes/4, query_view/3, query_view/4, query_view/6, - get_view_group_info/2]). - -% miscellany --export([design_docs/1, reset_validation_funs/1, cleanup_index_files/0, - cleanup_index_files/1]). - --include_lib("fabric/include/fabric.hrl"). - --type dbname() :: (iodata() | #db{}). --type docid() :: iodata(). --type revision() :: {integer(), binary()}. --type callback() :: fun((any(), any()) -> {ok | stop, any()}). --type json_obj() :: {[{binary() | atom(), any()}]}. --type option() :: atom() | {atom(), any()}. - -%% db operations -%% @equiv all_dbs(<<>>) -all_dbs() -> - all_dbs(<<>>). - -%% @doc returns a list of all database names --spec all_dbs(Prefix::iodata()) -> {ok, [binary()]}. -all_dbs(Prefix) when is_binary(Prefix) -> - Length = byte_size(Prefix), - MatchingDbs = mem3:fold_shards(fun(#shard{dbname=DbName}, Acc) -> - case DbName of - <> -> - [DbName | Acc]; - _ -> - Acc - end - end, []), - {ok, lists:usort(MatchingDbs)}; - -%% @equiv all_dbs(list_to_binary(Prefix)) -all_dbs(Prefix) when is_list(Prefix) -> - all_dbs(list_to_binary(Prefix)). - -%% @doc returns a property list of interesting properties -%% about the database such as `doc_count', `disk_size', -%% etc. --spec get_db_info(dbname()) -> - {ok, [ - {instance_start_time, binary()} | - {doc_count, non_neg_integer()} | - {doc_del_count, non_neg_integer()} | - {purge_seq, non_neg_integer()} | - {compact_running, boolean()} | - {disk_size, non_neg_integer()} | - {disk_format_version, pos_integer()} - ]}. -get_db_info(DbName) -> - fabric_db_info:go(dbname(DbName)). - -%% @doc the number of docs in a database --spec get_doc_count(dbname()) -> {ok, non_neg_integer()}. -get_doc_count(DbName) -> - fabric_db_doc_count:go(dbname(DbName)). - -%% @equiv create_db(DbName, []) -create_db(DbName) -> - create_db(DbName, []). - -%% @doc creates a database with the given name. -%% -%% Options can include values for q and n, -%% for example `{q, "8"}' and `{n, "3"}', which -%% control how many shards to split a database into -%% and how many nodes each doc is copied to respectively. -%% --spec create_db(dbname(), [option()]) -> ok | accepted | {error, atom()}. -create_db(DbName, Options) -> - fabric_db_create:go(dbname(DbName), opts(Options)). - -%% @equiv delete_db([]) -delete_db(DbName) -> - delete_db(DbName, []). - -%% @doc delete a database --spec delete_db(dbname(), [option()]) -> ok | accepted | {error, atom()}. -delete_db(DbName, Options) -> - fabric_db_delete:go(dbname(DbName), opts(Options)). - -%% @doc provide an upper bound for the number of tracked document revisions --spec set_revs_limit(dbname(), pos_integer(), [option()]) -> ok. -set_revs_limit(DbName, Limit, Options) when is_integer(Limit), Limit > 0 -> - fabric_db_meta:set_revs_limit(dbname(DbName), Limit, opts(Options)). - -%% @doc retrieves the maximum number of document revisions --spec get_revs_limit(dbname()) -> pos_integer() | no_return(). -get_revs_limit(DbName) -> - {ok, Db} = fabric_util:get_db(dbname(DbName), [?ADMIN_CTX]), - try couch_db:get_revs_limit(Db) after catch couch_db:close(Db) end. - -%% @doc sets the readers/writers/admin permissions for a database --spec set_security(dbname(), SecObj::json_obj()) -> ok. -set_security(DbName, SecObj) -> - fabric_db_meta:set_security(dbname(DbName), SecObj, [?ADMIN_CTX]). - -%% @doc sets the readers/writers/admin permissions for a database --spec set_security(dbname(), SecObj::json_obj(), [option()]) -> ok. -set_security(DbName, SecObj, Options) -> - fabric_db_meta:set_security(dbname(DbName), SecObj, opts(Options)). - -get_security(DbName) -> - get_security(DbName, [?ADMIN_CTX]). - -%% @doc retrieve the security object for a database --spec get_security(dbname()) -> json_obj() | no_return(). -get_security(DbName, Options) -> - {ok, Db} = fabric_util:get_db(dbname(DbName), opts(Options)), - try couch_db:get_security(Db) after catch couch_db:close(Db) end. - -%% @doc retrieve the security object for all shards of a database --spec get_all_security(dbname()) -> json_obj() | no_return(). -get_all_security(DbName) -> - get_all_security(DbName, []). - -%% @doc retrieve the security object for all shards of a database --spec get_all_security(dbname(), [option()]) -> json_obj() | no_return(). -get_all_security(DbName, Options) -> - fabric_db_meta:get_all_security(dbname(DbName), opts(Options)). - -% doc operations - -%% @doc retrieve the doc with a given id --spec open_doc(dbname(), docid(), [option()]) -> - {ok, #doc{}} | - {not_found, missing | deleted} | - {timeout, any()} | - {error, any()} | - {error, any() | any()}. -open_doc(DbName, Id, Options) -> - fabric_doc_open:go(dbname(DbName), docid(Id), opts(Options)). - -%% @doc retrieve a collection of revisions, possible all --spec open_revs(dbname(), docid(), [revision()] | all, [option()]) -> - {ok, [{ok, #doc{}} | {{not_found,missing}, revision()}]} | - {timeout, any()} | - {error, any()} | - {error, any(), any()}. -open_revs(DbName, Id, Revs, Options) -> - fabric_doc_open_revs:go(dbname(DbName), docid(Id), Revs, opts(Options)). - -%% @equiv get_missing_revs(DbName, IdsRevs, []) -get_missing_revs(DbName, IdsRevs) -> - get_missing_revs(DbName, IdsRevs, []). - -%% @doc retrieve missing revisions for a list of `{Id, Revs}' --spec get_missing_revs(dbname(),[{docid(), [revision()]}], [option()]) -> - {ok, [{docid(), any(), [any()]}]}. -get_missing_revs(DbName, IdsRevs, Options) when is_list(IdsRevs) -> - Sanitized = [idrevs(IdR) || IdR <- IdsRevs], - fabric_doc_missing_revs:go(dbname(DbName), Sanitized, opts(Options)). - -%% @doc update a single doc -%% @equiv update_docs(DbName,[Doc],Options) --spec update_doc(dbname(), #doc{}, [option()]) -> - {ok, any()} | any(). -update_doc(DbName, Doc, Options) -> - case update_docs(DbName, [Doc], opts(Options)) of - {ok, [{ok, NewRev}]} -> - {ok, NewRev}; - {accepted, [{accepted, NewRev}]} -> - {accepted, NewRev}; - {ok, [{{_Id, _Rev}, Error}]} -> - throw(Error); - {ok, [Error]} -> - throw(Error); - {ok, []} -> - % replication success - #doc{revs = {Pos, [RevId | _]}} = doc(Doc), - {ok, {Pos, RevId}} - end. - -%% @doc update a list of docs --spec update_docs(dbname(), [#doc{}], [option()]) -> - {ok, any()} | any(). -update_docs(DbName, Docs, Options) -> - try - fabric_doc_update:go(dbname(DbName), docs(Docs), opts(Options)) of - {ok, Results} -> - {ok, Results}; - {accepted, Results} -> - {accepted, Results}; - Error -> - throw(Error) - catch {aborted, PreCommitFailures} -> - {aborted, PreCommitFailures} - end. - -purge_docs(_DbName, _IdsRevs) -> - not_implemented. - -%% @doc spawns a process to upload attachment data and -%% returns a function that shards can use to communicate -%% with the spawned middleman process --spec att_receiver(#httpd{}, Length :: undefined | chunked | pos_integer() | - {unknown_transfer_encoding, any()}) -> - function() | binary(). -att_receiver(Req, Length) -> - fabric_doc_attachments:receiver(Req, Length). - -%% @doc retrieves all docs. Additional query parameters, such as `limit', -%% `start_key' and `end_key', `descending', and `include_docs', can -%% also be passed to further constrain the query. See -%% all_docs for details --spec all_docs(dbname(), callback(), [] | tuple(), #mrargs{}) -> - {ok, [any()]}. -all_docs(DbName, Callback, Acc0, #mrargs{} = QueryArgs) when - is_function(Callback, 2) -> - fabric_view_all_docs:go(dbname(DbName), QueryArgs, Callback, Acc0); - -%% @doc convenience function that takes a keylist rather than a record -%% @equiv all_docs(DbName, Callback, Acc0, kl_to_query_args(QueryArgs)) -all_docs(DbName, Callback, Acc0, QueryArgs) -> - all_docs(DbName, Callback, Acc0, kl_to_query_args(QueryArgs)). - - --spec changes(dbname(), callback(), any(), #changes_args{} | [{atom(),any()}]) -> - {ok, any()}. -changes(DbName, Callback, Acc0, #changes_args{}=Options) -> - Feed = Options#changes_args.feed, - fabric_view_changes:go(dbname(DbName), Feed, Options, Callback, Acc0); - -%% @doc convenience function, takes keylist instead of record -%% @equiv changes(DbName, Callback, Acc0, kl_to_changes_args(Options)) -changes(DbName, Callback, Acc0, Options) -> - changes(DbName, Callback, Acc0, kl_to_changes_args(Options)). - -%% @equiv query_view(DbName, DesignName, ViewName, #mrargs{}) -query_view(DbName, DesignName, ViewName) -> - query_view(DbName, DesignName, ViewName, #mrargs{}). - -%% @equiv query_view(DbName, DesignName, -%% ViewName, fun default_callback/2, [], QueryArgs) -query_view(DbName, DesignName, ViewName, QueryArgs) -> - Callback = fun default_callback/2, - query_view(DbName, DesignName, ViewName, Callback, [], QueryArgs). - -%% @doc execute a given view. -%% There are many additional query args that can be passed to a view, -%% see -%% query args for details. --spec query_view(dbname(), #doc{} | binary(), iodata(), callback(), any(), - #mrargs{}) -> - any(). -query_view(DbName, Design, ViewName, Callback, Acc0, QueryArgs) -> - Db = dbname(DbName), View = name(ViewName), - case is_reduce_view(Db, Design, View, QueryArgs) of - true -> - Mod = fabric_view_reduce; - false -> - Mod = fabric_view_map - end, - Mod:go(Db, Design, View, QueryArgs, Callback, Acc0). - -%% @doc retrieve info about a view group, disk size, language, whether compaction -%% is running and so forth --spec get_view_group_info(dbname(), #doc{} | docid()) -> - {ok, [ - {signature, binary()} | - {language, binary()} | - {disk_size, non_neg_integer()} | - {compact_running, boolean()} | - {updater_running, boolean()} | - {waiting_commit, boolean()} | - {waiting_clients, non_neg_integer()} | - {update_seq, pos_integer()} | - {purge_seq, non_neg_integer()} - ]}. -get_view_group_info(DbName, DesignId) -> - fabric_group_info:go(dbname(DbName), design_doc(DesignId)). - -%% @doc retrieve all the design docs from a database --spec design_docs(dbname()) -> {ok, [json_obj()]}. -design_docs(DbName) -> - QueryArgs = #mrargs{ - start_key = <<"_design/">>, - end_key = <<"_design0">>, - include_docs=true - }, - Callback = fun({total_and_offset, _, _}, []) -> - {ok, []}; - ({row, {Props}}, Acc) -> - case couch_util:get_value(id, Props) of - <<"_design/", _/binary>> -> - {ok, [couch_util:get_value(doc, Props) | Acc]}; - _ -> - {stop, Acc} - end; - (complete, Acc) -> - {ok, lists:reverse(Acc)}; - ({error, Reason}, _Acc) -> - {error, Reason} - end, - fabric:all_docs(dbname(DbName), Callback, [], QueryArgs). - -%% @doc forces a reload of validation functions, this is performed after -%% design docs are update -%% NOTE: This function probably doesn't belong here as part fo the API --spec reset_validation_funs(dbname()) -> [reference()]. -reset_validation_funs(DbName) -> - [rexi:cast(Node, {fabric_rpc, reset_validation_funs, [Name]}) || - #shard{node=Node, name=Name} <- mem3:shards(DbName)]. - -%% @doc clean up index files for all Dbs --spec cleanup_index_files() -> [ok]. -cleanup_index_files() -> - {ok, Dbs} = fabric:all_dbs(), - [cleanup_index_files(Db) || Db <- Dbs]. - -%% @doc clean up index files for a specific db --spec cleanup_index_files(dbname()) -> ok. -cleanup_index_files(DbName) -> - {ok, DesignDocs} = fabric:design_docs(DbName), - - ActiveSigs = lists:map(fun(#doc{id = GroupId}) -> - {ok, Info} = fabric:get_view_group_info(DbName, GroupId), - binary_to_list(couch_util:get_value(signature, Info)) - end, [couch_doc:from_json_obj(DD) || DD <- DesignDocs]), - - FileList = filelib:wildcard([config:get("couchdb", "view_index_dir"), - "/.shards/*/", couch_util:to_list(dbname(DbName)), ".[0-9]*_design/*"]), - - DeleteFiles = if ActiveSigs =:= [] -> FileList; true -> - {ok, RegExp} = re:compile([$(, string:join(ActiveSigs, "|"), $)]), - lists:filter(fun(FilePath) -> - re:run(FilePath, RegExp, [{capture, none}]) == nomatch - end, FileList) - end, - [file:delete(File) || File <- DeleteFiles], - ok. - -%% some simple type validation and transcoding - -dbname(DbName) when is_list(DbName) -> - list_to_binary(DbName); -dbname(DbName) when is_binary(DbName) -> - DbName; -dbname(#db{name=Name}) -> - Name; -dbname(DbName) -> - erlang:error({illegal_database_name, DbName}). - -name(Thing) -> - couch_util:to_binary(Thing). - -docid(DocId) when is_list(DocId) -> - list_to_binary(DocId); -docid(DocId) when is_binary(DocId) -> - DocId; -docid(DocId) -> - erlang:error({illegal_docid, DocId}). - -docs(Docs) when is_list(Docs) -> - [doc(D) || D <- Docs]; -docs(Docs) -> - erlang:error({illegal_docs_list, Docs}). - -doc(#doc{} = Doc) -> - Doc; -doc({_} = Doc) -> - couch_doc:from_json_obj(Doc); -doc(Doc) -> - erlang:error({illegal_doc_format, Doc}). - -design_doc(#doc{} = DDoc) -> - DDoc; -design_doc(DocId) when is_list(DocId) -> - design_doc(list_to_binary(DocId)); -design_doc(<<"_design/", _/binary>> = DocId) -> - DocId; -design_doc(GroupName) -> - <<"_design/", GroupName/binary>>. - -idrevs({Id, Revs}) when is_list(Revs) -> - {docid(Id), [rev(R) || R <- Revs]}. - -rev(Rev) when is_list(Rev); is_binary(Rev) -> - couch_doc:parse_rev(Rev); -rev({Seq, Hash} = Rev) when is_integer(Seq), is_binary(Hash) -> - Rev. - -%% @doc convenience method, useful when testing or calling fabric from the shell -opts(Options) -> - add_option(user_ctx, add_option(io_priority, Options)). - -add_option(Key, Options) -> - case couch_util:get_value(Key, Options) of - undefined -> - case erlang:get(Key) of - undefined -> - Options; - Value -> - [{Key, Value} | Options] - end; - _ -> - Options - end. - -default_callback(complete, Acc) -> - {ok, lists:reverse(Acc)}; -default_callback(Row, Acc) -> - {ok, [Row | Acc]}. - -is_reduce_view(_, _, _, #mrargs{view_type=Reduce}) -> - Reduce =:= reduce. - -%% @doc convenience method for use in the shell, converts a keylist -%% to a `changes_args' record -kl_to_changes_args(KeyList) -> - kl_to_record(KeyList, changes_args). - -%% @doc convenience method for use in the shell, converts a keylist -%% to a `mrargs' record -kl_to_query_args(KeyList) -> - kl_to_record(KeyList, mrargs). - -%% @doc finds the index of the given Key in the record. -%% note that record_info is only known at compile time -%% so the code must be written in this way. For each new -%% record type add a case clause -lookup_index(Key,RecName) -> - Indexes = - case RecName of - changes_args -> - lists:zip(record_info(fields, changes_args), - lists:seq(2, record_info(size, changes_args))); - mrargs -> - lists:zip(record_info(fields, mrargs), - lists:seq(2, record_info(size, mrargs))) - end, - couch_util:get_value(Key, Indexes). - -%% @doc convert a keylist to record with given `RecName' -%% @see lookup_index -kl_to_record(KeyList,RecName) -> - Acc0 = case RecName of - changes_args -> #changes_args{}; - mrargs -> #mrargs{} - end, - lists:foldl(fun({Key, Value}, Acc) -> - Index = lookup_index(couch_util:to_existing_atom(Key),RecName), - setelement(Index, Acc, Value) - end, Acc0, KeyList). http://git-wip-us.apache.org/repos/asf/couchdb/blob/753e7462/src/fabric/src/fabric_db_create.erl ---------------------------------------------------------------------- diff --git a/src/fabric/src/fabric_db_create.erl b/src/fabric/src/fabric_db_create.erl deleted file mode 100644 index c8f2d45..0000000 --- a/src/fabric/src/fabric_db_create.erl +++ /dev/null @@ -1,159 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(fabric_db_create). --export([go/2]). - --include_lib("fabric/include/fabric.hrl"). --include_lib("mem3/include/mem3.hrl"). --include_lib("couch/include/couch_db.hrl"). - --define(DBNAME_REGEX, "^[a-z][a-z0-9\\_\\$()\\+\\-\\/]*$"). - -%% @doc Create a new database, and all its partition files across the cluster -%% Options is proplist with user_ctx, n, q, validate_name -go(DbName, Options) -> - case validate_dbname(DbName, Options) of - ok -> - {Shards, Doc} = generate_shard_map(DbName, Options), - case {create_shard_files(Shards), create_shard_db_doc(Doc)} of - {ok, {ok, Status}} -> - Status; - {file_exists, {ok, _}} -> - {error, file_exists}; - {_, Error} -> - Error - end; - Error -> - Error - end. - -validate_dbname(DbName, Options) -> - case couch_util:get_value(validate_name, Options, true) of - false -> - ok; - true -> - case re:run(DbName, ?DBNAME_REGEX, [{capture,none}]) of - match -> - ok; - nomatch when DbName =:= <<"_users">> -> - ok; - nomatch when DbName =:= <<"_replicator">> -> - ok; - nomatch -> - {error, illegal_database_name} - end - end. - -generate_shard_map(DbName, Options) -> - {MegaSecs, Secs, _} = now(), - Suffix = "." ++ integer_to_list(MegaSecs*1000000 + Secs), - Shards = mem3:choose_shards(DbName, [{shard_suffix,Suffix} | Options]), - case mem3_util:open_db_doc(DbName) of - {ok, Doc} -> - % the DB already exists, and may have a different Suffix - ok; - {not_found, _} -> - Doc = make_document(Shards, Suffix) - end, - {Shards, Doc}. - -create_shard_files(Shards) -> - Workers = fabric_util:submit_jobs(Shards, create_db, []), - RexiMon = fabric_util:create_monitors(Shards), - try fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Workers) of - {error, file_exists} -> - file_exists; - _ -> - ok - after - rexi_monitor:stop(RexiMon) - end. - -handle_message(file_exists, _, _) -> - {error, file_exists}; - -handle_message({rexi_DOWN, _, {_, Node}, _}, _, Workers) -> - case lists:filter(fun(S) -> S#shard.node =/= Node end, Workers) of - [] -> - {stop, ok}; - RemainingWorkers -> - {ok, RemainingWorkers} - end; - -handle_message(_, Worker, Workers) -> - case lists:delete(Worker, Workers) of - [] -> - {stop, ok}; - RemainingWorkers -> - {ok, RemainingWorkers} - end. - -create_shard_db_doc(Doc) -> - Shards = [#shard{node=N} || N <- mem3:nodes()], - RexiMon = fabric_util:create_monitors(Shards), - Workers = fabric_util:submit_jobs(Shards, create_shard_db_doc, [Doc]), - Acc0 = {length(Shards), fabric_dict:init(Workers, nil)}, - try fabric_util:recv(Workers, #shard.ref, fun handle_db_update/3, Acc0) of - {timeout, _} -> - {error, timeout}; - Else -> - Else - after - rexi_monitor:stop(RexiMon) - end. - -handle_db_update({rexi_DOWN, _, {_, Node}, _}, _Worker, {W, Counters}) -> - New = fabric_dict:filter(fun(S, _) -> S#shard.node =/= Node end, Counters), - maybe_stop(W, New); - -handle_db_update({rexi_EXIT, _Reason}, Worker, {W, Counters}) -> - maybe_stop(W, fabric_dict:erase(Worker, Counters)); - -handle_db_update(conflict, _, _) -> - % just fail when we get any conflicts - {error, conflict}; - -handle_db_update(Msg, Worker, {W, Counters}) -> - maybe_stop(W, fabric_dict:store(Worker, Msg, Counters)). - -maybe_stop(W, Counters) -> - case fabric_dict:any(nil, Counters) of - true -> - {ok, {W, Counters}}; - false -> - case lists:sum([1 || {_, ok} <- Counters]) of - W -> - {stop, ok}; - NumOk when NumOk >= (W div 2 + 1) -> - {stop, accepted}; - _ -> - {error, internal_server_error} - end - end. - -make_document([#shard{dbname=DbName}|_] = Shards, Suffix) -> - {RawOut, ByNodeOut, ByRangeOut} = - lists:foldl(fun(#shard{node=N, range=[B,E]}, {Raw, ByNode, ByRange}) -> - Range = ?l2b([couch_util:to_hex(<>), "-", - couch_util:to_hex(<>)]), - Node = couch_util:to_binary(N), - {[[<<"add">>, Range, Node] | Raw], orddict:append(Node, Range, ByNode), - orddict:append(Range, Node, ByRange)} - end, {[], [], []}, Shards), - #doc{id=DbName, body = {[ - {<<"shard_suffix">>, Suffix}, - {<<"changelog">>, lists:sort(RawOut)}, - {<<"by_node">>, {[{K,lists:sort(V)} || {K,V} <- ByNodeOut]}}, - {<<"by_range">>, {[{K,lists:sort(V)} || {K,V} <- ByRangeOut]}} - ]}}. - http://git-wip-us.apache.org/repos/asf/couchdb/blob/753e7462/src/fabric/src/fabric_db_delete.erl ---------------------------------------------------------------------- diff --git a/src/fabric/src/fabric_db_delete.erl b/src/fabric/src/fabric_db_delete.erl deleted file mode 100644 index 934f95b..0000000 --- a/src/fabric/src/fabric_db_delete.erl +++ /dev/null @@ -1,93 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(fabric_db_delete). --export([go/2]). - --include_lib("fabric/include/fabric.hrl"). --include_lib("mem3/include/mem3.hrl"). - -%% @doc Options aren't used at all now in couch on delete but are left here -%% to be consistent with fabric_db_create for possible future use -%% @see couch_server:delete_db -%% -go(DbName, _Options) -> - Shards = mem3:shards(DbName), - % delete doc from shard_db - try delete_shard_db_doc(DbName) of - {ok, ok} -> - ok; - {ok, accepted} -> - accepted; - {ok, not_found} -> - erlang:error(database_does_not_exist, DbName); - Error -> - Error - after - % delete the shard files - fabric_util:submit_jobs(Shards, delete_db, []) - end. - -delete_shard_db_doc(Doc) -> - Shards = [#shard{node=N} || N <- mem3:nodes()], - RexiMon = fabric_util:create_monitors(Shards), - Workers = fabric_util:submit_jobs(Shards, delete_shard_db_doc, [Doc]), - Acc0 = {length(Shards), fabric_dict:init(Workers, nil)}, - try fabric_util:recv(Workers, #shard.ref, fun handle_db_update/3, Acc0) of - {timeout, _} -> - {error, timeout}; - Else -> - Else - after - rexi_monitor:stop(RexiMon) - end. - -handle_db_update({rexi_DOWN, _, {_, Node}, _}, _Worker, {W, Counters}) -> - New = fabric_dict:filter(fun(S, _) -> S#shard.node =/= Node end, Counters), - maybe_stop(W, New); - -handle_db_update({rexi_EXIT, _Reason}, Worker, {W, Counters}) -> - maybe_stop(W, fabric_dict:erase(Worker, Counters)); - -handle_db_update(conflict, _, _) -> - % just fail when we get any conflicts - {error, conflict}; - -handle_db_update(Msg, Worker, {W, Counters}) -> - maybe_stop(W, fabric_dict:store(Worker, Msg, Counters)). - -maybe_stop(W, Counters) -> - case fabric_dict:any(nil, Counters) of - true -> - {ok, {W, Counters}}; - false -> - {Ok,NotFound} = fabric_dict:fold(fun count_replies/3, {0,0}, Counters), - case {Ok + NotFound, Ok, NotFound} of - {W, 0, W} -> - {#shard{dbname=Name}, _} = hd(Counters), - twig:log(warn, "~p not_found ~s", [?MODULE, Name]), - {stop, not_found}; - {W, _, _} -> - {stop, ok}; - {N, M, _} when N >= (W div 2 + 1), M > 0 -> - {stop, accepted}; - _ -> - {error, internal_server_error} - end - end. - -count_replies(_, ok, {Ok, NotFound}) -> - {Ok+1, NotFound}; -count_replies(_, not_found, {Ok, NotFound}) -> - {Ok, NotFound+1}; -count_replies(_, _, Acc) -> - Acc. http://git-wip-us.apache.org/repos/asf/couchdb/blob/753e7462/src/fabric/src/fabric_db_doc_count.erl ---------------------------------------------------------------------- diff --git a/src/fabric/src/fabric_db_doc_count.erl b/src/fabric/src/fabric_db_doc_count.erl deleted file mode 100644 index dcc32aa..0000000 --- a/src/fabric/src/fabric_db_doc_count.erl +++ /dev/null @@ -1,66 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(fabric_db_doc_count). - --export([go/1]). - --include_lib("fabric/include/fabric.hrl"). --include_lib("mem3/include/mem3.hrl"). --include_lib("couch/include/couch_db.hrl"). - -go(DbName) -> - Shards = mem3:shards(DbName), - Workers = fabric_util:submit_jobs(Shards, get_doc_count, []), - RexiMon = fabric_util:create_monitors(Shards), - Acc0 = {fabric_dict:init(Workers, nil), 0}, - try - fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0) - after - rexi_monitor:stop(RexiMon) - end. - -handle_message({rexi_DOWN, _, {_,NodeRef},_}, _Shard, {Counters, Acc}) -> - case fabric_util:remove_down_workers(Counters, NodeRef) of - {ok, NewCounters} -> - {ok, {NewCounters, Acc}}; - error -> - {error, {nodedown, <<"progress not possible">>}} - end; - -handle_message({rexi_EXIT, Reason}, Shard, {Counters, Acc}) -> - NewCounters = lists:keydelete(Shard, #shard.ref, Counters), - case fabric_view:is_progress_possible(NewCounters) of - true -> - {ok, {NewCounters, Acc}}; - false -> - {error, Reason} - end; - -handle_message({ok, Count}, Shard, {Counters, Acc}) -> - case fabric_dict:lookup_element(Shard, Counters) of - undefined -> - % already heard from someone else in this range - {ok, {Counters, Acc}}; - nil -> - C1 = fabric_dict:store(Shard, ok, Counters), - C2 = fabric_view:remove_overlapping_shards(Shard, C1), - case fabric_dict:any(nil, C2) of - true -> - {ok, {C2, Count+Acc}}; - false -> - {stop, Count+Acc} - end - end; -handle_message(_, _, Acc) -> - {ok, Acc}. - http://git-wip-us.apache.org/repos/asf/couchdb/blob/753e7462/src/fabric/src/fabric_db_info.erl ---------------------------------------------------------------------- diff --git a/src/fabric/src/fabric_db_info.erl b/src/fabric/src/fabric_db_info.erl deleted file mode 100644 index 58139e8..0000000 --- a/src/fabric/src/fabric_db_info.erl +++ /dev/null @@ -1,102 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(fabric_db_info). - --export([go/1]). - --include_lib("fabric/include/fabric.hrl"). --include_lib("mem3/include/mem3.hrl"). - -go(DbName) -> - Shards = mem3:shards(DbName), - Workers = fabric_util:submit_jobs(Shards, get_db_info, []), - RexiMon = fabric_util:create_monitors(Shards), - Acc0 = {fabric_dict:init(Workers, nil), []}, - try - fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0) - after - rexi_monitor:stop(RexiMon) - end. - -handle_message({rexi_DOWN, _, {_,NodeRef},_}, _Shard, {Counters, Acc}) -> - case fabric_util:remove_down_workers(Counters, NodeRef) of - {ok, NewCounters} -> - {ok, {NewCounters, Acc}}; - error -> - {error, {nodedown, <<"progress not possible">>}} - end; - -handle_message({rexi_EXIT, Reason}, Shard, {Counters, Acc}) -> - NewCounters = lists:keydelete(Shard, #shard.ref, Counters), - case fabric_view:is_progress_possible(NewCounters) of - true -> - {ok, {NewCounters, Acc}}; - false -> - {error, Reason} - end; - -handle_message({ok, Info}, #shard{dbname=Name} = Shard, {Counters, Acc}) -> - case fabric_dict:lookup_element(Shard, Counters) of - undefined -> - % already heard from someone else in this range - {ok, {Counters, Acc}}; - nil -> - Seq = couch_util:get_value(update_seq, Info), - C1 = fabric_dict:store(Shard, Seq, Counters), - C2 = fabric_view:remove_overlapping_shards(Shard, C1), - case fabric_dict:any(nil, C2) of - true -> - {ok, {C2, [Info|Acc]}}; - false -> - {stop, [ - {db_name,Name}, - {update_seq, fabric_view_changes:pack_seqs(C2)} | - merge_results(lists:flatten([Info|Acc])) - ]} - end - end; -handle_message(_, _, Acc) -> - {ok, Acc}. - -merge_results(Info) -> - Dict = lists:foldl(fun({K,V},D0) -> orddict:append(K,V,D0) end, - orddict:new(), Info), - orddict:fold(fun - (doc_count, X, Acc) -> - [{doc_count, lists:sum(X)} | Acc]; - (doc_del_count, X, Acc) -> - [{doc_del_count, lists:sum(X)} | Acc]; - (purge_seq, X, Acc) -> - [{purge_seq, lists:sum(X)} | Acc]; - (compact_running, X, Acc) -> - [{compact_running, lists:member(true, X)} | Acc]; - (disk_size, X, Acc) -> - [{disk_size, lists:sum(X)} | Acc]; - (other, X, Acc) -> - [{other, {merge_other_results(X)}} | Acc]; - (disk_format_version, X, Acc) -> - [{disk_format_version, lists:max(X)} | Acc]; - (_, _, Acc) -> - Acc - end, [{instance_start_time, <<"0">>}], Dict). - -merge_other_results(Results) -> - Dict = lists:foldl(fun({Props}, D) -> - lists:foldl(fun({K,V},D0) -> orddict:append(K,V,D0) end, D, Props) - end, orddict:new(), Results), - orddict:fold(fun - (data_size, X, Acc) -> - [{data_size, lists:sum(X)} | Acc]; - (_, _, Acc) -> - Acc - end, [], Dict). http://git-wip-us.apache.org/repos/asf/couchdb/blob/753e7462/src/fabric/src/fabric_db_meta.erl ---------------------------------------------------------------------- diff --git a/src/fabric/src/fabric_db_meta.erl b/src/fabric/src/fabric_db_meta.erl deleted file mode 100644 index 78a3952..0000000 --- a/src/fabric/src/fabric_db_meta.erl +++ /dev/null @@ -1,157 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(fabric_db_meta). - --export([set_revs_limit/3, set_security/3, get_all_security/2]). - --include_lib("fabric/include/fabric.hrl"). --include_lib("mem3/include/mem3.hrl"). --include_lib("couch/include/couch_db.hrl"). - --record(acc, { - workers, - finished, - num_workers -}). - - -set_revs_limit(DbName, Limit, Options) -> - Shards = mem3:shards(DbName), - Workers = fabric_util:submit_jobs(Shards, set_revs_limit, [Limit, Options]), - Handler = fun handle_revs_message/3, - Waiting = length(Workers) - 1, - case fabric_util:recv(Workers, #shard.ref, Handler, Waiting) of - {ok, ok} -> - ok; - Error -> - Error - end. - -handle_revs_message(ok, _, 0) -> - {stop, ok}; -handle_revs_message(ok, _, Waiting) -> - {ok, Waiting - 1}; -handle_revs_message(Error, _, _Waiting) -> - {error, Error}. - - -set_security(DbName, SecObj, Options) -> - Shards = mem3:shards(DbName), - RexiMon = fabric_util:create_monitors(Shards), - Workers = fabric_util:submit_jobs(Shards, set_security, [SecObj, Options]), - Handler = fun handle_set_message/3, - Acc = #acc{ - workers=Workers, - finished=[], - num_workers=length(Workers) - }, - try fabric_util:recv(Workers, #shard.ref, Handler, Acc) of - {ok, #acc{finished=Finished}} -> - case check_sec_set(length(Workers), Finished) of - ok -> ok; - Error -> Error - end; - Error -> - Error - after - rexi_monitor:stop(RexiMon) - end. - -handle_set_message({rexi_DOWN, _, {_, Node}, _}, _, #acc{workers=Wrkrs}=Acc) -> - RemWorkers = lists:filter(fun(S) -> S#shard.node =/= Node end, Wrkrs), - maybe_finish_set(Acc#acc{workers=RemWorkers}); -handle_set_message(ok, W, Acc) -> - NewAcc = Acc#acc{ - workers = (Acc#acc.workers -- [W]), - finished = [W | Acc#acc.finished] - }, - maybe_finish_set(NewAcc); -handle_set_message(Error, W, Acc) -> - Dst = {W#shard.node, W#shard.name}, - twig:log(err, "Failed to set security object on ~p :: ~p", [Dst, Error]), - NewAcc = Acc#acc{workers = (Acc#acc.workers -- [W])}, - maybe_finish_set(NewAcc). - -maybe_finish_set(#acc{workers=[]}=Acc) -> - {stop, Acc}; -maybe_finish_set(#acc{finished=Finished, num_workers=NumWorkers}=Acc) -> - case check_sec_set(NumWorkers, Finished) of - ok -> {stop, Acc}; - _ -> {ok, Acc} - end. - -check_sec_set(NumWorkers, SetWorkers) -> - try - check_sec_set_int(NumWorkers, SetWorkers) - catch throw:Reason -> - {error, Reason} - end. - -check_sec_set_int(NumWorkers, SetWorkers) -> - case length(SetWorkers) < ((NumWorkers div 2) + 1) of - true -> throw(no_majority); - false -> ok - end, - % Hack to reuse fabric_view:is_progress_possible/1 - FakeCounters = [{S, 0} || S <- SetWorkers], - case fabric_view:is_progress_possible(FakeCounters) of - false -> throw(no_ring); - true -> ok - end, - ok. - - -get_all_security(DbName, Options) -> - Shards = case proplists:get_value(shards, Options) of - Shards0 when is_list(Shards0) -> Shards0; - _ -> mem3:shards(DbName) - end, - Admin = [{user_ctx, #user_ctx{roles = [<<"_admin">>]}}], - RexiMon = fabric_util:create_monitors(Shards), - Workers = fabric_util:submit_jobs(Shards, get_all_security, [Admin]), - Handler = fun handle_get_message/3, - Acc = #acc{ - workers=Workers, - finished=[], - num_workers=length(Workers) - }, - try fabric_util:recv(Workers, #shard.ref, Handler, Acc) of - {ok, #acc{finished=SecObjs}} when length(SecObjs) > length(Workers) / 2 -> - {ok, SecObjs}; - {ok, _} -> - {error, no_majority}; - Error -> - Error - after - rexi_monitor:stop(RexiMon) - end. - -handle_get_message({rexi_DOWN, _, {_, Node}, _}, _, #acc{workers=Wrkrs}=Acc) -> - RemWorkers = lists:filter(fun(S) -> S#shard.node =/= Node end, Wrkrs), - maybe_finish_get(Acc#acc{workers=RemWorkers}); -handle_get_message({Props}=SecObj, W, Acc) when is_list(Props) -> - NewAcc = Acc#acc{ - workers = (Acc#acc.workers -- [W]), - finished = [{W, SecObj} | Acc#acc.finished] - }, - maybe_finish_get(NewAcc); -handle_get_message(Error, W, Acc) -> - Dst = {W#shard.node, W#shard.name}, - twig:log(err, "Failed to get security object on ~p :: ~p", [Dst, Error]), - NewAcc = Acc#acc{workers = (Acc#acc.workers -- [W])}, - maybe_finish_set(NewAcc). - -maybe_finish_get(#acc{workers=[]}=Acc) -> - {stop, Acc}; -maybe_finish_get(Acc) -> - {ok, Acc}. http://git-wip-us.apache.org/repos/asf/couchdb/blob/753e7462/src/fabric/src/fabric_db_update_listener.erl ---------------------------------------------------------------------- diff --git a/src/fabric/src/fabric_db_update_listener.erl b/src/fabric/src/fabric_db_update_listener.erl deleted file mode 100644 index 28e5972..0000000 --- a/src/fabric/src/fabric_db_update_listener.erl +++ /dev/null @@ -1,150 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(fabric_db_update_listener). - --export([go/4, start_update_notifier/1, stop/1, wait_db_updated/1]). - --include_lib("fabric/include/fabric.hrl"). --include_lib("mem3/include/mem3.hrl"). - --record(worker, { - ref, - node, - pid -}). - --record(acc, { - parent, - state -}). - -go(Parent, ParentRef, DbName, Timeout) -> - Notifiers = start_update_notifiers(DbName), - MonRefs = lists:usort([{rexi_server, Node} || {Node, _Ref} <- Notifiers]), - RexiMon = rexi_monitor:start(MonRefs), - MonPid = start_cleanup_monitor(self(), Notifiers), - %% This is not a common pattern for rexi but to enable the calling - %% process to communicate via handle_message/3 we "fake" it as a - %% a spawned worker. - Workers = [#worker{ref=ParentRef, pid=Parent} | Notifiers], - Resp = try - receive_results(Workers, #acc{parent=Parent, state=unset}, Timeout) - after - rexi_monitor:stop(RexiMon), - stop_cleanup_monitor(MonPid) - end, - case Resp of - {ok, _} -> ok; - {error, Error} -> erlang:error(Error); - Error -> erlang:error(Error) - end. - -start_update_notifiers(DbName) -> - EndPointDict = lists:foldl(fun(#shard{node=Node, name=Name}, Acc) -> - dict:append(Node, Name, Acc) - end, dict:new(), mem3:shards(DbName)), - lists:map(fun({Node, DbNames}) -> - Ref = rexi:cast(Node, {?MODULE, start_update_notifier, [DbNames]}), - #worker{ref=Ref, node=Node} - end, dict:to_list(EndPointDict)). - -% rexi endpoint -start_update_notifier(DbNames) -> - {Caller, Ref} = get(rexi_from), - Fun = fun({_, X}) -> - case lists:member(X, DbNames) of - true -> erlang:send(Caller, {Ref, db_updated}); - false -> ok - end - end, - Id = {couch_db_update_notifier, make_ref()}, - ok = gen_event:add_sup_handler(couch_db_update, Id, Fun), - receive {gen_event_EXIT, Id, Reason} -> - rexi:reply({gen_event_EXIT, node(), Reason}) - end. - -start_cleanup_monitor(Parent, Notifiers) -> - spawn(fun() -> - Ref = erlang:monitor(process, Parent), - cleanup_monitor(Parent, Ref, Notifiers) - end). - -stop_cleanup_monitor(MonPid) -> - MonPid ! {self(), stop}. - -cleanup_monitor(Parent, Ref, Notifiers) -> - receive - {'DOWN', Ref, _, _, _} -> - stop_update_notifiers(Notifiers); - {Parent, stop} -> - stop_update_notifiers(Notifiers); - Else -> - twig:log(error, "Unkown message in ~w :: ~w", [?MODULE, Else]), - stop_update_notifiers(Notifiers), - exit(Parent, {unknown_message, Else}) - end. - -stop_update_notifiers(Notifiers) -> - [rexi:kill(Node, Ref) || #worker{node=Node, ref=Ref} <- Notifiers]. - -stop({Pid, Ref}) -> - erlang:send(Pid, {Ref, done}). - -wait_db_updated({Pid, Ref}) -> - MonRef = erlang:monitor(process, Pid), - erlang:send(Pid, {Ref, get_state}), - receive - {state, Pid, State} -> - erlang:demonitor(MonRef, [flush]), - State; - {'DOWN', MonRef, process, Pid, Reason} -> - throw({changes_feed_died, Reason}) - end. - -receive_results(Workers, Acc0, Timeout) -> - Fun = fun handle_message/3, - case rexi_utils:recv(Workers, #worker.ref, Fun, Acc0, infinity, Timeout) of - {timeout, #acc{state=updated}=Acc} -> - receive_results(Workers, Acc, Timeout); - {timeout, #acc{state=waiting}=Acc} -> - erlang:send(Acc#acc.parent, {state, self(), timeout}), - receive_results(Workers, Acc#acc{state=unset}, Timeout); - {timeout, Acc} -> - receive_results(Workers, Acc#acc{state=timeout}, Timeout); - {_, Acc} -> - {ok, Acc} - end. - - -handle_message({rexi_DOWN, _, {_, Node}, _}, _Worker, _Acc) -> - {error, {nodedown, Node}}; -handle_message({rexi_EXIT, _Reason}, Worker, _Acc) -> - {error, {worker_exit, Worker}}; -handle_message({gen_event_EXIT, Node, Reason}, _Worker, _Acc) -> - {error, {gen_event_exit, Node, Reason}}; -handle_message(db_updated, _Worker, #acc{state=waiting}=Acc) -> - % propagate message to calling controller - erlang:send(Acc#acc.parent, {state, self(), updated}), - {ok, Acc#acc{state=unset}}; -handle_message(db_updated, _Worker, Acc) -> - {ok, Acc#acc{state=updated}}; -handle_message(get_state, _Worker, #acc{state=unset}=Acc) -> - {ok, Acc#acc{state=waiting}}; -handle_message(get_state, _Worker, Acc) -> - erlang:send(Acc#acc.parent, {state, self(), Acc#acc.state}), - {ok, Acc#acc{state=unset}}; -handle_message(done, _, _) -> - {stop, ok}. - - - http://git-wip-us.apache.org/repos/asf/couchdb/blob/753e7462/src/fabric/src/fabric_dict.erl ---------------------------------------------------------------------- diff --git a/src/fabric/src/fabric_dict.erl b/src/fabric/src/fabric_dict.erl deleted file mode 100644 index a9d7fea..0000000 --- a/src/fabric/src/fabric_dict.erl +++ /dev/null @@ -1,49 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(fabric_dict). --compile(export_all). - -% Instead of ets, let's use an ordered keylist. We'll need to revisit if we -% have >> 100 shards, so a private interface is a good idea. - APK June 2010 - -init(Keys, InitialValue) -> - orddict:from_list([{Key, InitialValue} || Key <- Keys]). - - -decrement_all(Dict) -> - [{K,V-1} || {K,V} <- Dict]. - -store(Key, Value, Dict) -> - orddict:store(Key, Value, Dict). - -erase(Key, Dict) -> - orddict:erase(Key, Dict). - -update_counter(Key, Incr, Dict0) -> - orddict:update_counter(Key, Incr, Dict0). - - -lookup_element(Key, Dict) -> - couch_util:get_value(Key, Dict). - -size(Dict) -> - orddict:size(Dict). - -any(Value, Dict) -> - lists:keymember(Value, 2, Dict). - -filter(Fun, Dict) -> - orddict:filter(Fun, Dict). - -fold(Fun, Acc0, Dict) -> - orddict:fold(Fun, Acc0, Dict). http://git-wip-us.apache.org/repos/asf/couchdb/blob/753e7462/src/fabric/src/fabric_doc_attachments.erl ---------------------------------------------------------------------- diff --git a/src/fabric/src/fabric_doc_attachments.erl b/src/fabric/src/fabric_doc_attachments.erl deleted file mode 100644 index b29e20f..0000000 --- a/src/fabric/src/fabric_doc_attachments.erl +++ /dev/null @@ -1,151 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(fabric_doc_attachments). - --include_lib("fabric/include/fabric.hrl"). --include_lib("couch/include/couch_db.hrl"). - -%% couch api calls --export([receiver/2]). - -receiver(_Req, undefined) -> - <<"">>; -receiver(_Req, {unknown_transfer_encoding, Unknown}) -> - exit({unknown_transfer_encoding, Unknown}); -receiver(Req, chunked) -> - MiddleMan = spawn(fun() -> middleman(Req, chunked) end), - fun(4096, ChunkFun, ok) -> - write_chunks(MiddleMan, ChunkFun) - end; -receiver(_Req, 0) -> - <<"">>; -receiver(Req, Length) when is_integer(Length) -> - maybe_send_continue(Req), - Middleman = spawn(fun() -> middleman(Req, Length) end), - fun() -> - Middleman ! {self(), gimme_data}, - receive - {Middleman, Data} -> - rexi:reply(attachment_chunk_received), - iolist_to_binary(Data) - after 600000 -> - exit(timeout) - end - end; -receiver(_Req, Length) -> - exit({length_not_integer, Length}). - -%% -%% internal -%% - -maybe_send_continue(#httpd{mochi_req = MochiReq} = Req) -> - case couch_httpd:header_value(Req, "expect") of - undefined -> - ok; - Expect -> - case string:to_lower(Expect) of - "100-continue" -> - MochiReq:start_raw_response({100, gb_trees:empty()}); - _ -> - ok - end - end. - -write_chunks(MiddleMan, ChunkFun) -> - MiddleMan ! {self(), gimme_data}, - receive - {MiddleMan, ChunkRecordList} -> - rexi:reply(attachment_chunk_received), - case flush_chunks(ChunkRecordList, ChunkFun) of - continue -> write_chunks(MiddleMan, ChunkFun); - done -> ok - end - after 600000 -> - exit(timeout) - end. - -flush_chunks([], _ChunkFun) -> - continue; -flush_chunks([{0, _}], _ChunkFun) -> - done; -flush_chunks([Chunk | Rest], ChunkFun) -> - ChunkFun(Chunk, ok), - flush_chunks(Rest, ChunkFun). - -receive_unchunked_attachment(_Req, 0) -> - ok; -receive_unchunked_attachment(Req, Length) -> - receive {MiddleMan, go} -> - Data = couch_httpd:recv(Req, 0), - MiddleMan ! {self(), Data} - end, - receive_unchunked_attachment(Req, Length - size(Data)). - -middleman(Req, chunked) -> - % spawn a process to actually receive the uploaded data - RcvFun = fun(ChunkRecord, ok) -> - receive {From, go} -> From ! {self(), ChunkRecord} end, ok - end, - Receiver = spawn(fun() -> couch_httpd:recv_chunked(Req,4096,RcvFun,ok) end), - - % take requests from the DB writers and get data from the receiver - N = erlang:list_to_integer(config:get("cluster","n")), - middleman_loop(Receiver, N, [], []); - -middleman(Req, Length) -> - Receiver = spawn(fun() -> receive_unchunked_attachment(Req, Length) end), - N = erlang:list_to_integer(config:get("cluster","n")), - middleman_loop(Receiver, N, [], []). - -middleman_loop(Receiver, N, Counters0, ChunkList0) -> - receive {From, gimme_data} -> - % Figure out how far along this writer (From) is in the list - ListIndex = case fabric_dict:lookup_element(From, Counters0) of - undefined -> 0; - I -> I - end, - - % Talk to the receiver to get another chunk if necessary - ChunkList1 = if ListIndex == length(ChunkList0) -> - Receiver ! {self(), go}, - receive - {Receiver, ChunkRecord} -> - ChunkList0 ++ [ChunkRecord] - end; - true -> ChunkList0 end, - - % reply to the writer - Reply = lists:nthtail(ListIndex, ChunkList1), - From ! {self(), Reply}, - - % Update the counter for this writer - Counters1 = fabric_dict:update_counter(From, length(Reply), Counters0), - - % Drop any chunks that have been sent to all writers - Size = fabric_dict:size(Counters1), - NumToDrop = lists:min([I || {_, I} <- Counters1]), - - {ChunkList3, Counters3} = - if Size == N andalso NumToDrop > 0 -> - ChunkList2 = lists:nthtail(NumToDrop, ChunkList1), - Counters2 = [{F, I-NumToDrop} || {F, I} <- Counters1], - {ChunkList2, Counters2}; - true -> - {ChunkList1, Counters1} - end, - - middleman_loop(Receiver, N, Counters3, ChunkList3) - after 10000 -> - ok - end. http://git-wip-us.apache.org/repos/asf/couchdb/blob/753e7462/src/fabric/src/fabric_doc_missing_revs.erl ---------------------------------------------------------------------- diff --git a/src/fabric/src/fabric_doc_missing_revs.erl b/src/fabric/src/fabric_doc_missing_revs.erl deleted file mode 100644 index ec154ee..0000000 --- a/src/fabric/src/fabric_doc_missing_revs.erl +++ /dev/null @@ -1,88 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(fabric_doc_missing_revs). - --export([go/2, go/3]). - --include_lib("fabric/include/fabric.hrl"). --include_lib("mem3/include/mem3.hrl"). - -go(DbName, AllIdsRevs) -> - go(DbName, AllIdsRevs, []). - -go(DbName, AllIdsRevs, Options) -> - Workers = lists:map(fun({#shard{name=Name, node=Node} = Shard, IdsRevs}) -> - Ref = rexi:cast(Node, {fabric_rpc, get_missing_revs, [Name, IdsRevs, - Options]}), - Shard#shard{ref=Ref} - end, group_idrevs_by_shard(DbName, AllIdsRevs)), - ResultDict = dict:from_list([{Id, {{nil,Revs},[]}} || {Id, Revs} <- AllIdsRevs]), - RexiMon = fabric_util:create_monitors(Workers), - Acc0 = {length(Workers), ResultDict, Workers}, - try - fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0) - after - rexi_monitor:stop(RexiMon) - end. - -handle_message({rexi_DOWN, _, {_,NodeRef},_}, _Shard, {_WorkerLen, ResultDict, Workers}) -> - NewWorkers = [W || #shard{node=Node} = W <- Workers, Node =/= NodeRef], - skip_message({fabric_dict:size(NewWorkers), ResultDict, NewWorkers}); -handle_message({rexi_EXIT, _}, Worker, {W, D, Workers}) -> - skip_message({W-1,D,lists:delete(Worker, Workers)}); -handle_message({ok, Results}, _Worker, {1, D0, _}) -> - D = update_dict(D0, Results), - {stop, dict:fold(fun force_reply/3, [], D)}; -handle_message({ok, Results}, Worker, {WaitingCount, D0, Workers}) -> - D = update_dict(D0, Results), - case dict:fold(fun maybe_reply/3, {stop, []}, D) of - continue -> - % still haven't heard about some Ids - {ok, {WaitingCount - 1, D, lists:delete(Worker,Workers)}}; - {stop, FinalReply} -> - % finished, stop the rest of the jobs - fabric_util:cleanup(lists:delete(Worker,Workers)), - {stop, FinalReply} - end. - -force_reply(Id, {{nil,Revs}, Anc}, Acc) -> - % never heard about this ID, assume it's missing - [{Id, Revs, Anc} | Acc]; -force_reply(_, {[], _}, Acc) -> - Acc; -force_reply(Id, {Revs, Anc}, Acc) -> - [{Id, Revs, Anc} | Acc]. - -maybe_reply(_, _, continue) -> - continue; -maybe_reply(_, {{nil, _}, _}, _) -> - continue; -maybe_reply(_, {[], _}, {stop, Acc}) -> - {stop, Acc}; -maybe_reply(Id, {Revs, Anc}, {stop, Acc}) -> - {stop, [{Id, Revs, Anc} | Acc]}. - -group_idrevs_by_shard(DbName, IdsRevs) -> - dict:to_list(lists:foldl(fun({Id, Revs}, D0) -> - lists:foldl(fun(Shard, D1) -> - dict:append(Shard, {Id, Revs}, D1) - end, D0, mem3:shards(DbName,Id)) - end, dict:new(), IdsRevs)). - -update_dict(D0, KVs) -> - lists:foldl(fun({K,V,A}, D1) -> dict:store(K, {V,A}, D1) end, D0, KVs). - -skip_message({0, Dict, _Workers}) -> - {stop, dict:fold(fun force_reply/3, [], Dict)}; -skip_message(Acc) -> - {ok, Acc}. http://git-wip-us.apache.org/repos/asf/couchdb/blob/753e7462/src/fabric/src/fabric_doc_open.erl ---------------------------------------------------------------------- diff --git a/src/fabric/src/fabric_doc_open.erl b/src/fabric/src/fabric_doc_open.erl deleted file mode 100644 index caa389e..0000000 --- a/src/fabric/src/fabric_doc_open.erl +++ /dev/null @@ -1,470 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(fabric_doc_open). - --export([go/3]). - --include_lib("fabric/include/fabric.hrl"). --include_lib("mem3/include/mem3.hrl"). --include_lib("couch/include/couch_db.hrl"). - - --record(acc, { - dbname, - workers, - r, - state, - replies, - q_reply -}). - - -go(DbName, Id, Options) -> - Workers = fabric_util:submit_jobs(mem3:shards(DbName,Id), open_doc, - [Id, [deleted|Options]]), - SuppressDeletedDoc = not lists:member(deleted, Options), - N = mem3:n(DbName), - R = couch_util:get_value(r, Options, integer_to_list(mem3:quorum(DbName))), - Acc0 = #acc{ - dbname = DbName, - workers = Workers, - r = erlang:min(N, list_to_integer(R)), - state = r_not_met, - replies = [] - }, - RexiMon = fabric_util:create_monitors(Workers), - try fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0) of - {ok, #acc{}=Acc} -> - Reply = handle_response(Acc), - format_reply(Reply, SuppressDeletedDoc); - Error -> - Error - after - rexi_monitor:stop(RexiMon) - end. - -handle_message({rexi_DOWN, _, {_, Node}, _}, _Worker, Acc) -> - NewWorkers = [W || #shard{node=N}=W <- Acc#acc.workers, N /= Node], - case NewWorkers of - [] -> - {stop, Acc#acc{workers=[]}}; - _ -> - {ok, Acc#acc{workers=NewWorkers}} - end; -handle_message({rexi_EXIT, _Reason}, Worker, Acc) -> - NewWorkers = lists:delete(Worker, Acc#acc.workers), - case NewWorkers of - [] -> - {stop, Acc#acc{workers=[]}}; - _ -> - {ok, Acc#acc{workers=NewWorkers}} - end; -handle_message(Reply, Worker, Acc) -> - NewReplies = fabric_util:update_counter(Reply, 1, Acc#acc.replies), - NewAcc = Acc#acc{replies = NewReplies}, - case is_r_met(Acc#acc.workers, NewReplies, Acc#acc.r) of - {true, QuorumReply} -> - fabric_util:cleanup(lists:delete(Worker, Acc#acc.workers)), - {stop, NewAcc#acc{workers=[], state=r_met, q_reply=QuorumReply}}; - wait_for_more -> - NewWorkers = lists:delete(Worker, Acc#acc.workers), - {ok, NewAcc#acc{workers=NewWorkers}}; - no_more_workers -> - {stop, NewAcc#acc{workers=[]}} - end. - -handle_response(#acc{state=r_met, replies=Replies, q_reply=QuorumReply}=Acc) -> - case {Replies, fabric_util:remove_ancestors(Replies, [])} of - {[_], [_]} -> - % Complete agreement amongst all copies - QuorumReply; - {[_|_], [{_, {QuorumReply, _}}]} -> - % Any divergent replies are ancestors of the QuorumReply, - % repair the document asynchronously - spawn(fun() -> read_repair(Acc) end), - QuorumReply; - _Else -> - % real disagreement amongst the workers, block for the repair - read_repair(Acc) - end; -handle_response(Acc) -> - read_repair(Acc). - -is_r_met(Workers, Replies, R) -> - case lists:dropwhile(fun({_,{_, Count}}) -> Count < R end, Replies) of - [{_,{QuorumReply, _}} | _] -> - {true, QuorumReply}; - [] when length(Workers) > 1 -> - wait_for_more; - [] -> - no_more_workers - end. - -read_repair(#acc{dbname=DbName, replies=Replies}) -> - Docs = [Doc || {_, {{ok, #doc{}=Doc}, _}} <- Replies], - case Docs of - % omit local docs from read repair - [#doc{id = <>} | _] -> - choose_reply(Docs); - [#doc{id=Id} | _] -> - Ctx = #user_ctx{roles=[<<"_admin">>]}, - Opts = [replicated_changes, {user_ctx, Ctx}], - Res = fabric:update_docs(DbName, Docs, Opts), - twig:log(notice, "read_repair ~s ~s ~p", [DbName, Id, Res]), - choose_reply(Docs); - [] -> - % Try hard to return some sort of information - % to the client. - Values = [V || {_, {V, _}} <- Replies], - case lists:member({not_found, missing}, Values) of - true -> - {not_found, missing}; - false when length(Values) > 0 -> - % Sort for stability in responses in - % case we have some weird condition - hd(lists:sort(Values)); - false -> - {error, read_failure} - end - end. - -choose_reply(Docs) -> - % Sort descending by {not deleted, rev}. This should match - % the logic of couch_doc:to_doc_info/1. - [Winner | _] = lists:sort(fun(DocA, DocB) -> - InfoA = {not DocA#doc.deleted, DocA#doc.revs}, - InfoB = {not DocB#doc.deleted, DocB#doc.revs}, - InfoA > InfoB - end, Docs), - {ok, Winner}. - -format_reply({ok, #doc{deleted=true}}, true) -> - {not_found, deleted}; -format_reply(Else, _) -> - Else. - - -is_r_met_test() -> - Workers0 = [], - Workers1 = [nil], - Workers2 = [nil,nil], - - % Successful cases - - ?assertEqual( - {true, foo}, - is_r_met([], [fabric_util:kv(foo,2)], 2) - ), - - ?assertEqual( - {true, foo}, - is_r_met([], [fabric_util:kv(foo,3)], 2) - ), - - ?assertEqual( - {true, foo}, - is_r_met([], [fabric_util:kv(foo,1)], 1) - ), - - ?assertEqual( - {true, foo}, - is_r_met([], [fabric_util:kv(foo,2), fabric_util:kv(bar,1)], 2) - ), - - ?assertEqual( - {true, bar}, - is_r_met([], [fabric_util:kv(bar,1), fabric_util:kv(bar,2)], 2) - ), - - ?assertEqual( - {true, bar}, - is_r_met([], [fabric_util:kv(bar,2), fabric_util:kv(foo,1)], 2) - ), - - % Not met, but wait for more messages - - ?assertEqual( - wait_for_more, - is_r_met(Workers2, [fabric_util:kv(foo,1)], 2) - ), - - ?assertEqual( - wait_for_more, - is_r_met(Workers2, [fabric_util:kv(foo,2)], 3) - ), - - ?assertEqual( - wait_for_more, - is_r_met(Workers2, [fabric_util:kv(foo,1), fabric_util:kv(bar,1)], 2) - ), - - % Not met, bail out - - ?assertEqual( - no_more_workers, - is_r_met(Workers0, [fabric_util:kv(foo,1)], 2) - ), - - ?assertEqual( - no_more_workers, - is_r_met(Workers1, [fabric_util:kv(foo,1)], 2) - ), - - ?assertEqual( - no_more_workers, - is_r_met(Workers1, [fabric_util:kv(foo,1), fabric_util:kv(bar,1)], 2) - ), - - ?assertEqual( - no_more_workers, - is_r_met(Workers1, [fabric_util:kv(foo,2)], 3) - ), - - ok. - -handle_message_down_test() -> - Node0 = 'foo@localhost', - Node1 = 'bar@localhost', - Down0 = {rexi_DOWN, nil, {nil, Node0}, nil}, - Down1 = {rexi_DOWN, nil, {nil, Node1}, nil}, - Workers0 = [#shard{node=Node0} || _ <- [a, b]], - Worker1 = #shard{node=Node1}, - Workers1 = Workers0 ++ [Worker1], - - % Stop when no more workers are left - ?assertEqual( - {stop, #acc{workers=[]}}, - handle_message(Down0, nil, #acc{workers=Workers0}) - ), - - % Continue when we have more workers - ?assertEqual( - {ok, #acc{workers=[Worker1]}}, - handle_message(Down0, nil, #acc{workers=Workers1}) - ), - - % A second DOWN removes the remaining workers - ?assertEqual( - {stop, #acc{workers=[]}}, - handle_message(Down1, nil, #acc{workers=[Worker1]}) - ), - - ok. - -handle_message_exit_test() -> - Exit = {rexi_EXIT, nil}, - Worker0 = #shard{ref=erlang:make_ref()}, - Worker1 = #shard{ref=erlang:make_ref()}, - - % Only removes the specified worker - ?assertEqual( - {ok, #acc{workers=[Worker1]}}, - handle_message(Exit, Worker0, #acc{workers=[Worker0, Worker1]}) - ), - - ?assertEqual( - {ok, #acc{workers=[Worker0]}}, - handle_message(Exit, Worker1, #acc{workers=[Worker0, Worker1]}) - ), - - % We bail if it was the last worker - ?assertEqual( - {stop, #acc{workers=[]}}, - handle_message(Exit, Worker0, #acc{workers=[Worker0]}) - ), - - ok. - -handle_message_reply_test() -> - start_meck_(), - meck:expect(rexi, kill, fun(_, _) -> ok end), - - Worker0 = #shard{ref=erlang:make_ref()}, - Worker1 = #shard{ref=erlang:make_ref()}, - Worker2 = #shard{ref=erlang:make_ref()}, - Workers = [Worker0, Worker1, Worker2], - Acc0 = #acc{workers=Workers, r=2, replies=[]}, - - % Test that we continue when we haven't met R yet - ?assertEqual( - {ok, Acc0#acc{ - workers=[Worker0, Worker1], - replies=[fabric_util:kv(foo,1)] - }}, - handle_message(foo, Worker2, Acc0) - ), - - ?assertEqual( - {ok, Acc0#acc{ - workers=[Worker0, Worker1], - replies=[fabric_util:kv(bar,1), fabric_util:kv(foo,1)] - }}, - handle_message(bar, Worker2, Acc0#acc{ - replies=[fabric_util:kv(foo,1)] - }) - ), - - % Test that we don't get a quorum when R isn't met. q_reply - % isn't set and state remains unchanged and {stop, NewAcc} - % is returned. Bit subtle on the assertions here. - - ?assertEqual( - {stop, Acc0#acc{workers=[],replies=[fabric_util:kv(foo,1)]}}, - handle_message(foo, Worker0, Acc0#acc{workers=[Worker0]}) - ), - - ?assertEqual( - {stop, Acc0#acc{ - workers=[], - replies=[fabric_util:kv(bar,1), fabric_util:kv(foo,1)] - }}, - handle_message(bar, Worker0, Acc0#acc{ - workers=[Worker0], - replies=[fabric_util:kv(foo,1)] - }) - ), - - % Check that when R is met we stop with a new state and - % a q_reply. - - ?assertEqual( - {stop, Acc0#acc{ - workers=[], - replies=[fabric_util:kv(foo,2)], - state=r_met, - q_reply=foo - }}, - handle_message(foo, Worker1, Acc0#acc{ - workers=[Worker0, Worker1], - replies=[fabric_util:kv(foo,1)] - }) - ), - - ?assertEqual( - {stop, Acc0#acc{ - workers=[], - r=1, - replies=[fabric_util:kv(foo,1)], - state=r_met, - q_reply=foo - }}, - handle_message(foo, Worker0, Acc0#acc{r=1}) - ), - - ?assertEqual( - {stop, Acc0#acc{ - workers=[], - replies=[fabric_util:kv(bar,1), fabric_util:kv(foo,2)], - state=r_met, - q_reply=foo - }}, - handle_message(foo, Worker0, Acc0#acc{ - workers=[Worker0], - replies=[fabric_util:kv(bar,1), fabric_util:kv(foo,1)] - }) - ), - - stop_meck_(), - ok. - -read_repair_test() -> - start_meck_(), - meck:expect(twig, log, fun(_, _, _) -> ok end), - - Foo1 = {ok, #doc{revs = {1,[<<"foo">>]}}}, - Foo2 = {ok, #doc{revs = {2,[<<"foo2">>,<<"foo">>]}}}, - NFM = {not_found, missing}, - - % Test when we have actual doc data to repair - - meck:expect(fabric, update_docs, fun(_, [_], _) -> {ok, []} end), - Acc0 = #acc{ - dbname = <<"name">>, - replies = [fabric_util:kv(Foo1,1)] - }, - ?assertEqual(Foo1, read_repair(Acc0)), - - meck:expect(fabric, update_docs, fun(_, [_, _], _) -> {ok, []} end), - Acc1 = #acc{ - dbname = <<"name">>, - replies = [fabric_util:kv(Foo1,1), fabric_util:kv(Foo2,1)] - }, - ?assertEqual(Foo2, read_repair(Acc1)), - - % Test when we have nothing but errors - - Acc2 = #acc{replies=[fabric_util:kv(NFM, 1)]}, - ?assertEqual(NFM, read_repair(Acc2)), - - Acc3 = #acc{replies=[fabric_util:kv(NFM,1), fabric_util:kv(foo,2)]}, - ?assertEqual(NFM, read_repair(Acc3)), - - Acc4 = #acc{replies=[fabric_util:kv(foo,1), fabric_util:kv(bar,1)]}, - ?assertEqual(bar, read_repair(Acc4)), - - stop_meck_(), - ok. - -handle_response_quorum_met_test() -> - start_meck_(), - meck:expect(twig, log, fun(_, _, _) -> ok end), - meck:expect(fabric, update_docs, fun(_, _, _) -> {ok, []} end), - - Foo1 = {ok, #doc{revs = {1,[<<"foo">>]}}}, - Foo2 = {ok, #doc{revs = {2,[<<"foo2">>,<<"foo">>]}}}, - Bar1 = {ok, #doc{revs = {1,[<<"bar">>]}}}, - - BasicOkAcc = #acc{ - state=r_met, - replies=[fabric_util:kv(Foo1,2)], - q_reply=Foo1 - }, - ?assertEqual(Foo1, handle_response(BasicOkAcc)), - - WithAncestorsAcc = #acc{ - state=r_met, - replies=[fabric_util:kv(Foo1,1), fabric_util:kv(Foo2,2)], - q_reply=Foo2 - }, - ?assertEqual(Foo2, handle_response(WithAncestorsAcc)), - - % This also checks when the quorum isn't the most recent - % revision. - DeeperWinsAcc = #acc{ - state=r_met, - replies=[fabric_util:kv(Foo1,2), fabric_util:kv(Foo2,1)], - q_reply=Foo1 - }, - ?assertEqual(Foo2, handle_response(DeeperWinsAcc)), - - % Check that we return the proper doc based on rev - % (ie, pos is equal) - BiggerRevWinsAcc = #acc{ - state=r_met, - replies=[fabric_util:kv(Foo1,1), fabric_util:kv(Bar1,2)], - q_reply=Bar1 - }, - ?assertEqual(Foo1, handle_response(BiggerRevWinsAcc)), - - % r_not_met is a proxy to read_repair so we rely on - % read_repair_test for those conditions. - - stop_meck_(), - ok. - - -start_meck_() -> - meck:new([twig, rexi, fabric]). - -stop_meck_() -> - meck:unload([twig, rexi, fabric]).