Return-Path: X-Original-To: apmail-couchdb-commits-archive@www.apache.org Delivered-To: apmail-couchdb-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A8B5F108DB for ; Tue, 4 Feb 2014 23:51:51 +0000 (UTC) Received: (qmail 54199 invoked by uid 500); 4 Feb 2014 23:47:09 -0000 Delivered-To: apmail-couchdb-commits-archive@couchdb.apache.org Received: (qmail 52310 invoked by uid 500); 4 Feb 2014 23:44:35 -0000 Mailing-List: contact commits-help@couchdb.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@couchdb.apache.org Delivered-To: mailing list commits@couchdb.apache.org Received: (qmail 52066 invoked by uid 99); 4 Feb 2014 23:44:17 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 04 Feb 2014 23:44:17 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id CD54491B0B9; Tue, 4 Feb 2014 23:44:14 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: davisp@apache.org To: commits@couchdb.apache.org Date: Tue, 04 Feb 2014 23:44:20 -0000 Message-Id: In-Reply-To: <6f807b3972824f1c95b927d0faf2f925@git.apache.org> References: <6f807b3972824f1c95b927d0faf2f925@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [07/44] Remove src/couch http://git-wip-us.apache.org/repos/asf/couchdb/blob/ed98610c/src/couch/src/couch_key_tree.erl ---------------------------------------------------------------------- diff --git a/src/couch/src/couch_key_tree.erl b/src/couch/src/couch_key_tree.erl deleted file mode 100644 index a7f6bb2..0000000 --- a/src/couch/src/couch_key_tree.erl +++ /dev/null @@ -1,422 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - -%% @doc Data structure used to represent document edit histories. - -%% A key tree is used to represent the edit history of a document. Each node of -%% the tree represents a particular version. Relations between nodes represent -%% the order that these edits were applied. For instance, a set of three edits -%% would produce a tree of versions A->B->C indicating that edit C was based on -%% version B which was in turn based on A. In a world without replication (and -%% no ability to disable MVCC checks), all histories would be forced to be -%% linear lists of edits due to constraints imposed by MVCC (ie, new edits must -%% be based on the current version). However, we have replication, so we must -%% deal with not so easy cases, which lead to trees. -%% -%% Consider a document in state A. This doc is replicated to a second node. We -%% then edit the document on each node leaving it in two different states, B -%% and C. We now have two key trees, A->B and A->C. When we go to replicate a -%% second time, the key tree must combine these two trees which gives us -%% A->(B|C). This is how conflicts are introduced. In terms of the key tree, we -%% say that we have two leaves (B and C) that are not deleted. The presense of -%% the multiple leaves indicate conflict. To remove a conflict, one of the -%% edits (B or C) can be deleted, which results in, A->(B|C->D) where D is an -%% edit that is specially marked with the a deleted=true flag. -%% -%% What makes this a bit more complicated is that there is a limit to the -%% number of revisions kept, specified in couch_db.hrl (default is 1000). When -%% this limit is exceeded only the last 1000 are kept. This comes in to play -%% when branches are merged. The comparison has to begin at the same place in -%% the branches. A revision id is of the form N-XXXXXXX where N is the current -%% revision. So each path will have a start number, calculated in -%% couch_doc:to_path using the formula N - length(RevIds) + 1 So, .eg. if a doc -%% was edit 1003 times this start number would be 4, indicating that 3 -%% revisions were truncated. -%% -%% This comes into play in @see merge_at/3 which recursively walks down one -%% tree or the other until they begin at the same revision. - --module(couch_key_tree). - --export([merge/3, find_missing/2, get_key_leafs/2, get_full_key_paths/2, get/2]). --export([get_all_leafs/1, count_leafs/1, remove_leafs/2, get_all_leafs_full/1, stem/2]). --export([map/2, mapfold/3, map_leafs/2, fold/3]). - --include_lib("couch/include/couch_db.hrl"). - -%% @doc Merge a path with a list of paths and stem to the given length. --spec merge([path()], path(), pos_integer()) -> {[path()], - conflicts | no_conflicts}. -merge(Paths, Path, Depth) -> - {Merged, Conflicts} = merge(Paths, Path), - {stem(Merged, Depth), Conflicts}. - -%% @doc Merge a path with an existing list of paths, returning a new list of -%% paths. A return of conflicts indicates a new conflict was discovered in this -%% merge. Conflicts may already exist in the original list of paths. --spec merge([path()], path()) -> {[path()], conflicts | no_conflicts}. -merge(Paths, Path) -> - {ok, Merged, HasConflicts} = merge_one(Paths, Path, [], false), - if HasConflicts -> - Conflicts = conflicts; - (length(Merged) =/= length(Paths)) and (length(Merged) =/= 1) -> - Conflicts = conflicts; - true -> - Conflicts = no_conflicts - end, - {lists:sort(Merged), Conflicts}. - --spec merge_one(Original::[path()], Inserted::path(), [path()], boolean()) -> - {ok, Merged::[path()], NewConflicts::boolean()}. -merge_one([], Insert, OutAcc, ConflictsAcc) -> - {ok, [Insert | OutAcc], ConflictsAcc}; -merge_one([{Start, Tree}|Rest], {StartInsert, TreeInsert}, Acc, HasConflicts) -> - case merge_at([Tree], StartInsert - Start, [TreeInsert]) of - {ok, [Merged], Conflicts} -> - MergedStart = lists:min([Start, StartInsert]), - {ok, Rest ++ [{MergedStart, Merged} | Acc], Conflicts or HasConflicts}; - no -> - AccOut = [{Start, Tree} | Acc], - merge_one(Rest, {StartInsert, TreeInsert}, AccOut, HasConflicts) - end. - --spec merge_at(tree(), Place::integer(), tree()) -> - {ok, Merged::tree(), HasConflicts::boolean()} | no. -merge_at(_Ours, _Place, []) -> - no; -merge_at([], _Place, _Insert) -> - no; -merge_at([{Key, Value, SubTree}|Sibs], Place, InsertTree) when Place > 0 -> - % inserted starts later than committed, need to drill into committed subtree - case merge_at(SubTree, Place - 1, InsertTree) of - {ok, Merged, Conflicts} -> - {ok, [{Key, Value, Merged} | Sibs], Conflicts}; - no -> - % first branch didn't merge, move to next branch - case merge_at(Sibs, Place, InsertTree) of - {ok, Merged, Conflicts} -> - {ok, [{Key, Value, SubTree} | Merged], Conflicts}; - no -> - no - end - end; -merge_at(OurTree, Place, [{Key, Value, SubTree}]) when Place < 0 -> - % inserted starts earlier than committed, need to drill into insert subtree - case merge_at(OurTree, Place + 1, SubTree) of - {ok, Merged, Conflicts} -> - {ok, [{Key, Value, Merged}], Conflicts}; - no -> - no - end; -merge_at([{Key, V1, SubTree}|Sibs], 0, [{Key, V2, InsertSubTree}]) -> - {Merged, Conflicts} = merge_simple(SubTree, InsertSubTree), - {ok, [{Key, value_pref(V1, V2), Merged} | Sibs], Conflicts}; -merge_at([{OurKey, _, _} | _], 0, [{Key, _, _}]) when OurKey > Key -> - % siblings keys are ordered, no point in continuing - no; -merge_at([Tree | Sibs], 0, InsertTree) -> - case merge_at(Sibs, 0, InsertTree) of - {ok, Merged, Conflicts} -> - {ok, [Tree | Merged], Conflicts}; - no -> - no - end. - -% key tree functions - --spec merge_simple(tree(), tree()) -> {Merged::tree(), NewConflicts::boolean()}. -merge_simple([], B) -> - {B, false}; -merge_simple(A, []) -> - {A, false}; -merge_simple([{Key, V1, SubA} | NextA], [{Key, V2, SubB} | NextB]) -> - {MergedSubTree, Conflict1} = merge_simple(SubA, SubB), - {MergedNextTree, Conflict2} = merge_simple(NextA, NextB), - Value = value_pref(V1, V2), - {[{Key, Value, MergedSubTree} | MergedNextTree], Conflict1 or Conflict2}; -merge_simple([{A, _, _} = Tree | Next], [{B, _, _} | _] = Insert) when A < B -> - {Merged, Conflict} = merge_simple(Next, Insert), - % if Merged has more branches than the input we added a new conflict - {[Tree | Merged], Conflict orelse (length(Merged) > length(Next))}; -merge_simple(Ours, [Tree | Next]) -> - {Merged, Conflict} = merge_simple(Ours, Next), - {[Tree | Merged], Conflict orelse (length(Merged) > length(Next))}. - -find_missing(_Tree, []) -> - []; -find_missing([], SeachKeys) -> - SeachKeys; -find_missing([{Start, {Key, Value, SubTree}} | RestTree], SeachKeys) -> - PossibleKeys = [{KeyPos, KeyValue} || {KeyPos, KeyValue} <- SeachKeys, KeyPos >= Start], - ImpossibleKeys = [{KeyPos, KeyValue} || {KeyPos, KeyValue} <- SeachKeys, KeyPos < Start], - Missing = find_missing_simple(Start, [{Key, Value, SubTree}], PossibleKeys), - find_missing(RestTree, ImpossibleKeys ++ Missing). - -find_missing_simple(_Pos, _Tree, []) -> - []; -find_missing_simple(_Pos, [], SeachKeys) -> - SeachKeys; -find_missing_simple(Pos, [{Key, _, SubTree} | RestTree], SeachKeys) -> - PossibleKeys = [{KeyPos, KeyValue} || {KeyPos, KeyValue} <- SeachKeys, KeyPos >= Pos], - ImpossibleKeys = [{KeyPos, KeyValue} || {KeyPos, KeyValue} <- SeachKeys, KeyPos < Pos], - - SrcKeys2 = PossibleKeys -- [{Pos, Key}], - SrcKeys3 = find_missing_simple(Pos + 1, SubTree, SrcKeys2), - ImpossibleKeys ++ find_missing_simple(Pos, RestTree, SrcKeys3). - - -filter_leafs([], _Keys, FilteredAcc, RemovedKeysAcc) -> - {FilteredAcc, RemovedKeysAcc}; -filter_leafs([{Pos, [{LeafKey, _}|_]} = Path |Rest], Keys, FilteredAcc, RemovedKeysAcc) -> - FilteredKeys = lists:delete({Pos, LeafKey}, Keys), - if FilteredKeys == Keys -> - % this leaf is not a key we are looking to remove - filter_leafs(Rest, Keys, [Path | FilteredAcc], RemovedKeysAcc); - true -> - % this did match a key, remove both the node and the input key - filter_leafs(Rest, FilteredKeys, FilteredAcc, [{Pos, LeafKey} | RemovedKeysAcc]) - end. - -% Removes any branches from the tree whose leaf node(s) are in the Keys -remove_leafs(Trees, Keys) -> - % flatten each branch in a tree into a tree path - Paths = get_all_leafs_full(Trees), - - % filter out any that are in the keys list. - {FilteredPaths, RemovedKeys} = filter_leafs(Paths, Keys, [], []), - - SortedPaths = lists:sort( - [{Pos + 1 - length(Path), Path} || {Pos, Path} <- FilteredPaths] - ), - - % convert paths back to trees - NewTree = lists:foldl( - fun({StartPos, Path},TreeAcc) -> - [SingleTree] = lists:foldl( - fun({K,V},NewTreeAcc) -> [{K,V,NewTreeAcc}] end, [], Path), - {NewTrees, _} = merge(TreeAcc, {StartPos, SingleTree}), - NewTrees - end, [], SortedPaths), - {NewTree, RemovedKeys}. - - -% get the leafs in the tree matching the keys. The matching key nodes can be -% leafs or an inner nodes. If an inner node, then the leafs for that node -% are returned. -get_key_leafs(Tree, Keys) -> - get_key_leafs(Tree, Keys, []). - -get_key_leafs(_, [], Acc) -> - {Acc, []}; -get_key_leafs([], Keys, Acc) -> - {Acc, Keys}; -get_key_leafs([{Pos, Tree}|Rest], Keys, Acc) -> - {Gotten, RemainingKeys} = get_key_leafs_simple(Pos, [Tree], Keys, []), - get_key_leafs(Rest, RemainingKeys, Gotten ++ Acc). - -get_key_leafs_simple(_Pos, _Tree, [], _KeyPathAcc) -> - {[], []}; -get_key_leafs_simple(_Pos, [], KeysToGet, _KeyPathAcc) -> - {[], KeysToGet}; -get_key_leafs_simple(Pos, [{Key, _Value, SubTree}=Tree | RestTree], KeysToGet, KeyPathAcc) -> - case lists:delete({Pos, Key}, KeysToGet) of - KeysToGet -> % same list, key not found - {LeafsFound, KeysToGet2} = get_key_leafs_simple(Pos + 1, SubTree, KeysToGet, [Key | KeyPathAcc]), - {RestLeafsFound, KeysRemaining} = get_key_leafs_simple(Pos, RestTree, KeysToGet2, KeyPathAcc), - {LeafsFound ++ RestLeafsFound, KeysRemaining}; - KeysToGet2 -> - LeafsFound = get_all_leafs_simple(Pos, [Tree], KeyPathAcc), - LeafKeysFound = [{LeafPos, LeafRev} || {_, {LeafPos, [LeafRev|_]}} - <- LeafsFound], - KeysToGet3 = KeysToGet2 -- LeafKeysFound, - {RestLeafsFound, KeysRemaining} = get_key_leafs_simple(Pos, RestTree, KeysToGet3, KeyPathAcc), - {LeafsFound ++ RestLeafsFound, KeysRemaining} - end. - -get(Tree, KeysToGet) -> - {KeyPaths, KeysNotFound} = get_full_key_paths(Tree, KeysToGet), - FixedResults = [ {Value, {Pos, [Key0 || {Key0, _} <- Path]}} || {Pos, [{_Key, Value}|_]=Path} <- KeyPaths], - {FixedResults, KeysNotFound}. - -get_full_key_paths(Tree, Keys) -> - get_full_key_paths(Tree, Keys, []). - -get_full_key_paths(_, [], Acc) -> - {Acc, []}; -get_full_key_paths([], Keys, Acc) -> - {Acc, Keys}; -get_full_key_paths([{Pos, Tree}|Rest], Keys, Acc) -> - {Gotten, RemainingKeys} = get_full_key_paths(Pos, [Tree], Keys, []), - get_full_key_paths(Rest, RemainingKeys, Gotten ++ Acc). - - -get_full_key_paths(_Pos, _Tree, [], _KeyPathAcc) -> - {[], []}; -get_full_key_paths(_Pos, [], KeysToGet, _KeyPathAcc) -> - {[], KeysToGet}; -get_full_key_paths(Pos, [{KeyId, Value, SubTree} | RestTree], KeysToGet, KeyPathAcc) -> - KeysToGet2 = KeysToGet -- [{Pos, KeyId}], - CurrentNodeResult = - case length(KeysToGet2) =:= length(KeysToGet) of - true -> % not in the key list. - []; - false -> % this node is the key list. return it - [{Pos, [{KeyId, Value} | KeyPathAcc]}] - end, - {KeysGotten, KeysRemaining} = get_full_key_paths(Pos + 1, SubTree, KeysToGet2, [{KeyId, Value} | KeyPathAcc]), - {KeysGotten2, KeysRemaining2} = get_full_key_paths(Pos, RestTree, KeysRemaining, KeyPathAcc), - {CurrentNodeResult ++ KeysGotten ++ KeysGotten2, KeysRemaining2}. - -get_all_leafs_full(Tree) -> - get_all_leafs_full(Tree, []). - -get_all_leafs_full([], Acc) -> - Acc; -get_all_leafs_full([{Pos, Tree} | Rest], Acc) -> - get_all_leafs_full(Rest, get_all_leafs_full_simple(Pos, [Tree], []) ++ Acc). - -get_all_leafs_full_simple(_Pos, [], _KeyPathAcc) -> - []; -get_all_leafs_full_simple(Pos, [{KeyId, Value, []} | RestTree], KeyPathAcc) -> - [{Pos, [{KeyId, Value} | KeyPathAcc]} | get_all_leafs_full_simple(Pos, RestTree, KeyPathAcc)]; -get_all_leafs_full_simple(Pos, [{KeyId, Value, SubTree} | RestTree], KeyPathAcc) -> - get_all_leafs_full_simple(Pos + 1, SubTree, [{KeyId, Value} | KeyPathAcc]) ++ get_all_leafs_full_simple(Pos, RestTree, KeyPathAcc). - -get_all_leafs(Trees) -> - get_all_leafs(Trees, []). - -get_all_leafs([], Acc) -> - Acc; -get_all_leafs([{Pos, Tree}|Rest], Acc) -> - get_all_leafs(Rest, get_all_leafs_simple(Pos, [Tree], []) ++ Acc). - -get_all_leafs_simple(_Pos, [], _KeyPathAcc) -> - []; -get_all_leafs_simple(Pos, [{KeyId, Value, []} | RestTree], KeyPathAcc) -> - [{Value, {Pos, [KeyId | KeyPathAcc]}} | get_all_leafs_simple(Pos, RestTree, KeyPathAcc)]; -get_all_leafs_simple(Pos, [{KeyId, _Value, SubTree} | RestTree], KeyPathAcc) -> - get_all_leafs_simple(Pos + 1, SubTree, [KeyId | KeyPathAcc]) ++ get_all_leafs_simple(Pos, RestTree, KeyPathAcc). - - -count_leafs([]) -> - 0; -count_leafs([{_Pos,Tree}|Rest]) -> - count_leafs_simple([Tree]) + count_leafs(Rest). - -count_leafs_simple([]) -> - 0; -count_leafs_simple([{_Key, _Value, []} | RestTree]) -> - 1 + count_leafs_simple(RestTree); -count_leafs_simple([{_Key, _Value, SubTree} | RestTree]) -> - count_leafs_simple(SubTree) + count_leafs_simple(RestTree). - - -fold(_Fun, Acc, []) -> - Acc; -fold(Fun, Acc0, [{Pos, Tree}|Rest]) -> - Acc1 = fold_simple(Fun, Acc0, Pos, [Tree]), - fold(Fun, Acc1, Rest). - -fold_simple(_Fun, Acc, _Pos, []) -> - Acc; -fold_simple(Fun, Acc0, Pos, [{Key, Value, SubTree} | RestTree]) -> - Type = if SubTree == [] -> leaf; true -> branch end, - Acc1 = Fun({Pos, Key}, Value, Type, Acc0), - Acc2 = fold_simple(Fun, Acc1, Pos+1, SubTree), - fold_simple(Fun, Acc2, Pos, RestTree). - - -map(_Fun, []) -> - []; -map(Fun, [{Pos, Tree}|Rest]) -> - case erlang:fun_info(Fun, arity) of - {arity, 2} -> - [NewTree] = map_simple(fun(A,B,_C) -> Fun(A,B) end, Pos, [Tree]), - [{Pos, NewTree} | map(Fun, Rest)]; - {arity, 3} -> - [NewTree] = map_simple(Fun, Pos, [Tree]), - [{Pos, NewTree} | map(Fun, Rest)] - end. - -map_simple(_Fun, _Pos, []) -> - []; -map_simple(Fun, Pos, [{Key, Value, SubTree} | RestTree]) -> - Value2 = Fun({Pos, Key}, Value, - if SubTree == [] -> leaf; true -> branch end), - [{Key, Value2, map_simple(Fun, Pos + 1, SubTree)} | map_simple(Fun, Pos, RestTree)]. - - -mapfold(_Fun, Acc, []) -> - {[], Acc}; -mapfold(Fun, Acc, [{Pos, Tree} | Rest]) -> - {[NewTree], Acc2} = mapfold_simple(Fun, Acc, Pos, [Tree]), - {Rest2, Acc3} = mapfold(Fun, Acc2, Rest), - {[{Pos, NewTree} | Rest2], Acc3}. - -mapfold_simple(_Fun, Acc, _Pos, []) -> - {[], Acc}; -mapfold_simple(Fun, Acc, Pos, [{Key, Value, SubTree} | RestTree]) -> - {Value2, Acc2} = Fun({Pos, Key}, Value, - if SubTree == [] -> leaf; true -> branch end, Acc), - {SubTree2, Acc3} = mapfold_simple(Fun, Acc2, Pos + 1, SubTree), - {RestTree2, Acc4} = mapfold_simple(Fun, Acc3, Pos, RestTree), - {[{Key, Value2, SubTree2} | RestTree2], Acc4}. - - -map_leafs(_Fun, []) -> - []; -map_leafs(Fun, [{Pos, Tree}|Rest]) -> - [NewTree] = map_leafs_simple(Fun, Pos, [Tree]), - [{Pos, NewTree} | map_leafs(Fun, Rest)]. - -map_leafs_simple(_Fun, _Pos, []) -> - []; -map_leafs_simple(Fun, Pos, [{Key, Value, []} | RestTree]) -> - Value2 = Fun({Pos, Key}, Value), - [{Key, Value2, []} | map_leafs_simple(Fun, Pos, RestTree)]; -map_leafs_simple(Fun, Pos, [{Key, Value, SubTree} | RestTree]) -> - [{Key, Value, map_leafs_simple(Fun, Pos + 1, SubTree)} | map_leafs_simple(Fun, Pos, RestTree)]. - - -stem(Trees, Limit) -> - % flatten each branch in a tree into a tree path, sort by starting rev # - Paths = lists:sort(lists:map(fun({Pos, Path}) -> - StemmedPath = lists:sublist(Path, Limit), - {Pos + 1 - length(StemmedPath), StemmedPath} - end, get_all_leafs_full(Trees))), - - % convert paths back to trees - lists:foldl( - fun({StartPos, Path},TreeAcc) -> - [SingleTree] = lists:foldl( - fun({K,V},NewTreeAcc) -> [{K,V,NewTreeAcc}] end, [], Path), - {NewTrees, _} = merge(TreeAcc, {StartPos, SingleTree}), - NewTrees - end, [], Paths). - - -value_pref(Tuple, _) when is_tuple(Tuple), - (tuple_size(Tuple) == 3 orelse tuple_size(Tuple) == 4) -> - Tuple; -value_pref(_, Tuple) when is_tuple(Tuple), - (tuple_size(Tuple) == 3 orelse tuple_size(Tuple) == 4) -> - Tuple; -value_pref(?REV_MISSING, Other) -> - Other; -value_pref(Other, ?REV_MISSING) -> - Other; -value_pref(Last, _) -> - Last. - - -% Tests moved to test/etap/06?-*.t - http://git-wip-us.apache.org/repos/asf/couchdb/blob/ed98610c/src/couch/src/couch_log.erl ---------------------------------------------------------------------- diff --git a/src/couch/src/couch_log.erl b/src/couch/src/couch_log.erl deleted file mode 100644 index d1aa701..0000000 --- a/src/couch/src/couch_log.erl +++ /dev/null @@ -1,263 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(couch_log). --behaviour(gen_event). --behaviour(config_listener). - -% public API --export([start_link/0, stop/0]). --export([debug/2, info/2, warn/2, error/2]). --export([debug_on/0, info_on/0, warn_on/0, get_level/0, get_level_integer/0, set_level/1]). --export([debug_on/1, info_on/1, warn_on/1, get_level/1, get_level_integer/1, set_level/2]). --export([read/2]). - -% gen_event callbacks --export([init/1, handle_event/2, terminate/2, code_change/3]). --export([handle_info/2, handle_call/2]). - -% config_listener api --export([handle_config_change/5]). - --define(LEVEL_ERROR, 4). --define(LEVEL_WARN, 3). --define(LEVEL_INFO, 2). --define(LEVEL_DEBUG, 1). - --record(state, { - fd, - level, - sasl -}). - -debug(Format, Args) -> - {ConsoleMsg, FileMsg} = get_log_messages(self(), debug, Format, Args), - gen_event:sync_notify(error_logger, {couch_debug, ConsoleMsg, FileMsg}). - -info(Format, Args) -> - {ConsoleMsg, FileMsg} = get_log_messages(self(), info, Format, Args), - gen_event:sync_notify(error_logger, {couch_info, ConsoleMsg, FileMsg}). - -warn(Format, Args) -> - {ConsoleMsg, FileMsg} = get_log_messages(self(), warn, Format, Args), - gen_event:sync_notify(error_logger, {couch_warn, ConsoleMsg, FileMsg}). - -error(Format, Args) -> - {ConsoleMsg, FileMsg} = get_log_messages(self(), error, Format, Args), - gen_event:sync_notify(error_logger, {couch_error, ConsoleMsg, FileMsg}). - - -level_integer(error) -> ?LEVEL_ERROR; -level_integer(warn) -> ?LEVEL_WARN; -level_integer(info) -> ?LEVEL_INFO; -level_integer(debug) -> ?LEVEL_DEBUG; -level_integer(_Else) -> ?LEVEL_ERROR. % anything else default to ERROR level - -level_atom(?LEVEL_ERROR) -> error; -level_atom(?LEVEL_WARN) -> warn; -level_atom(?LEVEL_INFO) -> info; -level_atom(?LEVEL_DEBUG) -> debug. - - -start_link() -> - couch_event_sup:start_link({local, couch_log}, error_logger, couch_log, []). - -stop() -> - couch_event_sup:stop(couch_log). - -init([]) -> - ok = config:listen_for_changes(?MODULE, nil), - - Filename = config:get("log", "file", "couchdb.log"), - Level = level_integer(list_to_atom(config:get("log", "level", "info"))), - Sasl = config:get("log", "include_sasl", "true") =:= "true", - LevelByModule = config:get("log_level_by_module"), - - case ets:info(?MODULE) of - undefined -> ets:new(?MODULE, [named_table]); - _ -> ok - end, - ets:insert(?MODULE, {level, Level}), - lists:foreach(fun({Module, ModuleLevel}) -> - ModuleLevelInteger = level_integer(list_to_atom(ModuleLevel)), - ets:insert(?MODULE, {Module, ModuleLevelInteger}) - end, LevelByModule), - - - case file:open(Filename, [append]) of - {ok, Fd} -> - {ok, #state{fd = Fd, level = Level, sasl = Sasl}}; - {error, Reason} -> - ReasonStr = file:format_error(Reason), - io:format("Error opening log file ~s: ~s", [Filename, ReasonStr]), - {stop, {error, ReasonStr, Filename}} - end. - -handle_config_change("log", "file", _, _, _) -> - ?MODULE:stop(), - remove_handler; -handle_config_change("log", "level", _, _, _) -> - ?MODULE:stop(), - remove_handler; -handle_config_change("log", "include_sasl", _, _, _) -> - ?MODULE:stop(), - remove_handler; -handle_config_change(_, _, _, _, _) -> - {ok, nil}. - -debug_on() -> - get_level_integer() =< ?LEVEL_DEBUG. - -info_on() -> - get_level_integer() =< ?LEVEL_INFO. - -warn_on() -> - get_level_integer() =< ?LEVEL_WARN. - -debug_on(Module) -> - get_level_integer(Module) =< ?LEVEL_DEBUG. - -info_on(Module) -> - get_level_integer(Module) =< ?LEVEL_INFO. - -warn_on(Module) -> - get_level_integer(Module) =< ?LEVEL_WARN. - -set_level(LevelAtom) -> - set_level_integer(level_integer(LevelAtom)). - -set_level(Module, LevelAtom) -> - set_level_integer(Module, level_integer(LevelAtom)). - -get_level() -> - level_atom(get_level_integer()). - -get_level(Module) -> - level_atom(get_level_integer(Module)). - -get_level_integer() -> - try - ets:lookup_element(?MODULE, level, 2) - catch error:badarg -> - ?LEVEL_ERROR - end. - -get_level_integer(Module0) -> - Module = atom_to_list(Module0), - try - [{_Module, Level}] = ets:lookup(?MODULE, Module), - Level - catch error:_ -> - get_level_integer() - end. - -set_level_integer(Int) -> - gen_event:call(error_logger, couch_log, {set_level_integer, Int}). - -set_level_integer(Module, Int) -> - gen_event:call(error_logger, couch_log, {set_level_integer, Module, Int}). - -handle_event({couch_error, ConMsg, FileMsg}, State) -> - log(State, ConMsg, FileMsg), - {ok, State}; -handle_event({couch_warn, ConMsg, FileMsg}, State) -> - log(State, ConMsg, FileMsg), - {ok, State}; -handle_event({couch_info, ConMsg, FileMsg}, State) -> - log(State, ConMsg, FileMsg), - {ok, State}; -handle_event({couch_debug, ConMsg, FileMsg}, State) -> - log(State, ConMsg, FileMsg), - {ok, State}; -handle_event({error_report, _, {Pid, _, _}}=Event, #state{sasl = true} = St) -> - {ConMsg, FileMsg} = get_log_messages(Pid, error, "~p", [Event]), - log(St, ConMsg, FileMsg), - {ok, St}; -handle_event({error, _, {Pid, Format, Args}}, #state{sasl = true} = State) -> - {ConMsg, FileMsg} = get_log_messages(Pid, error, Format, Args), - log(State, ConMsg, FileMsg), - {ok, State}; -handle_event(_Event, State) -> - {ok, State}. - -handle_call({set_level_integer, NewLevel}, State) -> - ets:insert(?MODULE, {level, NewLevel}), - {ok, ok, State#state{level = NewLevel}}; - -handle_call({set_level_integer, Module, NewLevel}, State) -> - ets:insert(?MODULE, {Module, NewLevel}), - {ok, ok, State#state{level = NewLevel}}. - -handle_info({gen_event_EXIT, {config_listener, ?MODULE}, _Reason}, State) -> - erlang:send_after(5000, self(), restart_config_listener), - {ok, State}; -handle_info(restart_config_listener, State) -> - ok = config:listen_for_changes(?MODULE, nil), - {ok, State}; -handle_info(_Info, State) -> - {ok, State}. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -terminate(_Arg, #state{fd = Fd}) -> - file:close(Fd). - -log(#state{fd = Fd}, ConsoleMsg, FileMsg) -> - ok = io:put_chars(ConsoleMsg), - ok = io:put_chars(Fd, FileMsg). - -get_log_messages(Pid, Level, Format, Args) -> - ConsoleMsg = unicode:characters_to_binary(io_lib:format( - "[~s] [~p] " ++ Format ++ "~n", [Level, Pid | Args])), - FileMsg = ["[", couch_util:rfc1123_date(), "] ", ConsoleMsg], - {ConsoleMsg, iolist_to_binary(FileMsg)}. - - -% Read Bytes bytes from the end of log file, jumping Offset bytes towards -% the beginning of the file first. -% -% Log File FilePos -% ---------- -% | | 10 -% | | 20 -% | | 30 -% | | 40 -% | | 50 -% | | 60 -% | | 70 -- Bytes = 20 -- -% | | 80 | Chunk -% | | 90 -- Offset = 10 -- -% |__________| 100 - -read(Bytes, Offset) -> - LogFileName = config:get("log", "file"), - LogFileSize = filelib:file_size(LogFileName), - MaxChunkSize = list_to_integer( - config:get("httpd", "log_max_chunk_size", "1000000")), - case Bytes > MaxChunkSize of - true -> - throw({bad_request, "'bytes' cannot exceed " ++ - integer_to_list(MaxChunkSize)}); - false -> - ok - end, - - {ok, Fd} = file:open(LogFileName, [read]), - Start = lists:max([LogFileSize - Bytes - Offset, 0]), - - % TODO: truncate chopped first line - % TODO: make streaming - - {ok, Chunk} = file:pread(Fd, Start, Bytes), - ok = file:close(Fd), - Chunk. http://git-wip-us.apache.org/repos/asf/couchdb/blob/ed98610c/src/couch/src/couch_lru.erl ---------------------------------------------------------------------- diff --git a/src/couch/src/couch_lru.erl b/src/couch/src/couch_lru.erl deleted file mode 100644 index cc751b0..0000000 --- a/src/couch/src/couch_lru.erl +++ /dev/null @@ -1,48 +0,0 @@ --module(couch_lru). --export([new/0, insert/2, update/2, close/1]). - --include_lib("couch/include/couch_db.hrl"). - -new() -> - {gb_trees:empty(), dict:new()}. - -insert(DbName, {Tree0, Dict0}) -> - Lru = now(), - {gb_trees:insert(Lru, DbName, Tree0), dict:store(DbName, Lru, Dict0)}. - -update(DbName, {Tree0, Dict0}) -> - case dict:find(DbName, Dict0) of - {ok, Old} -> - New = now(), - Tree = gb_trees:insert(New, DbName, gb_trees:delete(Old, Tree0)), - Dict = dict:store(DbName, New, Dict0), - {Tree, Dict}; - error -> - % We closed this database before processing the update. Ignore - {Tree0, Dict0} - end. - -close({Tree, _} = Cache) -> - close_int(gb_trees:next(gb_trees:iterator(Tree)), Cache). - -%% internals - -close_int(none, _) -> - erlang:error(all_dbs_active); -close_int({Lru, DbName, Iter}, {Tree, Dict} = Cache) -> - case ets:update_element(couch_dbs, DbName, {#db.fd_monitor, locked}) of - true -> - [#db{main_pid = Pid} = Db] = ets:lookup(couch_dbs, DbName), - case couch_db:is_idle(Db) of true -> - true = ets:delete(couch_dbs, DbName), - exit(Pid, kill), - {gb_trees:delete(Lru, Tree), dict:erase(DbName, Dict)}; - false -> - true = ets:update_element(couch_dbs, DbName, {#db.fd_monitor, nil}), - close_int(gb_trees:next(Iter), update(DbName, Cache)) - end; - false -> - NewTree = gb_trees:delete(Lru, Tree), - NewIter = gb_trees:iterator(NewTree), - close_int(gb_trees:next(NewIter), {NewTree, dict:erase(DbName, Dict)}) - end. http://git-wip-us.apache.org/repos/asf/couchdb/blob/ed98610c/src/couch/src/couch_native_process.erl ---------------------------------------------------------------------- diff --git a/src/couch/src/couch_native_process.erl b/src/couch/src/couch_native_process.erl deleted file mode 100644 index 8ca56f0..0000000 --- a/src/couch/src/couch_native_process.erl +++ /dev/null @@ -1,422 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); -% you may not use this file except in compliance with the License. -% -% You may obtain a copy of the License at -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, -% software distributed under the License is distributed on an -% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, -% either express or implied. -% -% See the License for the specific language governing permissions -% and limitations under the License. -% -% This file drew much inspiration from erlview, which was written by and -% copyright Michael McDaniel [http://autosys.us], and is also under APL 2.0 -% -% -% This module provides the smallest possible native view-server. -% With this module in-place, you can add the following to your couch INI files: -% [native_query_servers] -% erlang={couch_native_process, start_link, []} -% -% Which will then allow following example map function to be used: -% -% fun({Doc}) -> -% % Below, we emit a single record - the _id as key, null as value -% DocId = couch_util:get_value(<<"_id">>, Doc, null), -% Emit(DocId, null) -% end. -% -% which should be roughly the same as the javascript: -% emit(doc._id, null); -% -% This module exposes enough functions such that a native erlang server can -% act as a fully-fleged view server, but no 'helper' functions specifically -% for simplifying your erlang view code. It is expected other third-party -% extensions will evolve which offer useful layers on top of this view server -% to help simplify your view code. --module(couch_native_process). --behaviour(gen_server). - --export([start_link/0,init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3, - handle_info/2]). --export([set_timeout/2, prompt/2, prompt_many/2]). - --define(STATE, native_proc_state). --record(evstate, {ddocs, funs=[], query_config=[], list_pid=nil, timeout=5000}). - --include_lib("couch/include/couch_db.hrl"). - -start_link() -> - gen_server:start_link(?MODULE, [], []). - -% this is a bit messy, see also couch_query_servers handle_info -% stop(_Pid) -> -% ok. - -set_timeout(Pid, TimeOut) -> - gen_server:call(Pid, {set_timeout, TimeOut}). - -prompt(Pid, Data) when is_list(Data) -> - gen_server:call(Pid, {prompt, Data}). - -prompt_many(Pid, DataList) -> - prompt_many(Pid, DataList, []). - -prompt_many(_Pid, [], Acc) -> - {ok, lists:reverse(Acc)}; -prompt_many(Pid, [Data | Rest], Acc) -> - Result = prompt(Pid, Data), - prompt_many(Pid, Rest, [Result | Acc]). - -% gen_server callbacks -init([]) -> - {ok, #evstate{ddocs=dict:new()}}. - -handle_call({set_timeout, TimeOut}, _From, State) -> - {reply, ok, State#evstate{timeout=TimeOut}}; - -handle_call({prompt, Data}, _From, State) -> - ?LOG_DEBUG("Prompt native qs: ~s",[?JSON_ENCODE(Data)]), - {NewState, Resp} = try run(State, to_binary(Data)) of - {S, R} -> {S, R} - catch - throw:{error, Why} -> - {State, [<<"error">>, Why, Why]} - end, - - case Resp of - {error, Reason} -> - Msg = io_lib:format("couch native server error: ~p", [Reason]), - {reply, [<<"error">>, <<"native_query_server">>, list_to_binary(Msg)], NewState}; - [<<"error">> | Rest] -> - % Msg = io_lib:format("couch native server error: ~p", [Rest]), - % TODO: markh? (jan) - {reply, [<<"error">> | Rest], NewState}; - [<<"fatal">> | Rest] -> - % Msg = io_lib:format("couch native server error: ~p", [Rest]), - % TODO: markh? (jan) - {stop, fatal, [<<"error">> | Rest], NewState}; - Resp -> - {reply, Resp, NewState} - end. - -handle_cast(garbage_collect, State) -> - erlang:garbage_collect(), - {noreply, State}; -handle_cast(foo, State) -> {noreply, State}. - -handle_info({'EXIT',_,normal}, State) -> {noreply, State}; -handle_info({'EXIT',_,Reason}, State) -> - {stop, Reason, State}. -terminate(_Reason, _State) -> ok. -code_change(_OldVersion, State, _Extra) -> {ok, State}. - -run(#evstate{list_pid=Pid}=State, [<<"list_row">>, Row]) when is_pid(Pid) -> - Pid ! {self(), list_row, Row}, - receive - {Pid, chunks, Data} -> - {State, [<<"chunks">>, Data]}; - {Pid, list_end, Data} -> - receive - {'EXIT', Pid, normal} -> ok - after State#evstate.timeout -> - throw({timeout, list_cleanup}) - end, - process_flag(trap_exit, erlang:get(do_trap)), - {State#evstate{list_pid=nil}, [<<"end">>, Data]} - after State#evstate.timeout -> - throw({timeout, list_row}) - end; -run(#evstate{list_pid=Pid}=State, [<<"list_end">>]) when is_pid(Pid) -> - Pid ! {self(), list_end}, - Resp = - receive - {Pid, list_end, Data} -> - receive - {'EXIT', Pid, normal} -> ok - after State#evstate.timeout -> - throw({timeout, list_cleanup}) - end, - [<<"end">>, Data] - after State#evstate.timeout -> - throw({timeout, list_end}) - end, - process_flag(trap_exit, erlang:get(do_trap)), - {State#evstate{list_pid=nil}, Resp}; -run(#evstate{list_pid=Pid}=State, _Command) when is_pid(Pid) -> - {State, [<<"error">>, list_error, list_error]}; -run(#evstate{ddocs=DDocs}, [<<"reset">>]) -> - {#evstate{ddocs=DDocs}, true}; -run(#evstate{ddocs=DDocs}, [<<"reset">>, QueryConfig]) -> - {#evstate{ddocs=DDocs, query_config=QueryConfig}, true}; -run(#evstate{funs=Funs}=State, [<<"add_fun">> , BinFunc]) -> - FunInfo = makefun(State, BinFunc), - {State#evstate{funs=Funs ++ [FunInfo]}, true}; -run(State, [<<"map_doc">> , Doc]) -> - Resp = lists:map(fun({Sig, Fun}) -> - erlang:put(Sig, []), - Fun(Doc), - lists:reverse(erlang:get(Sig)) - end, State#evstate.funs), - {State, Resp}; -run(State, [<<"reduce">>, Funs, KVs]) -> - {Keys, Vals} = - lists:foldl(fun([K, V], {KAcc, VAcc}) -> - {[K | KAcc], [V | VAcc]} - end, {[], []}, KVs), - Keys2 = lists:reverse(Keys), - Vals2 = lists:reverse(Vals), - {State, catch reduce(State, Funs, Keys2, Vals2, false)}; -run(State, [<<"rereduce">>, Funs, Vals]) -> - {State, catch reduce(State, Funs, null, Vals, true)}; -run(#evstate{ddocs=DDocs}=State, [<<"ddoc">>, <<"new">>, DDocId, DDoc]) -> - DDocs2 = store_ddoc(DDocs, DDocId, DDoc), - {State#evstate{ddocs=DDocs2}, true}; -run(#evstate{ddocs=DDocs}=State, [<<"ddoc">>, DDocId | Rest]) -> - DDoc = load_ddoc(DDocs, DDocId), - ddoc(State, DDoc, Rest); -run(_, Unknown) -> - ?LOG_ERROR("Native Process: Unknown command: ~p~n", [Unknown]), - throw({error, unknown_command}). - -ddoc(State, {DDoc}, [FunPath, Args]) -> - % load fun from the FunPath - BFun = lists:foldl(fun - (Key, {Props}) when is_list(Props) -> - couch_util:get_value(Key, Props, nil); - (_Key, Fun) when is_binary(Fun) -> - Fun; - (_Key, nil) -> - throw({error, not_found}); - (_Key, _Fun) -> - throw({error, malformed_ddoc}) - end, {DDoc}, FunPath), - ddoc(State, makefun(State, BFun, {DDoc}), FunPath, Args). - -ddoc(State, {_, Fun}, [<<"validate_doc_update">>], Args) -> - {State, (catch apply(Fun, Args))}; -ddoc(State, {_, Fun}, [<<"filters">>|_], [Docs, Req]) -> - FilterFunWrapper = fun(Doc) -> - case catch Fun(Doc, Req) of - true -> true; - false -> false; - {'EXIT', Error} -> ?LOG_ERROR("~p", [Error]) - end - end, - Resp = lists:map(FilterFunWrapper, Docs), - {State, [true, Resp]}; -ddoc(State, {_, Fun}, [<<"shows">>|_], Args) -> - Resp = case (catch apply(Fun, Args)) of - FunResp when is_list(FunResp) -> - FunResp; - {FunResp} -> - [<<"resp">>, {FunResp}]; - FunResp -> - FunResp - end, - {State, Resp}; -ddoc(State, {_, Fun}, [<<"updates">>|_], Args) -> - Resp = case (catch apply(Fun, Args)) of - [JsonDoc, JsonResp] -> - [<<"up">>, JsonDoc, JsonResp] - end, - {State, Resp}; -ddoc(State, {Sig, Fun}, [<<"lists">>|_], Args) -> - Self = self(), - SpawnFun = fun() -> - LastChunk = (catch apply(Fun, Args)), - case start_list_resp(Self, Sig) of - started -> - receive - {Self, list_row, _Row} -> ignore; - {Self, list_end} -> ignore - after State#evstate.timeout -> - throw({timeout, list_cleanup_pid}) - end; - _ -> - ok - end, - LastChunks = - case erlang:get(Sig) of - undefined -> [LastChunk]; - OtherChunks -> [LastChunk | OtherChunks] - end, - Self ! {self(), list_end, lists:reverse(LastChunks)} - end, - erlang:put(do_trap, process_flag(trap_exit, true)), - Pid = spawn_link(SpawnFun), - Resp = - receive - {Pid, start, Chunks, JsonResp} -> - [<<"start">>, Chunks, JsonResp] - after State#evstate.timeout -> - throw({timeout, list_start}) - end, - {State#evstate{list_pid=Pid}, Resp}. - -store_ddoc(DDocs, DDocId, DDoc) -> - dict:store(DDocId, DDoc, DDocs). -load_ddoc(DDocs, DDocId) -> - try dict:fetch(DDocId, DDocs) of - {DDoc} -> {DDoc} - catch - _:_Else -> throw({error, ?l2b(io_lib:format("Native Query Server missing DDoc with Id: ~s",[DDocId]))}) - end. - -bindings(State, Sig) -> - bindings(State, Sig, nil). -bindings(State, Sig, DDoc) -> - Self = self(), - - Log = fun(Msg) -> - ?LOG_INFO(Msg, []) - end, - - Emit = fun(Id, Value) -> - Curr = erlang:get(Sig), - erlang:put(Sig, [[Id, Value] | Curr]) - end, - - Start = fun(Headers) -> - erlang:put(list_headers, Headers) - end, - - Send = fun(Chunk) -> - Curr = - case erlang:get(Sig) of - undefined -> []; - Else -> Else - end, - erlang:put(Sig, [Chunk | Curr]) - end, - - GetRow = fun() -> - case start_list_resp(Self, Sig) of - started -> - ok; - _ -> - Chunks = - case erlang:get(Sig) of - undefined -> []; - CurrChunks -> CurrChunks - end, - Self ! {self(), chunks, lists:reverse(Chunks)} - end, - erlang:put(Sig, []), - receive - {Self, list_row, Row} -> Row; - {Self, list_end} -> nil - after State#evstate.timeout -> - throw({timeout, list_pid_getrow}) - end - end, - - FoldRows = fun(Fun, Acc) -> foldrows(GetRow, Fun, Acc) end, - - Bindings = [ - {'Log', Log}, - {'Emit', Emit}, - {'Start', Start}, - {'Send', Send}, - {'GetRow', GetRow}, - {'FoldRows', FoldRows} - ], - case DDoc of - {_Props} -> - Bindings ++ [{'DDoc', DDoc}]; - _Else -> Bindings - end. - -% thanks to erlview, via: -% http://erlang.org/pipermail/erlang-questions/2003-November/010544.html -makefun(State, Source) -> - Sig = couch_util:md5(Source), - BindFuns = bindings(State, Sig), - {Sig, makefun(State, Source, BindFuns)}. -makefun(State, Source, {DDoc}) -> - Sig = couch_util:md5(lists:flatten([Source, term_to_binary(DDoc)])), - BindFuns = bindings(State, Sig, {DDoc}), - {Sig, makefun(State, Source, BindFuns)}; -makefun(_State, Source, BindFuns) when is_list(BindFuns) -> - FunStr = binary_to_list(Source), - {ok, Tokens, _} = erl_scan:string(FunStr), - Form = case (catch erl_parse:parse_exprs(Tokens)) of - {ok, [ParsedForm]} -> - ParsedForm; - {error, {LineNum, _Mod, [Mesg, Params]}}=Error -> - io:format(standard_error, "Syntax error on line: ~p~n", [LineNum]), - io:format(standard_error, "~s~p~n", [Mesg, Params]), - throw(Error) - end, - Bindings = lists:foldl(fun({Name, Fun}, Acc) -> - erl_eval:add_binding(Name, Fun, Acc) - end, erl_eval:new_bindings(), BindFuns), - {value, Fun, _} = erl_eval:expr(Form, Bindings), - Fun. - -reduce(State, BinFuns, Keys, Vals, ReReduce) -> - Funs = case is_list(BinFuns) of - true -> - lists:map(fun(BF) -> makefun(State, BF) end, BinFuns); - _ -> - [makefun(State, BinFuns)] - end, - Reds = lists:map(fun({_Sig, Fun}) -> - Fun(Keys, Vals, ReReduce) - end, Funs), - [true, Reds]. - -foldrows(GetRow, ProcRow, Acc) -> - case GetRow() of - nil -> - {ok, Acc}; - Row -> - case (catch ProcRow(Row, Acc)) of - {ok, Acc2} -> - foldrows(GetRow, ProcRow, Acc2); - {stop, Acc2} -> - {ok, Acc2} - end - end. - -start_list_resp(Self, Sig) -> - case erlang:get(list_started) of - undefined -> - Headers = - case erlang:get(list_headers) of - undefined -> {[{<<"headers">>, {[]}}]}; - CurrHdrs -> CurrHdrs - end, - Chunks = - case erlang:get(Sig) of - undefined -> []; - CurrChunks -> CurrChunks - end, - Self ! {self(), start, lists:reverse(Chunks), Headers}, - erlang:put(list_started, true), - erlang:put(Sig, []), - started; - _ -> - ok - end. - -to_binary({Data}) -> - Pred = fun({Key, Value}) -> - {to_binary(Key), to_binary(Value)} - end, - {lists:map(Pred, Data)}; -to_binary(Data) when is_list(Data) -> - [to_binary(D) || D <- Data]; -to_binary(null) -> - null; -to_binary(true) -> - true; -to_binary(false) -> - false; -to_binary(Data) when is_atom(Data) -> - list_to_binary(atom_to_list(Data)); -to_binary(Data) -> - Data. http://git-wip-us.apache.org/repos/asf/couchdb/blob/ed98610c/src/couch/src/couch_os_daemons.erl ---------------------------------------------------------------------- diff --git a/src/couch/src/couch_os_daemons.erl b/src/couch/src/couch_os_daemons.erl deleted file mode 100644 index 3560149..0000000 --- a/src/couch/src/couch_os_daemons.erl +++ /dev/null @@ -1,377 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. --module(couch_os_daemons). --behaviour(gen_server). --behaviour(config_listener). - --export([start_link/0, info/0, info/1]). - --export([init/1, terminate/2, code_change/3]). --export([handle_call/3, handle_cast/2, handle_info/2]). - -% config_listener api --export([handle_config_change/5]). - --include_lib("couch/include/couch_db.hrl"). - --record(daemon, { - port, - name, - cmd, - kill, - status=running, - cfg_patterns=[], - errors=[], - buf=[] -}). - --define(PORT_OPTIONS, [stream, {line, 1024}, binary, exit_status, hide]). --define(TIMEOUT, 5000). - -start_link() -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). - -info() -> - info([]). - -info(Options) -> - gen_server:call(?MODULE, {daemon_info, Options}). - -init(_) -> - process_flag(trap_exit, true), - ok = config:listen_for_changes(?MODULE, nil), - Table = ets:new(?MODULE, [protected, set, {keypos, #daemon.port}]), - reload_daemons(Table), - {ok, Table}. - -terminate(_Reason, Table) -> - [stop_port(D) || D <- ets:tab2list(Table)], - ok. - -handle_call({daemon_info, Options}, _From, Table) when is_list(Options) -> - case lists:member(table, Options) of - true -> - {reply, {ok, ets:tab2list(Table)}, Table}; - _ -> - {reply, {ok, Table}, Table} - end; -handle_call(Msg, From, Table) -> - ?LOG_ERROR("Unknown call message to ~p from ~p: ~p", [?MODULE, From, Msg]), - {stop, error, Table}. - -handle_cast({config_change, Sect, Key}, Table) -> - restart_daemons(Table, Sect, Key), - case Sect of - "os_daemons" -> reload_daemons(Table); - _ -> ok - end, - {noreply, Table}; -handle_cast(stop, Table) -> - {stop, normal, Table}; -handle_cast(Msg, Table) -> - ?LOG_ERROR("Unknown cast message to ~p: ~p", [?MODULE, Msg]), - {stop, error, Table}. - -handle_info({gen_event_EXIT, {config_listener, ?MODULE}, _Reason}, State) -> - erlang:send_after(5000, self(), restart_config_listener), - {noreply, State}; -handle_info(restart_config_listener, State) -> - ok = config:listen_for_changes(?MODULE, nil), - {noreply, State}; -handle_info({'EXIT', Port, Reason}, Table) -> - case ets:lookup(Table, Port) of - [] -> - ?LOG_INFO("Port ~p exited after stopping: ~p~n", [Port, Reason]); - [#daemon{status=stopping}] -> - true = ets:delete(Table, Port); - [#daemon{name=Name, status=restarting}=D] -> - ?LOG_INFO("Daemon ~p restarting after config change.", [Name]), - true = ets:delete(Table, Port), - {ok, Port2} = start_port(D#daemon.cmd), - true = ets:insert(Table, D#daemon{ - port=Port2, status=running, kill=undefined, buf=[] - }); - [#daemon{name=Name, status=halted}] -> - ?LOG_ERROR("Halted daemon process: ~p", [Name]); - [D] -> - ?LOG_ERROR("Invalid port state at exit: ~p", [D]) - end, - {noreply, Table}; -handle_info({Port, closed}, Table) -> - handle_info({Port, {exit_status, closed}}, Table); -handle_info({Port, {exit_status, Status}}, Table) -> - case ets:lookup(Table, Port) of - [] -> - ?LOG_ERROR("Unknown port ~p exiting ~p", [Port, Status]), - {stop, {error, unknown_port_died, Status}, Table}; - [#daemon{name=Name, status=restarting}=D] -> - ?LOG_INFO("Daemon ~p restarting after config change.", [Name]), - true = ets:delete(Table, Port), - {ok, Port2} = start_port(D#daemon.cmd), - true = ets:insert(Table, D#daemon{ - port=Port2, status=running, kill=undefined, buf=[] - }), - {noreply, Table}; - [#daemon{status=stopping}=D] -> - % The configuration changed and this daemon is no - % longer needed. - ?LOG_DEBUG("Port ~p shut down.", [D#daemon.name]), - true = ets:delete(Table, Port), - {noreply, Table}; - [D] -> - % Port died for unknown reason. Check to see if it's - % died too many times or if we should boot it back up. - case should_halt([now() | D#daemon.errors]) of - {true, _} -> - % Halting the process. We won't try and reboot - % until the configuration changes. - Fmt = "Daemon ~p halted with exit_status ~p", - ?LOG_ERROR(Fmt, [D#daemon.name, Status]), - D2 = D#daemon{status=halted, errors=nil, buf=nil}, - true = ets:insert(Table, D2), - {noreply, Table}; - {false, Errors} -> - % We're guessing it was a random error, this daemon - % has behaved so we'll give it another chance. - Fmt = "Daemon ~p is being rebooted after exit_status ~p", - ?LOG_INFO(Fmt, [D#daemon.name, Status]), - true = ets:delete(Table, Port), - {ok, Port2} = start_port(D#daemon.cmd), - true = ets:insert(Table, D#daemon{ - port=Port2, status=running, kill=undefined, - errors=Errors, buf=[] - }), - {noreply, Table} - end; - _Else -> - throw(error) - end; -handle_info({Port, {data, {noeol, Data}}}, Table) -> - [#daemon{buf=Buf}=D] = ets:lookup(Table, Port), - true = ets:insert(Table, D#daemon{buf=[Data | Buf]}), - {noreply, Table}; -handle_info({Port, {data, {eol, Data}}}, Table) -> - [#daemon{buf=Buf}=D] = ets:lookup(Table, Port), - Line = lists:reverse(Buf, Data), - % The first line echoed back is the kill command - % for when we go to get rid of the port. Lines after - % that are considered part of the stdio API. - case D#daemon.kill of - undefined -> - true = ets:insert(Table, D#daemon{kill=?b2l(Line), buf=[]}); - _Else -> - D2 = case (catch ?JSON_DECODE(Line)) of - {invalid_json, Rejected} -> - ?LOG_ERROR("Ignoring OS daemon request: ~p", [Rejected]), - D; - JSON -> - {ok, D3} = handle_port_message(D, JSON), - D3 - end, - true = ets:insert(Table, D2#daemon{buf=[]}) - end, - {noreply, Table}; -handle_info({Port, Error}, Table) -> - ?LOG_ERROR("Unexpectd message from port ~p: ~p", [Port, Error]), - stop_port(Port), - [D] = ets:lookup(Table, Port), - true = ets:insert(Table, D#daemon{status=restarting, buf=nil}), - {noreply, Table}; -handle_info(Msg, Table) -> - ?LOG_ERROR("Unexpected info message to ~p: ~p", [?MODULE, Msg]), - {stop, error, Table}. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - - -handle_config_change(Section, Key, _, _, _) -> - gen_server:cast(?MODULE, {config_change, Section, Key}), - {ok, nil}. - - -% Internal API - -% -% Port management helpers -% - -start_port(Command) -> - PrivDir = couch_util:priv_dir(), - Spawnkiller = filename:join(PrivDir, "couchspawnkillable"), - Port = open_port({spawn, Spawnkiller ++ " " ++ Command}, ?PORT_OPTIONS), - {ok, Port}. - - -stop_port(#daemon{port=Port, kill=undefined}=D) -> - ?LOG_ERROR("Stopping daemon without a kill command: ~p", [D#daemon.name]), - catch port_close(Port); -stop_port(#daemon{port=Port}=D) -> - ?LOG_DEBUG("Stopping daemon: ~p", [D#daemon.name]), - os:cmd(D#daemon.kill), - catch port_close(Port). - - -handle_port_message(#daemon{port=Port}=Daemon, [<<"get">>, Section]) -> - KVs = config:get(Section), - Data = lists:map(fun({K, V}) -> {?l2b(K), ?l2b(V)} end, KVs), - Json = iolist_to_binary(?JSON_ENCODE({Data})), - port_command(Port, <>), - {ok, Daemon}; -handle_port_message(#daemon{port=Port}=Daemon, [<<"get">>, Section, Key]) -> - Value = case config:get(Section, Key, null) of - null -> null; - String -> ?l2b(String) - end, - Json = iolist_to_binary(?JSON_ENCODE(Value)), - port_command(Port, <>), - {ok, Daemon}; -handle_port_message(Daemon, [<<"register">>, Sec]) when is_binary(Sec) -> - Patterns = lists:usort(Daemon#daemon.cfg_patterns ++ [{?b2l(Sec)}]), - {ok, Daemon#daemon{cfg_patterns=Patterns}}; -handle_port_message(Daemon, [<<"register">>, Sec, Key]) - when is_binary(Sec) andalso is_binary(Key) -> - Pattern = {?b2l(Sec), ?b2l(Key)}, - Patterns = lists:usort(Daemon#daemon.cfg_patterns ++ [Pattern]), - {ok, Daemon#daemon{cfg_patterns=Patterns}}; -handle_port_message(#daemon{name=Name}=Daemon, [<<"log">>, Msg]) -> - handle_log_message(Name, Msg, <<"info">>), - {ok, Daemon}; -handle_port_message(#daemon{name=Name}=Daemon, [<<"log">>, Msg, {Opts}]) -> - Level = couch_util:get_value(<<"level">>, Opts, <<"info">>), - handle_log_message(Name, Msg, Level), - {ok, Daemon}; -handle_port_message(#daemon{name=Name}=Daemon, Else) -> - ?LOG_ERROR("Daemon ~p made invalid request: ~p", [Name, Else]), - {ok, Daemon}. - - -handle_log_message(Name, Msg, _Level) when not is_binary(Msg) -> - ?LOG_ERROR("Invalid log message from daemon ~p: ~p", [Name, Msg]); -handle_log_message(Name, Msg, <<"debug">>) -> - ?LOG_DEBUG("Daemon ~p :: ~s", [Name, ?b2l(Msg)]); -handle_log_message(Name, Msg, <<"info">>) -> - ?LOG_INFO("Daemon ~p :: ~s", [Name, ?b2l(Msg)]); -handle_log_message(Name, Msg, <<"error">>) -> - ?LOG_ERROR("Daemon: ~p :: ~s", [Name, ?b2l(Msg)]); -handle_log_message(Name, Msg, Level) -> - ?LOG_ERROR("Invalid log level from daemon: ~p", [Level]), - ?LOG_INFO("Daemon: ~p :: ~s", [Name, ?b2l(Msg)]). - -% -% Daemon management helpers -% - -reload_daemons(Table) -> - % List of daemons we want to have running. - Configured = lists:sort(config:get("os_daemons")), - - % Remove records for daemons that were halted. - MSpecHalted = #daemon{name='$1', cmd='$2', status=halted, _='_'}, - Halted = lists:sort([{N, C} || [N, C] <- ets:match(Table, MSpecHalted)]), - ok = stop_os_daemons(Table, find_to_stop(Configured, Halted, [])), - - % Stop daemons that are running - % Start newly configured daemons - MSpecRunning = #daemon{name='$1', cmd='$2', status=running, _='_'}, - Running = lists:sort([{N, C} || [N, C] <- ets:match(Table, MSpecRunning)]), - ok = stop_os_daemons(Table, find_to_stop(Configured, Running, [])), - ok = boot_os_daemons(Table, find_to_boot(Configured, Running, [])), - ok. - - -restart_daemons(Table, Sect, Key) -> - restart_daemons(Table, Sect, Key, ets:first(Table)). - -restart_daemons(_, _, _, '$end_of_table') -> - ok; -restart_daemons(Table, Sect, Key, Port) -> - [D] = ets:lookup(Table, Port), - HasSect = lists:member({Sect}, D#daemon.cfg_patterns), - HasKey = lists:member({Sect, Key}, D#daemon.cfg_patterns), - case HasSect or HasKey of - true -> - stop_port(D), - D2 = D#daemon{status=restarting, buf=nil}, - true = ets:insert(Table, D2); - _ -> - ok - end, - restart_daemons(Table, Sect, Key, ets:next(Table, Port)). - - -stop_os_daemons(_Table, []) -> - ok; -stop_os_daemons(Table, [{Name, Cmd} | Rest]) -> - [[Port]] = ets:match(Table, #daemon{port='$1', name=Name, cmd=Cmd, _='_'}), - [D] = ets:lookup(Table, Port), - case D#daemon.status of - halted -> - ets:delete(Table, Port); - _ -> - stop_port(D), - D2 = D#daemon{status=stopping, errors=nil, buf=nil}, - true = ets:insert(Table, D2) - end, - stop_os_daemons(Table, Rest). - -boot_os_daemons(_Table, []) -> - ok; -boot_os_daemons(Table, [{Name, Cmd} | Rest]) -> - {ok, Port} = start_port(Cmd), - true = ets:insert(Table, #daemon{port=Port, name=Name, cmd=Cmd}), - boot_os_daemons(Table, Rest). - -% Elements unique to the configured set need to be booted. -find_to_boot([], _Rest, Acc) -> - % Nothing else configured. - Acc; -find_to_boot([D | R1], [D | R2], Acc) -> - % Elements are equal, daemon already running. - find_to_boot(R1, R2, Acc); -find_to_boot([D1 | R1], [D2 | _]=A2, Acc) when D1 < D2 -> - find_to_boot(R1, A2, [D1 | Acc]); -find_to_boot(A1, [_ | R2], Acc) -> - find_to_boot(A1, R2, Acc); -find_to_boot(Rest, [], Acc) -> - % No more candidates for already running. Boot all. - Rest ++ Acc. - -% Elements unique to the running set need to be killed. -find_to_stop([], Rest, Acc) -> - % The rest haven't been found, so they must all - % be ready to die. - Rest ++ Acc; -find_to_stop([D | R1], [D | R2], Acc) -> - % Elements are equal, daemon already running. - find_to_stop(R1, R2, Acc); -find_to_stop([D1 | R1], [D2 | _]=A2, Acc) when D1 < D2 -> - find_to_stop(R1, A2, Acc); -find_to_stop(A1, [D2 | R2], Acc) -> - find_to_stop(A1, R2, [D2 | Acc]); -find_to_stop(_, [], Acc) -> - % No more running daemons to worry about. - Acc. - -should_halt(Errors) -> - RetryTimeCfg = config:get("os_daemon_settings", "retry_time", "5"), - RetryTime = list_to_integer(RetryTimeCfg), - - Now = now(), - RecentErrors = lists:filter(fun(Time) -> - timer:now_diff(Now, Time) =< RetryTime * 1000000 - end, Errors), - - RetryCfg = config:get("os_daemon_settings", "max_retries", "3"), - Retries = list_to_integer(RetryCfg), - - {length(RecentErrors) >= Retries, RecentErrors}. http://git-wip-us.apache.org/repos/asf/couchdb/blob/ed98610c/src/couch/src/couch_os_process.erl ---------------------------------------------------------------------- diff --git a/src/couch/src/couch_os_process.erl b/src/couch/src/couch_os_process.erl deleted file mode 100644 index c6e6520..0000000 --- a/src/couch/src/couch_os_process.erl +++ /dev/null @@ -1,285 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(couch_os_process). --behaviour(gen_server). - --export([start_link/1, start_link/2, start_link/3, stop/1]). --export([set_timeout/2, prompt/2, prompt_many/2, killer/1]). --export([send/2, writeline/2, readline/1, writejson/2, readjson/1]). --export([init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2, code_change/3]). - --include_lib("couch/include/couch_db.hrl"). - --define(PORT_OPTIONS, [stream, {line, 4096}, binary, exit_status, hide]). - --record(os_proc, - {command, - port, - writer, - reader, - timeout=5000, - idle - }). - -start_link(Command) -> - start_link(Command, []). -start_link(Command, Options) -> - start_link(Command, Options, ?PORT_OPTIONS). -start_link(Command, Options, PortOptions) -> - gen_server:start_link(couch_os_process, [Command, Options, PortOptions], []). - -stop(Pid) -> - gen_server:cast(Pid, stop). - -% Read/Write API -set_timeout(Pid, TimeOut) when is_integer(TimeOut) -> - ok = gen_server:call(Pid, {set_timeout, TimeOut}, infinity). - -% Used by couch_db_update_notifier.erl -send(Pid, Data) -> - gen_server:cast(Pid, {send, Data}). - -prompt(Pid, Data) -> - case gen_server:call(Pid, {prompt, Data}, infinity) of - {ok, Result} -> - Result; - Error -> - ?LOG_ERROR("OS Process Error ~p :: ~p",[Pid,Error]), - throw(Error) - end. - -prompt_many(Pid, DataList) -> - OsProc = gen_server:call(Pid, get_os_proc, infinity), - true = port_connect(OsProc#os_proc.port, self()), - try - send_many(OsProc, DataList), - receive_many(length(DataList), OsProc, []) - after - % Can throw badarg error, when OsProc Pid is dead or port was closed - % by the readline function on error/timeout. - (catch port_connect(OsProc#os_proc.port, Pid)), - unlink(OsProc#os_proc.port), - drop_port_messages(OsProc#os_proc.port) - end. - -send_many(_OsProc, []) -> - ok; -send_many(#os_proc{writer = Writer} = OsProc, [Data | Rest]) -> - Writer(OsProc, Data), - send_many(OsProc, Rest). - -receive_many(0, _OsProc, Acc) -> - {ok, lists:reverse(Acc)}; -receive_many(N, #os_proc{reader = Reader} = OsProc, Acc) -> - Line = Reader(OsProc), - receive_many(N - 1, OsProc, [Line | Acc]). - -drop_port_messages(Port) -> - receive - {Port, _} -> - drop_port_messages(Port) - after 0 -> - ok - end. - -% Utility functions for reading and writing -% in custom functions -writeline(OsProc, Data) when is_record(OsProc, os_proc) -> - port_command(OsProc#os_proc.port, [Data, $\n]). - -readline(#os_proc{} = OsProc) -> - readline(OsProc, []). -readline(#os_proc{port = Port} = OsProc, Acc) -> - receive - {Port, {data, {noeol, Data}}} when is_binary(Acc) -> - readline(OsProc, <>); - {Port, {data, {noeol, Data}}} when is_binary(Data) -> - readline(OsProc, Data); - {Port, {data, {noeol, Data}}} -> - readline(OsProc, [Data|Acc]); - {Port, {data, {eol, <>}}} when is_binary(Acc) -> - [<>]; - {Port, {data, {eol, Data}}} when is_binary(Data) -> - [Data]; - {Port, {data, {eol, Data}}} -> - lists:reverse(Acc, Data); - {Port, Err} -> - catch port_close(Port), - throw({os_process_error, Err}) - after OsProc#os_proc.timeout -> - catch port_close(Port), - throw({os_process_error, "OS process timed out."}) - end. - -% Standard JSON functions -writejson(OsProc, Data) when is_record(OsProc, os_proc) -> - JsonData = ?JSON_ENCODE(Data), - ?LOG_DEBUG("OS Process ~p Input :: ~s", [OsProc#os_proc.port, JsonData]), - true = writeline(OsProc, JsonData). - -readjson(OsProc) when is_record(OsProc, os_proc) -> - Line = iolist_to_binary(readline(OsProc)), - ?LOG_DEBUG("OS Process ~p Output :: ~s", [OsProc#os_proc.port, Line]), - try - % Don't actually parse the whole JSON. Just try to see if it's - % a command or a doc map/reduce/filter/show/list/update output. - % If it's a command then parse the whole JSON and execute the - % command, otherwise return the raw JSON line to the caller. - pick_command(Line) - catch - throw:abort -> - {json, Line}; - throw:{cmd, _Cmd} -> - case ?JSON_DECODE(Line) of - [<<"log">>, Msg] when is_binary(Msg) -> - % we got a message to log. Log it and continue - ?LOG_INFO("OS Process ~p Log :: ~s", [OsProc#os_proc.port, Msg]), - readjson(OsProc); - [<<"error">>, Id, Reason] -> - throw({error, {couch_util:to_existing_atom(Id),Reason}}); - [<<"fatal">>, Id, Reason] -> - ?LOG_INFO("OS Process ~p Fatal Error :: ~s ~p", - [OsProc#os_proc.port, Id, Reason]), - throw({couch_util:to_existing_atom(Id),Reason}); - _Result -> - {json, Line} - end - end. - -pick_command(Line) -> - json_stream_parse:events(Line, fun pick_command0/1). - -pick_command0(array_start) -> - fun pick_command1/1; -pick_command0(_) -> - throw(abort). - -pick_command1(<<"log">> = Cmd) -> - throw({cmd, Cmd}); -pick_command1(<<"error">> = Cmd) -> - throw({cmd, Cmd}); -pick_command1(<<"fatal">> = Cmd) -> - throw({cmd, Cmd}); -pick_command1(_) -> - throw(abort). - - -% gen_server API -init([Command, Options, PortOptions]) -> - PrivDir = couch_util:priv_dir(), - Spawnkiller = filename:join(PrivDir, "couchspawnkillable"), - V = config:get("query_server_config", "os_process_idle_limit", "300"), - IdleLimit = list_to_integer(V) * 1000, - BaseProc = #os_proc{ - command=Command, - port=open_port({spawn, Spawnkiller ++ " " ++ Command}, PortOptions), - writer=fun ?MODULE:writejson/2, - reader=fun ?MODULE:readjson/1, - idle=IdleLimit - }, - KillCmd = iolist_to_binary(readline(BaseProc)), - Pid = self(), - ?LOG_DEBUG("OS Process Start :: ~p", [BaseProc#os_proc.port]), - spawn(fun() -> - % this ensure the real os process is killed when this process dies. - erlang:monitor(process, Pid), - receive _ -> ok end, - killer(?b2l(KillCmd)) - end), - OsProc = - lists:foldl(fun(Opt, Proc) -> - case Opt of - {writer, Writer} when is_function(Writer) -> - Proc#os_proc{writer=Writer}; - {reader, Reader} when is_function(Reader) -> - Proc#os_proc{reader=Reader}; - {timeout, TimeOut} when is_integer(TimeOut) -> - Proc#os_proc{timeout=TimeOut} - end - end, BaseProc, Options), - {ok, OsProc, IdleLimit}. - -terminate(_Reason, #os_proc{port=Port}) -> - catch port_close(Port), - ok. - -handle_call(get_os_proc, _From, #os_proc{idle=Idle}=OsProc) -> - {reply, OsProc, OsProc, Idle}; -handle_call({set_timeout, TimeOut}, _From, #os_proc{idle=Idle}=OsProc) -> - {reply, ok, OsProc#os_proc{timeout=TimeOut}, Idle}; -handle_call({prompt, Data}, _From, #os_proc{idle=Idle}=OsProc) -> - #os_proc{writer=Writer, reader=Reader} = OsProc, - try - Writer(OsProc, Data), - {reply, {ok, Reader(OsProc)}, OsProc, Idle} - catch - throw:{error, OsError} -> - {reply, OsError, OsProc, Idle}; - throw:{fatal, OsError} -> - {stop, normal, OsError, OsProc}; - throw:OtherError -> - {stop, normal, OtherError, OsProc} - after - garbage_collect() - end. - -handle_cast({send, Data}, #os_proc{writer=Writer, idle=Idle}=OsProc) -> - try - Writer(OsProc, Data), - {noreply, OsProc, Idle} - catch - throw:OsError -> - ?LOG_ERROR("Failed sending data: ~p -> ~p", [Data, OsError]), - {stop, normal, OsProc} - end; -handle_cast(stop, OsProc) -> - {stop, normal, OsProc}; -handle_cast(Msg, #os_proc{idle=Idle}=OsProc) -> - ?LOG_DEBUG("OS Proc: Unknown cast: ~p", [Msg]), - {noreply, OsProc, Idle}. - -handle_info(timeout, #os_proc{idle=Idle}=OsProc) -> - gen_server:cast(couch_proc_manager, {os_proc_idle, self()}), - erlang:garbage_collect(), - {noreply, OsProc, Idle}; -handle_info({Port, {exit_status, 0}}, #os_proc{port=Port}=OsProc) -> - ?LOG_INFO("OS Process terminated normally", []), - {stop, normal, OsProc}; -handle_info({Port, {exit_status, Status}}, #os_proc{port=Port}=OsProc) -> - ?LOG_ERROR("OS Process died with status: ~p", [Status]), - {stop, {exit_status, Status}, OsProc}; -handle_info(Msg, #os_proc{idle=Idle}=OsProc) -> - ?LOG_DEBUG("OS Proc: Unknown info: ~p", [Msg]), - {noreply, OsProc, Idle}. - -code_change(_, {os_proc, Cmd, Port, W, R, Timeout} , _) -> - V = config:get("query_server_config","os_process_idle_limit","300"), - State = #os_proc{ - command = Cmd, - port = Port, - writer = W, - reader = R, - timeout = Timeout, - idle = list_to_integer(V) * 1000 - }, - {ok, State}; -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -killer(KillCmd) -> - receive _ -> - os:cmd(KillCmd) - after 1000 -> - ?MODULE:killer(KillCmd) - end. - http://git-wip-us.apache.org/repos/asf/couchdb/blob/ed98610c/src/couch/src/couch_passwords.erl ---------------------------------------------------------------------- diff --git a/src/couch/src/couch_passwords.erl b/src/couch/src/couch_passwords.erl deleted file mode 100644 index d0f36cc..0000000 --- a/src/couch/src/couch_passwords.erl +++ /dev/null @@ -1,119 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(couch_passwords). - --export([simple/2, pbkdf2/3, pbkdf2/4, verify/2]). --export([hash_admin_password/1, get_unhashed_admins/0]). - --include_lib("couch/include/couch_db.hrl"). - --define(MAX_DERIVED_KEY_LENGTH, (1 bsl 32 - 1)). --define(SHA1_OUTPUT_LENGTH, 20). - -%% legacy scheme, not used for new passwords. --spec simple(binary(), binary()) -> binary(). -simple(Password, Salt) -> - ?l2b(couch_util:to_hex(crypto:sha(<>))). - -%% CouchDB utility functions --spec hash_admin_password(binary()) -> binary(). -hash_admin_password(ClearPassword) -> - Iterations = config:get("couch_httpd_auth", "iterations", "10000"), - Salt = couch_uuids:random(), - DerivedKey = couch_passwords:pbkdf2(couch_util:to_binary(ClearPassword), - Salt ,list_to_integer(Iterations)), - ?l2b("-pbkdf2-" ++ ?b2l(DerivedKey) ++ "," - ++ ?b2l(Salt) ++ "," - ++ Iterations). - --spec get_unhashed_admins() -> list(). -get_unhashed_admins() -> - lists:filter( - fun({_User, "-hashed-" ++ _}) -> - false; % already hashed - ({_User, "-pbkdf2-" ++ _}) -> - false; % already hashed - ({_User, _ClearPassword}) -> - true - end, - config:get("admins")). - -%% Current scheme, much stronger. --spec pbkdf2(binary(), binary(), integer()) -> binary(). -pbkdf2(Password, Salt, Iterations) -> - {ok, Result} = pbkdf2(Password, Salt, Iterations, ?SHA1_OUTPUT_LENGTH), - Result. - --spec pbkdf2(binary(), binary(), integer(), integer()) - -> {ok, binary()} | {error, derived_key_too_long}. -pbkdf2(_Password, _Salt, _Iterations, DerivedLength) - when DerivedLength > ?MAX_DERIVED_KEY_LENGTH -> - {error, derived_key_too_long}; -pbkdf2(Password, Salt, Iterations, DerivedLength) -> - L = ceiling(DerivedLength / ?SHA1_OUTPUT_LENGTH), - <> = - iolist_to_binary(pbkdf2(Password, Salt, Iterations, L, 1, [])), - {ok, ?l2b(couch_util:to_hex(Bin))}. - --spec pbkdf2(binary(), binary(), integer(), integer(), integer(), iolist()) - -> iolist(). -pbkdf2(_Password, _Salt, _Iterations, BlockCount, BlockIndex, Acc) - when BlockIndex > BlockCount -> - lists:reverse(Acc); -pbkdf2(Password, Salt, Iterations, BlockCount, BlockIndex, Acc) -> - Block = pbkdf2(Password, Salt, Iterations, BlockIndex, 1, <<>>, <<>>), - pbkdf2(Password, Salt, Iterations, BlockCount, BlockIndex + 1, [Block|Acc]). - --spec pbkdf2(binary(), binary(), integer(), integer(), integer(), - binary(), binary()) -> binary(). -pbkdf2(_Password, _Salt, Iterations, _BlockIndex, Iteration, _Prev, Acc) - when Iteration > Iterations -> - Acc; -pbkdf2(Password, Salt, Iterations, BlockIndex, 1, _Prev, _Acc) -> - InitialBlock = crypto:sha_mac(Password, - <>), - pbkdf2(Password, Salt, Iterations, BlockIndex, 2, - InitialBlock, InitialBlock); -pbkdf2(Password, Salt, Iterations, BlockIndex, Iteration, Prev, Acc) -> - Next = crypto:sha_mac(Password, Prev), - pbkdf2(Password, Salt, Iterations, BlockIndex, Iteration + 1, - Next, crypto:exor(Next, Acc)). - -%% verify two lists for equality without short-circuits to avoid timing attacks. --spec verify(string(), string(), integer()) -> boolean(). -verify([X|RestX], [Y|RestY], Result) -> - verify(RestX, RestY, (X bxor Y) bor Result); -verify([], [], Result) -> - Result == 0. - --spec verify(binary(), binary()) -> boolean(); - (list(), list()) -> boolean(). -verify(<>, <>) -> - verify(?b2l(X), ?b2l(Y)); -verify(X, Y) when is_list(X) and is_list(Y) -> - case length(X) == length(Y) of - true -> - verify(X, Y, 0); - false -> - false - end; -verify(_X, _Y) -> false. - --spec ceiling(number()) -> integer(). -ceiling(X) -> - T = erlang:trunc(X), - case (X - T) of - Neg when Neg < 0 -> T; - Pos when Pos > 0 -> T + 1; - _ -> T - end. http://git-wip-us.apache.org/repos/asf/couchdb/blob/ed98610c/src/couch/src/couch_primary_sup.erl ---------------------------------------------------------------------- diff --git a/src/couch/src/couch_primary_sup.erl b/src/couch/src/couch_primary_sup.erl deleted file mode 100644 index 3ce8827..0000000 --- a/src/couch/src/couch_primary_sup.erl +++ /dev/null @@ -1,60 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(couch_primary_sup). --behaviour(supervisor). --export([init/1, start_link/0]). - -start_link() -> - supervisor:start_link({local,couch_primary_services}, ?MODULE, []). - -init([]) -> - Children = [ - {collation_driver, - {couch_drv, start_link, []}, - permanent, - infinity, - supervisor, - [couch_drv]}, - {couch_task_status, - {couch_task_status, start_link, []}, - permanent, - brutal_kill, - worker, - [couch_task_status]}, - {couch_server, - {couch_server, sup_start_link, []}, - permanent, - brutal_kill, - worker, - [couch_server]}, - {couch_db_update_event, - {gen_event, start_link, [{local, couch_db_update}]}, - permanent, - brutal_kill, - worker, - dynamic}, - {couch_replication_event, - {gen_event, start_link, [{local, couch_replication}]}, - permanent, - brutal_kill, - worker, - dynamic}, - {couch_replicator_job_sup, - {couch_replicator_job_sup, start_link, []}, - permanent, - infinity, - supervisor, - [couch_replicator_job_sup]} - ], - {ok, {{one_for_one, 10, 3600}, Children}}. - http://git-wip-us.apache.org/repos/asf/couchdb/blob/ed98610c/src/couch/src/couch_proc_manager.erl ---------------------------------------------------------------------- diff --git a/src/couch/src/couch_proc_manager.erl b/src/couch/src/couch_proc_manager.erl deleted file mode 100644 index 45b334f..0000000 --- a/src/couch/src/couch_proc_manager.erl +++ /dev/null @@ -1,307 +0,0 @@ --module(couch_proc_manager). --behaviour(gen_server). --behaviour(config_listener). - --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, - code_change/3]). - --export([start_link/0, get_proc_count/0, new_proc/2, new_proc/4]). - -% config_listener api --export([handle_config_change/5]). - --include_lib("couch/include/couch_db.hrl"). - --record(state, { - tab, - config -}). - -start_link() -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). - -get_proc_count() -> - gen_server:call(?MODULE, get_proc_count). - -init([]) -> - process_flag(trap_exit, true), - ok = config:listen_for_changes(?MODULE, nil), - {ok, #state{ - tab = ets:new(procs, [ordered_set, {keypos, #proc.pid}]), - config = get_proc_config() - }}. - -handle_call(get_table, _From, State) -> - {reply, State#state.tab, State}; - -handle_call(get_proc_count, _From, State) -> - {reply, ets:info(State#state.tab, size), State}; - -handle_call({get_proc, #doc{body={Props}}=DDoc, DDocKey}, From, State) -> - {Client, _} = From, - Lang = couch_util:get_value(<<"language">>, Props, <<"javascript">>), - IterFun = fun(Proc, Acc) -> - case lists:member(DDocKey, Proc#proc.ddoc_keys) of - true -> - {stop, assign_proc(State#state.tab, Client, Proc)}; - false -> - {ok, Acc} - end - end, - TeachFun = fun(Proc0, Acc) -> - try - {ok, Proc1} = teach_ddoc(DDoc, DDocKey, Proc0), - {stop, assign_proc(State#state.tab, Client, Proc1)} - catch _:_ -> - {ok, Acc} - end - end, - try iter_procs(State#state.tab, Lang, IterFun, nil) of - {not_found, _} -> - case iter_procs(State#state.tab, Lang, TeachFun, nil) of - {not_found, _} -> - spawn_link(?MODULE, new_proc, [From, Lang, DDoc, DDocKey]), - {noreply, State}; - {ok, Proc} -> - {reply, {ok, Proc, State#state.config}, State} - end; - {ok, Proc} -> - {reply, {ok, Proc, State#state.config}, State} - catch error:Reason -> - ?LOG_ERROR("~p ~p ~p", [?MODULE, Reason, erlang:get_stacktrace()]), - {reply, {error, Reason}, State} - end; - -handle_call({get_proc, Lang}, {Client, _} = From, State) -> - IterFun = fun(Proc, _Acc) -> - {stop, assign_proc(State#state.tab, Client, Proc)} - end, - try iter_procs(State#state.tab, Lang, IterFun, nil) of - {not_found, _} -> - spawn_link(?MODULE, new_proc, [From, Lang]), - {noreply, State}; - {ok, Proc} -> - {reply, {ok, Proc, State#state.config}, State} - catch error:Reason -> - ?LOG_ERROR("~p ~p ~p", [?MODULE, Reason, erlang:get_stacktrace()]), - {reply, {error, Reason}, State} - end; - -handle_call({ret_proc, #proc{client=Ref} = Proc}, _From, State) -> - erlang:demonitor(Ref, [flush]), - % We need to check if the process is alive here, as the client could be - % handing us a #proc{} with a dead one. We would have already removed the - % #proc{} from our own table, so the alternative is to do a lookup in the - % table before the insert. Don't know which approach is cheaper. - return_proc(State#state.tab, Proc), - {reply, true, State}; - -handle_call(_Call, _From, State) -> - {reply, ignored, State}. - -handle_cast({os_proc_idle, Pid}, #state{tab=Tab}=State) -> - Limit = config:get("query_server_config", "os_process_soft_limit", "100"), - case ets:lookup(Tab, Pid) of - [#proc{client=nil}] -> - case ets:info(Tab, size) > list_to_integer(Limit) of - true -> - ?LOG_INFO("Closing idle OS Process: ~p", [Pid]), - ets:delete(Tab, Pid), - case is_process_alive(Pid) of - true -> - unlink(Pid), - gen_server:cast(Pid, stop); - _ -> - ok - end; - _ -> - ok - end; - _ -> - ok - end, - {noreply, State}; -handle_cast(reload_config, State) -> - {noreply, State#state{config = get_proc_config()}}; -handle_cast(_Msg, State) -> - {noreply, State}. - - -handle_info(shutdown, State) -> - {stop, shutdown, State}; - -handle_info({'EXIT', _, {ok, Proc0, {Client,_} = From}}, State) -> - link(Proc0#proc.pid), - Proc = assign_proc(State#state.tab, Client, Proc0), - gen_server:reply(From, {ok, Proc, State#state.config}), - {noreply, State}; - -handle_info({'EXIT', Pid, Reason}, State) -> - ?LOG_INFO("~p ~p died ~p", [?MODULE, Pid, Reason]), - ets:delete(State#state.tab, Pid), - {noreply, State}; - -handle_info({'DOWN', Ref, _, _, _Reason}, State) -> - case ets:match_object(State#state.tab, #proc{client=Ref, _='_'}) of - [] -> - ok; - [#proc{} = Proc] -> - return_proc(State#state.tab, Proc) - end, - {noreply, State}; - -handle_info({gen_event_EXIT, {config_listener, ?MODULE}, _Reason}, State) -> - erlang:send_after(5000, self(), restart_config_listener), - {noreply, State}; - -handle_info(restart_config_lister, State) -> - ok = config:listen_for_changes(?MODULE, nil), - {noreply, State}; - -handle_info(_Msg, State) -> - {noreply, State}. - -terminate(_Reason, #state{tab=Tab}) -> - ets:foldl(fun(#proc{pid=P}, _) -> couch_util:shutdown_sync(P) end, 0, Tab), - ok. - -code_change(_OldVsn, #state{tab = Tab} = State, _Extra) -> - NewTab = ets:new(procs, [ordered_set, {keypos, #proc.pid}]), - true = ets:insert(NewTab, ets:tab2list(Tab)), - true = ets:delete(Tab), - {ok, State#state{tab = NewTab}}. - -handle_config_change("query_server_config", _, _, _, _) -> - gen_server:cast(?MODULE, reload_config), - {ok, nil}; -handle_config_change(_, _, _, _, _) -> - {ok, nil}. - -iter_procs(Tab, Lang, Fun, Acc) when is_binary(Lang) -> - iter_procs(Tab, binary_to_list(Lang), Fun, Acc); -iter_procs(Tab, Lang, Fun, Acc) -> - Pattern = #proc{lang=Lang, client=nil, _='_'}, - MSpec = [{Pattern, [], ['$_']}], - case ets:select_reverse(Tab, MSpec, 25) of - '$end_of_table' -> - {not_found, Acc}; - Continuation -> - iter_procs(Continuation, Fun, Acc) - end. - -iter_procs({[], Continuation0}, Fun, Acc) -> - case ets:select_reverse(Continuation0) of - '$end_of_table' -> - {not_found, Acc}; - Continuation1 -> - iter_procs(Continuation1, Fun, Acc) - end; -iter_procs({[Proc | Rest], Continuation}, Fun, Acc0) -> - case Fun(Proc, Acc0) of - {ok, Acc1} -> - iter_procs({Rest, Continuation}, Fun, Acc1); - {stop, Acc1} -> - {ok, Acc1} - end. - -new_proc(From, Lang) -> - case new_proc_int(From, Lang) of - {ok, Proc} -> - exit({ok, Proc, From}); - Error -> - gen_server:reply(From, {error, Error}) - end. - -new_proc(From, Lang, DDoc, DDocKey) -> - case new_proc_int(From, Lang) of - {ok, NewProc} -> - case proc_with_ddoc(DDoc, DDocKey, [NewProc]) of - {ok, Proc} -> - exit({ok, Proc, From}); - {error, Reason} -> - gen_server:reply(From, {error, Reason}) - end; - Error -> - gen_server:reply(From, {error, Error}) - end. - -new_proc_int(From, Lang) when is_binary(Lang) -> - new_proc_int(From, binary_to_list(Lang)); -new_proc_int(From, Lang) when is_list(Lang) -> - case config:get("query_servers", Lang) of - undefined -> - case config:get("native_query_servers", Lang) of - undefined -> - gen_server:reply(From, {unknown_query_language, Lang}); - SpecStr -> - {ok, {M,F,A}} = couch_util:parse_term(SpecStr), - {ok, Pid} = apply(M, F, A), - make_proc(Pid, Lang, M) - end; - Command -> - {ok, Pid} = couch_os_process:start_link(Command), - make_proc(Pid, Lang, couch_os_process) - end. - -make_proc(Pid, Lang, Mod) -> - Proc = #proc{ - lang = Lang, - pid = Pid, - prompt_fun = {Mod, prompt}, - prompt_many_fun = {Mod, prompt_many}, - set_timeout_fun = {Mod, set_timeout}, - stop_fun = {Mod, stop} - }, - unlink(Pid), - {ok, Proc}. - -assign_proc(Tab, Client, #proc{client=nil}=Proc0) -> - Proc = Proc0#proc{client = erlang:monitor(process, Client)}, - ets:insert(Tab, Proc), - Proc. - -return_proc(Tab, #proc{pid=Pid} = Proc) -> - case is_process_alive(Pid) of true -> - gen_server:cast(Pid, garbage_collect), - ets:insert(Tab, Proc#proc{client=nil}); - false -> - ets:delete(Tab, Pid) - end. - -get_proc_config() -> - Limit = config:get("query_server_config", "reduce_limit", "true"), - Timeout = config:get("couchdb", "os_process_timeout", "5000"), - {[ - {<<"reduce_limit">>, list_to_atom(Limit)}, - {<<"timeout">>, list_to_integer(Timeout)} - ]}. - -proc_with_ddoc(DDoc, DDocKey, Procs) -> - Filter = fun(#proc{ddoc_keys=Keys}) -> not lists:member(DDocKey, Keys) end, - case lists:dropwhile(Filter, Procs) of - [DDocProc|_] -> - {ok, DDocProc}; - [] -> - teach_any_proc(DDoc, DDocKey, Procs) - end. - -teach_any_proc(DDoc, DDocKey, [Proc|Rest]) -> - try - teach_ddoc(DDoc, DDocKey, Proc) - catch _:_ -> - teach_any_proc(DDoc, DDocKey, Rest) - end; -teach_any_proc(_, _, []) -> - {error, noproc}. - -teach_ddoc(DDoc, {DDocId, _Rev}=DDocKey, #proc{ddoc_keys=Keys}=Proc) -> - % send ddoc over the wire - % we only share the rev with the client we know to update code - % but it only keeps the latest copy, per each ddoc, around. - true = couch_query_servers:proc_prompt(Proc, [<<"ddoc">>, <<"new">>, - DDocId, couch_doc:to_json_obj(DDoc, [])]), - % we should remove any other ddocs keys for this docid - % because the query server overwrites without the rev - Keys2 = [{D,R} || {D,R} <- Keys, D /= DDocId], - % add ddoc to the proc - {ok, Proc#proc{ddoc_keys=[DDocKey|Keys2]}}.