couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From beno...@apache.org
Subject [43/57] [abbrv] [partial] inital move to rebar compilation
Date Tue, 07 Jan 2014 00:37:03 GMT
http://git-wip-us.apache.org/repos/asf/couchdb/blob/add91738/apps/couch/src/couch_native_process.erl
----------------------------------------------------------------------
diff --git a/apps/couch/src/couch_native_process.erl b/apps/couch/src/couch_native_process.erl
new file mode 100644
index 0000000..5a32e75
--- /dev/null
+++ b/apps/couch/src/couch_native_process.erl
@@ -0,0 +1,409 @@
+% Licensed under the Apache License, Version 2.0 (the "License");
+% you may not use this file except in compliance with the License.
+%
+% You may obtain a copy of the License at
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing,
+% software distributed under the License is distributed on an
+% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+% either express or implied.
+%
+% See the License for the specific language governing permissions
+% and limitations under the License.
+%
+% This file drew much inspiration from erlview, which was written by and
+% copyright Michael McDaniel [http://autosys.us], and is also under APL 2.0
+%
+%
+% This module provides the smallest possible native view-server.
+% With this module in-place, you can add the following to your couch INI files:
+%  [native_query_servers]
+%  erlang={couch_native_process, start_link, []}
+%
+% Which will then allow following example map function to be used:
+%
+%  fun({Doc}) ->
+%    % Below, we emit a single record - the _id as key, null as value
+%    DocId = couch_util:get_value(<<"_id">>, Doc, null),
+%    Emit(DocId, null)
+%  end.
+%
+% which should be roughly the same as the javascript:
+%    emit(doc._id, null);
+%
+% This module exposes enough functions such that a native erlang server can
+% act as a fully-fleged view server, but no 'helper' functions specifically
+% for simplifying your erlang view code.  It is expected other third-party
+% extensions will evolve which offer useful layers on top of this view server
+% to help simplify your view code.
+-module(couch_native_process).
+-behaviour(gen_server).
+
+-export([start_link/0,init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,
+         handle_info/2]).
+-export([set_timeout/2, prompt/2]).
+
+-define(STATE, native_proc_state).
+-record(evstate, {ddocs, funs=[], query_config=[], list_pid=nil, timeout=5000}).
+
+-include("couch_db.hrl").
+
+start_link() ->
+    gen_server:start_link(?MODULE, [], []).
+
+% this is a bit messy, see also couch_query_servers handle_info
+% stop(_Pid) ->
+%     ok.
+
+set_timeout(Pid, TimeOut) ->
+    gen_server:call(Pid, {set_timeout, TimeOut}).
+
+prompt(Pid, Data) when is_list(Data) ->
+    gen_server:call(Pid, {prompt, Data}).
+
+% gen_server callbacks
+init([]) ->
+    {ok, #evstate{ddocs=dict:new()}}.
+
+handle_call({set_timeout, TimeOut}, _From, State) ->
+    {reply, ok, State#evstate{timeout=TimeOut}};
+
+handle_call({prompt, Data}, _From, State) ->
+    ?LOG_DEBUG("Prompt native qs: ~s",[?JSON_ENCODE(Data)]),
+    {NewState, Resp} = try run(State, to_binary(Data)) of
+        {S, R} -> {S, R}
+        catch
+            throw:{error, Why} ->
+                {State, [<<"error">>, Why, Why]}
+        end,
+
+    case Resp of
+        {error, Reason} ->
+            Msg = io_lib:format("couch native server error: ~p", [Reason]),
+            {reply, [<<"error">>, <<"native_query_server">>, list_to_binary(Msg)], NewState};
+        [<<"error">> | Rest] ->
+            % Msg = io_lib:format("couch native server error: ~p", [Rest]),
+            % TODO: markh? (jan)
+            {reply, [<<"error">> | Rest], NewState};
+        [<<"fatal">> | Rest] ->
+            % Msg = io_lib:format("couch native server error: ~p", [Rest]),
+            % TODO: markh? (jan)
+            {stop, fatal, [<<"error">> | Rest], NewState};
+        Resp ->
+            {reply, Resp, NewState}
+    end.
+
+handle_cast(foo, State) -> {noreply, State}.
+handle_info({'EXIT',_,normal}, State) -> {noreply, State};
+handle_info({'EXIT',_,Reason}, State) ->
+    {stop, Reason, State}.
+terminate(_Reason, _State) -> ok.
+code_change(_OldVersion, State, _Extra) -> {ok, State}.
+
+run(#evstate{list_pid=Pid}=State, [<<"list_row">>, Row]) when is_pid(Pid) ->
+    Pid ! {self(), list_row, Row},
+    receive
+        {Pid, chunks, Data} ->
+            {State, [<<"chunks">>, Data]};
+        {Pid, list_end, Data} ->
+            receive
+                {'EXIT', Pid, normal} -> ok
+            after State#evstate.timeout ->
+                throw({timeout, list_cleanup})
+            end,
+            process_flag(trap_exit, erlang:get(do_trap)),
+            {State#evstate{list_pid=nil}, [<<"end">>, Data]}
+    after State#evstate.timeout ->
+        throw({timeout, list_row})
+    end;
+run(#evstate{list_pid=Pid}=State, [<<"list_end">>]) when is_pid(Pid) ->
+    Pid ! {self(), list_end},
+    Resp =
+    receive
+        {Pid, list_end, Data} ->
+            receive
+                {'EXIT', Pid, normal} -> ok
+            after State#evstate.timeout ->
+                throw({timeout, list_cleanup})
+            end,
+            [<<"end">>, Data]
+    after State#evstate.timeout ->
+        throw({timeout, list_end})
+    end,
+    process_flag(trap_exit, erlang:get(do_trap)),
+    {State#evstate{list_pid=nil}, Resp};
+run(#evstate{list_pid=Pid}=State, _Command) when is_pid(Pid) ->
+    {State, [<<"error">>, list_error, list_error]};
+run(#evstate{ddocs=DDocs}, [<<"reset">>]) ->
+    {#evstate{ddocs=DDocs}, true};
+run(#evstate{ddocs=DDocs}, [<<"reset">>, QueryConfig]) ->
+    {#evstate{ddocs=DDocs, query_config=QueryConfig}, true};
+run(#evstate{funs=Funs}=State, [<<"add_fun">> , BinFunc]) ->
+    FunInfo = makefun(State, BinFunc),
+    {State#evstate{funs=Funs ++ [FunInfo]}, true};
+run(State, [<<"map_doc">> , Doc]) ->
+    Resp = lists:map(fun({Sig, Fun}) ->
+        erlang:put(Sig, []),
+        Fun(Doc),
+        lists:reverse(erlang:get(Sig))
+    end, State#evstate.funs),
+    {State, Resp};
+run(State, [<<"reduce">>, Funs, KVs]) ->
+    {Keys, Vals} =
+    lists:foldl(fun([K, V], {KAcc, VAcc}) ->
+        {[K | KAcc], [V | VAcc]}
+    end, {[], []}, KVs),
+    Keys2 = lists:reverse(Keys),
+    Vals2 = lists:reverse(Vals),
+    {State, catch reduce(State, Funs, Keys2, Vals2, false)};
+run(State, [<<"rereduce">>, Funs, Vals]) ->
+    {State, catch reduce(State, Funs, null, Vals, true)};
+run(#evstate{ddocs=DDocs}=State, [<<"ddoc">>, <<"new">>, DDocId, DDoc]) ->
+    DDocs2 = store_ddoc(DDocs, DDocId, DDoc),
+    {State#evstate{ddocs=DDocs2}, true};
+run(#evstate{ddocs=DDocs}=State, [<<"ddoc">>, DDocId | Rest]) ->
+    DDoc = load_ddoc(DDocs, DDocId),
+    ddoc(State, DDoc, Rest);
+run(_, Unknown) ->
+    ?LOG_ERROR("Native Process: Unknown command: ~p~n", [Unknown]),
+    throw({error, unknown_command}).
+    
+ddoc(State, {DDoc}, [FunPath, Args]) ->
+    % load fun from the FunPath
+    BFun = lists:foldl(fun
+        (Key, {Props}) when is_list(Props) ->
+            couch_util:get_value(Key, Props, nil);
+        (_Key, Fun) when is_binary(Fun) ->
+            Fun;
+        (_Key, nil) ->
+            throw({error, not_found});
+        (_Key, _Fun) ->
+            throw({error, malformed_ddoc})
+        end, {DDoc}, FunPath),
+    ddoc(State, makefun(State, BFun, {DDoc}), FunPath, Args).
+
+ddoc(State, {_, Fun}, [<<"validate_doc_update">>], Args) ->
+    {State, (catch apply(Fun, Args))};
+ddoc(State, {_, Fun}, [<<"filters">>|_], [Docs, Req]) ->
+    FilterFunWrapper = fun(Doc) ->
+        case catch Fun(Doc, Req) of
+        true -> true;
+        false -> false;
+        {'EXIT', Error} -> ?LOG_ERROR("~p", [Error])
+        end
+    end,
+    Resp = lists:map(FilterFunWrapper, Docs),
+    {State, [true, Resp]};
+ddoc(State, {_, Fun}, [<<"shows">>|_], Args) ->
+    Resp = case (catch apply(Fun, Args)) of
+        FunResp when is_list(FunResp) ->
+            FunResp;
+        {FunResp} ->
+            [<<"resp">>, {FunResp}];
+        FunResp ->
+            FunResp
+    end,
+    {State, Resp};
+ddoc(State, {_, Fun}, [<<"updates">>|_], Args) ->
+    Resp = case (catch apply(Fun, Args)) of
+        [JsonDoc, JsonResp]  ->
+            [<<"up">>, JsonDoc, JsonResp]
+    end,
+    {State, Resp};
+ddoc(State, {Sig, Fun}, [<<"lists">>|_], Args) ->
+    Self = self(),
+    SpawnFun = fun() ->
+        LastChunk = (catch apply(Fun, Args)),
+        case start_list_resp(Self, Sig) of
+            started ->
+                receive
+                    {Self, list_row, _Row} -> ignore;
+                    {Self, list_end} -> ignore
+                after State#evstate.timeout ->
+                    throw({timeout, list_cleanup_pid})
+                end;
+            _ ->
+                ok
+        end,
+        LastChunks =
+        case erlang:get(Sig) of
+            undefined -> [LastChunk];
+            OtherChunks -> [LastChunk | OtherChunks]
+        end,
+        Self ! {self(), list_end, lists:reverse(LastChunks)}
+    end,
+    erlang:put(do_trap, process_flag(trap_exit, true)),
+    Pid = spawn_link(SpawnFun),
+    Resp =
+    receive
+        {Pid, start, Chunks, JsonResp} ->
+            [<<"start">>, Chunks, JsonResp]
+    after State#evstate.timeout ->
+        throw({timeout, list_start})
+    end,
+    {State#evstate{list_pid=Pid}, Resp}.
+
+store_ddoc(DDocs, DDocId, DDoc) ->
+    dict:store(DDocId, DDoc, DDocs).
+load_ddoc(DDocs, DDocId) ->
+    try dict:fetch(DDocId, DDocs) of
+        {DDoc} -> {DDoc}
+    catch
+        _:_Else -> throw({error, ?l2b(io_lib:format("Native Query Server missing DDoc with Id: ~s",[DDocId]))})
+    end.
+
+bindings(State, Sig) ->
+    bindings(State, Sig, nil).
+bindings(State, Sig, DDoc) ->
+    Self = self(),
+
+    Log = fun(Msg) ->
+        ?LOG_INFO(Msg, [])
+    end,
+
+    Emit = fun(Id, Value) ->
+        Curr = erlang:get(Sig),
+        erlang:put(Sig, [[Id, Value] | Curr])
+    end,
+
+    Start = fun(Headers) ->
+        erlang:put(list_headers, Headers)
+    end,
+
+    Send = fun(Chunk) ->
+        Curr =
+        case erlang:get(Sig) of
+            undefined -> [];
+            Else -> Else
+        end,
+        erlang:put(Sig, [Chunk | Curr])
+    end,
+
+    GetRow = fun() ->
+        case start_list_resp(Self, Sig) of
+            started ->
+                ok;
+            _ ->
+                Chunks =
+                case erlang:get(Sig) of
+                    undefined -> [];
+                    CurrChunks -> CurrChunks
+                end,
+                Self ! {self(), chunks, lists:reverse(Chunks)}
+        end,
+        erlang:put(Sig, []),
+        receive
+            {Self, list_row, Row} -> Row;
+            {Self, list_end} -> nil
+        after State#evstate.timeout ->
+            throw({timeout, list_pid_getrow})
+        end
+    end,
+   
+    FoldRows = fun(Fun, Acc) -> foldrows(GetRow, Fun, Acc) end,
+
+    Bindings = [
+        {'Log', Log},
+        {'Emit', Emit},
+        {'Start', Start},
+        {'Send', Send},
+        {'GetRow', GetRow},
+        {'FoldRows', FoldRows}
+    ],
+    case DDoc of
+        {_Props} ->
+            Bindings ++ [{'DDoc', DDoc}];
+        _Else -> Bindings
+    end.
+
+% thanks to erlview, via:
+% http://erlang.org/pipermail/erlang-questions/2003-November/010544.html
+makefun(State, Source) ->
+    Sig = couch_util:md5(Source),
+    BindFuns = bindings(State, Sig),
+    {Sig, makefun(State, Source, BindFuns)}.
+makefun(State, Source, {DDoc}) ->
+    Sig = couch_util:md5(lists:flatten([Source, term_to_binary(DDoc)])),
+    BindFuns = bindings(State, Sig, {DDoc}),
+    {Sig, makefun(State, Source, BindFuns)};
+makefun(_State, Source, BindFuns) when is_list(BindFuns) ->
+    FunStr = binary_to_list(Source),
+    {ok, Tokens, _} = erl_scan:string(FunStr),
+    Form = case (catch erl_parse:parse_exprs(Tokens)) of
+        {ok, [ParsedForm]} ->
+            ParsedForm;
+        {error, {LineNum, _Mod, [Mesg, Params]}}=Error ->
+            io:format(standard_error, "Syntax error on line: ~p~n", [LineNum]),
+            io:format(standard_error, "~s~p~n", [Mesg, Params]),
+            throw(Error)
+    end,
+    Bindings = lists:foldl(fun({Name, Fun}, Acc) ->
+        erl_eval:add_binding(Name, Fun, Acc)
+    end, erl_eval:new_bindings(), BindFuns),
+    {value, Fun, _} = erl_eval:expr(Form, Bindings),
+    Fun.
+
+reduce(State, BinFuns, Keys, Vals, ReReduce) ->
+    Funs = case is_list(BinFuns) of
+        true ->
+            lists:map(fun(BF) -> makefun(State, BF) end, BinFuns);
+        _ ->
+            [makefun(State, BinFuns)]
+    end,
+    Reds = lists:map(fun({_Sig, Fun}) ->
+        Fun(Keys, Vals, ReReduce)
+    end, Funs),
+    [true, Reds].
+
+foldrows(GetRow, ProcRow, Acc) ->
+    case GetRow() of
+        nil ->
+            {ok, Acc};
+        Row ->
+            case (catch ProcRow(Row, Acc)) of
+                {ok, Acc2} ->
+                    foldrows(GetRow, ProcRow, Acc2);
+                {stop, Acc2} ->
+                    {ok, Acc2}
+            end
+    end.
+
+start_list_resp(Self, Sig) ->
+    case erlang:get(list_started) of
+        undefined ->
+            Headers =
+            case erlang:get(list_headers) of
+                undefined -> {[{<<"headers">>, {[]}}]};
+                CurrHdrs -> CurrHdrs
+            end,
+            Chunks =
+            case erlang:get(Sig) of
+                undefined -> [];
+                CurrChunks -> CurrChunks
+            end,
+            Self ! {self(), start, lists:reverse(Chunks), Headers},
+            erlang:put(list_started, true),
+            erlang:put(Sig, []),
+            started;
+        _ ->
+            ok
+    end.
+
+to_binary({Data}) ->
+    Pred = fun({Key, Value}) ->
+        {to_binary(Key), to_binary(Value)}
+    end,
+    {lists:map(Pred, Data)};
+to_binary(Data) when is_list(Data) ->
+    [to_binary(D) || D <- Data];
+to_binary(null) ->
+    null;
+to_binary(true) ->
+    true;
+to_binary(false) ->
+    false;
+to_binary(Data) when is_atom(Data) ->
+    list_to_binary(atom_to_list(Data));
+to_binary(Data) ->
+    Data.

http://git-wip-us.apache.org/repos/asf/couchdb/blob/add91738/apps/couch/src/couch_os_daemons.erl
----------------------------------------------------------------------
diff --git a/apps/couch/src/couch_os_daemons.erl b/apps/couch/src/couch_os_daemons.erl
new file mode 100644
index 0000000..cac031a
--- /dev/null
+++ b/apps/couch/src/couch_os_daemons.erl
@@ -0,0 +1,374 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+-module(couch_os_daemons).
+-behaviour(gen_server).
+
+-export([start_link/0, info/0, info/1, config_change/2]).
+
+-export([init/1, terminate/2, code_change/3]).
+-export([handle_call/3, handle_cast/2, handle_info/2]).
+
+-include("couch_db.hrl").
+
+-record(daemon, {
+    port,
+    name,
+    cmd,
+    kill,
+    status=running,
+    cfg_patterns=[],
+    errors=[],
+    buf=[]
+}).
+
+-define(PORT_OPTIONS, [stream, {line, 1024}, binary, exit_status, hide]).
+-define(TIMEOUT, 5000).
+
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+info() ->
+    info([]).
+
+info(Options) ->
+    gen_server:call(?MODULE, {daemon_info, Options}).
+
+config_change(Section, Key) ->
+    gen_server:cast(?MODULE, {config_change, Section, Key}).
+
+init(_) ->
+    process_flag(trap_exit, true),
+    ok = couch_config:register(fun ?MODULE:config_change/2),
+    Table = ets:new(?MODULE, [protected, set, {keypos, #daemon.port}]),
+    reload_daemons(Table),
+    {ok, Table}.
+
+terminate(_Reason, Table) ->
+    [stop_port(D) || D <- ets:tab2list(Table)],
+    ok.
+
+handle_call({daemon_info, Options}, _From, Table) when is_list(Options) ->
+    case lists:member(table, Options) of
+        true ->
+            {reply, {ok, ets:tab2list(Table)}, Table};
+        _ ->
+            {reply, {ok, Table}, Table}
+    end;
+handle_call(Msg, From, Table) ->
+    ?LOG_ERROR("Unknown call message to ~p from ~p: ~p", [?MODULE, From, Msg]),
+    {stop, error, Table}.
+
+handle_cast({config_change, Sect, Key}, Table) ->
+    restart_daemons(Table, Sect, Key),
+    case Sect of
+        "os_daemons" -> reload_daemons(Table);
+        _ -> ok
+    end,
+    {noreply, Table};
+handle_cast(stop, Table) ->
+    {stop, normal, Table};
+handle_cast(Msg, Table) ->
+    ?LOG_ERROR("Unknown cast message to ~p: ~p", [?MODULE, Msg]),
+    {stop, error, Table}.
+
+handle_info({'EXIT', Port, Reason}, Table) ->
+    case ets:lookup(Table, Port) of
+        [] ->
+            ?LOG_INFO("Port ~p exited after stopping: ~p~n", [Port, Reason]);
+        [#daemon{status=stopping}] ->
+            true = ets:delete(Table, Port);
+        [#daemon{name=Name, status=restarting}=D] ->
+            ?LOG_INFO("Daemon ~P restarting after config change.", [Name]),
+            true = ets:delete(Table, Port),
+            {ok, Port2} = start_port(D#daemon.cmd),
+            true = ets:insert(Table, D#daemon{
+                port=Port2, status=running, kill=undefined, buf=[]
+            });
+        [#daemon{name=Name, status=halted}] ->
+            ?LOG_ERROR("Halted daemon process: ~p", [Name]);
+        [D] ->
+            ?LOG_ERROR("Invalid port state at exit: ~p", [D])
+    end,
+    {noreply, Table};
+handle_info({Port, closed}, Table) ->
+    handle_info({Port, {exit_status, closed}}, Table);
+handle_info({Port, {exit_status, Status}}, Table) ->
+    case ets:lookup(Table, Port) of
+        [] ->
+            ?LOG_ERROR("Unknown port ~p exiting ~p", [Port, Status]),
+            {stop, {error, unknown_port_died, Status}, Table};
+        [#daemon{name=Name, status=restarting}=D] ->
+            ?LOG_INFO("Daemon ~P restarting after config change.", [Name]),
+            true = ets:delete(Table, Port),
+            {ok, Port2} = start_port(D#daemon.cmd),
+            true = ets:insert(Table, D#daemon{
+                port=Port2, status=running, kill=undefined, buf=[]
+            }),
+            {noreply, Table};
+        [#daemon{status=stopping}=D] ->
+            % The configuration changed and this daemon is no
+            % longer needed.
+            ?LOG_DEBUG("Port ~p shut down.", [D#daemon.name]),
+            true = ets:delete(Table, Port),
+            {noreply, Table};
+        [D] ->
+            % Port died for unknown reason. Check to see if it's
+            % died too many times or if we should boot it back up.
+            case should_halt([now() | D#daemon.errors]) of
+                {true, _} ->
+                    % Halting the process. We won't try and reboot
+                    % until the configuration changes.
+                    Fmt = "Daemon ~p halted with exit_status ~p",
+                    ?LOG_ERROR(Fmt, [D#daemon.name, Status]),
+                    D2 = D#daemon{status=halted, errors=nil, buf=nil},
+                    true = ets:insert(Table, D2),
+                    {noreply, Table};
+                {false, Errors} ->
+                    % We're guessing it was a random error, this daemon
+                    % has behaved so we'll give it another chance.
+                    Fmt = "Daemon ~p is being rebooted after exit_status ~p",
+                    ?LOG_INFO(Fmt, [D#daemon.name, Status]),
+                    true = ets:delete(Table, Port),
+                    {ok, Port2} = start_port(D#daemon.cmd),
+                    true = ets:insert(Table, D#daemon{
+                        port=Port2, status=running, kill=undefined,
+                                                errors=Errors, buf=[]
+                    }),
+                    {noreply, Table}
+            end;
+        _Else ->
+            throw(error)
+    end;
+handle_info({Port, {data, {noeol, Data}}}, Table) ->
+    [#daemon{buf=Buf}=D] = ets:lookup(Table, Port),
+    true = ets:insert(Table, D#daemon{buf=[Data | Buf]}),
+    {noreply, Table};
+handle_info({Port, {data, {eol, Data}}}, Table) ->
+    [#daemon{buf=Buf}=D] = ets:lookup(Table, Port),
+    Line = lists:reverse(Buf, Data),
+    % The first line echoed back is the kill command
+    % for when we go to get rid of the port. Lines after
+    % that are considered part of the stdio API.
+    case D#daemon.kill of
+        undefined ->
+            true = ets:insert(Table, D#daemon{kill=?b2l(Line), buf=[]});
+        _Else ->
+            D2 = case (catch ?JSON_DECODE(Line)) of
+                {invalid_json, Rejected} ->
+                    ?LOG_ERROR("Ignoring OS daemon request: ~p", [Rejected]),
+                    D;
+                JSON ->
+                    {ok, D3} = handle_port_message(D, JSON),
+                    D3
+            end,
+            true = ets:insert(Table, D2#daemon{buf=[]})
+    end,
+    {noreply, Table};
+handle_info({Port, Error}, Table) ->
+    ?LOG_ERROR("Unexpectd message from port ~p: ~p", [Port, Error]),
+    stop_port(Port),
+    [D] = ets:lookup(Table, Port),
+    true = ets:insert(Table, D#daemon{status=restarting, buf=nil}),
+    {noreply, Table};
+handle_info(Msg, Table) ->
+    ?LOG_ERROR("Unexpected info message to ~p: ~p", [?MODULE, Msg]),
+    {stop, error, Table}.
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+% Internal API
+
+%
+% Port management helpers
+%
+
+start_port(Command) ->
+    start_port(Command, []).
+
+start_port(Command, EnvPairs) ->
+    PrivDir = couch_util:priv_dir(),
+    Spawnkiller = filename:join(PrivDir, "couchspawnkillable"),
+    Opts = case lists:keytake(env, 1, ?PORT_OPTIONS) of
+        false ->
+            ?PORT_OPTIONS ++ [ {env,EnvPairs} ];
+        {value, {env,OldPairs}, SubOpts} ->
+            AllPairs = lists:keymerge(1, EnvPairs, OldPairs),
+            SubOpts ++ [ {env,AllPairs} ]
+    end,
+    Port = open_port({spawn, Spawnkiller ++ " " ++ Command}, Opts),
+    {ok, Port}.
+
+
+stop_port(#daemon{port=Port, kill=undefined}=D) ->
+    ?LOG_ERROR("Stopping daemon without a kill command: ~p", [D#daemon.name]),
+    catch port_close(Port);
+stop_port(#daemon{port=Port}=D) ->
+    ?LOG_DEBUG("Stopping daemon: ~p", [D#daemon.name]),
+    os:cmd(D#daemon.kill),
+    catch port_close(Port).
+
+
+handle_port_message(#daemon{port=Port}=Daemon, [<<"get">>, Section]) ->
+    KVs = couch_config:get(Section),
+    Data = lists:map(fun({K, V}) -> {?l2b(K), ?l2b(V)} end, KVs),
+    Json = iolist_to_binary(?JSON_ENCODE({Data})),
+    port_command(Port, <<Json/binary, "\n">>),
+    {ok, Daemon};
+handle_port_message(#daemon{port=Port}=Daemon, [<<"get">>, Section, Key]) ->
+    Value = case couch_config:get(Section, Key, null) of
+        null -> null;
+        String -> ?l2b(String)
+    end,
+    Json = iolist_to_binary(?JSON_ENCODE(Value)),
+    port_command(Port, <<Json/binary, "\n">>),
+    {ok, Daemon};
+handle_port_message(Daemon, [<<"register">>, Sec]) when is_binary(Sec) ->
+    Patterns = lists:usort(Daemon#daemon.cfg_patterns ++ [{?b2l(Sec)}]),
+    {ok, Daemon#daemon{cfg_patterns=Patterns}};
+handle_port_message(Daemon, [<<"register">>, Sec, Key])
+                        when is_binary(Sec) andalso is_binary(Key) ->
+    Pattern = {?b2l(Sec), ?b2l(Key)},
+    Patterns = lists:usort(Daemon#daemon.cfg_patterns ++ [Pattern]),
+    {ok, Daemon#daemon{cfg_patterns=Patterns}};
+handle_port_message(#daemon{name=Name}=Daemon, [<<"log">>, Msg]) ->
+    handle_log_message(Name, Msg, <<"info">>),
+    {ok, Daemon};
+handle_port_message(#daemon{name=Name}=Daemon, [<<"log">>, Msg, {Opts}]) ->
+    Level = couch_util:get_value(<<"level">>, Opts, <<"info">>),
+    handle_log_message(Name, Msg, Level),
+    {ok, Daemon};
+handle_port_message(#daemon{name=Name}=Daemon, Else) ->
+    ?LOG_ERROR("Daemon ~p made invalid request: ~p", [Name, Else]),
+    {ok, Daemon}.
+
+
+handle_log_message(Name, Msg, _Level) when not is_binary(Msg) ->
+    ?LOG_ERROR("Invalid log message from daemon ~p: ~p", [Name, Msg]);
+handle_log_message(Name, Msg, <<"debug">>) ->
+    ?LOG_DEBUG("Daemon ~p :: ~s", [Name, ?b2l(Msg)]);
+handle_log_message(Name, Msg, <<"info">>) ->
+    ?LOG_INFO("Daemon ~p :: ~s", [Name, ?b2l(Msg)]);
+handle_log_message(Name, Msg, <<"error">>) ->
+    ?LOG_ERROR("Daemon: ~p :: ~s", [Name, ?b2l(Msg)]);
+handle_log_message(Name, Msg, Level) ->
+    ?LOG_ERROR("Invalid log level from daemon: ~p", [Level]),
+    ?LOG_INFO("Daemon: ~p :: ~s", [Name, ?b2l(Msg)]).
+
+%
+% Daemon management helpers
+%
+
+reload_daemons(Table) ->
+    % List of daemons we want to have running.
+    Configured = lists:sort(couch_config:get("os_daemons")),
+    
+    % Remove records for daemons that were halted.
+    MSpecHalted = #daemon{name='$1', cmd='$2', status=halted, _='_'},
+    Halted = lists:sort([{N, C} || [N, C] <- ets:match(Table, MSpecHalted)]),
+    ok = stop_os_daemons(Table, find_to_stop(Configured, Halted, [])),
+    
+    % Stop daemons that are running
+    % Start newly configured daemons
+    MSpecRunning = #daemon{name='$1', cmd='$2', status=running, _='_'},
+    Running = lists:sort([{N, C} || [N, C] <- ets:match(Table, MSpecRunning)]),
+    ok = stop_os_daemons(Table, find_to_stop(Configured, Running, [])),
+    ok = boot_os_daemons(Table, find_to_boot(Configured, Running, [])),
+    ok.
+
+
+restart_daemons(Table, Sect, Key) ->
+    restart_daemons(Table, Sect, Key, ets:first(Table)).
+
+restart_daemons(_, _, _, '$end_of_table') ->
+    ok;
+restart_daemons(Table, Sect, Key, Port) ->
+    [D] = ets:lookup(Table, Port),
+    HasSect = lists:member({Sect}, D#daemon.cfg_patterns),
+    HasKey = lists:member({Sect, Key}, D#daemon.cfg_patterns),
+    case HasSect or HasKey of
+        true ->
+            stop_port(D),
+            D2 = D#daemon{status=restarting, buf=nil},
+            true = ets:insert(Table, D2);
+        _ ->
+            ok
+    end,
+    restart_daemons(Table, Sect, Key, ets:next(Table, Port)).
+    
+
+stop_os_daemons(_Table, []) ->
+    ok;
+stop_os_daemons(Table, [{Name, Cmd} | Rest]) ->
+    [[Port]] = ets:match(Table, #daemon{port='$1', name=Name, cmd=Cmd, _='_'}),
+    [D] = ets:lookup(Table, Port),
+    case D#daemon.status of
+        halted ->
+            ets:delete(Table, Port);
+        _ ->
+            stop_port(D),
+            D2 = D#daemon{status=stopping, errors=nil, buf=nil},
+            true = ets:insert(Table, D2)
+    end,
+    stop_os_daemons(Table, Rest).
+    
+boot_os_daemons(_Table, []) ->
+    ok;
+boot_os_daemons(Table, [{Name, Cmd} | Rest]) ->
+    {ok, Port} = start_port(Cmd),
+    true = ets:insert(Table, #daemon{port=Port, name=Name, cmd=Cmd}),
+    boot_os_daemons(Table, Rest).
+    
+% Elements unique to the configured set need to be booted.
+find_to_boot([], _Rest, Acc) ->
+    % Nothing else configured.
+    Acc;
+find_to_boot([D | R1], [D | R2], Acc) ->
+    % Elements are equal, daemon already running.
+    find_to_boot(R1, R2, Acc);
+find_to_boot([D1 | R1], [D2 | _]=A2, Acc) when D1 < D2 ->
+    find_to_boot(R1, A2, [D1 | Acc]);
+find_to_boot(A1, [_ | R2], Acc) ->
+    find_to_boot(A1, R2, Acc);
+find_to_boot(Rest, [], Acc) ->
+    % No more candidates for already running. Boot all.
+    Rest ++ Acc.
+
+% Elements unique to the running set need to be killed.
+find_to_stop([], Rest, Acc) ->
+    % The rest haven't been found, so they must all
+    % be ready to die.
+    Rest ++ Acc;
+find_to_stop([D | R1], [D | R2], Acc) ->
+    % Elements are equal, daemon already running.
+    find_to_stop(R1, R2, Acc);
+find_to_stop([D1 | R1], [D2 | _]=A2, Acc) when D1 < D2 ->
+    find_to_stop(R1, A2, Acc);
+find_to_stop(A1, [D2 | R2], Acc) ->
+    find_to_stop(A1, R2, [D2 | Acc]);
+find_to_stop(_, [], Acc) ->
+    % No more running daemons to worry about.
+    Acc.
+
+should_halt(Errors) ->
+    RetryTimeCfg = couch_config:get("os_daemon_settings", "retry_time", "5"),
+    RetryTime = list_to_integer(RetryTimeCfg),
+
+    Now = now(),
+    RecentErrors = lists:filter(fun(Time) ->
+        timer:now_diff(Now, Time) =< RetryTime * 1000000
+    end, Errors),
+
+    RetryCfg = couch_config:get("os_daemon_settings", "max_retries", "3"),
+    Retries = list_to_integer(RetryCfg),
+
+    {length(RecentErrors) >= Retries, RecentErrors}.

http://git-wip-us.apache.org/repos/asf/couchdb/blob/add91738/apps/couch/src/couch_os_process.erl
----------------------------------------------------------------------
diff --git a/apps/couch/src/couch_os_process.erl b/apps/couch/src/couch_os_process.erl
new file mode 100644
index 0000000..db62d49
--- /dev/null
+++ b/apps/couch/src/couch_os_process.erl
@@ -0,0 +1,216 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_os_process).
+-behaviour(gen_server).
+
+-export([start_link/1, start_link/2, start_link/3, stop/1]).
+-export([set_timeout/2, prompt/2]).
+-export([send/2, writeline/2, readline/1, writejson/2, readjson/1]).
+-export([init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2, code_change/3]).
+
+-include("couch_db.hrl").
+
+-define(PORT_OPTIONS, [stream, {line, 4096}, binary, exit_status, hide]).
+
+-record(os_proc,
+    {command,
+     port,
+     writer,
+     reader,
+     timeout=5000
+    }).
+
+start_link(Command) ->
+    start_link(Command, []).
+start_link(Command, Options) ->
+    start_link(Command, Options, ?PORT_OPTIONS).
+start_link(Command, Options, PortOptions) ->
+    gen_server:start_link(couch_os_process, [Command, Options, PortOptions], []).
+
+stop(Pid) ->
+    gen_server:cast(Pid, stop).
+
+% Read/Write API
+set_timeout(Pid, TimeOut) when is_integer(TimeOut) ->
+    ok = gen_server:call(Pid, {set_timeout, TimeOut}, infinity).
+
+% Used by couch_db_update_notifier.erl
+send(Pid, Data) ->
+    gen_server:cast(Pid, {send, Data}).
+
+prompt(Pid, Data) ->
+    case gen_server:call(Pid, {prompt, Data}, infinity) of
+        {ok, Result} ->
+            Result;
+        Error ->
+            ?LOG_ERROR("OS Process Error ~p :: ~p",[Pid,Error]),
+            throw(Error)
+    end.
+
+% Utility functions for reading and writing
+% in custom functions
+writeline(OsProc, Data) when is_record(OsProc, os_proc) ->
+    port_command(OsProc#os_proc.port, [Data, $\n]).
+
+readline(#os_proc{} = OsProc) ->
+    readline(OsProc, []).
+readline(#os_proc{port = Port} = OsProc, Acc) ->
+    receive
+    {Port, {data, {noeol, Data}}} when is_binary(Acc) ->
+        readline(OsProc, <<Acc/binary,Data/binary>>);
+    {Port, {data, {noeol, Data}}} when is_binary(Data) ->
+        readline(OsProc, Data);
+    {Port, {data, {noeol, Data}}} ->
+        readline(OsProc, [Data|Acc]);
+    {Port, {data, {eol, <<Data/binary>>}}} when is_binary(Acc) ->
+        [<<Acc/binary,Data/binary>>];
+    {Port, {data, {eol, Data}}} when is_binary(Data) ->
+        [Data];
+    {Port, {data, {eol, Data}}} ->
+        lists:reverse(Acc, Data);
+    {Port, Err} ->
+        catch port_close(Port),
+        throw({os_process_error, Err})
+    after OsProc#os_proc.timeout ->
+        catch port_close(Port),
+        throw({os_process_error, "OS process timed out."})
+    end.
+
+% Standard JSON functions
+writejson(OsProc, Data) when is_record(OsProc, os_proc) ->
+    JsonData = ?JSON_ENCODE(Data),
+    ?LOG_DEBUG("OS Process ~p Input  :: ~s", [OsProc#os_proc.port, JsonData]),
+    true = writeline(OsProc, JsonData).
+
+readjson(OsProc) when is_record(OsProc, os_proc) ->
+    Line = iolist_to_binary(readline(OsProc)),
+    ?LOG_DEBUG("OS Process ~p Output :: ~s", [OsProc#os_proc.port, Line]),
+    try
+        % Don't actually parse the whole JSON. Just try to see if it's
+        % a command or a doc map/reduce/filter/show/list/update output.
+        % If it's a command then parse the whole JSON and execute the
+        % command, otherwise return the raw JSON line to the caller.
+        pick_command(Line)
+    catch
+    throw:abort ->
+        {json, Line};
+    throw:{cmd, _Cmd} ->
+        case ?JSON_DECODE(Line) of
+        [<<"log">>, Msg] when is_binary(Msg) ->
+            % we got a message to log. Log it and continue
+            ?LOG_INFO("OS Process ~p Log :: ~s", [OsProc#os_proc.port, Msg]),
+            readjson(OsProc);
+        [<<"error">>, Id, Reason] ->
+            throw({error, {couch_util:to_existing_atom(Id),Reason}});
+        [<<"fatal">>, Id, Reason] ->
+            ?LOG_INFO("OS Process ~p Fatal Error :: ~s ~p",
+                [OsProc#os_proc.port, Id, Reason]),
+            throw({couch_util:to_existing_atom(Id),Reason});
+        _Result ->
+            {json, Line}
+        end
+    end.
+
+pick_command(Line) ->
+    json_stream_parse:events(Line, fun pick_command0/1).
+
+pick_command0(array_start) ->
+    fun pick_command1/1;
+pick_command0(_) ->
+    throw(abort).
+
+pick_command1(<<"log">> = Cmd) ->
+    throw({cmd, Cmd});
+pick_command1(<<"error">> = Cmd) ->
+    throw({cmd, Cmd});
+pick_command1(<<"fatal">> = Cmd) ->
+    throw({cmd, Cmd});
+pick_command1(_) ->
+    throw(abort).
+
+
+% gen_server API
+init([Command, Options, PortOptions]) ->
+    PrivDir = couch_util:priv_dir(),
+    Spawnkiller = filename:join(PrivDir, "couchspawnkillable"),
+    BaseProc = #os_proc{
+        command=Command,
+        port=open_port({spawn, Spawnkiller ++ " " ++ Command}, PortOptions),
+        writer=fun writejson/2,
+        reader=fun readjson/1
+    },
+    KillCmd = iolist_to_binary(readline(BaseProc)),
+    Pid = self(),
+    ?LOG_DEBUG("OS Process Start :: ~p", [BaseProc#os_proc.port]),
+    spawn(fun() ->
+            % this ensure the real os process is killed when this process dies.
+            erlang:monitor(process, Pid),
+            receive _ -> ok end,
+            os:cmd(?b2l(iolist_to_binary(KillCmd)))
+        end),
+    OsProc =
+    lists:foldl(fun(Opt, Proc) ->
+        case Opt of
+        {writer, Writer} when is_function(Writer) ->
+            Proc#os_proc{writer=Writer};
+        {reader, Reader} when is_function(Reader) ->
+            Proc#os_proc{reader=Reader};
+        {timeout, TimeOut} when is_integer(TimeOut) ->
+            Proc#os_proc{timeout=TimeOut}
+        end
+    end, BaseProc, Options),
+    {ok, OsProc}.
+
+terminate(_Reason, #os_proc{port=Port}) ->
+    catch port_close(Port),
+    ok.
+
+handle_call({set_timeout, TimeOut}, _From, OsProc) ->
+    {reply, ok, OsProc#os_proc{timeout=TimeOut}};
+handle_call({prompt, Data}, _From, OsProc) ->
+    #os_proc{writer=Writer, reader=Reader} = OsProc,
+    try
+        Writer(OsProc, Data),
+        {reply, {ok, Reader(OsProc)}, OsProc}
+    catch
+        throw:{error, OsError} ->
+            {reply, OsError, OsProc};
+        throw:OtherError ->
+            {stop, normal, OtherError, OsProc}
+    end.
+
+handle_cast({send, Data}, #os_proc{writer=Writer}=OsProc) ->
+    try
+        Writer(OsProc, Data),
+        {noreply, OsProc}
+    catch
+        throw:OsError ->
+            ?LOG_ERROR("Failed sending data: ~p -> ~p", [Data, OsError]),
+            {stop, normal, OsProc}
+    end;
+handle_cast(stop, OsProc) ->
+    {stop, normal, OsProc};
+handle_cast(Msg, OsProc) ->
+    ?LOG_DEBUG("OS Proc: Unknown cast: ~p", [Msg]),
+    {noreply, OsProc}.
+
+handle_info({Port, {exit_status, 0}}, #os_proc{port=Port}=OsProc) ->
+    ?LOG_INFO("OS Process terminated normally", []),
+    {stop, normal, OsProc};
+handle_info({Port, {exit_status, Status}}, #os_proc{port=Port}=OsProc) ->
+    ?LOG_ERROR("OS Process died with status: ~p", [Status]),
+    {stop, {exit_status, Status}, OsProc}.
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+

http://git-wip-us.apache.org/repos/asf/couchdb/blob/add91738/apps/couch/src/couch_passwords.erl
----------------------------------------------------------------------
diff --git a/apps/couch/src/couch_passwords.erl b/apps/couch/src/couch_passwords.erl
new file mode 100644
index 0000000..d9e6836
--- /dev/null
+++ b/apps/couch/src/couch_passwords.erl
@@ -0,0 +1,119 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_passwords).
+
+-export([simple/2, pbkdf2/3, pbkdf2/4, verify/2]).
+-export([hash_admin_password/1, get_unhashed_admins/0]).
+
+-include("couch_db.hrl").
+
+-define(MAX_DERIVED_KEY_LENGTH, (1 bsl 32 - 1)).
+-define(SHA1_OUTPUT_LENGTH, 20).
+
+%% legacy scheme, not used for new passwords.
+-spec simple(binary(), binary()) -> binary().
+simple(Password, Salt) ->
+    ?l2b(couch_util:to_hex(crypto:sha(<<Password/binary, Salt/binary>>))).
+
+%% CouchDB utility functions
+-spec hash_admin_password(binary()) -> binary().
+hash_admin_password(ClearPassword) ->
+    Iterations = couch_config:get("couch_httpd_auth", "iterations", "10000"),
+    Salt = couch_uuids:random(),
+    DerivedKey = couch_passwords:pbkdf2(couch_util:to_binary(ClearPassword),
+                                        Salt ,list_to_integer(Iterations)),
+    ?l2b("-pbkdf2-" ++ ?b2l(DerivedKey) ++ ","
+        ++ ?b2l(Salt) ++ ","
+        ++ Iterations).
+
+-spec get_unhashed_admins() -> list().
+get_unhashed_admins() ->
+    lists:filter(
+        fun({_User, "-hashed-" ++ _}) ->
+            false; % already hashed
+        ({_User, "-pbkdf2-" ++ _}) ->
+            false; % already hashed
+        ({_User, _ClearPassword}) ->
+            true
+        end,
+    couch_config:get("admins")).
+
+%% Current scheme, much stronger.
+-spec pbkdf2(binary(), binary(), integer()) -> binary().
+pbkdf2(Password, Salt, Iterations) ->
+    {ok, Result} = pbkdf2(Password, Salt, Iterations, ?SHA1_OUTPUT_LENGTH),
+    Result.
+
+-spec pbkdf2(binary(), binary(), integer(), integer())
+    -> {ok, binary()} | {error, derived_key_too_long}.
+pbkdf2(_Password, _Salt, _Iterations, DerivedLength)
+    when DerivedLength > ?MAX_DERIVED_KEY_LENGTH ->
+    {error, derived_key_too_long};
+pbkdf2(Password, Salt, Iterations, DerivedLength) ->
+    L = ceiling(DerivedLength / ?SHA1_OUTPUT_LENGTH),
+    <<Bin:DerivedLength/binary,_/binary>> =
+        iolist_to_binary(pbkdf2(Password, Salt, Iterations, L, 1, [])),
+    {ok, ?l2b(couch_util:to_hex(Bin))}.
+
+-spec pbkdf2(binary(), binary(), integer(), integer(), integer(), iolist())
+    -> iolist().
+pbkdf2(_Password, _Salt, _Iterations, BlockCount, BlockIndex, Acc)
+    when BlockIndex > BlockCount ->
+    lists:reverse(Acc);
+pbkdf2(Password, Salt, Iterations, BlockCount, BlockIndex, Acc) ->
+    Block = pbkdf2(Password, Salt, Iterations, BlockIndex, 1, <<>>, <<>>),
+    pbkdf2(Password, Salt, Iterations, BlockCount, BlockIndex + 1, [Block|Acc]).
+
+-spec pbkdf2(binary(), binary(), integer(), integer(), integer(),
+    binary(), binary()) -> binary().
+pbkdf2(_Password, _Salt, Iterations, _BlockIndex, Iteration, _Prev, Acc)
+    when Iteration > Iterations ->
+    Acc;
+pbkdf2(Password, Salt, Iterations, BlockIndex, 1, _Prev, _Acc) ->
+    InitialBlock = crypto:sha_mac(Password,
+        <<Salt/binary,BlockIndex:32/integer>>),
+    pbkdf2(Password, Salt, Iterations, BlockIndex, 2,
+        InitialBlock, InitialBlock);
+pbkdf2(Password, Salt, Iterations, BlockIndex, Iteration, Prev, Acc) ->
+    Next = crypto:sha_mac(Password, Prev),
+    pbkdf2(Password, Salt, Iterations, BlockIndex, Iteration + 1,
+                   Next, crypto:exor(Next, Acc)).
+
+%% verify two lists for equality without short-circuits to avoid timing attacks.
+-spec verify(string(), string(), integer()) -> boolean().
+verify([X|RestX], [Y|RestY], Result) ->
+    verify(RestX, RestY, (X bxor Y) bor Result);
+verify([], [], Result) ->
+    Result == 0.
+
+-spec verify(binary(), binary()) -> boolean();
+            (list(), list()) -> boolean().
+verify(<<X/binary>>, <<Y/binary>>) ->
+    verify(?b2l(X), ?b2l(Y));
+verify(X, Y) when is_list(X) and is_list(Y) ->
+    case length(X) == length(Y) of
+        true ->
+            verify(X, Y, 0);
+        false ->
+            false
+    end;
+verify(_X, _Y) -> false.
+
+-spec ceiling(number()) -> integer().
+ceiling(X) ->
+    T = erlang:trunc(X),
+    case (X - T) of
+        Neg when Neg < 0 -> T;
+        Pos when Pos > 0 -> T + 1;
+        _ -> T
+    end.

http://git-wip-us.apache.org/repos/asf/couchdb/blob/add91738/apps/couch/src/couch_primary_sup.erl
----------------------------------------------------------------------
diff --git a/apps/couch/src/couch_primary_sup.erl b/apps/couch/src/couch_primary_sup.erl
new file mode 100644
index 0000000..150b92e
--- /dev/null
+++ b/apps/couch/src/couch_primary_sup.erl
@@ -0,0 +1,66 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_primary_sup).
+-behaviour(supervisor).
+-export([init/1, start_link/0]).
+
+start_link() ->
+    supervisor:start_link({local,couch_primary_services}, ?MODULE, []).
+
+init([]) ->
+    Children = [
+        {collation_driver,
+            {couch_drv, start_link, []},
+            permanent,
+            infinity,
+            supervisor,
+            [couch_drv]},
+        {couch_task_status,
+            {couch_task_status, start_link, []},
+            permanent,
+            brutal_kill,
+            worker,
+            [couch_task_status]},
+        {couch_server,
+            {couch_server, sup_start_link, []},
+            permanent,
+            brutal_kill,
+            worker,
+            [couch_server]},
+        {couch_db_update_event,
+            {gen_event, start_link, [{local, couch_db_update}]},
+            permanent,
+            brutal_kill,
+            worker,
+            dynamic},
+        {couch_replication_event,
+            {gen_event, start_link, [{local, couch_replication}]},
+            permanent,
+            brutal_kill,
+            worker,
+            dynamic},
+        {couch_replicator_job_sup,
+            {couch_replicator_job_sup, start_link, []},
+            permanent,
+            infinity,
+            supervisor,
+            [couch_replicator_job_sup]},
+        {couch_log,
+            {couch_log, start_link, []},
+            permanent,
+            brutal_kill,
+            worker,
+            [couch_log]}
+    ],
+    {ok, {{one_for_one, 10, 3600}, Children}}.
+

http://git-wip-us.apache.org/repos/asf/couchdb/blob/add91738/apps/couch/src/couch_query_servers.erl
----------------------------------------------------------------------
diff --git a/apps/couch/src/couch_query_servers.erl b/apps/couch/src/couch_query_servers.erl
new file mode 100644
index 0000000..3b58cbe
--- /dev/null
+++ b/apps/couch/src/couch_query_servers.erl
@@ -0,0 +1,616 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_query_servers).
+-behaviour(gen_server).
+
+-export([start_link/0, config_change/1]).
+
+-export([init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2,code_change/3]).
+-export([start_doc_map/3, map_docs/2, map_doc_raw/2, stop_doc_map/1, raw_to_ejson/1]).
+-export([reduce/3, rereduce/3,validate_doc_update/5]).
+-export([filter_docs/5]).
+-export([filter_view/3]).
+
+-export([with_ddoc_proc/2, proc_prompt/2, ddoc_prompt/3, ddoc_proc_prompt/3, json_doc/1]).
+
+% For 210-os-proc-pool.t
+-export([get_os_process/1, ret_os_process/1]).
+
+-include("couch_db.hrl").
+
+-record(proc, {
+    pid,
+    lang,
+    ddoc_keys = [],
+    prompt_fun,
+    set_timeout_fun,
+    stop_fun
+}).
+
+-record(qserver, {
+    langs, % Keyed by language name, value is {Mod,Func,Arg}
+    pid_procs, % Keyed by PID, valus is a #proc record.
+    lang_procs, % Keyed by language name, value is a #proc record
+    lang_limits, % Keyed by language name, value is {Lang, Limit, Current}
+    waitlist = [],
+    config
+}).
+
+start_link() ->
+    gen_server:start_link({local, couch_query_servers}, couch_query_servers, [], []).
+
+start_doc_map(Lang, Functions, Lib) ->
+    Proc = get_os_process(Lang),
+    case Lib of
+    {[]} -> ok;
+    Lib ->
+        true = proc_prompt(Proc, [<<"add_lib">>, Lib])
+    end,
+    lists:foreach(fun(FunctionSource) ->
+        true = proc_prompt(Proc, [<<"add_fun">>, FunctionSource])
+    end, Functions),
+    {ok, Proc}.
+
+map_docs(Proc, Docs) ->
+    % send the documents
+    Results = lists:map(
+        fun(Doc) ->
+            Json = couch_doc:to_json_obj(Doc, []),
+
+            FunsResults = proc_prompt(Proc, [<<"map_doc">>, Json]),
+            % the results are a json array of function map yields like this:
+            % [FunResults1, FunResults2 ...]
+            % where funresults is are json arrays of key value pairs:
+            % [[Key1, Value1], [Key2, Value2]]
+            % Convert the key, value pairs to tuples like
+            % [{Key1, Value1}, {Key2, Value2}]
+            lists:map(
+                fun(FunRs) ->
+                    [list_to_tuple(FunResult) || FunResult <- FunRs]
+                end,
+            FunsResults)
+        end,
+        Docs),
+    {ok, Results}.
+
+map_doc_raw(Proc, Doc) ->
+    Json = couch_doc:to_json_obj(Doc, []),
+    {ok, proc_prompt_raw(Proc, [<<"map_doc">>, Json])}.
+
+
+stop_doc_map(nil) ->
+    ok;
+stop_doc_map(Proc) ->
+    ok = ret_os_process(Proc).
+
+group_reductions_results([]) ->
+    [];
+group_reductions_results(List) ->
+    {Heads, Tails} = lists:foldl(
+        fun([H|T], {HAcc,TAcc}) ->
+            {[H|HAcc], [T|TAcc]}
+        end, {[], []}, List),
+    case Tails of
+    [[]|_] -> % no tails left
+        [Heads];
+    _ ->
+     [Heads | group_reductions_results(Tails)]
+    end.
+
+rereduce(_Lang, [], _ReducedValues) ->
+    {ok, []};
+rereduce(Lang, RedSrcs, ReducedValues) ->
+    Grouped = group_reductions_results(ReducedValues),
+    Results = lists:zipwith(
+        fun
+        (<<"_", _/binary>> = FunSrc, Values) ->
+            {ok, [Result]} = builtin_reduce(rereduce, [FunSrc], [[[], V] || V <- Values], []),
+            Result;
+        (FunSrc, Values) ->
+            os_rereduce(Lang, [FunSrc], Values)
+        end, RedSrcs, Grouped),
+    {ok, Results}.
+
+reduce(_Lang, [], _KVs) ->
+    {ok, []};
+reduce(Lang, RedSrcs, KVs) ->
+    {OsRedSrcs, BuiltinReds} = lists:partition(fun
+        (<<"_", _/binary>>) -> false;
+        (_OsFun) -> true
+    end, RedSrcs),
+    {ok, OsResults} = os_reduce(Lang, OsRedSrcs, KVs),
+    {ok, BuiltinResults} = builtin_reduce(reduce, BuiltinReds, KVs, []),
+    recombine_reduce_results(RedSrcs, OsResults, BuiltinResults, []).
+
+recombine_reduce_results([], [], [], Acc) ->
+    {ok, lists:reverse(Acc)};
+recombine_reduce_results([<<"_", _/binary>>|RedSrcs], OsResults, [BRes|BuiltinResults], Acc) ->
+    recombine_reduce_results(RedSrcs, OsResults, BuiltinResults, [BRes|Acc]);
+recombine_reduce_results([_OsFun|RedSrcs], [OsR|OsResults], BuiltinResults, Acc) ->
+    recombine_reduce_results(RedSrcs, OsResults, BuiltinResults, [OsR|Acc]).
+
+os_reduce(_Lang, [], _KVs) ->
+    {ok, []};
+os_reduce(Lang, OsRedSrcs, KVs) ->
+    Proc = get_os_process(Lang),
+    OsResults = try proc_prompt(Proc, [<<"reduce">>, OsRedSrcs, KVs]) of
+        [true, Reductions] -> Reductions
+    after
+        ok = ret_os_process(Proc)
+    end,
+    {ok, OsResults}.
+
+os_rereduce(Lang, OsRedSrcs, KVs) ->
+    Proc = get_os_process(Lang),
+    try proc_prompt(Proc, [<<"rereduce">>, OsRedSrcs, KVs]) of
+        [true, [Reduction]] -> Reduction
+    after
+        ok = ret_os_process(Proc)
+    end.
+
+
+builtin_reduce(_Re, [], _KVs, Acc) ->
+    {ok, lists:reverse(Acc)};
+builtin_reduce(Re, [<<"_sum",_/binary>>|BuiltinReds], KVs, Acc) ->
+    Sum = builtin_sum_rows(KVs),
+    builtin_reduce(Re, BuiltinReds, KVs, [Sum|Acc]);
+builtin_reduce(reduce, [<<"_count",_/binary>>|BuiltinReds], KVs, Acc) ->
+    Count = length(KVs),
+    builtin_reduce(reduce, BuiltinReds, KVs, [Count|Acc]);
+builtin_reduce(rereduce, [<<"_count",_/binary>>|BuiltinReds], KVs, Acc) ->
+    Count = builtin_sum_rows(KVs),
+    builtin_reduce(rereduce, BuiltinReds, KVs, [Count|Acc]);
+builtin_reduce(Re, [<<"_stats",_/binary>>|BuiltinReds], KVs, Acc) ->
+    Stats = builtin_stats(Re, KVs),
+    builtin_reduce(Re, BuiltinReds, KVs, [Stats|Acc]).
+
+builtin_sum_rows(KVs) ->
+    lists:foldl(fun
+        ([_Key, Value], Acc) when is_number(Value), is_number(Acc) ->
+            Acc + Value;
+        ([_Key, Value], Acc) when is_list(Value), is_list(Acc) ->
+            sum_terms(Acc, Value);
+        ([_Key, Value], Acc) when is_number(Value), is_list(Acc) ->
+            sum_terms(Acc, [Value]);
+        ([_Key, Value], Acc) when is_list(Value), is_number(Acc) ->
+            sum_terms([Acc], Value);
+        (_Else, _Acc) ->
+            throw({invalid_value, <<"builtin _sum function requires map values to be numbers or lists of numbers">>})
+    end, 0, KVs).
+
+sum_terms([], []) ->
+    [];
+sum_terms([_|_]=Xs, []) ->
+    Xs;
+sum_terms([], [_|_]=Ys) ->
+    Ys;
+sum_terms([X|Xs], [Y|Ys]) when is_number(X), is_number(Y) ->
+    [X+Y | sum_terms(Xs,Ys)];
+sum_terms(_, _) ->
+    throw({invalid_value, <<"builtin _sum function requires map values to be numbers or lists of numbers">>}).
+
+builtin_stats(reduce, []) ->
+    {[]};
+builtin_stats(reduce, [[_,First]|Rest]) when is_number(First) ->
+    Stats = lists:foldl(fun([_K,V], {S,C,Mi,Ma,Sq}) when is_number(V) ->
+        {S+V, C+1, lists:min([Mi, V]), lists:max([Ma, V]), Sq+(V*V)};
+    (_, _) ->
+        throw({invalid_value,
+            <<"builtin _stats function requires map values to be numbers">>})
+    end, {First,1,First,First,First*First}, Rest),
+    {Sum, Cnt, Min, Max, Sqr} = Stats,
+    {[{sum,Sum}, {count,Cnt}, {min,Min}, {max,Max}, {sumsqr,Sqr}]};
+
+builtin_stats(rereduce, [[_,First]|Rest]) ->
+    {[{sum,Sum0}, {count,Cnt0}, {min,Min0}, {max,Max0}, {sumsqr,Sqr0}]} = First,
+    Stats = lists:foldl(fun([_K,Red], {S,C,Mi,Ma,Sq}) ->
+        {[{sum,Sum}, {count,Cnt}, {min,Min}, {max,Max}, {sumsqr,Sqr}]} = Red,
+        {Sum+S, Cnt+C, lists:min([Min, Mi]), lists:max([Max, Ma]), Sqr+Sq}
+    end, {Sum0,Cnt0,Min0,Max0,Sqr0}, Rest),
+    {Sum, Cnt, Min, Max, Sqr} = Stats,
+    {[{sum,Sum}, {count,Cnt}, {min,Min}, {max,Max}, {sumsqr,Sqr}]}.
+
+% use the function stored in ddoc.validate_doc_update to test an update.
+validate_doc_update(DDoc, EditDoc, DiskDoc, Ctx, SecObj) ->
+    JsonEditDoc = couch_doc:to_json_obj(EditDoc, [revs]),
+    JsonDiskDoc = json_doc(DiskDoc),
+    case ddoc_prompt(DDoc, [<<"validate_doc_update">>], [JsonEditDoc, JsonDiskDoc, Ctx, SecObj]) of
+        1 ->
+            ok;
+        {[{<<"forbidden">>, Message}]} ->
+            throw({forbidden, Message});
+        {[{<<"unauthorized">>, Message}]} ->
+            throw({unauthorized, Message})
+    end.
+
+json_doc(nil) -> null;
+json_doc(Doc) ->
+    couch_doc:to_json_obj(Doc, [revs]).
+
+filter_view(DDoc, VName, Docs) ->
+    JsonDocs = [couch_doc:to_json_obj(Doc, [revs]) || Doc <- Docs],
+    [true, Passes] = ddoc_prompt(DDoc, [<<"views">>, VName, <<"map">>], [JsonDocs]),
+    {ok, Passes}.
+
+filter_docs(Req, Db, DDoc, FName, Docs) ->
+    JsonReq = case Req of
+    {json_req, JsonObj} ->
+        JsonObj;
+    #httpd{} = HttpReq ->
+        couch_httpd_external:json_req_obj(HttpReq, Db)
+    end,
+    JsonDocs = [couch_doc:to_json_obj(Doc, [revs]) || Doc <- Docs],
+    [true, Passes] = ddoc_prompt(DDoc, [<<"filters">>, FName],
+        [JsonDocs, JsonReq]),
+    {ok, Passes}.
+
+ddoc_proc_prompt({Proc, DDocId}, FunPath, Args) ->
+    proc_prompt(Proc, [<<"ddoc">>, DDocId, FunPath, Args]).
+
+ddoc_prompt(DDoc, FunPath, Args) ->
+    with_ddoc_proc(DDoc, fun({Proc, DDocId}) ->
+        proc_prompt(Proc, [<<"ddoc">>, DDocId, FunPath, Args])
+    end).
+
+with_ddoc_proc(#doc{id=DDocId,revs={Start, [DiskRev|_]}}=DDoc, Fun) ->
+    Rev = couch_doc:rev_to_str({Start, DiskRev}),
+    DDocKey = {DDocId, Rev},
+    Proc = get_ddoc_process(DDoc, DDocKey),
+    try Fun({Proc, DDocId})
+    after
+        ok = ret_os_process(Proc)
+    end.
+
+init([]) ->
+    % register async to avoid deadlock on restart_child
+    Self = self(),
+    spawn(couch_config, register, [fun ?MODULE:config_change/1, Self]),
+
+    Langs = ets:new(couch_query_server_langs, [set, private]),
+    LangLimits = ets:new(couch_query_server_lang_limits, [set, private]),
+    PidProcs = ets:new(couch_query_server_pid_langs, [set, private]),
+    LangProcs = ets:new(couch_query_server_procs, [set, private]),
+
+    ProcTimeout = list_to_integer(couch_config:get(
+                        "couchdb", "os_process_timeout", "5000")),
+    ReduceLimit = list_to_atom(
+        couch_config:get("query_server_config","reduce_limit","true")),
+    OsProcLimit = list_to_integer(
+        couch_config:get("query_server_config","os_process_limit","10")),
+
+    % 'query_servers' specifies an OS command-line to execute.
+    lists:foreach(fun({Lang, Command}) ->
+        true = ets:insert(LangLimits, {?l2b(Lang), OsProcLimit, 0}),
+        true = ets:insert(Langs, {?l2b(Lang),
+                          couch_os_process, start_link, [Command]})
+    end, couch_config:get("query_servers")),
+    % 'native_query_servers' specifies a {Module, Func, Arg} tuple.
+    lists:foreach(fun({Lang, SpecStr}) ->
+        {ok, {Mod, Fun, SpecArg}} = couch_util:parse_term(SpecStr),
+        true = ets:insert(LangLimits, {?l2b(Lang), 0, 0}), % 0 means no limit
+        true = ets:insert(Langs, {?l2b(Lang),
+                          Mod, Fun, SpecArg})
+    end, couch_config:get("native_query_servers")),
+
+
+    process_flag(trap_exit, true),
+    {ok, #qserver{
+        langs = Langs, % Keyed by language name, value is {Mod,Func,Arg}
+        pid_procs = PidProcs, % Keyed by PID, valus is a #proc record.
+        lang_procs = LangProcs, % Keyed by language name, value is a #proc record
+        lang_limits = LangLimits, % Keyed by language name, value is {Lang, Limit, Current}
+        config = {[{<<"reduce_limit">>, ReduceLimit},{<<"timeout">>, ProcTimeout}]}
+    }}.
+
+terminate(_Reason, #qserver{pid_procs=PidProcs}) ->
+    [couch_util:shutdown_sync(P) || {P,_} <- ets:tab2list(PidProcs)],
+    ok.
+
+handle_call({get_proc, DDoc1, DDocKey}, From, Server) ->
+    #doc{body = {Props}} = DDoc = couch_doc:with_ejson_body(DDoc1),
+    Lang = couch_util:get_value(<<"language">>, Props, <<"javascript">>),
+    case lang_proc(Lang, Server, fun(Procs) ->
+            % find a proc in the set that has the DDoc
+            proc_with_ddoc(DDoc, DDocKey, Procs)
+        end) of
+    {ok, Proc} ->
+        {reply, {ok, Proc, Server#qserver.config}, Server};
+    wait ->
+        {noreply, add_to_waitlist({DDoc, DDocKey}, From, Server)};
+    Error ->
+        {reply, Error, Server}
+    end;
+handle_call({get_proc, Lang}, From, Server) ->
+    case lang_proc(Lang, Server, fun([P|_Procs]) ->
+            {ok, P}
+        end) of
+    {ok, Proc} ->
+        {reply, {ok, Proc, Server#qserver.config}, Server};
+    wait ->
+        {noreply, add_to_waitlist({Lang}, From, Server)};
+    Error ->
+        {reply, Error, Server}
+    end;
+handle_call({unlink_proc, Pid}, _From, Server) ->
+    unlink(Pid),
+    {reply, ok, Server};
+handle_call({ret_proc, Proc}, _From, #qserver{
+        pid_procs=PidProcs,
+        lang_procs=LangProcs}=Server) ->
+    % Along with max process limit, here we should check
+    % if we're over the limit and discard when we are.
+    case is_process_alive(Proc#proc.pid) of
+        true ->
+            add_value(PidProcs, Proc#proc.pid, Proc),
+            add_to_list(LangProcs, Proc#proc.lang, Proc),
+            link(Proc#proc.pid);
+        false ->
+            ok
+    end,
+    {reply, true, service_waitlist(Server)}.
+
+handle_cast(_Whatever, Server) ->
+    {noreply, Server}.
+
+handle_info({'EXIT', _, _}, Server) ->
+    {noreply, Server};
+handle_info({'DOWN', _, process, Pid, Status}, #qserver{
+        pid_procs=PidProcs,
+        lang_procs=LangProcs,
+        lang_limits=LangLimits}=Server) ->
+    case ets:lookup(PidProcs, Pid) of
+    [{Pid, Proc}] ->
+        case Status of
+        normal -> ok;
+        _ -> ?LOG_DEBUG("Linked process died abnormally: ~p (reason: ~p)", [Pid, Status])
+        end,
+        rem_value(PidProcs, Pid),
+        catch rem_from_list(LangProcs, Proc#proc.lang, Proc),
+        [{Lang, Lim, Current}] = ets:lookup(LangLimits, Proc#proc.lang),
+        true = ets:insert(LangLimits, {Lang, Lim, Current-1}),
+        {noreply, service_waitlist(Server)};
+    [] ->
+        case Status of
+        normal ->
+            {noreply, Server};
+        _ ->
+            {stop, Status, Server}
+        end
+    end.
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+config_change("query_servers") ->
+    supervisor:terminate_child(couch_secondary_services, query_servers),
+    supervisor:restart_child(couch_secondary_services, query_servers);
+config_change("native_query_servers") ->
+    supervisor:terminate_child(couch_secondary_services, query_servers),
+    supervisor:restart_child(couch_secondary_services, query_servers);
+config_change("query_server_config") ->
+    supervisor:terminate_child(couch_secondary_services, query_servers),
+    supervisor:restart_child(couch_secondary_services, query_servers).
+
+% Private API
+
+add_to_waitlist(Info, From, #qserver{waitlist=Waitlist}=Server) ->
+    Server#qserver{waitlist=[{Info, From}|Waitlist]}.
+
+service_waitlist(#qserver{waitlist=[]}=Server) ->
+    Server;
+service_waitlist(#qserver{waitlist=Waitlist}=Server) ->
+    [Oldest|RevWList] = lists:reverse(Waitlist),
+    case service_waiting(Oldest, Server) of
+    ok ->
+        Server#qserver{waitlist=lists:reverse(RevWList)};
+    wait ->
+        Server#qserver{waitlist=Waitlist}
+    end.
+
+% todo get rid of duplication
+service_waiting({{#doc{body={Props}}=DDoc, DDocKey}, From}, Server) ->
+    Lang = couch_util:get_value(<<"language">>, Props, <<"javascript">>),
+    case lang_proc(Lang, Server, fun(Procs) ->
+            % find a proc in the set that has the DDoc
+            proc_with_ddoc(DDoc, DDocKey, Procs)
+        end) of
+    {ok, Proc} ->
+        gen_server:reply(From, {ok, Proc, Server#qserver.config}),
+        ok;
+    wait -> % this should never happen
+        wait;
+    Error ->
+        gen_server:reply(From, Error),
+        ok
+    end;
+service_waiting({{Lang}, From}, Server) ->
+    case lang_proc(Lang, Server, fun([P|_Procs]) ->
+            {ok, P}
+        end) of
+    {ok, Proc} ->
+        gen_server:reply(From, {ok, Proc, Server#qserver.config}),
+        ok;
+    wait -> % this should never happen
+        wait;
+    Error ->
+        gen_server:reply(From, Error),
+        ok
+    end.
+
+lang_proc(Lang, #qserver{
+        langs=Langs,
+        pid_procs=PidProcs,
+        lang_procs=LangProcs,
+        lang_limits=LangLimits}, PickFun) ->
+    % Note to future self. Add max process limit.
+    case ets:lookup(LangProcs, Lang) of
+    [{Lang, [P|Procs]}] ->
+        {ok, Proc} = PickFun([P|Procs]),
+        rem_from_list(LangProcs, Lang, Proc),
+        {ok, Proc};
+    _ ->
+        case (catch new_process(Langs, LangLimits, Lang)) of
+        {ok, Proc} ->
+            add_value(PidProcs, Proc#proc.pid, Proc),
+            PickFun([Proc]);
+        ErrorOrWait ->
+            ErrorOrWait
+        end
+    end.
+
+new_process(Langs, LangLimits, Lang) ->
+    [{Lang, Lim, Current}] = ets:lookup(LangLimits, Lang),
+    if (Lim == 0) or (Current < Lim) -> % Lim == 0 means no limit
+        % we are below the limit for our language, make a new one
+        case ets:lookup(Langs, Lang) of
+        [{Lang, Mod, Func, Arg}] ->
+            {ok, Pid} = apply(Mod, Func, Arg),
+            erlang:monitor(process, Pid),
+            true = ets:insert(LangLimits, {Lang, Lim, Current+1}),
+            {ok, #proc{lang=Lang,
+                       pid=Pid,
+                       % Called via proc_prompt, proc_set_timeout, and proc_stop
+                       prompt_fun={Mod, prompt},
+                       set_timeout_fun={Mod, set_timeout},
+                       stop_fun={Mod, stop}}};
+        _ ->
+            {unknown_query_language, Lang}
+        end;
+    true ->
+        wait
+    end.
+
+proc_with_ddoc(DDoc, DDocKey, LangProcs) ->
+    DDocProcs = lists:filter(fun(#proc{ddoc_keys=Keys}) ->
+            lists:any(fun(Key) ->
+                Key == DDocKey
+            end, Keys)
+        end, LangProcs),
+    case DDocProcs of
+        [DDocProc|_] ->
+            ?LOG_DEBUG("DDocProc found for DDocKey: ~p",[DDocKey]),
+            {ok, DDocProc};
+        [] ->
+            [TeachProc|_] = LangProcs,
+            ?LOG_DEBUG("Teach ddoc to new proc ~p with DDocKey: ~p",[TeachProc, DDocKey]),
+            {ok, SmartProc} = teach_ddoc(DDoc, DDocKey, TeachProc),
+            {ok, SmartProc}
+    end.
+
+proc_prompt(Proc, Args) ->
+     case proc_prompt_raw(Proc, Args) of
+     {json, Json} ->
+         ?JSON_DECODE(Json);
+     EJson ->
+         EJson
+     end.
+
+proc_prompt_raw(#proc{prompt_fun = {Mod, Func}} = Proc, Args) ->
+    apply(Mod, Func, [Proc#proc.pid, Args]).
+
+raw_to_ejson({json, Json}) ->
+    ?JSON_DECODE(Json);
+raw_to_ejson(EJson) ->
+    EJson.
+
+proc_stop(Proc) ->
+    {Mod, Func} = Proc#proc.stop_fun,
+    apply(Mod, Func, [Proc#proc.pid]).
+
+proc_set_timeout(Proc, Timeout) ->
+    {Mod, Func} = Proc#proc.set_timeout_fun,
+    apply(Mod, Func, [Proc#proc.pid, Timeout]).
+
+teach_ddoc(DDoc, {DDocId, _Rev}=DDocKey, #proc{ddoc_keys=Keys}=Proc) ->
+    % send ddoc over the wire
+    % we only share the rev with the client we know to update code
+    % but it only keeps the latest copy, per each ddoc, around.
+    true = proc_prompt(Proc, [<<"ddoc">>, <<"new">>, DDocId, couch_doc:to_json_obj(DDoc, [])]),
+    % we should remove any other ddocs keys for this docid
+    % because the query server overwrites without the rev
+    Keys2 = [{D,R} || {D,R} <- Keys, D /= DDocId],
+    % add ddoc to the proc
+    {ok, Proc#proc{ddoc_keys=[DDocKey|Keys2]}}.
+
+get_ddoc_process(#doc{} = DDoc, DDocKey) ->
+    % remove this case statement
+    case gen_server:call(couch_query_servers, {get_proc, DDoc, DDocKey}, infinity) of
+    {ok, Proc, {QueryConfig}} ->
+        % process knows the ddoc
+        case (catch proc_prompt(Proc, [<<"reset">>, {QueryConfig}])) of
+        true ->
+            proc_set_timeout(Proc, couch_util:get_value(<<"timeout">>, QueryConfig)),
+            link(Proc#proc.pid),
+            gen_server:call(couch_query_servers, {unlink_proc, Proc#proc.pid}, infinity),
+            Proc;
+        _ ->
+            catch proc_stop(Proc),
+            get_ddoc_process(DDoc, DDocKey)
+        end;
+    Error ->
+        throw(Error)
+    end.
+
+get_os_process(Lang) ->
+    case gen_server:call(couch_query_servers, {get_proc, Lang}, infinity) of
+    {ok, Proc, {QueryConfig}} ->
+        case (catch proc_prompt(Proc, [<<"reset">>, {QueryConfig}])) of
+        true ->
+            proc_set_timeout(Proc, couch_util:get_value(<<"timeout">>, QueryConfig)),
+            link(Proc#proc.pid),
+            gen_server:call(couch_query_servers, {unlink_proc, Proc#proc.pid}, infinity),
+            Proc;
+        _ ->
+            catch proc_stop(Proc),
+            get_os_process(Lang)
+        end;
+    Error ->
+        throw(Error)
+    end.
+
+ret_os_process(Proc) ->
+    true = gen_server:call(couch_query_servers, {ret_proc, Proc}, infinity),
+    catch unlink(Proc#proc.pid),
+    ok.
+
+add_value(Tid, Key, Value) ->
+    true = ets:insert(Tid, {Key, Value}).
+
+rem_value(Tid, Key) ->
+    true = ets:delete(Tid, Key).
+
+add_to_list(Tid, Key, Value) ->
+    case ets:lookup(Tid, Key) of
+    [{Key, Vals}] ->
+        true = ets:insert(Tid, {Key, [Value|Vals]});
+    [] ->
+        true = ets:insert(Tid, {Key, [Value]})
+    end.
+
+rem_from_list(Tid, Key, Value) when is_record(Value, proc)->
+    Pid = Value#proc.pid,
+    case ets:lookup(Tid, Key) of
+    [{Key, Vals}] ->
+        % make a new values list that doesn't include the Value arg
+        NewValues = [Val || #proc{pid=P}=Val <- Vals, P /= Pid],
+        ets:insert(Tid, {Key, NewValues});
+    [] -> ok
+    end;
+rem_from_list(Tid, Key, Value) ->
+    case ets:lookup(Tid, Key) of
+    [{Key, Vals}] ->
+        % make a new values list that doesn't include the Value arg
+        NewValues = [Val || Val <- Vals, Val /= Value],
+        ets:insert(Tid, {Key, NewValues});
+    [] -> ok
+    end.

http://git-wip-us.apache.org/repos/asf/couchdb/blob/add91738/apps/couch/src/couch_ref_counter.erl
----------------------------------------------------------------------
diff --git a/apps/couch/src/couch_ref_counter.erl b/apps/couch/src/couch_ref_counter.erl
new file mode 100644
index 0000000..a774f46
--- /dev/null
+++ b/apps/couch/src/couch_ref_counter.erl
@@ -0,0 +1,111 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_ref_counter).
+-behaviour(gen_server).
+
+-export([start/1, init/1, terminate/2, handle_call/3, handle_cast/2, code_change/3, handle_info/2]).
+-export([drop/1,drop/2,add/1,add/2,count/1]).
+
+start(ChildProcs) ->
+    gen_server:start(couch_ref_counter, {self(), ChildProcs}, []).
+
+
+drop(RefCounterPid) ->
+    drop(RefCounterPid, self()).
+
+drop(RefCounterPid, Pid) ->
+    gen_server:call(RefCounterPid, {drop, Pid}, infinity).
+
+
+add(RefCounterPid) ->
+    add(RefCounterPid, self()).
+
+add(RefCounterPid, Pid) ->
+    gen_server:call(RefCounterPid, {add, Pid}, infinity).
+
+count(RefCounterPid) ->
+    gen_server:call(RefCounterPid, count).
+
+% server functions
+
+-record(srv,
+    {
+    referrers=dict:new(), % a dict of each ref counting proc.
+    child_procs=[]
+    }).
+
+init({Pid, ChildProcs}) ->
+    [link(ChildProc) || ChildProc <- ChildProcs],
+    Referrers = dict:from_list([{Pid, {erlang:monitor(process, Pid), 1}}]),
+    {ok, #srv{referrers=Referrers, child_procs=ChildProcs}}.
+
+
+terminate(_Reason, #srv{child_procs=ChildProcs}) ->
+    [couch_util:shutdown_sync(Pid) || Pid <- ChildProcs],
+    ok.
+
+
+handle_call({add, Pid},_From, #srv{referrers=Referrers}=Srv) ->
+    Referrers2 =
+    case dict:find(Pid, Referrers) of
+    error ->
+        dict:store(Pid, {erlang:monitor(process, Pid), 1}, Referrers);
+    {ok, {MonRef, RefCnt}} ->
+        dict:store(Pid, {MonRef, RefCnt + 1}, Referrers)
+    end,
+    {reply, ok, Srv#srv{referrers=Referrers2}};
+handle_call(count, _From, Srv) ->
+    {monitors, Monitors} =  process_info(self(), monitors),
+    {reply, length(Monitors), Srv};
+handle_call({drop, Pid}, _From, #srv{referrers=Referrers}=Srv) ->
+    Referrers2 =
+    case dict:find(Pid, Referrers) of
+    {ok, {MonRef, 1}} ->
+        erlang:demonitor(MonRef, [flush]),
+        dict:erase(Pid, Referrers);
+    {ok, {MonRef, Num}} ->
+        dict:store(Pid, {MonRef, Num-1}, Referrers);
+    error ->
+        Referrers
+    end,
+    Srv2 = Srv#srv{referrers=Referrers2},
+    case should_close() of
+    true ->
+        {stop,normal,ok,Srv2};
+    false ->
+        {reply, ok, Srv2}
+    end.
+
+handle_cast(Msg, _Srv)->
+    exit({unknown_msg,Msg}).
+
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+handle_info({'DOWN', MonRef, _, Pid, _}, #srv{referrers=Referrers}=Srv) ->
+    {ok, {MonRef, _RefCount}} = dict:find(Pid, Referrers),
+    Srv2 = Srv#srv{referrers=dict:erase(Pid, Referrers)},
+    case should_close() of
+    true ->
+        {stop,normal,Srv2};
+    false ->
+        {noreply,Srv2}
+    end.
+
+
+should_close() ->
+    case process_info(self(), monitors) of
+    {monitors, []} ->   true;
+    _ ->                false
+    end.

http://git-wip-us.apache.org/repos/asf/couchdb/blob/add91738/apps/couch/src/couch_secondary_sup.erl
----------------------------------------------------------------------
diff --git a/apps/couch/src/couch_secondary_sup.erl b/apps/couch/src/couch_secondary_sup.erl
new file mode 100644
index 0000000..6dd5604
--- /dev/null
+++ b/apps/couch/src/couch_secondary_sup.erl
@@ -0,0 +1,49 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_secondary_sup).
+-behaviour(supervisor).
+-export([init/1, start_link/0]).
+
+start_link() ->
+    supervisor:start_link({local,couch_secondary_services}, ?MODULE, []).
+
+init([]) ->
+    SecondarySupervisors = [
+        {couch_db_update_notifier_sup,
+            {couch_db_update_notifier_sup, start_link, []},
+            permanent,
+            infinity,
+            supervisor,
+            [couch_db_update_notifier_sup]},
+
+        {couch_plugin_event,
+            {gen_event, start_link, [{local, couch_plugin}]},
+            permanent,
+            brutal_kill,
+            worker,
+            dynamic}
+    ],
+    Children = SecondarySupervisors ++ [
+        begin
+            {ok, {Module, Fun, Args}} = couch_util:parse_term(SpecStr),
+
+            {list_to_atom(Name),
+                {Module, Fun, Args},
+                permanent,
+                brutal_kill,
+                worker,
+                [Module]}
+        end
+        || {Name, SpecStr}
+        <- couch_config:get("daemons"), SpecStr /= ""],
+    {ok, {{one_for_one, 10, 3600}, Children}}.


Mime
View raw message