couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rnew...@apache.org
Subject [2/6] Merge remote-tracking branch 'origin/import-master'
Date Tue, 06 May 2014 12:40:57 GMT
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/9d629ff6/src/couch_httpd_rewrite.erl
----------------------------------------------------------------------
diff --cc src/couch_httpd_rewrite.erl
index abd6af5,0000000..40d8a7b
mode 100644,000000..100644
--- a/src/couch_httpd_rewrite.erl
+++ b/src/couch_httpd_rewrite.erl
@@@ -1,483 -1,0 +1,480 @@@
 +% Licensed under the Apache License, Version 2.0 (the "License"); you may not
 +% use this file except in compliance with the License. You may obtain a copy of
 +% the License at
 +%
 +% http://www.apache.org/licenses/LICENSE-2.0
 +%
 +% Unless required by applicable law or agreed to in writing, software
 +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 +% License for the specific language governing permissions and limitations under
 +% the License.
 +%
 +% bind_path is based on bind method from Webmachine
 +
 +
 +%% @doc Module for URL rewriting by pattern matching.
 +
 +-module(couch_httpd_rewrite).
 +-export([handle_rewrite_req/3]).
 +-include_lib("couch/include/couch_db.hrl").
 +
 +-define(SEPARATOR, $\/).
 +-define(MATCH_ALL, {bind, <<"*">>}).
 +
 +
 +%% doc The http rewrite handler. All rewriting is done from
 +%% /dbname/_design/ddocname/_rewrite by default.
 +%%
 +%% each rules should be in rewrites member of the design doc.
 +%% Ex of a complete rule :
 +%%
 +%%  {
 +%%      ....
 +%%      "rewrites": [
 +%%      {
 +%%          "from": "",
 +%%          "to": "index.html",
 +%%          "method": "GET",
 +%%          "query": {}
 +%%      }
 +%%      ]
 +%%  }
 +%%
 +%%  from: is the path rule used to bind current uri to the rule. It
 +%% use pattern matching for that.
 +%%
 +%%  to: rule to rewrite an url. It can contain variables depending on binding
 +%% variables discovered during pattern matching and query args (url args and from
 +%% the query member.)
 +%%
 +%%  method: method to bind the request method to the rule. by default "*"
 +%%  query: query args you want to define they can contain dynamic variable
 +%% by binding the key to the bindings
 +%%
 +%%
 +%% to and from are path with  patterns. pattern can be string starting with ":" or
 +%% "*". ex:
 +%% /somepath/:var/*
 +%%
 +%% This path is converted in erlang list by splitting "/". Each var are
 +%% converted in atom. "*" is converted to '*' atom. The pattern matching is done
 +%% by splitting "/" in request url in a list of token. A string pattern will
 +%% match equal token. The star atom ('*' in single quotes) will match any number
 +%% of tokens, but may only be present as the last pathtern in a pathspec. If all
 +%% tokens are matched and all pathterms are used, then the pathspec matches. It works
 +%% like webmachine. Each identified token will be reused in to rule and in query
 +%%
 +%% The pattern matching is done by first matching the request method to a rule. by
 +%% default all methods match a rule. (method is equal to "*" by default). Then
 +%% It will try to match the path to one rule. If no rule match, then a 404 error
 +%% is displayed.
 +%%
 +%% Once a rule is found we rewrite the request url using the "to" and
 +%% "query" members. The identified token are matched to the rule and
 +%% will replace var. if '*' is found in the rule it will contain the remaining
 +%% part if it exists.
 +%%
 +%% Examples:
 +%%
 +%% Dispatch rule            URL             TO                  Tokens
 +%%
 +%% {"from": "/a/b",         /a/b?k=v        /some/b?k=v         var =:= b
 +%% "to": "/some/"}                                              k = v
 +%%
 +%% {"from": "/a/b",         /a/b            /some/b?var=b       var =:= b
 +%% "to": "/some/:var"}
 +%%
 +%% {"from": "/a",           /a              /some
 +%% "to": "/some/*"}
 +%%
 +%% {"from": "/a/*",         /a/b/c          /some/b/c
 +%% "to": "/some/*"}
 +%%
 +%% {"from": "/a",           /a              /some
 +%% "to": "/some/*"}
 +%%
 +%% {"from": "/a/:foo/*",    /a/b/c          /some/b/c?foo=b     foo =:= b
 +%% "to": "/some/:foo/*"}
 +%%
 +%% {"from": "/a/:foo",     /a/b             /some/?k=b&foo=b    foo =:= b
 +%% "to": "/some",
 +%%  "query": {
 +%%      "k": ":foo"
 +%%  }}
 +%%
 +%% {"from": "/a",           /a?foo=b        /some/b             foo =:= b
 +%% "to": "/some/:foo",
 +%%  }}
 +
 +
 +
 +handle_rewrite_req(#httpd{
 +        path_parts=[DbName, <<"_design">>, DesignName, _Rewrite|PathParts],
 +        method=Method,
 +        mochi_req=MochiReq}=Req, _Db, DDoc) ->
 +
 +    % we are in a design handler
 +    DesignId = <<"_design/", DesignName/binary>>,
 +    Prefix = <<"/", (?l2b(couch_util:url_encode(DbName)))/binary, "/", DesignId/binary>>,
 +    QueryList = lists:map(fun decode_query_value/1, couch_httpd:qs(Req)),
 +
-     MaxRewritesList = config:get("httpd", "rewrite_limit", "100"),
-     MaxRewrites = list_to_integer(MaxRewritesList),
-     case get(couch_rewrite_count) of
-         undefined ->
-             put(couch_rewrite_count, 1);
-         NumRewrites when NumRewrites < MaxRewrites ->
-             put(couch_rewrite_count, NumRewrites + 1);
-         _ ->
-             throw({bad_request, <<"Exceeded rewrite recursion limit">>})
++    RewritesSoFar = erlang:get(?REWRITE_COUNT),
++    MaxRewrites = list_to_integer(config:get("httpd", "rewrite_limit", "100")),
++    case RewritesSoFar >= MaxRewrites of
++        true ->
++            throw({bad_request, <<"Exceeded rewrite recursion limit">>});
++        false ->
++            erlang:put(?REWRITE_COUNT, RewritesSoFar + 1)
 +    end,
 +
 +    #doc{body={Props}} = DDoc,
 +
 +    % get rules from ddoc
 +    case couch_util:get_value(<<"rewrites">>, Props) of
 +        undefined ->
 +            couch_httpd:send_error(Req, 404, <<"rewrite_error">>,
 +                <<"Invalid path.">>);
 +        Bin when is_binary(Bin) ->
 +            couch_httpd:send_error(Req, 400, <<"rewrite_error">>,
 +                <<"Rewrite rules are a String. They must be a JSON Array.">>);
 +        Rules ->
 +            % create dispatch list from rules
 +            DispatchList =  [make_rule(Rule) || {Rule} <- Rules],
 +            Method1 = couch_util:to_binary(Method),
 +
-             %% get raw path by matching url to a rule.
-             RawPath = case try_bind_path(DispatchList, Method1, 
-                     PathParts, QueryList) of
-                 no_dispatch_path ->
-                     throw(not_found);
-                 {NewPathParts, Bindings} ->
-                     Parts = [quote_plus(X) || X <- NewPathParts],
- 
-                     % build new path, reencode query args, eventually convert
-                     % them to json
-                     Bindings1 = maybe_encode_bindings(Bindings),
-                     Path = binary_to_list(
-                         iolist_to_binary([
-                                 string:join(Parts, [?SEPARATOR]),
-                                 [["?", mochiweb_util:urlencode(Bindings1)] 
-                                     || Bindings1 =/= [] ]
-                             ])),
-                     
-                     % if path is relative detect it and rewrite path
-                     case mochiweb_util:safe_relative_path(Path) of
-                         undefined ->
-                             ?b2l(Prefix) ++ "/" ++ Path;
-                         P1 ->
-                             ?b2l(Prefix) ++ "/" ++ P1
-                     end
- 
-                 end,
- 
-             % normalize final path (fix levels "." and "..")
-             RawPath1 = ?b2l(iolist_to_binary(normalize_path(RawPath))),
++            % get raw path by matching url to a rule. Throws not_found.
++            {NewPathParts0, Bindings0} =
++                try_bind_path(DispatchList, Method1, PathParts, QueryList),
++            NewPathParts = [quote_plus(X) || X <- NewPathParts0],
++            Bindings = maybe_encode_bindings(Bindings0),
++
++            Path0 = string:join(NewPathParts, [?SEPARATOR]),
++
++            % if path is relative detect it and rewrite path
++            Path1 = case mochiweb_util:safe_relative_path(Path0) of
++                undefined ->
++                    ?b2l(Prefix) ++ "/" ++ Path0;
++                P1 ->
++                    ?b2l(Prefix) ++ "/" ++ P1
++            end,
++
++            Path2 = normalize_path(Path1),
++
++            Path3 = case Bindings of
++                [] ->
++                    Path2;
++                _ ->
++                    [Path2, "?", mochiweb_util:urlencode(Bindings)]
++            end,
++
++            RawPath1 = ?b2l(iolist_to_binary(Path3)),
 +
 +            % In order to do OAuth correctly, we have to save the
 +            % requested path. We use default so chained rewriting
 +            % wont replace the original header.
 +            Headers = mochiweb_headers:default("x-couchdb-requested-path",
 +                                             MochiReq:get(raw_path),
 +                                             MochiReq:get(headers)),
 +
 +            ?LOG_DEBUG("rewrite to ~p ~n", [RawPath1]),
 +
 +            % build a new mochiweb request
 +            MochiReq1 = mochiweb_request:new(MochiReq:get(socket),
 +                                             MochiReq:get(method),
 +                                             RawPath1,
 +                                             MochiReq:get(version),
 +                                             Headers),
 +
 +            % cleanup, It force mochiweb to reparse raw uri.
 +            MochiReq1:cleanup(),
 +
 +            #httpd{
 +                db_url_handlers = DbUrlHandlers,
 +                design_url_handlers = DesignUrlHandlers,
 +                default_fun = DefaultFun,
 +                url_handlers = UrlHandlers,
-                 user_ctx = UserCtx
++                user_ctx = UserCtx,
++               auth = Auth
 +            } = Req,
++
++            erlang:put(pre_rewrite_auth, Auth),
 +            erlang:put(pre_rewrite_user_ctx, UserCtx),
 +            couch_httpd:handle_request_int(MochiReq1, DefaultFun,
 +                    UrlHandlers, DbUrlHandlers, DesignUrlHandlers)
 +        end.
 +
 +quote_plus({bind, X}) ->
 +    mochiweb_util:quote_plus(X);
 +quote_plus(X) ->
 +    mochiweb_util:quote_plus(X).
 +
 +%% @doc Try to find a rule matching current url. If none is found
 +%% 404 error not_found is raised
 +try_bind_path([], _Method, _PathParts, _QueryList) ->
-     no_dispatch_path;
++    throw(not_found);
 +try_bind_path([Dispatch|Rest], Method, PathParts, QueryList) ->
 +    [{PathParts1, Method1}, RedirectPath, QueryArgs, Formats] = Dispatch,
 +    case bind_method(Method1, Method) of
 +        true ->
 +            case bind_path(PathParts1, PathParts, []) of
 +                {ok, Remaining, Bindings} ->
 +                    Bindings1 = Bindings ++ QueryList,
 +                    % we parse query args from the rule and fill
 +                    % it eventually with bindings vars
 +                    QueryArgs1 = make_query_list(QueryArgs, Bindings1,
 +                        Formats, []),
 +                    % remove params in QueryLists1 that are already in
 +                    % QueryArgs1
 +                    Bindings2 = lists:foldl(fun({K, V}, Acc) ->
 +                        K1 = to_binding(K),
 +                        KV = case couch_util:get_value(K1, QueryArgs1) of
 +                            undefined -> [{K1, V}];
 +                            _V1 -> []
 +                        end,
 +                        Acc ++ KV
 +                    end, [], Bindings1),
 +
 +                    FinalBindings = Bindings2 ++ QueryArgs1,
 +                    NewPathParts = make_new_path(RedirectPath, FinalBindings,
 +                                    Remaining, []),
 +                    {NewPathParts, FinalBindings};
 +                fail ->
 +                    try_bind_path(Rest, Method, PathParts, QueryList)
 +            end;
 +        false ->
 +            try_bind_path(Rest, Method, PathParts, QueryList)
 +    end.
 +
 +%% rewriting dynamically the quey list given as query member in
 +%% rewrites. Each value is replaced by one binding or an argument
 +%% passed in url.
 +make_query_list([], _Bindings, _Formats, Acc) ->
 +    Acc;
 +make_query_list([{Key, {Value}}|Rest], Bindings, Formats, Acc) ->
 +    Value1 = {Value},
 +    make_query_list(Rest, Bindings, Formats, [{to_binding(Key), Value1}|Acc]);
 +make_query_list([{Key, Value}|Rest], Bindings, Formats, Acc) when is_binary(Value) ->
 +    Value1 = replace_var(Value, Bindings, Formats),
 +    make_query_list(Rest, Bindings, Formats, [{to_binding(Key), Value1}|Acc]);
 +make_query_list([{Key, Value}|Rest], Bindings, Formats, Acc) when is_list(Value) ->
 +    Value1 = replace_var(Value, Bindings, Formats),
 +    make_query_list(Rest, Bindings, Formats, [{to_binding(Key), Value1}|Acc]);
 +make_query_list([{Key, Value}|Rest], Bindings, Formats, Acc) ->
 +    make_query_list(Rest, Bindings, Formats, [{to_binding(Key), Value}|Acc]).
 +
 +replace_var(<<"*">>=Value, Bindings, Formats) ->
 +    get_var(Value, Bindings, Value, Formats);
 +replace_var(<<":", Var/binary>> = Value, Bindings, Formats) ->
 +    get_var(Var, Bindings, Value, Formats);
 +replace_var(Value, _Bindings, _Formats) when is_binary(Value) ->
 +    Value;
 +replace_var(Value, Bindings, Formats) when is_list(Value) ->
 +    lists:reverse(lists:foldl(fun
 +                (<<":", Var/binary>>=Value1, Acc) ->
 +                    [get_var(Var, Bindings, Value1, Formats)|Acc];
 +                (Value1, Acc) ->
 +                    [Value1|Acc]
 +            end, [], Value));
 +replace_var(Value, _Bindings, _Formats) ->
 +    Value.
 +                    
 +maybe_json(Key, Value) ->
 +    case lists:member(Key, [<<"key">>, <<"startkey">>, <<"start_key">>,
 +                <<"endkey">>, <<"end_key">>, <<"keys">>]) of
 +        true ->
 +            ?JSON_ENCODE(Value);
 +        false ->
 +            Value
 +    end.
 +
 +get_var(VarName, Props, Default, Formats) ->
 +    VarName1 = to_binding(VarName),
 +    Val = couch_util:get_value(VarName1, Props, Default),
 +    maybe_format(VarName, Val, Formats).
 +
 +maybe_format(VarName, Value, Formats) ->
 +    case couch_util:get_value(VarName, Formats) of
 +        undefined ->
 +             Value;
 +        Format ->
 +            format(Format, Value)
 +    end.
 +
 +format(<<"int">>, Value) when is_integer(Value) ->
 +    Value;
 +format(<<"int">>, Value) when is_binary(Value) ->
 +    format(<<"int">>, ?b2l(Value));
 +format(<<"int">>, Value) when is_list(Value) ->
 +    case (catch list_to_integer(Value)) of
 +        IntVal when is_integer(IntVal) ->
 +            IntVal;
 +        _ ->
 +            Value
 +    end;
 +format(<<"bool">>, Value) when is_binary(Value) ->
 +    format(<<"bool">>, ?b2l(Value));
 +format(<<"bool">>, Value) when is_list(Value) ->
 +    case string:to_lower(Value) of
 +        "true" -> true;
 +        "false" -> false;
 +        _ -> Value
 +    end;
 +format(_Format, Value) ->
 +   Value. 
 +
 +%% doc: build new patch from bindings. bindings are query args
 +%% (+ dynamic query rewritten if needed) and bindings found in
 +%% bind_path step.
 +make_new_path([], _Bindings, _Remaining, Acc) ->
 +    lists:reverse(Acc);
 +make_new_path([?MATCH_ALL], _Bindings, Remaining, Acc) ->
 +    Acc1 = lists:reverse(Acc) ++ Remaining,
 +    Acc1;
 +make_new_path([?MATCH_ALL|_Rest], _Bindings, Remaining, Acc) ->
 +    Acc1 = lists:reverse(Acc) ++ Remaining,
 +    Acc1;
 +make_new_path([{bind, P}|Rest], Bindings, Remaining, Acc) ->
 +    P2 = case couch_util:get_value({bind, P}, Bindings) of
 +        undefined -> << "undefined">>;
 +        P1 -> 
 +            iolist_to_binary(P1)
 +    end,
 +    make_new_path(Rest, Bindings, Remaining, [P2|Acc]);
 +make_new_path([P|Rest], Bindings, Remaining, Acc) ->
 +    make_new_path(Rest, Bindings, Remaining, [P|Acc]).
 +
 +
 +%% @doc If method of the query fith the rule method. If the
 +%% method rule is '*', which is the default, all
 +%% request method will bind. It allows us to make rules
 +%% depending on HTTP method.
 +bind_method(?MATCH_ALL, _Method ) ->
 +    true;
 +bind_method({bind, Method}, Method) ->
 +    true;
 +bind_method(_, _) ->
 +    false.
 +
 +
 +%% @doc bind path. Using the rule from we try to bind variables given
 +%% to the current url by pattern matching
 +bind_path([], [], Bindings) ->
 +    {ok, [], Bindings};
 +bind_path([?MATCH_ALL], [Match|_RestMatch]=Rest, Bindings) ->
 +    {ok, Rest, [{?MATCH_ALL, Match}|Bindings]};
 +bind_path(_, [], _) ->
 +    fail;
 +bind_path([{bind, Token}|RestToken],[Match|RestMatch],Bindings) ->
 +    bind_path(RestToken, RestMatch, [{{bind, Token}, Match}|Bindings]);
 +bind_path([Token|RestToken], [Token|RestMatch], Bindings) ->
 +    bind_path(RestToken, RestMatch, Bindings);
 +bind_path(_, _, _) ->
 +    fail.
 +
 +
 +%% normalize path.
 +normalize_path(Path)  ->
 +    "/" ++ string:join(normalize_path1(string:tokens(Path,
 +                "/"), []), [?SEPARATOR]).
 +
 +
 +normalize_path1([], Acc) ->
 +    lists:reverse(Acc);
 +normalize_path1([".."|Rest], Acc) ->
 +    Acc1 = case Acc of
 +        [] -> [".."|Acc];
 +        [T|_] when T =:= ".." -> [".."|Acc];
 +        [_|R] -> R
 +    end,
 +    normalize_path1(Rest, Acc1);
 +normalize_path1(["."|Rest], Acc) ->
 +    normalize_path1(Rest, Acc);
 +normalize_path1([Path|Rest], Acc) ->
 +    normalize_path1(Rest, [Path|Acc]).
 +
 +
 +%% @doc transform json rule in erlang for pattern matching
 +make_rule(Rule) ->
 +    Method = case couch_util:get_value(<<"method">>, Rule) of
 +        undefined -> ?MATCH_ALL;
 +        M -> to_binding(M)
 +    end,
 +    QueryArgs = case couch_util:get_value(<<"query">>, Rule) of
 +        undefined -> [];
 +        {Args} -> Args
 +        end,
 +    FromParts  = case couch_util:get_value(<<"from">>, Rule) of
 +        undefined -> [?MATCH_ALL];
 +        From ->
 +            parse_path(From)
 +        end,
 +    ToParts  = case couch_util:get_value(<<"to">>, Rule) of
 +        undefined ->
 +            throw({error, invalid_rewrite_target});
 +        To ->
 +            parse_path(To)
 +        end,
 +    Formats = case couch_util:get_value(<<"formats">>, Rule) of
 +        undefined -> [];
 +        {Fmts} -> Fmts
 +    end,
 +    [{FromParts, Method}, ToParts, QueryArgs, Formats].
 +
 +parse_path(Path) ->
 +    {ok, SlashRE} = re:compile(<<"\\/">>),
 +    path_to_list(re:split(Path, SlashRE), [], 0).
 +
 +%% @doc convert a path rule (from or to) to an erlang list
 +%% * and path variable starting by ":" are converted
 +%% in erlang atom.
 +path_to_list([], Acc, _DotDotCount) ->
 +    lists:reverse(Acc);
 +path_to_list([<<>>|R], Acc, DotDotCount) ->
 +    path_to_list(R, Acc, DotDotCount);
 +path_to_list([<<"*">>|R], Acc, DotDotCount) ->
 +    path_to_list(R, [?MATCH_ALL|Acc], DotDotCount);
 +path_to_list([<<"..">>|R], Acc, DotDotCount) when DotDotCount == 2 ->
 +    case config:get("httpd", "secure_rewrites", "true") of
 +    "false" ->
 +        path_to_list(R, [<<"..">>|Acc], DotDotCount+1);
 +    _Else ->
 +        ?LOG_INFO("insecure_rewrite_rule ~p blocked", [lists:reverse(Acc) ++ [<<"..">>] ++ R]),
 +        throw({insecure_rewrite_rule, "too many ../.. segments"})
 +    end;
 +path_to_list([<<"..">>|R], Acc, DotDotCount) ->
 +    path_to_list(R, [<<"..">>|Acc], DotDotCount+1);
 +path_to_list([P|R], Acc, DotDotCount) ->
 +    P1 = case P of
 +        <<":", Var/binary>> ->
 +            to_binding(Var);
 +        _ -> P
 +    end,
 +    path_to_list(R, [P1|Acc], DotDotCount).
 +
 +maybe_encode_bindings([]) ->
 +    [];
 +maybe_encode_bindings(Props) -> 
 +    lists:foldl(fun 
 +            ({{bind, <<"*">>}, _V}, Acc) ->
 +                Acc;
 +            ({{bind, K}, V}, Acc) ->
 +                V1 = iolist_to_binary(maybe_json(K, V)),
 +                [{K, V1}|Acc]
 +        end, [], Props).
 +                
 +decode_query_value({K,V}) ->
 +    case lists:member(K, ["key", "startkey", "start_key",
 +                "endkey", "end_key", "keys"]) of
 +        true ->
 +            {to_binding(K), ?JSON_DECODE(V)};
 +        false ->
 +            {to_binding(K), ?l2b(V)}
 +    end.
 +
 +to_binding({bind, V}) ->
 +    {bind, V};
 +to_binding(V) when is_list(V) ->
 +    to_binding(?l2b(V));
 +to_binding(V) ->
 +    {bind, V}.

http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/9d629ff6/src/couch_httpd_stats_handlers.erl
----------------------------------------------------------------------
diff --cc src/couch_httpd_stats_handlers.erl
index b858830,0000000..cd357ea
mode 100644,000000..100644
--- a/src/couch_httpd_stats_handlers.erl
+++ b/src/couch_httpd_stats_handlers.erl
@@@ -1,56 -1,0 +1,56 @@@
 +% Licensed under the Apache License, Version 2.0 (the "License"); you may not
 +% use this file except in compliance with the License. You may obtain a copy of
 +% the License at
 +%
 +%   http://www.apache.org/licenses/LICENSE-2.0
 +%
 +% Unless required by applicable law or agreed to in writing, software
 +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 +% License for the specific language governing permissions and limitations under
 +% the License.
 +
 +-module(couch_httpd_stats_handlers).
 +-include_lib("couch/include/couch_db.hrl").
 +
 +-export([handle_stats_req/1]).
 +-import(couch_httpd, [
 +    send_json/2, send_json/3, send_json/4, send_method_not_allowed/2,
 +    start_json_response/2, send_chunk/2, end_json_response/1,
 +    start_chunked_response/3, send_error/4
 +]).
 +
 +handle_stats_req(#httpd{method='GET', path_parts=[_]}=Req) ->
 +    flush(Req),
 +    send_json(Req, couch_stats_aggregator:all(range(Req)));
 +
 +handle_stats_req(#httpd{method='GET', path_parts=[_, _Mod]}) ->
-     throw({bad_request, <<"Stat names must have exactly to parts.">>});
++    throw({bad_request, <<"Stat names must have exactly two parts.">>});
 +
 +handle_stats_req(#httpd{method='GET', path_parts=[_, Mod, Key]}=Req) ->
 +    flush(Req),
 +    Stats = couch_stats_aggregator:get_json({list_to_atom(binary_to_list(Mod)),
 +        list_to_atom(binary_to_list(Key))}, range(Req)),
 +    send_json(Req, {[{Mod, {[{Key, Stats}]}}]});
 +
 +handle_stats_req(#httpd{method='GET', path_parts=[_, _Mod, _Key | _Extra]}) ->
 +    throw({bad_request, <<"Stat names must have exactly two parts.">>});
 +
 +handle_stats_req(Req) ->
 +    send_method_not_allowed(Req, "GET").
 +
 +range(Req) ->
 +    case couch_util:get_value("range", couch_httpd:qs(Req)) of
 +        undefined ->
 +            0;
 +        Value ->
 +            list_to_integer(Value)
 +    end.
 +
 +flush(Req) ->
 +    case couch_util:get_value("flush", couch_httpd:qs(Req)) of
 +        "true" ->
 +            couch_stats_aggregator:collect_sample();
 +        _Else ->
 +            ok
 +    end.

http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/9d629ff6/src/couch_js_functions.hrl
----------------------------------------------------------------------
diff --cc src/couch_js_functions.hrl
index 2ecd851,0000000..a48feae
mode 100644,000000..100644
--- a/src/couch_js_functions.hrl
+++ b/src/couch_js_functions.hrl
@@@ -1,155 -1,0 +1,170 @@@
 +% Licensed under the Apache License, Version 2.0 (the "License"); you may not
 +% use this file except in compliance with the License.  You may obtain a copy of
 +% the License at
 +%
 +%   http://www.apache.org/licenses/LICENSE-2.0
 +%
 +% Unless required by applicable law or agreed to in writing, software
 +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
 +% License for the specific language governing permissions and limitations under
 +% the License.
 +
 +-define(AUTH_DB_DOC_VALIDATE_FUNCTION, <<"
 +    function(newDoc, oldDoc, userCtx, secObj) {
 +        if (newDoc._deleted === true) {
 +            // allow deletes by admins and matching users
 +            // without checking the other fields
 +            if ((userCtx.roles.indexOf('_admin') !== -1) ||
 +                (userCtx.name == oldDoc.name)) {
 +                return;
 +            } else {
 +                throw({forbidden: 'Only admins may delete other user docs.'});
 +            }
 +        }
 +
 +        if ((oldDoc && oldDoc.type !== 'user') || newDoc.type !== 'user') {
 +            throw({forbidden : 'doc.type must be user'});
 +        } // we only allow user docs for now
 +
 +        if (!newDoc.name) {
 +            throw({forbidden: 'doc.name is required'});
 +        }
 +
 +        if (!newDoc.roles) {
 +            throw({forbidden: 'doc.roles must exist'});
 +        }
 +
 +        if (!isArray(newDoc.roles)) {
 +            throw({forbidden: 'doc.roles must be an array'});
 +        }
 +
++        for (var idx = 0; idx < newDoc.roles.length; idx++) {
++            if (typeof newDoc.roles[idx] !== 'string') {
++                throw({forbidden: 'doc.roles can only contain strings'});
++            }
++        }
++
 +        if (newDoc._id !== ('org.couchdb.user:' + newDoc.name)) {
 +            throw({
 +                forbidden: 'Doc ID must be of the form org.couchdb.user:name'
 +            });
 +        }
 +
 +        if (oldDoc) { // validate all updates
 +            if (oldDoc.name !== newDoc.name) {
 +                throw({forbidden: 'Usernames can not be changed.'});
 +            }
 +        }
 +
 +        if (newDoc.password_sha && !newDoc.salt) {
 +            throw({
 +                forbidden: 'Users with password_sha must have a salt.' +
 +                    'See /_utils/script/couch.js for example code.'
 +            });
 +        }
 +
++        if (newDoc.password_scheme === \"pbkdf2\") {
++            if (typeof(newDoc.iterations) !== \"number\") {
++               throw({forbidden: \"iterations must be a number.\"});
++            }
++            if (typeof(newDoc.derived_key) !== \"string\") {
++               throw({forbidden: \"derived_key must be a string.\"});
++            }
++        }
++
 +        var is_server_or_database_admin = function(userCtx, secObj) {
 +            // see if the user is a server admin
 +            if(userCtx.roles.indexOf('_admin') !== -1) {
 +                return true; // a server admin
 +            }
 +
 +            // see if the user a database admin specified by name
 +            if(secObj && secObj.admins && secObj.admins.names) {
 +                if(secObj.admins.names.indexOf(userCtx.name) !== -1) {
 +                    return true; // database admin
 +                }
 +            }
 +
 +            // see if the user a database admin specified by role
 +            if(secObj && secObj.admins && secObj.admins.roles) {
 +                var db_roles = secObj.admins.roles;
 +                for(var idx = 0; idx < userCtx.roles.length; idx++) {
 +                    var user_role = userCtx.roles[idx];
 +                    if(db_roles.indexOf(user_role) !== -1) {
 +                        return true; // role matches!
 +                    }
 +                }
 +            }
 +
 +            return false; // default to no admin
 +        }
 +
 +        if (!is_server_or_database_admin(userCtx, secObj)) {
 +            if (oldDoc) { // validate non-admin updates
 +                if (userCtx.name !== newDoc.name) {
 +                    throw({
 +                        forbidden: 'You may only update your own user document.'
 +                    });
 +                }
 +                // validate role updates
 +                var oldRoles = oldDoc.roles.sort();
 +                var newRoles = newDoc.roles.sort();
 +
 +                if (oldRoles.length !== newRoles.length) {
 +                    throw({forbidden: 'Only _admin may edit roles'});
 +                }
 +
 +                for (var i = 0; i < oldRoles.length; i++) {
 +                    if (oldRoles[i] !== newRoles[i]) {
 +                        throw({forbidden: 'Only _admin may edit roles'});
 +                    }
 +                }
 +            } else if (newDoc.roles.length > 0) {
 +                throw({forbidden: 'Only _admin may set roles'});
 +            }
 +        }
 +
 +        // no system roles in users db
 +        for (var i = 0; i < newDoc.roles.length; i++) {
 +            if (newDoc.roles[i][0] === '_') {
 +                throw({
 +                    forbidden:
 +                    'No system roles (starting with underscore) in users db.'
 +                });
 +            }
 +        }
 +
 +        // no system names as names
 +        if (newDoc.name[0] === '_') {
 +            throw({forbidden: 'Username may not start with underscore.'});
 +        }
 +
 +        var badUserNameChars = [':'];
 +
 +        for (var i = 0; i < badUserNameChars.length; i++) {
 +            if (newDoc.name.indexOf(badUserNameChars[i]) >= 0) {
 +                throw({forbidden: 'Character `' + badUserNameChars[i] +
 +                        '` is not allowed in usernames.'});
 +            }
 +        }
 +    }
 +">>).
 +
 +
 +-define(OAUTH_MAP_FUN, <<"
 +    function(doc) {
 +        if (doc.type === 'user' && doc.oauth && doc.oauth.consumer_keys) {
 +            for (var consumer_key in doc.oauth.consumer_keys) {
 +                for (var token in doc.oauth.tokens) {
 +                    var obj = {
 +                        'consumer_secret': doc.oauth.consumer_keys[consumer_key],
 +                        'token_secret': doc.oauth.tokens[token],
 +                        'username': doc.name
 +                    };
 +                    emit([consumer_key, token], obj);
 +                }
 +            }
 +        }
 +    }
 +">>).

http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/9d629ff6/src/couch_native_process.erl
----------------------------------------------------------------------
diff --cc src/couch_native_process.erl
index 8ca56f0,0000000..d4b2deb
mode 100644,000000..100644
--- a/src/couch_native_process.erl
+++ b/src/couch_native_process.erl
@@@ -1,422 -1,0 +1,413 @@@
 +% Licensed under the Apache License, Version 2.0 (the "License");
 +% you may not use this file except in compliance with the License.
 +%
 +% You may obtain a copy of the License at
 +% http://www.apache.org/licenses/LICENSE-2.0
 +%
 +% Unless required by applicable law or agreed to in writing,
 +% software distributed under the License is distributed on an
 +% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
 +% either express or implied.
 +%
 +% See the License for the specific language governing permissions
 +% and limitations under the License.
 +%
 +% This file drew much inspiration from erlview, which was written by and
 +% copyright Michael McDaniel [http://autosys.us], and is also under APL 2.0
 +%
 +%
 +% This module provides the smallest possible native view-server.
 +% With this module in-place, you can add the following to your couch INI files:
 +%  [native_query_servers]
 +%  erlang={couch_native_process, start_link, []}
 +%
 +% Which will then allow following example map function to be used:
 +%
 +%  fun({Doc}) ->
 +%    % Below, we emit a single record - the _id as key, null as value
 +%    DocId = couch_util:get_value(<<"_id">>, Doc, null),
 +%    Emit(DocId, null)
 +%  end.
 +%
 +% which should be roughly the same as the javascript:
 +%    emit(doc._id, null);
 +%
 +% This module exposes enough functions such that a native erlang server can
 +% act as a fully-fleged view server, but no 'helper' functions specifically
 +% for simplifying your erlang view code.  It is expected other third-party
 +% extensions will evolve which offer useful layers on top of this view server
 +% to help simplify your view code.
 +-module(couch_native_process).
 +-behaviour(gen_server).
 +
 +-export([start_link/0,init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,
 +         handle_info/2]).
- -export([set_timeout/2, prompt/2, prompt_many/2]).
++-export([set_timeout/2, prompt/2]).
 +
 +-define(STATE, native_proc_state).
 +-record(evstate, {ddocs, funs=[], query_config=[], list_pid=nil, timeout=5000}).
 +
 +-include_lib("couch/include/couch_db.hrl").
 +
 +start_link() ->
 +    gen_server:start_link(?MODULE, [], []).
 +
 +% this is a bit messy, see also couch_query_servers handle_info
 +% stop(_Pid) ->
 +%     ok.
 +
 +set_timeout(Pid, TimeOut) ->
 +    gen_server:call(Pid, {set_timeout, TimeOut}).
 +
 +prompt(Pid, Data) when is_list(Data) ->
 +    gen_server:call(Pid, {prompt, Data}).
 +
- prompt_many(Pid, DataList) ->
-     prompt_many(Pid, DataList, []).
- 
- prompt_many(_Pid, [], Acc) ->
-     {ok, lists:reverse(Acc)};
- prompt_many(Pid, [Data | Rest], Acc) ->
-     Result = prompt(Pid, Data),
-     prompt_many(Pid, Rest, [Result | Acc]).
- 
 +% gen_server callbacks
 +init([]) ->
 +    {ok, #evstate{ddocs=dict:new()}}.
 +
 +handle_call({set_timeout, TimeOut}, _From, State) ->
 +    {reply, ok, State#evstate{timeout=TimeOut}};
 +
 +handle_call({prompt, Data}, _From, State) ->
 +    ?LOG_DEBUG("Prompt native qs: ~s",[?JSON_ENCODE(Data)]),
 +    {NewState, Resp} = try run(State, to_binary(Data)) of
 +        {S, R} -> {S, R}
 +        catch
 +            throw:{error, Why} ->
 +                {State, [<<"error">>, Why, Why]}
 +        end,
 +
 +    case Resp of
 +        {error, Reason} ->
 +            Msg = io_lib:format("couch native server error: ~p", [Reason]),
 +            {reply, [<<"error">>, <<"native_query_server">>, list_to_binary(Msg)], NewState};
 +        [<<"error">> | Rest] ->
 +            % Msg = io_lib:format("couch native server error: ~p", [Rest]),
 +            % TODO: markh? (jan)
 +            {reply, [<<"error">> | Rest], NewState};
 +        [<<"fatal">> | Rest] ->
 +            % Msg = io_lib:format("couch native server error: ~p", [Rest]),
 +            % TODO: markh? (jan)
 +            {stop, fatal, [<<"error">> | Rest], NewState};
 +        Resp ->
 +            {reply, Resp, NewState}
 +    end.
 +
 +handle_cast(garbage_collect, State) ->
 +    erlang:garbage_collect(),
 +    {noreply, State};
 +handle_cast(foo, State) -> {noreply, State}.
 +
 +handle_info({'EXIT',_,normal}, State) -> {noreply, State};
 +handle_info({'EXIT',_,Reason}, State) ->
 +    {stop, Reason, State}.
 +terminate(_Reason, _State) -> ok.
 +code_change(_OldVersion, State, _Extra) -> {ok, State}.
 +
 +run(#evstate{list_pid=Pid}=State, [<<"list_row">>, Row]) when is_pid(Pid) ->
 +    Pid ! {self(), list_row, Row},
 +    receive
 +        {Pid, chunks, Data} ->
 +            {State, [<<"chunks">>, Data]};
 +        {Pid, list_end, Data} ->
 +            receive
 +                {'EXIT', Pid, normal} -> ok
 +            after State#evstate.timeout ->
 +                throw({timeout, list_cleanup})
 +            end,
 +            process_flag(trap_exit, erlang:get(do_trap)),
 +            {State#evstate{list_pid=nil}, [<<"end">>, Data]}
 +    after State#evstate.timeout ->
 +        throw({timeout, list_row})
 +    end;
 +run(#evstate{list_pid=Pid}=State, [<<"list_end">>]) when is_pid(Pid) ->
 +    Pid ! {self(), list_end},
 +    Resp =
 +    receive
 +        {Pid, list_end, Data} ->
 +            receive
 +                {'EXIT', Pid, normal} -> ok
 +            after State#evstate.timeout ->
 +                throw({timeout, list_cleanup})
 +            end,
 +            [<<"end">>, Data]
 +    after State#evstate.timeout ->
 +        throw({timeout, list_end})
 +    end,
 +    process_flag(trap_exit, erlang:get(do_trap)),
 +    {State#evstate{list_pid=nil}, Resp};
 +run(#evstate{list_pid=Pid}=State, _Command) when is_pid(Pid) ->
 +    {State, [<<"error">>, list_error, list_error]};
 +run(#evstate{ddocs=DDocs}, [<<"reset">>]) ->
 +    {#evstate{ddocs=DDocs}, true};
 +run(#evstate{ddocs=DDocs}, [<<"reset">>, QueryConfig]) ->
 +    {#evstate{ddocs=DDocs, query_config=QueryConfig}, true};
 +run(#evstate{funs=Funs}=State, [<<"add_fun">> , BinFunc]) ->
 +    FunInfo = makefun(State, BinFunc),
 +    {State#evstate{funs=Funs ++ [FunInfo]}, true};
 +run(State, [<<"map_doc">> , Doc]) ->
 +    Resp = lists:map(fun({Sig, Fun}) ->
 +        erlang:put(Sig, []),
 +        Fun(Doc),
 +        lists:reverse(erlang:get(Sig))
 +    end, State#evstate.funs),
 +    {State, Resp};
 +run(State, [<<"reduce">>, Funs, KVs]) ->
 +    {Keys, Vals} =
 +    lists:foldl(fun([K, V], {KAcc, VAcc}) ->
 +        {[K | KAcc], [V | VAcc]}
 +    end, {[], []}, KVs),
 +    Keys2 = lists:reverse(Keys),
 +    Vals2 = lists:reverse(Vals),
 +    {State, catch reduce(State, Funs, Keys2, Vals2, false)};
 +run(State, [<<"rereduce">>, Funs, Vals]) ->
 +    {State, catch reduce(State, Funs, null, Vals, true)};
 +run(#evstate{ddocs=DDocs}=State, [<<"ddoc">>, <<"new">>, DDocId, DDoc]) ->
 +    DDocs2 = store_ddoc(DDocs, DDocId, DDoc),
 +    {State#evstate{ddocs=DDocs2}, true};
 +run(#evstate{ddocs=DDocs}=State, [<<"ddoc">>, DDocId | Rest]) ->
 +    DDoc = load_ddoc(DDocs, DDocId),
 +    ddoc(State, DDoc, Rest);
 +run(_, Unknown) ->
 +    ?LOG_ERROR("Native Process: Unknown command: ~p~n", [Unknown]),
 +    throw({error, unknown_command}).
 +    
 +ddoc(State, {DDoc}, [FunPath, Args]) ->
 +    % load fun from the FunPath
 +    BFun = lists:foldl(fun
 +        (Key, {Props}) when is_list(Props) ->
 +            couch_util:get_value(Key, Props, nil);
 +        (_Key, Fun) when is_binary(Fun) ->
 +            Fun;
 +        (_Key, nil) ->
 +            throw({error, not_found});
 +        (_Key, _Fun) ->
 +            throw({error, malformed_ddoc})
 +        end, {DDoc}, FunPath),
 +    ddoc(State, makefun(State, BFun, {DDoc}), FunPath, Args).
 +
 +ddoc(State, {_, Fun}, [<<"validate_doc_update">>], Args) ->
 +    {State, (catch apply(Fun, Args))};
 +ddoc(State, {_, Fun}, [<<"filters">>|_], [Docs, Req]) ->
 +    FilterFunWrapper = fun(Doc) ->
 +        case catch Fun(Doc, Req) of
 +        true -> true;
 +        false -> false;
 +        {'EXIT', Error} -> ?LOG_ERROR("~p", [Error])
 +        end
 +    end,
 +    Resp = lists:map(FilterFunWrapper, Docs),
 +    {State, [true, Resp]};
 +ddoc(State, {_, Fun}, [<<"shows">>|_], Args) ->
 +    Resp = case (catch apply(Fun, Args)) of
 +        FunResp when is_list(FunResp) ->
 +            FunResp;
 +        {FunResp} ->
 +            [<<"resp">>, {FunResp}];
 +        FunResp ->
 +            FunResp
 +    end,
 +    {State, Resp};
 +ddoc(State, {_, Fun}, [<<"updates">>|_], Args) ->
 +    Resp = case (catch apply(Fun, Args)) of
 +        [JsonDoc, JsonResp]  ->
 +            [<<"up">>, JsonDoc, JsonResp]
 +    end,
 +    {State, Resp};
 +ddoc(State, {Sig, Fun}, [<<"lists">>|_], Args) ->
 +    Self = self(),
 +    SpawnFun = fun() ->
 +        LastChunk = (catch apply(Fun, Args)),
 +        case start_list_resp(Self, Sig) of
 +            started ->
 +                receive
 +                    {Self, list_row, _Row} -> ignore;
 +                    {Self, list_end} -> ignore
 +                after State#evstate.timeout ->
 +                    throw({timeout, list_cleanup_pid})
 +                end;
 +            _ ->
 +                ok
 +        end,
 +        LastChunks =
 +        case erlang:get(Sig) of
 +            undefined -> [LastChunk];
 +            OtherChunks -> [LastChunk | OtherChunks]
 +        end,
 +        Self ! {self(), list_end, lists:reverse(LastChunks)}
 +    end,
 +    erlang:put(do_trap, process_flag(trap_exit, true)),
 +    Pid = spawn_link(SpawnFun),
 +    Resp =
 +    receive
 +        {Pid, start, Chunks, JsonResp} ->
 +            [<<"start">>, Chunks, JsonResp]
 +    after State#evstate.timeout ->
 +        throw({timeout, list_start})
 +    end,
 +    {State#evstate{list_pid=Pid}, Resp}.
 +
 +store_ddoc(DDocs, DDocId, DDoc) ->
 +    dict:store(DDocId, DDoc, DDocs).
 +load_ddoc(DDocs, DDocId) ->
 +    try dict:fetch(DDocId, DDocs) of
 +        {DDoc} -> {DDoc}
 +    catch
 +        _:_Else -> throw({error, ?l2b(io_lib:format("Native Query Server missing DDoc with Id: ~s",[DDocId]))})
 +    end.
 +
 +bindings(State, Sig) ->
 +    bindings(State, Sig, nil).
 +bindings(State, Sig, DDoc) ->
 +    Self = self(),
 +
 +    Log = fun(Msg) ->
 +        ?LOG_INFO(Msg, [])
 +    end,
 +
 +    Emit = fun(Id, Value) ->
 +        Curr = erlang:get(Sig),
 +        erlang:put(Sig, [[Id, Value] | Curr])
 +    end,
 +
 +    Start = fun(Headers) ->
 +        erlang:put(list_headers, Headers)
 +    end,
 +
 +    Send = fun(Chunk) ->
 +        Curr =
 +        case erlang:get(Sig) of
 +            undefined -> [];
 +            Else -> Else
 +        end,
 +        erlang:put(Sig, [Chunk | Curr])
 +    end,
 +
 +    GetRow = fun() ->
 +        case start_list_resp(Self, Sig) of
 +            started ->
 +                ok;
 +            _ ->
 +                Chunks =
 +                case erlang:get(Sig) of
 +                    undefined -> [];
 +                    CurrChunks -> CurrChunks
 +                end,
 +                Self ! {self(), chunks, lists:reverse(Chunks)}
 +        end,
 +        erlang:put(Sig, []),
 +        receive
 +            {Self, list_row, Row} -> Row;
 +            {Self, list_end} -> nil
 +        after State#evstate.timeout ->
 +            throw({timeout, list_pid_getrow})
 +        end
 +    end,
 +   
 +    FoldRows = fun(Fun, Acc) -> foldrows(GetRow, Fun, Acc) end,
 +
 +    Bindings = [
 +        {'Log', Log},
 +        {'Emit', Emit},
 +        {'Start', Start},
 +        {'Send', Send},
 +        {'GetRow', GetRow},
 +        {'FoldRows', FoldRows}
 +    ],
 +    case DDoc of
 +        {_Props} ->
 +            Bindings ++ [{'DDoc', DDoc}];
 +        _Else -> Bindings
 +    end.
 +
 +% thanks to erlview, via:
 +% http://erlang.org/pipermail/erlang-questions/2003-November/010544.html
 +makefun(State, Source) ->
 +    Sig = couch_util:md5(Source),
 +    BindFuns = bindings(State, Sig),
 +    {Sig, makefun(State, Source, BindFuns)}.
 +makefun(State, Source, {DDoc}) ->
 +    Sig = couch_util:md5(lists:flatten([Source, term_to_binary(DDoc)])),
 +    BindFuns = bindings(State, Sig, {DDoc}),
 +    {Sig, makefun(State, Source, BindFuns)};
 +makefun(_State, Source, BindFuns) when is_list(BindFuns) ->
 +    FunStr = binary_to_list(Source),
 +    {ok, Tokens, _} = erl_scan:string(FunStr),
 +    Form = case (catch erl_parse:parse_exprs(Tokens)) of
 +        {ok, [ParsedForm]} ->
 +            ParsedForm;
 +        {error, {LineNum, _Mod, [Mesg, Params]}}=Error ->
 +            io:format(standard_error, "Syntax error on line: ~p~n", [LineNum]),
 +            io:format(standard_error, "~s~p~n", [Mesg, Params]),
 +            throw(Error)
 +    end,
 +    Bindings = lists:foldl(fun({Name, Fun}, Acc) ->
 +        erl_eval:add_binding(Name, Fun, Acc)
 +    end, erl_eval:new_bindings(), BindFuns),
 +    {value, Fun, _} = erl_eval:expr(Form, Bindings),
 +    Fun.
 +
 +reduce(State, BinFuns, Keys, Vals, ReReduce) ->
 +    Funs = case is_list(BinFuns) of
 +        true ->
 +            lists:map(fun(BF) -> makefun(State, BF) end, BinFuns);
 +        _ ->
 +            [makefun(State, BinFuns)]
 +    end,
 +    Reds = lists:map(fun({_Sig, Fun}) ->
 +        Fun(Keys, Vals, ReReduce)
 +    end, Funs),
 +    [true, Reds].
 +
 +foldrows(GetRow, ProcRow, Acc) ->
 +    case GetRow() of
 +        nil ->
 +            {ok, Acc};
 +        Row ->
 +            case (catch ProcRow(Row, Acc)) of
 +                {ok, Acc2} ->
 +                    foldrows(GetRow, ProcRow, Acc2);
 +                {stop, Acc2} ->
 +                    {ok, Acc2}
 +            end
 +    end.
 +
 +start_list_resp(Self, Sig) ->
 +    case erlang:get(list_started) of
 +        undefined ->
 +            Headers =
 +            case erlang:get(list_headers) of
 +                undefined -> {[{<<"headers">>, {[]}}]};
 +                CurrHdrs -> CurrHdrs
 +            end,
 +            Chunks =
 +            case erlang:get(Sig) of
 +                undefined -> [];
 +                CurrChunks -> CurrChunks
 +            end,
 +            Self ! {self(), start, lists:reverse(Chunks), Headers},
 +            erlang:put(list_started, true),
 +            erlang:put(Sig, []),
 +            started;
 +        _ ->
 +            ok
 +    end.
 +
 +to_binary({Data}) ->
 +    Pred = fun({Key, Value}) ->
 +        {to_binary(Key), to_binary(Value)}
 +    end,
 +    {lists:map(Pred, Data)};
 +to_binary(Data) when is_list(Data) ->
 +    [to_binary(D) || D <- Data];
 +to_binary(null) ->
 +    null;
 +to_binary(true) ->
 +    true;
 +to_binary(false) ->
 +    false;
 +to_binary(Data) when is_atom(Data) ->
 +    list_to_binary(atom_to_list(Data));
 +to_binary(Data) ->
 +    Data.

http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/9d629ff6/src/couch_os_daemons.erl
----------------------------------------------------------------------
diff --cc src/couch_os_daemons.erl
index 3560149,0000000..1c0d1dc
mode 100644,000000..100644
--- a/src/couch_os_daemons.erl
+++ b/src/couch_os_daemons.erl
@@@ -1,377 -1,0 +1,387 @@@
 +% Licensed under the Apache License, Version 2.0 (the "License"); you may not
 +% use this file except in compliance with the License. You may obtain a copy of
 +% the License at
 +%
 +%   http://www.apache.org/licenses/LICENSE-2.0
 +%
 +% Unless required by applicable law or agreed to in writing, software
 +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 +% License for the specific language governing permissions and limitations under
 +% the License.
 +-module(couch_os_daemons).
 +-behaviour(gen_server).
 +-behaviour(config_listener).
 +
 +-export([start_link/0, info/0, info/1]).
 +
 +-export([init/1, terminate/2, code_change/3]).
 +-export([handle_call/3, handle_cast/2, handle_info/2]).
 +
 +% config_listener api
 +-export([handle_config_change/5]).
 +
 +-include_lib("couch/include/couch_db.hrl").
 +
 +-record(daemon, {
 +    port,
 +    name,
 +    cmd,
 +    kill,
 +    status=running,
 +    cfg_patterns=[],
 +    errors=[],
 +    buf=[]
 +}).
 +
 +-define(PORT_OPTIONS, [stream, {line, 1024}, binary, exit_status, hide]).
 +-define(TIMEOUT, 5000).
 +
 +start_link() ->
 +    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
 +
 +info() ->
 +    info([]).
 +
 +info(Options) ->
 +    gen_server:call(?MODULE, {daemon_info, Options}).
 +
 +init(_) ->
 +    process_flag(trap_exit, true),
 +    ok = config:listen_for_changes(?MODULE, nil),
 +    Table = ets:new(?MODULE, [protected, set, {keypos, #daemon.port}]),
 +    reload_daemons(Table),
 +    {ok, Table}.
 +
 +terminate(_Reason, Table) ->
 +    [stop_port(D) || D <- ets:tab2list(Table)],
 +    ok.
 +
 +handle_call({daemon_info, Options}, _From, Table) when is_list(Options) ->
 +    case lists:member(table, Options) of
 +        true ->
 +            {reply, {ok, ets:tab2list(Table)}, Table};
 +        _ ->
 +            {reply, {ok, Table}, Table}
 +    end;
 +handle_call(Msg, From, Table) ->
 +    ?LOG_ERROR("Unknown call message to ~p from ~p: ~p", [?MODULE, From, Msg]),
 +    {stop, error, Table}.
 +
 +handle_cast({config_change, Sect, Key}, Table) ->
 +    restart_daemons(Table, Sect, Key),
 +    case Sect of
 +        "os_daemons" -> reload_daemons(Table);
 +        _ -> ok
 +    end,
 +    {noreply, Table};
 +handle_cast(stop, Table) ->
 +    {stop, normal, Table};
 +handle_cast(Msg, Table) ->
 +    ?LOG_ERROR("Unknown cast message to ~p: ~p", [?MODULE, Msg]),
 +    {stop, error, Table}.
 +
 +handle_info({gen_event_EXIT, {config_listener, ?MODULE}, _Reason}, State) ->
 +    erlang:send_after(5000, self(), restart_config_listener),
 +    {noreply, State};
 +handle_info(restart_config_listener, State) ->
 +    ok = config:listen_for_changes(?MODULE, nil),
 +    {noreply, State};
 +handle_info({'EXIT', Port, Reason}, Table) ->
 +    case ets:lookup(Table, Port) of
 +        [] ->
 +            ?LOG_INFO("Port ~p exited after stopping: ~p~n", [Port, Reason]);
 +        [#daemon{status=stopping}] ->
 +            true = ets:delete(Table, Port);
 +        [#daemon{name=Name, status=restarting}=D] ->
 +            ?LOG_INFO("Daemon ~p restarting after config change.", [Name]),
 +            true = ets:delete(Table, Port),
 +            {ok, Port2} = start_port(D#daemon.cmd),
 +            true = ets:insert(Table, D#daemon{
 +                port=Port2, status=running, kill=undefined, buf=[]
 +            });
 +        [#daemon{name=Name, status=halted}] ->
 +            ?LOG_ERROR("Halted daemon process: ~p", [Name]);
 +        [D] ->
 +            ?LOG_ERROR("Invalid port state at exit: ~p", [D])
 +    end,
 +    {noreply, Table};
 +handle_info({Port, closed}, Table) ->
 +    handle_info({Port, {exit_status, closed}}, Table);
 +handle_info({Port, {exit_status, Status}}, Table) ->
 +    case ets:lookup(Table, Port) of
 +        [] ->
 +            ?LOG_ERROR("Unknown port ~p exiting ~p", [Port, Status]),
 +            {stop, {error, unknown_port_died, Status}, Table};
 +        [#daemon{name=Name, status=restarting}=D] ->
 +            ?LOG_INFO("Daemon ~p restarting after config change.", [Name]),
 +            true = ets:delete(Table, Port),
 +            {ok, Port2} = start_port(D#daemon.cmd),
 +            true = ets:insert(Table, D#daemon{
 +                port=Port2, status=running, kill=undefined, buf=[]
 +            }),
 +            {noreply, Table};
 +        [#daemon{status=stopping}=D] ->
 +            % The configuration changed and this daemon is no
 +            % longer needed.
 +            ?LOG_DEBUG("Port ~p shut down.", [D#daemon.name]),
 +            true = ets:delete(Table, Port),
 +            {noreply, Table};
 +        [D] ->
 +            % Port died for unknown reason. Check to see if it's
 +            % died too many times or if we should boot it back up.
 +            case should_halt([now() | D#daemon.errors]) of
 +                {true, _} ->
 +                    % Halting the process. We won't try and reboot
 +                    % until the configuration changes.
 +                    Fmt = "Daemon ~p halted with exit_status ~p",
 +                    ?LOG_ERROR(Fmt, [D#daemon.name, Status]),
 +                    D2 = D#daemon{status=halted, errors=nil, buf=nil},
 +                    true = ets:insert(Table, D2),
 +                    {noreply, Table};
 +                {false, Errors} ->
 +                    % We're guessing it was a random error, this daemon
 +                    % has behaved so we'll give it another chance.
 +                    Fmt = "Daemon ~p is being rebooted after exit_status ~p",
 +                    ?LOG_INFO(Fmt, [D#daemon.name, Status]),
 +                    true = ets:delete(Table, Port),
 +                    {ok, Port2} = start_port(D#daemon.cmd),
 +                    true = ets:insert(Table, D#daemon{
 +                        port=Port2, status=running, kill=undefined,
 +                                                errors=Errors, buf=[]
 +                    }),
 +                    {noreply, Table}
 +            end;
 +        _Else ->
 +            throw(error)
 +    end;
 +handle_info({Port, {data, {noeol, Data}}}, Table) ->
 +    [#daemon{buf=Buf}=D] = ets:lookup(Table, Port),
 +    true = ets:insert(Table, D#daemon{buf=[Data | Buf]}),
 +    {noreply, Table};
 +handle_info({Port, {data, {eol, Data}}}, Table) ->
 +    [#daemon{buf=Buf}=D] = ets:lookup(Table, Port),
 +    Line = lists:reverse(Buf, Data),
 +    % The first line echoed back is the kill command
 +    % for when we go to get rid of the port. Lines after
 +    % that are considered part of the stdio API.
 +    case D#daemon.kill of
 +        undefined ->
 +            true = ets:insert(Table, D#daemon{kill=?b2l(Line), buf=[]});
 +        _Else ->
 +            D2 = case (catch ?JSON_DECODE(Line)) of
 +                {invalid_json, Rejected} ->
 +                    ?LOG_ERROR("Ignoring OS daemon request: ~p", [Rejected]),
 +                    D;
 +                JSON ->
 +                    {ok, D3} = handle_port_message(D, JSON),
 +                    D3
 +            end,
 +            true = ets:insert(Table, D2#daemon{buf=[]})
 +    end,
 +    {noreply, Table};
 +handle_info({Port, Error}, Table) ->
 +    ?LOG_ERROR("Unexpectd message from port ~p: ~p", [Port, Error]),
 +    stop_port(Port),
 +    [D] = ets:lookup(Table, Port),
 +    true = ets:insert(Table, D#daemon{status=restarting, buf=nil}),
 +    {noreply, Table};
 +handle_info(Msg, Table) ->
 +    ?LOG_ERROR("Unexpected info message to ~p: ~p", [?MODULE, Msg]),
 +    {stop, error, Table}.
 +
 +code_change(_OldVsn, State, _Extra) ->
 +    {ok, State}.
 +
 +
 +handle_config_change(Section, Key, _, _, _) ->
 +    gen_server:cast(?MODULE, {config_change, Section, Key}),
 +    {ok, nil}.
 +
 +
 +% Internal API
 +
 +%
 +% Port management helpers
 +%
 +
 +start_port(Command) ->
++    start_port(Command, []).
++
++start_port(Command, EnvPairs) ->
 +    PrivDir = couch_util:priv_dir(),
 +    Spawnkiller = filename:join(PrivDir, "couchspawnkillable"),
-     Port = open_port({spawn, Spawnkiller ++ " " ++ Command}, ?PORT_OPTIONS),
++    Opts = case lists:keytake(env, 1, ?PORT_OPTIONS) of
++        false ->
++            ?PORT_OPTIONS ++ [ {env,EnvPairs} ];
++        {value, {env,OldPairs}, SubOpts} ->
++            AllPairs = lists:keymerge(1, EnvPairs, OldPairs),
++            SubOpts ++ [ {env,AllPairs} ]
++    end,
++    Port = open_port({spawn, Spawnkiller ++ " " ++ Command}, Opts),
 +    {ok, Port}.
 +
 +
 +stop_port(#daemon{port=Port, kill=undefined}=D) ->
 +    ?LOG_ERROR("Stopping daemon without a kill command: ~p", [D#daemon.name]),
 +    catch port_close(Port);
 +stop_port(#daemon{port=Port}=D) ->
 +    ?LOG_DEBUG("Stopping daemon: ~p", [D#daemon.name]),
 +    os:cmd(D#daemon.kill),
 +    catch port_close(Port).
 +
 +
 +handle_port_message(#daemon{port=Port}=Daemon, [<<"get">>, Section]) ->
 +    KVs = config:get(Section),
 +    Data = lists:map(fun({K, V}) -> {?l2b(K), ?l2b(V)} end, KVs),
 +    Json = iolist_to_binary(?JSON_ENCODE({Data})),
 +    port_command(Port, <<Json/binary, "\n">>),
 +    {ok, Daemon};
 +handle_port_message(#daemon{port=Port}=Daemon, [<<"get">>, Section, Key]) ->
 +    Value = case config:get(Section, Key, null) of
 +        null -> null;
 +        String -> ?l2b(String)
 +    end,
 +    Json = iolist_to_binary(?JSON_ENCODE(Value)),
 +    port_command(Port, <<Json/binary, "\n">>),
 +    {ok, Daemon};
 +handle_port_message(Daemon, [<<"register">>, Sec]) when is_binary(Sec) ->
 +    Patterns = lists:usort(Daemon#daemon.cfg_patterns ++ [{?b2l(Sec)}]),
 +    {ok, Daemon#daemon{cfg_patterns=Patterns}};
 +handle_port_message(Daemon, [<<"register">>, Sec, Key])
 +                        when is_binary(Sec) andalso is_binary(Key) ->
 +    Pattern = {?b2l(Sec), ?b2l(Key)},
 +    Patterns = lists:usort(Daemon#daemon.cfg_patterns ++ [Pattern]),
 +    {ok, Daemon#daemon{cfg_patterns=Patterns}};
 +handle_port_message(#daemon{name=Name}=Daemon, [<<"log">>, Msg]) ->
 +    handle_log_message(Name, Msg, <<"info">>),
 +    {ok, Daemon};
 +handle_port_message(#daemon{name=Name}=Daemon, [<<"log">>, Msg, {Opts}]) ->
 +    Level = couch_util:get_value(<<"level">>, Opts, <<"info">>),
 +    handle_log_message(Name, Msg, Level),
 +    {ok, Daemon};
 +handle_port_message(#daemon{name=Name}=Daemon, Else) ->
 +    ?LOG_ERROR("Daemon ~p made invalid request: ~p", [Name, Else]),
 +    {ok, Daemon}.
 +
 +
 +handle_log_message(Name, Msg, _Level) when not is_binary(Msg) ->
 +    ?LOG_ERROR("Invalid log message from daemon ~p: ~p", [Name, Msg]);
 +handle_log_message(Name, Msg, <<"debug">>) ->
 +    ?LOG_DEBUG("Daemon ~p :: ~s", [Name, ?b2l(Msg)]);
 +handle_log_message(Name, Msg, <<"info">>) ->
 +    ?LOG_INFO("Daemon ~p :: ~s", [Name, ?b2l(Msg)]);
 +handle_log_message(Name, Msg, <<"error">>) ->
 +    ?LOG_ERROR("Daemon: ~p :: ~s", [Name, ?b2l(Msg)]);
 +handle_log_message(Name, Msg, Level) ->
 +    ?LOG_ERROR("Invalid log level from daemon: ~p", [Level]),
 +    ?LOG_INFO("Daemon: ~p :: ~s", [Name, ?b2l(Msg)]).
 +
 +%
 +% Daemon management helpers
 +%
 +
 +reload_daemons(Table) ->
 +    % List of daemons we want to have running.
 +    Configured = lists:sort(config:get("os_daemons")),
 +    
 +    % Remove records for daemons that were halted.
 +    MSpecHalted = #daemon{name='$1', cmd='$2', status=halted, _='_'},
 +    Halted = lists:sort([{N, C} || [N, C] <- ets:match(Table, MSpecHalted)]),
 +    ok = stop_os_daemons(Table, find_to_stop(Configured, Halted, [])),
 +    
 +    % Stop daemons that are running
 +    % Start newly configured daemons
 +    MSpecRunning = #daemon{name='$1', cmd='$2', status=running, _='_'},
 +    Running = lists:sort([{N, C} || [N, C] <- ets:match(Table, MSpecRunning)]),
 +    ok = stop_os_daemons(Table, find_to_stop(Configured, Running, [])),
 +    ok = boot_os_daemons(Table, find_to_boot(Configured, Running, [])),
 +    ok.
 +
 +
 +restart_daemons(Table, Sect, Key) ->
 +    restart_daemons(Table, Sect, Key, ets:first(Table)).
 +
 +restart_daemons(_, _, _, '$end_of_table') ->
 +    ok;
 +restart_daemons(Table, Sect, Key, Port) ->
 +    [D] = ets:lookup(Table, Port),
 +    HasSect = lists:member({Sect}, D#daemon.cfg_patterns),
 +    HasKey = lists:member({Sect, Key}, D#daemon.cfg_patterns),
 +    case HasSect or HasKey of
 +        true ->
 +            stop_port(D),
 +            D2 = D#daemon{status=restarting, buf=nil},
 +            true = ets:insert(Table, D2);
 +        _ ->
 +            ok
 +    end,
 +    restart_daemons(Table, Sect, Key, ets:next(Table, Port)).
 +    
 +
 +stop_os_daemons(_Table, []) ->
 +    ok;
 +stop_os_daemons(Table, [{Name, Cmd} | Rest]) ->
 +    [[Port]] = ets:match(Table, #daemon{port='$1', name=Name, cmd=Cmd, _='_'}),
 +    [D] = ets:lookup(Table, Port),
 +    case D#daemon.status of
 +        halted ->
 +            ets:delete(Table, Port);
 +        _ ->
 +            stop_port(D),
 +            D2 = D#daemon{status=stopping, errors=nil, buf=nil},
 +            true = ets:insert(Table, D2)
 +    end,
 +    stop_os_daemons(Table, Rest).
 +    
 +boot_os_daemons(_Table, []) ->
 +    ok;
 +boot_os_daemons(Table, [{Name, Cmd} | Rest]) ->
 +    {ok, Port} = start_port(Cmd),
 +    true = ets:insert(Table, #daemon{port=Port, name=Name, cmd=Cmd}),
 +    boot_os_daemons(Table, Rest).
 +    
 +% Elements unique to the configured set need to be booted.
 +find_to_boot([], _Rest, Acc) ->
 +    % Nothing else configured.
 +    Acc;
 +find_to_boot([D | R1], [D | R2], Acc) ->
 +    % Elements are equal, daemon already running.
 +    find_to_boot(R1, R2, Acc);
 +find_to_boot([D1 | R1], [D2 | _]=A2, Acc) when D1 < D2 ->
 +    find_to_boot(R1, A2, [D1 | Acc]);
 +find_to_boot(A1, [_ | R2], Acc) ->
 +    find_to_boot(A1, R2, Acc);
 +find_to_boot(Rest, [], Acc) ->
 +    % No more candidates for already running. Boot all.
 +    Rest ++ Acc.
 +
 +% Elements unique to the running set need to be killed.
 +find_to_stop([], Rest, Acc) ->
 +    % The rest haven't been found, so they must all
 +    % be ready to die.
 +    Rest ++ Acc;
 +find_to_stop([D | R1], [D | R2], Acc) ->
 +    % Elements are equal, daemon already running.
 +    find_to_stop(R1, R2, Acc);
 +find_to_stop([D1 | R1], [D2 | _]=A2, Acc) when D1 < D2 ->
 +    find_to_stop(R1, A2, Acc);
 +find_to_stop(A1, [D2 | R2], Acc) ->
 +    find_to_stop(A1, R2, [D2 | Acc]);
 +find_to_stop(_, [], Acc) ->
 +    % No more running daemons to worry about.
 +    Acc.
 +
 +should_halt(Errors) ->
 +    RetryTimeCfg = config:get("os_daemon_settings", "retry_time", "5"),
 +    RetryTime = list_to_integer(RetryTimeCfg),
 +
 +    Now = now(),
 +    RecentErrors = lists:filter(fun(Time) ->
 +        timer:now_diff(Now, Time) =< RetryTime * 1000000
 +    end, Errors),
 +
 +    RetryCfg = config:get("os_daemon_settings", "max_retries", "3"),
 +    Retries = list_to_integer(RetryCfg),
 +
 +    {length(RecentErrors) >= Retries, RecentErrors}.

http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/9d629ff6/src/couch_os_process.erl
----------------------------------------------------------------------
diff --cc src/couch_os_process.erl
index c6e6520,0000000..6756dd3
mode 100644,000000..100644
--- a/src/couch_os_process.erl
+++ b/src/couch_os_process.erl
@@@ -1,285 -1,0 +1,249 @@@
 +% Licensed under the Apache License, Version 2.0 (the "License"); you may not
 +% use this file except in compliance with the License. You may obtain a copy of
 +% the License at
 +%
 +%   http://www.apache.org/licenses/LICENSE-2.0
 +%
 +% Unless required by applicable law or agreed to in writing, software
 +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 +% License for the specific language governing permissions and limitations under
 +% the License.
 +
 +-module(couch_os_process).
 +-behaviour(gen_server).
 +
 +-export([start_link/1, start_link/2, start_link/3, stop/1]).
- -export([set_timeout/2, prompt/2, prompt_many/2, killer/1]).
++-export([set_timeout/2, prompt/2, killer/1]).
 +-export([send/2, writeline/2, readline/1, writejson/2, readjson/1]).
 +-export([init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2, code_change/3]).
 +
 +-include_lib("couch/include/couch_db.hrl").
 +
 +-define(PORT_OPTIONS, [stream, {line, 4096}, binary, exit_status, hide]).
 +
 +-record(os_proc,
 +    {command,
 +     port,
 +     writer,
 +     reader,
 +     timeout=5000,
 +     idle
 +    }).
 +
 +start_link(Command) ->
 +    start_link(Command, []).
 +start_link(Command, Options) ->
 +    start_link(Command, Options, ?PORT_OPTIONS).
 +start_link(Command, Options, PortOptions) ->
 +    gen_server:start_link(couch_os_process, [Command, Options, PortOptions], []).
 +
 +stop(Pid) ->
 +    gen_server:cast(Pid, stop).
 +
 +% Read/Write API
 +set_timeout(Pid, TimeOut) when is_integer(TimeOut) ->
 +    ok = gen_server:call(Pid, {set_timeout, TimeOut}, infinity).
 +
 +% Used by couch_db_update_notifier.erl
 +send(Pid, Data) ->
 +    gen_server:cast(Pid, {send, Data}).
 +
 +prompt(Pid, Data) ->
 +    case gen_server:call(Pid, {prompt, Data}, infinity) of
 +        {ok, Result} ->
 +            Result;
 +        Error ->
 +            ?LOG_ERROR("OS Process Error ~p :: ~p",[Pid,Error]),
 +            throw(Error)
 +    end.
 +
- prompt_many(Pid, DataList) ->
-     OsProc = gen_server:call(Pid, get_os_proc, infinity),
-     true = port_connect(OsProc#os_proc.port, self()),
-     try
-         send_many(OsProc, DataList),
-         receive_many(length(DataList), OsProc, [])
-     after
-         % Can throw badarg error, when OsProc Pid is dead or port was closed
-         % by the readline function on error/timeout.
-         (catch port_connect(OsProc#os_proc.port, Pid)),
-         unlink(OsProc#os_proc.port),
-         drop_port_messages(OsProc#os_proc.port)
-     end.
- 
- send_many(_OsProc, []) ->
-     ok;
- send_many(#os_proc{writer = Writer} = OsProc, [Data | Rest]) ->
-     Writer(OsProc, Data),
-     send_many(OsProc, Rest).
- 
- receive_many(0, _OsProc, Acc) ->
-     {ok, lists:reverse(Acc)};
- receive_many(N, #os_proc{reader = Reader} = OsProc, Acc) ->
-     Line = Reader(OsProc),
-     receive_many(N - 1, OsProc, [Line | Acc]).
- 
- drop_port_messages(Port) ->
-     receive
-     {Port, _} ->
-         drop_port_messages(Port)
-     after 0 ->
-         ok
-     end.
- 
 +% Utility functions for reading and writing
 +% in custom functions
 +writeline(OsProc, Data) when is_record(OsProc, os_proc) ->
 +    port_command(OsProc#os_proc.port, [Data, $\n]).
 +
 +readline(#os_proc{} = OsProc) ->
 +    readline(OsProc, []).
 +readline(#os_proc{port = Port} = OsProc, Acc) ->
 +    receive
 +    {Port, {data, {noeol, Data}}} when is_binary(Acc) ->
 +        readline(OsProc, <<Acc/binary,Data/binary>>);
 +    {Port, {data, {noeol, Data}}} when is_binary(Data) ->
 +        readline(OsProc, Data);
 +    {Port, {data, {noeol, Data}}} ->
 +        readline(OsProc, [Data|Acc]);
 +    {Port, {data, {eol, <<Data/binary>>}}} when is_binary(Acc) ->
 +        [<<Acc/binary,Data/binary>>];
 +    {Port, {data, {eol, Data}}} when is_binary(Data) ->
 +        [Data];
 +    {Port, {data, {eol, Data}}} ->
 +        lists:reverse(Acc, Data);
 +    {Port, Err} ->
 +        catch port_close(Port),
 +        throw({os_process_error, Err})
 +    after OsProc#os_proc.timeout ->
 +        catch port_close(Port),
 +        throw({os_process_error, "OS process timed out."})
 +    end.
 +
 +% Standard JSON functions
 +writejson(OsProc, Data) when is_record(OsProc, os_proc) ->
 +    JsonData = ?JSON_ENCODE(Data),
 +    ?LOG_DEBUG("OS Process ~p Input  :: ~s", [OsProc#os_proc.port, JsonData]),
 +    true = writeline(OsProc, JsonData).
 +
 +readjson(OsProc) when is_record(OsProc, os_proc) ->
 +    Line = iolist_to_binary(readline(OsProc)),
 +    ?LOG_DEBUG("OS Process ~p Output :: ~s", [OsProc#os_proc.port, Line]),
 +    try
 +        % Don't actually parse the whole JSON. Just try to see if it's
 +        % a command or a doc map/reduce/filter/show/list/update output.
 +        % If it's a command then parse the whole JSON and execute the
 +        % command, otherwise return the raw JSON line to the caller.
 +        pick_command(Line)
 +    catch
 +    throw:abort ->
 +        {json, Line};
 +    throw:{cmd, _Cmd} ->
 +        case ?JSON_DECODE(Line) of
 +        [<<"log">>, Msg] when is_binary(Msg) ->
 +            % we got a message to log. Log it and continue
 +            ?LOG_INFO("OS Process ~p Log :: ~s", [OsProc#os_proc.port, Msg]),
 +            readjson(OsProc);
 +        [<<"error">>, Id, Reason] ->
 +            throw({error, {couch_util:to_existing_atom(Id),Reason}});
 +        [<<"fatal">>, Id, Reason] ->
 +            ?LOG_INFO("OS Process ~p Fatal Error :: ~s ~p",
 +                [OsProc#os_proc.port, Id, Reason]),
 +            throw({couch_util:to_existing_atom(Id),Reason});
 +        _Result ->
 +            {json, Line}
 +        end
 +    end.
 +
 +pick_command(Line) ->
 +    json_stream_parse:events(Line, fun pick_command0/1).
 +
 +pick_command0(array_start) ->
 +    fun pick_command1/1;
 +pick_command0(_) ->
 +    throw(abort).
 +
 +pick_command1(<<"log">> = Cmd) ->
 +    throw({cmd, Cmd});
 +pick_command1(<<"error">> = Cmd) ->
 +    throw({cmd, Cmd});
 +pick_command1(<<"fatal">> = Cmd) ->
 +    throw({cmd, Cmd});
 +pick_command1(_) ->
 +    throw(abort).
 +
 +
 +% gen_server API
 +init([Command, Options, PortOptions]) ->
 +    PrivDir = couch_util:priv_dir(),
 +    Spawnkiller = filename:join(PrivDir, "couchspawnkillable"),
 +    V = config:get("query_server_config", "os_process_idle_limit", "300"),
 +    IdleLimit = list_to_integer(V) * 1000,
 +    BaseProc = #os_proc{
 +        command=Command,
 +        port=open_port({spawn, Spawnkiller ++ " " ++ Command}, PortOptions),
 +        writer=fun ?MODULE:writejson/2,
 +        reader=fun ?MODULE:readjson/1,
 +        idle=IdleLimit
 +    },
 +    KillCmd = iolist_to_binary(readline(BaseProc)),
 +    Pid = self(),
 +    ?LOG_DEBUG("OS Process Start :: ~p", [BaseProc#os_proc.port]),
 +    spawn(fun() ->
 +            % this ensure the real os process is killed when this process dies.
 +            erlang:monitor(process, Pid),
 +            receive _ -> ok end,
 +            killer(?b2l(KillCmd))
 +        end),
 +    OsProc =
 +    lists:foldl(fun(Opt, Proc) ->
 +        case Opt of
 +        {writer, Writer} when is_function(Writer) ->
 +            Proc#os_proc{writer=Writer};
 +        {reader, Reader} when is_function(Reader) ->
 +            Proc#os_proc{reader=Reader};
 +        {timeout, TimeOut} when is_integer(TimeOut) ->
 +            Proc#os_proc{timeout=TimeOut}
 +        end
 +    end, BaseProc, Options),
 +    {ok, OsProc, IdleLimit}.
 +
 +terminate(_Reason, #os_proc{port=Port}) ->
 +    catch port_close(Port),
 +    ok.
 +
- handle_call(get_os_proc, _From, #os_proc{idle=Idle}=OsProc) ->
-     {reply, OsProc, OsProc, Idle};
 +handle_call({set_timeout, TimeOut}, _From, #os_proc{idle=Idle}=OsProc) ->
 +    {reply, ok, OsProc#os_proc{timeout=TimeOut}, Idle};
 +handle_call({prompt, Data}, _From, #os_proc{idle=Idle}=OsProc) ->
 +    #os_proc{writer=Writer, reader=Reader} = OsProc,
 +    try
 +        Writer(OsProc, Data),
 +        {reply, {ok, Reader(OsProc)}, OsProc, Idle}
 +    catch
 +        throw:{error, OsError} ->
 +            {reply, OsError, OsProc, Idle};
 +        throw:{fatal, OsError} ->
 +            {stop, normal, OsError, OsProc};
 +        throw:OtherError ->
 +            {stop, normal, OtherError, OsProc}
 +    after
 +        garbage_collect()
 +    end.
 +
 +handle_cast({send, Data}, #os_proc{writer=Writer, idle=Idle}=OsProc) ->
 +    try
 +        Writer(OsProc, Data),
 +        {noreply, OsProc, Idle}
 +    catch
 +        throw:OsError ->
 +            ?LOG_ERROR("Failed sending data: ~p -> ~p", [Data, OsError]),
 +            {stop, normal, OsProc}
 +    end;
 +handle_cast(stop, OsProc) ->
 +    {stop, normal, OsProc};
 +handle_cast(Msg, #os_proc{idle=Idle}=OsProc) ->
 +    ?LOG_DEBUG("OS Proc: Unknown cast: ~p", [Msg]),
 +    {noreply, OsProc, Idle}.
 +
 +handle_info(timeout, #os_proc{idle=Idle}=OsProc) ->
 +    gen_server:cast(couch_proc_manager, {os_proc_idle, self()}),
 +    erlang:garbage_collect(),
 +    {noreply, OsProc, Idle};
 +handle_info({Port, {exit_status, 0}}, #os_proc{port=Port}=OsProc) ->
 +    ?LOG_INFO("OS Process terminated normally", []),
 +    {stop, normal, OsProc};
 +handle_info({Port, {exit_status, Status}}, #os_proc{port=Port}=OsProc) ->
 +    ?LOG_ERROR("OS Process died with status: ~p", [Status]),
 +    {stop, {exit_status, Status}, OsProc};
 +handle_info(Msg, #os_proc{idle=Idle}=OsProc) ->
 +    ?LOG_DEBUG("OS Proc: Unknown info: ~p", [Msg]),
 +    {noreply, OsProc, Idle}.
 +
 +code_change(_, {os_proc, Cmd, Port, W, R, Timeout} , _) ->
 +    V = config:get("query_server_config","os_process_idle_limit","300"),
 +    State = #os_proc{
 +        command = Cmd,
 +        port = Port,
 +        writer = W,
 +        reader = R,
 +        timeout = Timeout,
 +        idle = list_to_integer(V) * 1000
 +    },
 +    {ok, State};
 +code_change(_OldVsn, State, _Extra) ->
 +    {ok, State}.
 +
 +killer(KillCmd) ->
 +    receive _ ->
 +        os:cmd(KillCmd)
 +    after 1000 ->
 +        ?MODULE:killer(KillCmd)
 +    end.
 +

http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/9d629ff6/src/couch_query_servers.erl
----------------------------------------------------------------------
diff --cc src/couch_query_servers.erl
index 4fef028,0000000..5b1f77d
mode 100644,000000..100644
--- a/src/couch_query_servers.erl
+++ b/src/couch_query_servers.erl
@@@ -1,479 -1,0 +1,474 @@@
 +% Licensed under the Apache License, Version 2.0 (the "License"); you may not
 +% use this file except in compliance with the License. You may obtain a copy of
 +% the License at
 +%
 +%   http://www.apache.org/licenses/LICENSE-2.0
 +%
 +% Unless required by applicable law or agreed to in writing, software
 +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 +% License for the specific language governing permissions and limitations under
 +% the License.
 +
 +-module(couch_query_servers).
 +
 +-export([try_compile/4]).
- -export([start_doc_map/3, map_docs/2, map_docs_raw/2, stop_doc_map/1, raw_to_ejson/1]).
++-export([start_doc_map/3, map_docs/2, stop_doc_map/1, raw_to_ejson/1]).
 +-export([reduce/3, rereduce/3,validate_doc_update/5]).
 +-export([filter_docs/5]).
 +-export([filter_view/3]).
 +
 +-export([with_ddoc_proc/2, proc_prompt/2, ddoc_prompt/3, ddoc_proc_prompt/3, json_doc/1]).
 +
 +% For 210-os-proc-pool.t
 +-export([get_os_process/1, ret_os_process/1]).
 +
 +-include_lib("couch/include/couch_db.hrl").
 +
 +-define(SUMERROR, <<"The _sum function requires that map values be numbers, "
 +    "arrays of numbers, or objects, not '~p'. Objects cannot be mixed with other "
 +    "data structures. Objects can be arbitrarily nested, provided that the values "
 +    "for all fields are themselves numbers, arrays of numbers, or objects.">>).
 +
 +-define(STATERROR, <<"The _stats function requires that map values be numbers "
 +    "or arrays of numbers, not '~p'">>).
 +
 +% https://gist.github.com/df10284c76d85f988c3f
 +-define(SUMREGEX, {re_pattern,3,0,<<69,82,67,80,194,0,0,0,8,0,0,0,5,0,0,0,3,0,
 +2,0,0,0,125,2,48,0,9,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,2,118,97,108,
 +117,101,115,0,93,0,130,65,9,27,102,27,117,27,110,27,99,27,116,27,105,27,111,27,
 +110,102,94,0,9,0,1,66,9,58,11,84,0,9,65,9,27,40,65,9,58,11,65,9,27,44,56,9,94,
 +0,7,0,2,58,11,84,0,7,102,94,0,15,0,3,65,9,27,44,65,9,58,11,56,9,84,0,15,65,9,
 +27,41,65,9,27,123,65,9,27,114,27,101,27,116,27,117,27,114,27,110,66,9,27,115,
 +27,117,27,109,65,9,27,40,56,9,80,0,2,65,9,27,41,56,9,34,59,65,9,27,125,56,9,84,
 +0,130,0,0,0,0>>}).
 +
 +% https://gist.github.com/cbd73238b671325f5a6f
 +-define(COUNTREGEX, {re_pattern,8,0,<<69,82,67,80,30,2,0,0,8,0,0,0,5,0,0,0,8,0,
 +4,0,0,0,125,2,48,0,11,0,2,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,4,114,101,
 +114,101,100,117,99,101,0,0,2,118,97,108,117,101,115,0,101,0,93,1,206,65,9,27,
 +102,27,117,27,110,27,99,27,116,27,105,27,111,27,110,102,94,0,9,0,1,66,9,58,11,
 +84,0,9,65,9,27,40,65,9,58,11,65,9,27,44,56,9,94,0,7,0,2,58,11,84,0,7,102,94,0,
 +23,0,3,65,9,27,44,56,9,94,0,7,0,4,58,11,84,0,7,56,9,84,0,23,65,9,27,41,65,9,27,
 +123,56,9,94,0,136,0,5,94,0,128,0,6,27,105,27,102,65,9,27,40,56,9,80,0,4,65,9,
 +27,41,56,9,34,123,65,9,27,114,27,101,27,116,27,117,27,114,27,110,66,9,27,115,
 +27,117,27,109,65,9,27,40,56,9,80,0,2,65,9,27,41,56,9,34,59,56,9,34,125,65,9,27,
 +101,27,108,27,115,27,101,56,9,34,123,65,9,27,114,27,101,27,116,27,117,27,114,
 +27,110,58,9,80,0,2,65,9,27,46,65,9,27,108,27,101,27,110,27,103,27,116,27,104,
 +56,9,34,59,56,9,34,125,84,0,128,83,0,138,94,0,132,0,7,27,105,27,102,65,9,27,40,
 +65,9,27,33,56,9,80,0,4,65,9,27,41,56,9,34,123,65,9,27,114,27,101,27,116,27,117,
 +27,114,27,110,58,9,80,0,2,65,9,27,46,65,9,27,108,27,101,27,110,27,103,27,116,
 +27,104,56,9,34,59,56,9,34,125,65,9,27,101,27,108,27,115,27,101,56,9,34,123,65,
 +9,27,114,27,101,27,116,27,117,27,114,27,110,66,9,27,115,27,117,27,109,65,9,27,
 +40,56,9,80,0,2,65,9,27,41,56,9,34,59,56,9,34,125,84,0,132,83,0,84,94,0,78,0,8,
 +27,114,27,101,27,116,27,117,27,114,27,110,58,9,80,0,4,65,9,27,63,65,9,27,115,
 +27,117,27,109,65,9,27,40,56,9,80,0,2,65,9,27,41,65,9,27,58,56,9,80,0,2,65,9,27,
 +46,65,9,27,108,27,101,27,110,27,103,27,116,27,104,56,9,34,59,84,0,78,84,1,102,
 +65,9,27,125,56,9,84,1,206,0,0,0,0,0,0,0>>}).
 +
 +
 +try_compile(Proc, FunctionType, FunctionName, FunctionSource) ->
 +    try
 +        proc_prompt(Proc, [<<"add_fun">>, FunctionSource]),
 +        ok
 +    catch {compilation_error, E} ->
 +        Fmt = "Compilation of the ~s function in the '~s' view failed: ~s",
 +        Msg = io_lib:format(Fmt, [FunctionType, FunctionName, E]),
 +        throw({compilation_error, Msg})
 +    end.
 +
 +start_doc_map(Lang, Functions, Lib) ->
 +    Proc = get_os_process(Lang),
 +    case Lib of
 +    {[]} -> ok;
 +    Lib ->
 +        true = proc_prompt(Proc, [<<"add_lib">>, Lib])
 +    end,
 +    lists:foreach(fun(FunctionSource) ->
 +        true = proc_prompt(Proc, [<<"add_fun">>, FunctionSource])
 +    end, Functions),
 +    {ok, Proc}.
 +
 +map_docs(Proc, Docs) ->
 +    % send the documents
 +    Results = lists:map(
 +        fun(Doc) ->
 +            Json = couch_doc:to_json_obj(Doc, []),
 +
 +            FunsResults = proc_prompt(Proc, [<<"map_doc">>, Json]),
 +            % the results are a json array of function map yields like this:
 +            % [FunResults1, FunResults2 ...]
 +            % where funresults is are json arrays of key value pairs:
 +            % [[Key1, Value1], [Key2, Value2]]
 +            % Convert the key, value pairs to tuples like
 +            % [{Key1, Value1}, {Key2, Value2}]
 +            lists:map(
 +                fun(FunRs) ->
 +                    [list_to_tuple(FunResult) || FunResult <- FunRs]
 +                end,
 +            FunsResults)
 +        end,
 +        Docs),
 +    {ok, Results}.
 +
- map_docs_raw(Proc, DocList) ->
-     {Mod, Fun} = Proc#proc.prompt_many_fun,
-     CommandList = lists:map(
-         fun(Doc) ->
-             EJson = couch_doc:to_json_obj(Doc, []),
-             [<<"map_doc">>, EJson]
-         end,
-         DocList),
-     Mod:Fun(Proc#proc.pid, CommandList).
++map_doc_raw(Proc, Doc) ->
++    Json = couch_doc:to_json_obj(Doc, []),
++    {ok, proc_prompt_raw(Proc, [<<"map_doc">>, Json])}.
++
 +
 +stop_doc_map(nil) ->
 +    ok;
 +stop_doc_map(Proc) ->
 +    ok = ret_os_process(Proc).
 +
 +group_reductions_results([]) ->
 +    [];
 +group_reductions_results(List) ->
 +    {Heads, Tails} = lists:foldl(
 +        fun([H|T], {HAcc,TAcc}) ->
 +            {[H|HAcc], [T|TAcc]}
 +        end, {[], []}, List),
 +    case Tails of
 +    [[]|_] -> % no tails left
 +        [Heads];
 +    _ ->
 +     [Heads | group_reductions_results(Tails)]
 +    end.
 +
 +rereduce(_Lang, [], _ReducedValues) ->
 +    {ok, []};
 +rereduce(Lang, RedSrcs, ReducedValues) ->
 +    Grouped = group_reductions_results(ReducedValues),
 +    Results = lists:zipwith(
 +        fun
 +        (<<"_", _/binary>> = FunSrc, Values) ->
 +            {ok, [Result]} = builtin_reduce(rereduce, [FunSrc], [[[], V] || V <- Values], []),
 +            Result;
 +        (FunSrc, Values) ->
 +            os_rereduce(Lang, [FunSrc], Values)
 +        end, replace_builtin_equivalents(RedSrcs), Grouped),
 +    {ok, Results}.
 +
 +reduce(_Lang, [], _KVs) ->
 +    {ok, []};
 +reduce(Lang, RedSrcs0, KVs) ->
 +    RedSrcs = replace_builtin_equivalents(RedSrcs0),
 +    {OsRedSrcs, BuiltinReds} = lists:partition(fun
 +        (<<"_", _/binary>>) -> false;
 +        (_OsFun) -> true
 +    end, RedSrcs),
 +    {ok, OsResults} = os_reduce(Lang, OsRedSrcs, KVs),
 +    {ok, BuiltinResults} = builtin_reduce(reduce, BuiltinReds, KVs, []),
 +    recombine_reduce_results(RedSrcs, OsResults, BuiltinResults, []).
 +
 +replace_builtin_equivalents([<<"_", _/binary>> = R | Rest]) ->
 +    [R | replace_builtin_equivalents(Rest)];
 +replace_builtin_equivalents([OsFun | Rest]) ->
 +    case re:run(OsFun, ?SUMREGEX) of nomatch ->
 +        case re:run(OsFun, ?COUNTREGEX) of nomatch ->
 +            [OsFun | replace_builtin_equivalents(Rest)];
 +        {match, _} ->
 +            [<<"_count">> | replace_builtin_equivalents(Rest)]
 +        end;
 +    {match, _} ->
 +        [<<"_sum">> | replace_builtin_equivalents(Rest)]
 +    end;
 +replace_builtin_equivalents([]) ->
 +    [].
 +
 +recombine_reduce_results([], [], [], Acc) ->
 +    {ok, lists:reverse(Acc)};
 +recombine_reduce_results([<<"_", _/binary>>|RedSrcs], OsResults, [BRes|BuiltinResults], Acc) ->
 +    recombine_reduce_results(RedSrcs, OsResults, BuiltinResults, [BRes|Acc]);
 +recombine_reduce_results([_OsFun|RedSrcs], [OsR|OsResults], BuiltinResults, Acc) ->
 +    recombine_reduce_results(RedSrcs, OsResults, BuiltinResults, [OsR|Acc]).
 +
 +os_reduce(_Lang, [], _KVs) ->
 +    {ok, []};
 +os_reduce(Lang, OsRedSrcs, KVs) ->
 +    Proc = get_os_process(Lang),
 +    OsResults = try proc_prompt(Proc, [<<"reduce">>, OsRedSrcs, KVs]) of
 +        [true, Reductions] -> Reductions
 +    after
 +        ok = ret_os_process(Proc)
 +    end,
 +    {ok, OsResults}.
 +
 +os_rereduce(Lang, OsRedSrcs, KVs) ->
 +    Proc = get_os_process(Lang),
 +    try proc_prompt(Proc, [<<"rereduce">>, OsRedSrcs, KVs]) of
 +        [true, [Reduction]] -> Reduction
 +    after
 +        ok = ret_os_process(Proc)
 +    end.
 +
 +
 +builtin_reduce(_Re, [], _KVs, Acc) ->
 +    {ok, lists:reverse(Acc)};
 +builtin_reduce(Re, [<<"_sum",_/binary>>|BuiltinReds], KVs, Acc) ->
 +    Sum = builtin_sum_rows(KVs),
 +    builtin_reduce(Re, BuiltinReds, KVs, [Sum|Acc]);
 +builtin_reduce(reduce, [<<"_count",_/binary>>|BuiltinReds], KVs, Acc) ->
 +    Count = length(KVs),
 +    builtin_reduce(reduce, BuiltinReds, KVs, [Count|Acc]);
 +builtin_reduce(rereduce, [<<"_count",_/binary>>|BuiltinReds], KVs, Acc) ->
 +    Count = builtin_sum_rows(KVs),
 +    builtin_reduce(rereduce, BuiltinReds, KVs, [Count|Acc]);
 +builtin_reduce(Re, [<<"_stats",_/binary>>|BuiltinReds], KVs, Acc) ->
 +    Stats = builtin_stats(Re, KVs),
 +    builtin_reduce(Re, BuiltinReds, KVs, [Stats|Acc]).
 +
 +builtin_sum_rows(KVs) ->
 +    lists:foldl(fun([_Key, Value], Acc) -> sum_values(Value, Acc) end, 0, KVs).
 +
 +sum_values({Props}, 0) ->
 +    {Props};
 +sum_values({Props}, {AccProps}) ->
 +    {sum_objects(lists:sort(Props), lists:sort(AccProps))};
 +sum_values(Value, Acc) when is_number(Value), is_number(Acc) ->
 +    Acc + Value;
 +sum_values(Value, Acc) when is_list(Value), is_list(Acc) ->
 +    sum_arrays(Acc, Value);
 +sum_values(Value, Acc) when is_number(Value), is_list(Acc) ->
 +    sum_arrays(Acc, [Value]);
 +sum_values(Value, Acc) when is_list(Value), is_number(Acc) ->
 +    sum_arrays([Acc], Value);
 +sum_values(Else, _Acc) ->
 +    throw_sum_error(Else).
 +
 +sum_objects([{K1, V1} | Rest1], [{K1, V2} | Rest2]) ->
 +    [{K1, sum_values(V1, V2)} | sum_objects(Rest1, Rest2)];
 +sum_objects([{K1, V1} | Rest1], [{K2, V2} | Rest2]) when K1 < K2 ->
 +    [{K1, V1} | sum_objects(Rest1, [{K2, V2} | Rest2])];
 +sum_objects([{K1, V1} | Rest1], [{K2, V2} | Rest2]) when K1 > K2 ->
 +    [{K2, V2} | sum_objects([{K1, V1} | Rest1], Rest2)];
 +sum_objects([], Rest) ->
 +    Rest;
 +sum_objects(Rest, []) ->
 +    Rest.
 +
 +sum_arrays([], []) ->
 +    [];
 +sum_arrays([_|_]=Xs, []) ->
 +    Xs;
 +sum_arrays([], [_|_]=Ys) ->
 +    Ys;
 +sum_arrays([X|Xs], [Y|Ys]) when is_number(X), is_number(Y) ->
 +    [X+Y | sum_arrays(Xs,Ys)];
 +sum_arrays(Else, _) ->
 +    throw_sum_error(Else).
 +
 +builtin_stats(_, []) ->
 +    {[{sum,0}, {count,0}, {min,0}, {max,0}, {sumsqr,0}]};
 +builtin_stats(_, [[_,First]|Rest]) ->
 +    Unpacked = lists:foldl(fun([_Key, Value], Acc) -> stat_values(Value, Acc) end,
 +                           build_initial_accumulator(First), Rest),
 +    pack_stats(Unpacked).
 +
 +stat_values(Value, Acc) when is_list(Value), is_list(Acc) ->
 +    lists:zipwith(fun stat_values/2, Value, Acc);
 +stat_values({PreRed}, Acc) when is_list(PreRed) ->
 +    stat_values(unpack_stats({PreRed}), Acc);
 +stat_values(Value, Acc) when is_number(Value) ->
 +    stat_values({Value, 1, Value, Value, Value*Value}, Acc);
 +stat_values(Value, Acc) when is_number(Acc) ->
 +    stat_values(Value, {Acc, 1, Acc, Acc, Acc*Acc});
 +stat_values(Value, Acc) when is_tuple(Value), is_tuple(Acc) ->
 +    {Sum0, Cnt0, Min0, Max0, Sqr0} = Value,
 +    {Sum1, Cnt1, Min1, Max1, Sqr1} = Acc,
 +    {
 +      Sum0 + Sum1,
 +      Cnt0 + Cnt1,
 +      erlang:min(Min0, Min1),
 +      erlang:max(Max0, Max1),
 +      Sqr0 + Sqr1
 +    };
 +stat_values(Else, _Acc) ->
 +    throw_stat_error(Else).
 +
 +build_initial_accumulator(L) when is_list(L) ->
 +    [build_initial_accumulator(X) || X <- L];
 +build_initial_accumulator(X) when is_number(X) ->
 +    {X, 1, X, X, X*X};
 +build_initial_accumulator({Props}) ->
 +    unpack_stats({Props});
 +build_initial_accumulator(Else) ->
 +    Msg = io_lib:format("non-numeric _stats input: ~w", [Else]),
 +    throw({invalid_value, iolist_to_binary(Msg)}).
 +
 +unpack_stats({PreRed}) when is_list(PreRed) ->
 +    {
 +      get_number(<<"sum">>, PreRed),
 +      get_number(<<"count">>, PreRed),
 +      get_number(<<"min">>, PreRed),
 +      get_number(<<"max">>, PreRed),
 +      get_number(<<"sumsqr">>, PreRed)
 +    }.
 +
 +pack_stats({Sum, Cnt, Min, Max, Sqr}) ->
 +    {[{<<"sum">>,Sum}, {<<"count">>,Cnt}, {<<"min">>,Min}, {<<"max">>,Max}, {<<"sumsqr">>,Sqr}]};
 +pack_stats(Stats) when is_list(Stats) ->
 +    lists:map(fun pack_stats/1, Stats).
 +
 +get_number(Key, Props) ->
 +    case couch_util:get_value(Key, Props) of
 +    X when is_number(X) ->
 +        X;
 +    undefined when is_binary(Key) ->
 +        get_number(binary_to_atom(Key, latin1), Props);
 +    undefined ->
 +        Msg = io_lib:format("user _stats input missing required field ~s (~p)",
 +            [Key, Props]),
 +        throw({invalid_value, iolist_to_binary(Msg)});
 +    Else ->
 +        Msg = io_lib:format("non-numeric _stats input received for ~s: ~w",
 +            [Key, Else]),
 +        throw({invalid_value, iolist_to_binary(Msg)})
 +    end.
 +
 +% use the function stored in ddoc.validate_doc_update to test an update.
 +validate_doc_update(DDoc, EditDoc, DiskDoc, Ctx, SecObj) ->
 +    JsonEditDoc = couch_doc:to_json_obj(EditDoc, [revs]),
 +    JsonDiskDoc = json_doc(DiskDoc),
 +    case ddoc_prompt(DDoc, [<<"validate_doc_update">>], [JsonEditDoc, JsonDiskDoc, Ctx, SecObj]) of
 +        1 ->
 +            ok;
 +        {[{<<"forbidden">>, Message}]} ->
 +            throw({forbidden, Message});
 +        {[{<<"unauthorized">>, Message}]} ->
 +            throw({unauthorized, Message});
 +        Message when is_binary(Message) ->
 +            throw({unknown_error, Message})
 +    end.
 +
 +json_doc(nil) -> null;
 +json_doc(Doc) ->
 +    couch_doc:to_json_obj(Doc, [revs]).
 +
 +filter_view(DDoc, VName, Docs) ->
 +    JsonDocs = [couch_doc:to_json_obj(Doc, [revs]) || Doc <- Docs],
 +    [true, Passes] = ddoc_prompt(DDoc, [<<"views">>, VName, <<"map">>], [JsonDocs]),
 +    {ok, Passes}.
 +
 +filter_docs(Req, Db, DDoc, FName, Docs) ->
 +    JsonReq = case Req of
 +    {json_req, JsonObj} ->
 +        JsonObj;
 +    #httpd{} = HttpReq ->
 +        couch_httpd_external:json_req_obj(HttpReq, Db)
 +    end,
 +    JsonDocs = [couch_doc:to_json_obj(Doc, [revs]) || Doc <- Docs],
 +    [true, Passes] = ddoc_prompt(DDoc, [<<"filters">>, FName],
 +        [JsonDocs, JsonReq]),
 +    {ok, Passes}.
 +
 +ddoc_proc_prompt({Proc, DDocId}, FunPath, Args) ->
 +    proc_prompt(Proc, [<<"ddoc">>, DDocId, FunPath, Args]).
 +
 +ddoc_prompt(DDoc, FunPath, Args) ->
 +    with_ddoc_proc(DDoc, fun({Proc, DDocId}) ->
 +        proc_prompt(Proc, [<<"ddoc">>, DDocId, FunPath, Args])
 +    end).
 +
 +with_ddoc_proc(#doc{id=DDocId,revs={Start, [DiskRev|_]}}=DDoc, Fun) ->
 +    Rev = couch_doc:rev_to_str({Start, DiskRev}),
 +    DDocKey = {DDocId, Rev},
 +    Proc = get_ddoc_process(DDoc, DDocKey),
 +    try Fun({Proc, DDocId})
 +    after
 +        ok = ret_os_process(Proc)
 +    end.
 +
 +proc_prompt(Proc, Args) ->
 +     case proc_prompt_raw(Proc, Args) of
 +     {json, Json} ->
 +         ?JSON_DECODE(Json);
 +     EJson ->
 +         EJson
 +     end.
 +
 +proc_prompt_raw(#proc{prompt_fun = {Mod, Func}} = Proc, Args) ->
 +    apply(Mod, Func, [Proc#proc.pid, Args]).
 +
 +raw_to_ejson({json, Json}) ->
 +    ?JSON_DECODE(Json);
 +raw_to_ejson(EJson) ->
 +    EJson.
 +
 +proc_stop(Proc) ->
 +    {Mod, Func} = Proc#proc.stop_fun,
 +    apply(Mod, Func, [Proc#proc.pid]).
 +
 +proc_set_timeout(Proc, Timeout) ->
 +    {Mod, Func} = Proc#proc.set_timeout_fun,
 +    apply(Mod, Func, [Proc#proc.pid, Timeout]).
 +
 +get_ddoc_process(#doc{} = DDoc, DDocKey) ->
 +    % remove this case statement
 +    case gen_server:call(couch_proc_manager, {get_proc, DDoc, DDocKey}, infinity) of
 +    {ok, Proc, {QueryConfig}} ->
 +        % process knows the ddoc
 +        case (catch proc_prompt(Proc, [<<"reset">>, {QueryConfig}])) of
 +        true ->
 +            proc_set_timeout(Proc, couch_util:get_value(<<"timeout">>, QueryConfig)),
 +            Proc;
 +        _ ->
 +            catch proc_stop(Proc),
 +            get_ddoc_process(DDoc, DDocKey)
 +        end;
 +    Error ->
 +        throw(Error)
 +    end.
 +
 +get_os_process(Lang) ->
 +    case gen_server:call(couch_proc_manager, {get_proc, Lang}, infinity) of
 +    {ok, Proc, {QueryConfig}} ->
 +        case (catch proc_prompt(Proc, [<<"reset">>, {QueryConfig}])) of
 +        true ->
 +            proc_set_timeout(Proc, couch_util:get_value(<<"timeout">>, QueryConfig)),
 +            Proc;
 +        _ ->
 +            catch proc_stop(Proc),
 +            get_os_process(Lang)
 +        end;
 +    Error ->
 +        throw(Error)
 +    end.
 +
 +ret_os_process(Proc) ->
 +    true = gen_server:call(couch_proc_manager, {ret_proc, Proc}, infinity),
 +    catch unlink(Proc#proc.pid),
 +    ok.
 +
 +throw_sum_error(Else) ->
 +    throw({invalid_value, iolist_to_binary(io_lib:format(?SUMERROR, [Else]))}).
 +
 +throw_stat_error(Else) ->
 +    throw({invalid_value, iolist_to_binary(io_lib:format(?STATERROR, [Else]))}).
 +
 +
 +-ifdef(TEST).
 +-include_lib("eunit/include/eunit.hrl").
 +
 +sum_values_test() ->
 +    ?assertEqual(3, sum_values(1, 2)),
 +    ?assertEqual([2,4,6], sum_values(1, [1,4,6])),
 +    ?assertEqual([3,5,7], sum_values([3,2,4], [0,3,3])),
 +    X = {[{<<"a">>,1}, {<<"b">>,[1,2]}, {<<"c">>, {[{<<"d">>,3}]}},
 +            {<<"g">>,1}]},
 +    Y = {[{<<"a">>,2}, {<<"b">>,3}, {<<"c">>, {[{<<"e">>, 5}]}},
 +            {<<"f">>,1}, {<<"g">>,1}]},
 +    Z = {[{<<"a">>,3}, {<<"b">>,[4,2]}, {<<"c">>, {[{<<"d">>,3},{<<"e">>,5}]}},
 +            {<<"f">>,1}, {<<"g">>,2}]},
 +    ?assertEqual(Z, sum_values(X, Y)),
 +    ?assertEqual(Z, sum_values(Y, X)).
 +
 +stat_values_test() ->
 +    ?assertEqual({1, 2, 0, 1, 1}, stat_values(1, 0)),
 +    ?assertEqual({11, 2, 1, 10, 101}, stat_values(1, 10)),
 +    ?assertEqual([{9, 2, 2, 7, 53},
 +                  {14, 2, 3, 11, 130},
 +                  {18, 2, 5, 13, 194}
 +                 ], stat_values([2,3,5], [7,11,13])).
 +
 +-endif.


Mime
View raw message