couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jch...@apache.org
Subject svn commit: r727132 - in /couchdb/trunk/src/couchdb: Makefile.am couch_db_update_notifier.erl couch_os_process.erl couch_query_servers.erl
Date Tue, 16 Dec 2008 20:31:52 GMT
Author: jchris
Date: Tue Dec 16 12:31:51 2008
New Revision: 727132

URL: http://svn.apache.org/viewvc?rev=727132&view=rev
Log:
couch_os_process to manage the JSON line protocol. thanks davisp.

Added:
    couchdb/trunk/src/couchdb/couch_os_process.erl
Modified:
    couchdb/trunk/src/couchdb/Makefile.am
    couchdb/trunk/src/couchdb/couch_db_update_notifier.erl
    couchdb/trunk/src/couchdb/couch_query_servers.erl

Modified: couchdb/trunk/src/couchdb/Makefile.am
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/Makefile.am?rev=727132&r1=727131&r2=727132&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/Makefile.am (original)
+++ couchdb/trunk/src/couchdb/Makefile.am Tue Dec 16 12:31:51 2008
@@ -55,6 +55,7 @@
     couch_httpd_misc_handlers.erl \
     couch_key_tree.erl \
     couch_log.erl \
+    couch_os_process.erl \
     couch_query_servers.erl \
     couch_rep.erl \
     couch_server.erl \
@@ -85,6 +86,7 @@
     couch_httpd_misc_handlers.beam \
     couch_key_tree.beam \
     couch_log.beam \
+    couch_os_process.beam \
     couch_query_servers.beam \
     couch_rep.beam \
     couch_server.beam \

Modified: couchdb/trunk/src/couchdb/couch_db_update_notifier.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_db_update_notifier.erl?rev=727132&r1=727131&r2=727132&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_db_update_notifier.erl (original)
+++ couchdb/trunk/src/couchdb/couch_db_update_notifier.erl Tue Dec 16 12:31:51 2008
@@ -37,12 +37,14 @@
     couch_event_sup:stop(Pid).
 
 init(Exec) when is_list(Exec) -> % an exe
-    Port = open_port({spawn, Exec}, [stream, exit_status, hide]),
-    {ok, Port};
+    {ok, couch_os_process:start_link(Exec, [], [stream, exit_status, hide])};
 init(Else) ->
     {ok, Else}.
 
-terminate(_Reason, _Port) ->
+terminate(_Reason, Pid) when is_pid(Pid) ->
+    couch_os_process:stop(Pid),
+    ok;
+terminate(_Reason, _State) ->
     ok.
 
 handle_event(Event, Fun) when is_function(Fun, 1) ->
@@ -51,16 +53,17 @@
 handle_event(Event, {Fun, FunAcc}) ->
     FunAcc2 = Fun(Event, FunAcc),
     {ok, {Fun, FunAcc2}};
-handle_event({EventAtom, DbName}, Port) ->
+handle_event({EventAtom, DbName}, Pid) ->
     Obj = {[{type, list_to_binary(atom_to_list(EventAtom))}, {db, DbName}]},
-    true = port_command(Port, ?JSON_ENCODE(Obj) ++ "\n"),
-    {ok, Port}.
+    true = couch_os_process:write(Pid, Obj),
+    {ok, Pid}.
 
 handle_call(_Request, State) ->
-    {ok, ok, State}.
+    {reply, ok, State}.
 
-handle_info({'EXIT', _, _Reason}, _Port) ->
-    remove_handler.
+handle_info({'EXIT', Pid, Reason}, Pid) ->
+    ?LOG_ERROR("Update notification process ~p died: ~p", [Pid, Reason]),
+    {stop, nil}.
 
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.

Added: couchdb/trunk/src/couchdb/couch_os_process.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_os_process.erl?rev=727132&view=auto
==============================================================================
--- couchdb/trunk/src/couchdb/couch_os_process.erl (added)
+++ couchdb/trunk/src/couchdb/couch_os_process.erl Tue Dec 16 12:31:51 2008
@@ -0,0 +1,161 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License.  You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_os_process).
+-behaviour(gen_server).
+
+-export([start_link/1, start_link/2, start_link/3, stop/1]).
+-export([set_timeout/2, write/2, read/1, prompt/2, async/3]).
+-export([writeline/2, readline/1, writejson/2, readjson/1]).
+-export([init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2, code_change/3]).
+
+-include("couch_db.hrl").
+
+-define(PORT_OPTIONS, [stream, {line, 1024}, binary, exit_status, hide]).
+
+-record(os_proc,
+    {command,
+     port,
+     writer,
+     reader,
+     timeout
+    }).
+
+start_link(Command) ->
+    start_link(Command, []).
+start_link(Command, Options) ->
+    start_link(Command, Options, ?PORT_OPTIONS).
+start_link(Command, Options, PortOptions) ->
+    gen_server:start_link(couch_os_process, [Command, Options, PortOptions], []).
+
+stop(Pid) ->
+    gen_server:cast(Pid, stop).
+
+% Read/Write API
+set_timeout(Pid, TimeOut) when is_integer(TimeOut) ->
+    ok = gen_server:call(Pid, {set_timeout, TimeOut}).
+
+write(Pid, Data) ->
+    gen_server:call(Pid, {write, Data}).
+
+read(Pid) ->
+    gen_server:call(Pid, read).
+
+prompt(Pid, Data) ->
+    gen_server:call(Pid, {prompt, Data}).
+
+async(Pid, Data, CallBack) ->
+    gen_server:cast(Pid, {async, Data, CallBack}).
+
+% Utility functions for reading and writing
+% in custom functions
+writeline(OsProc, Data) when is_record(OsProc, os_proc) ->
+    port_command(OsProc#os_proc.port, Data ++ "\n").
+
+readline(OsProc) when is_record(OsProc, os_proc) ->
+    readline(OsProc, []).
+readline(OsProc, Acc) when is_record(OsProc, os_proc) ->
+    #os_proc{port=Port} = OsProc,
+    receive
+    {Port, {data, {noeol, Data}}} ->
+        readline(OsProc, [Data|Acc]);
+    {Port, {data, {eol, Data}}} ->
+        lists:reverse(Acc, Data);
+    {Port, Err} ->
+        catch port_close(Port),
+        throw({os_process_error, Err})
+    after OsProc#os_proc.timeout ->
+        catch port_close(Port),
+        throw({os_process_error, "OS process timed out."})
+    end.
+
+% Standard JSON functions
+writejson(OsProc, Data) when is_record(OsProc, os_proc) ->
+    true = writeline(OsProc, ?JSON_ENCODE(Data)).
+
+readjson(OsProc) when is_record(OsProc, os_proc) ->
+    Line = readline(OsProc),
+    case ?JSON_DECODE(Line) of
+    {[{<<"log">>,Msg}]} when is_binary(Msg) ->
+        % we got a message to log. Log it and continue
+        ?LOG_INFO("OS Process Log Message: ~s", [Msg]),
+        readjson(OsProc);
+    {[{<<"error">>, Id}, {<<"reason">>, Reason}]} ->
+        throw({list_to_atom(binary_to_list(Id)),Reason});
+    {[{<<"reason">>, Reason}, {<<"error">>, Id}]} ->
+        throw({list_to_atom(binary_to_list(Id)),Reason});
+    Result ->
+        Result
+    end.
+
+% gen_server API
+init([Command, Options, PortOptions]) ->
+    BaseProc = #os_proc{
+        command=Command,
+        port=open_port({spawn, Command}, PortOptions),
+        writer=fun writejson/2,
+        reader=fun readjson/1,
+        timeout=5000
+    },
+    OsProc =
+    lists:foldl(fun(Opt, Proc) ->
+        case Opt of
+        {writer, Writer} when is_function(Writer) ->
+            Proc#os_proc{writer=Writer};
+        {reader, Reader} when is_function(Reader) ->
+            Proc#os_proc{reader=Reader};
+        {timeout, TimeOut} when is_integer(TimeOut) ->
+            Proc#os_proc{timeout=TimeOut}
+        end
+    end, BaseProc, Options),
+    {ok, OsProc}.
+
+terminate(_Reason, #os_proc{port=nil}) ->
+    ok;
+terminate(_Reason, #os_proc{port=Port}) ->
+    catch port_close(Port),
+    ok.
+
+handle_call({set_timeout, TimeOut}, _From, OsProc) ->
+    {reply, ok, OsProc#os_proc{timeout=TimeOut}};
+handle_call({write, Data}, _From, OsProc) ->
+    Writer = OsProc#os_proc.writer,
+    {reply, Writer(OsProc, Data), OsProc};
+handle_call(read, _From, OsProc) ->
+    Reader = OsProc#os_proc.reader,
+    {reply, Reader(OsProc), OsProc};
+handle_call({prompt, Data}, _From, OsProc) ->
+    #os_proc{writer=Writer, reader=Reader} = OsProc,
+    Writer(OsProc, Data),
+    {reply, Reader(OsProc), OsProc}.
+
+handle_cast({async, Data, CallBack}, OsProc) ->
+    #os_proc{writer=Writer, reader=Reader} = OsProc,
+    Writer(OsProc, Data),
+    CallBack(Reader(OsProc)),
+    {noreply, OsProc};
+handle_cast(stop, OsProc) ->
+    {stop, normal, OsProc};
+handle_cast(Msg, OsProc) ->
+    ?LOG_DEBUG("OS Proc: Unknown cast: ~p", [Msg]),
+    {noreply, OsProc}.
+
+handle_info({Port, {exit_status, Status}}, #os_proc{port=Port}=OsProc) ->
+    ?LOG_ERROR("OS Process died with status: ~p", [Status]),
+    {stop, error, OsProc};
+handle_info(Msg, OsProc) ->
+    ?LOG_DEBUG("OS Proc: Unknown info: ~p", [Msg]),
+    {noreply, OsProc}.
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+

Modified: couchdb/trunk/src/couchdb/couch_query_servers.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_query_servers.erl?rev=727132&r1=727131&r2=727132&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_query_servers.erl (original)
+++ couchdb/trunk/src/couchdb/couch_query_servers.erl Tue Dec 16 12:31:51 2008
@@ -28,71 +28,20 @@
 stop() ->
     exit(whereis(couch_query_servers), close).
 
-readline(Port) ->
-    readline(Port, []).
-
-readline(Port, Acc) ->
-    case get(query_server_timeout) of
-    undefined ->
-        Timeout = list_to_integer(couch_config:get(
-            "couchdb", "view_timeout", "5000")),
-        put(query_server_timeout, Timeout);
-    Timeout -> ok
-    end,
-    receive
-    {Port, {data, {noeol, Data}}} ->
-        readline(Port, [Data|Acc]);
-    {Port, {data, {eol, Data}}} ->
-        lists:reverse(Acc, Data);
-    {Port, Err} ->
-        catch port_close(Port),
-        throw({external_process_error, Err})
-    after Timeout ->
-        catch port_close(Port),
-        throw({external_process_error, "External process timed out"})
-    end.
-
-read_json(Port) ->
-    Line = readline(Port),
-    case ?JSON_DECODE(Line) of
-    {[{<<"log">>,Msg}]} when is_binary(Msg) ->
-        % we got a message to log. Log it and continue
-        ?LOG_INFO("Query Server Log Message: ~s", [Msg]),
-        read_json(Port);
-    Else ->
-        Else
-    end.
-
-% send command and get a response.
-prompt(Port, Json) ->
-    Bin = iolist_to_binary([?JSON_ENCODE(Json) , "\n"]),
-    true = port_command(Port, Bin),
-    case read_json(Port) of
-    {[{<<"error">>, Id}, {<<"reason">>, Reason}]} ->
-        throw({Id,Reason});
-    {[{<<"reason">>, Reason}, {<<"error">>, Id}]} ->
-        throw({Id,Reason});
-    Result ->
-        Result
-    end.
-
-
 start_doc_map(Lang, Functions) ->
-    Port = get_linked_port(Lang),
-    % send the functions as json strings
+    Pid = get_os_process(Lang),
     lists:foreach(fun(FunctionSource) ->
-            true = prompt(Port, [<<"add_fun">>, FunctionSource])
-        end,
-        Functions),
-    {ok, {Lang, Port}}.
+        true = couch_os_process:prompt(Pid, [<<"add_fun">>, FunctionSource])
+    end, Functions),
+    {ok, {Lang, Pid}}.
 
-map_docs({_Lang, Port}, Docs) ->
+map_docs({_Lang, Pid}, Docs) ->
     % send the documents
     Results = lists:map(
         fun(Doc) ->
             Json = couch_doc:to_json_obj(Doc, []),
             
-            FunsResults = prompt(Port, [<<"map_doc">>, Json]),
+            FunsResults = couch_os_process:prompt(Pid, [<<"map_doc">>, Json]),
             % the results are a json array of function map yields like this:
             % [FunResults1, FunResults2 ...]
             % where funresults is are json arrays of key value pairs:
@@ -111,30 +60,8 @@
 
 stop_doc_map(nil) ->
     ok;
-stop_doc_map({Lang, Port}) ->
-    return_linked_port(Lang, Port).
-    
-get_linked_port(Lang) ->
-    case gen_server:call(couch_query_servers, {get_port, Lang}) of
-    {ok, Port0} ->
-        link(Port0),
-        true = prompt(Port0, [<<"reset">>]),
-        Port0;
-    {empty, Cmd} ->
-        ?LOG_INFO("Spawning new ~s instance.", [Lang]),
-        open_port({spawn, Cmd}, [stream,
-                                    {line, 1000},
-                                    binary,
-                                    exit_status,
-                                    hide]);
-    Error ->
-        throw(Error)
-    end.
-
-return_linked_port(Lang, Port) ->
-    ok = gen_server:call(couch_query_servers, {return_port, {Lang, Port}}),
-    true = unlink(Port),
-    ok.
+stop_doc_map({Lang, Pid}) ->
+    ok = ret_os_process(Lang, Pid).
 
 group_reductions_results([]) ->
     [];
@@ -153,29 +80,29 @@
 rereduce(_Lang, [], _ReducedValues) ->
     {ok, []};
 rereduce(Lang, RedSrcs, ReducedValues) ->
-    Port = get_linked_port(Lang),
+    Pid = get_os_process(Lang),
     Grouped = group_reductions_results(ReducedValues),
     Results = lists:zipwith(
         fun(FunSrc, Values) ->
             [true, [Result]] = 
-                prompt(Port, [<<"rereduce">>, [FunSrc], Values]),
+                couch_os_process:prompt(Pid, [<<"rereduce">>, [FunSrc], Values]),
             Result
         end, RedSrcs, Grouped),
         
-    return_linked_port(Lang, Port),
+    ok = ret_os_process(Lang, Pid),
     {ok, Results}.
 
 reduce(_Lang, [], _KVs) ->
     {ok, []};
 reduce(Lang, RedSrcs, KVs) ->
-    Port = get_linked_port(Lang),
-    [true, Results] = prompt(Port, 
+    Pid = get_os_process(Lang),
+    [true, Results] = couch_os_process:prompt(Pid, 
             [<<"reduce">>, RedSrcs, KVs]),
-    return_linked_port(Lang, Port),
+    ok = ret_os_process(Lang, Pid),
     {ok, Results}.
 
 validate_doc_update(Lang, FunSrc, EditDoc, DiskDoc, Ctx) ->
-    Port = get_linked_port(Lang),
+    Pid = get_os_process(Lang),
     JsonEditDoc = couch_doc:to_json_obj(EditDoc, [revs]),
     JsonDiskDoc =
     if DiskDoc == nil ->
@@ -183,7 +110,7 @@
     true -> 
         couch_doc:to_json_obj(DiskDoc, [revs])
     end,
-    try prompt(Port, 
+    try couch_os_process:prompt(Pid, 
             [<<"validate">>, FunSrc, JsonEditDoc, JsonDiskDoc, Ctx]) of
     1 ->
         ok;
@@ -192,7 +119,7 @@
     {[{<<"unauthorized">>, Message}]} ->
         throw({unauthorized, Message})
     after
-        return_linked_port(Lang, Port)
+        ok = ret_os_process(Lang, Pid)
     end.
 
 init([]) ->
@@ -206,72 +133,101 @@
         fun("query_servers" ++ _, _) ->
             ?MODULE:stop()
         end),
-        
-    QueryServers = couch_config:get("query_servers"),
-    QueryServers2 = 
-        [{list_to_binary(Lang), Path} || {Lang, Path} <- QueryServers],
-        
-    {ok, {QueryServers2, []}}.
+
+    Langs = ets:new(couch_query_server_langs, [set, private]),
+    PidLangs = ets:new(couch_query_server_pid_langs, [set, private]),
+    Pids = ets:new(couch_query_server_procs, [set, private]),
+    InUse = ets:new(couch_query_server_used, [set, private]),
+    lists:foreach(fun({Lang, Command}) ->
+        true = ets:insert(Langs, {?l2b(Lang), Command})
+    end, couch_config:get("query_servers")),
+    {ok, {Langs, PidLangs, Pids, InUse}}.
 
 terminate(_Reason, _Server) ->
     ok.
 
 
-handle_call({get_port, Lang}, {FromPid, _}, {QueryServerList, LangPorts}) ->
-    case proplists:get_value(Lang, LangPorts) of
-    undefined ->
-        case proplists:get_value(Lang, QueryServerList) of
-        undefined -> % not a supported language
-            {reply, {query_language_unknown, Lang}, {QueryServerList, LangPorts}};
-        ServerCmd ->
-            {reply, {empty, ServerCmd}, {QueryServerList, LangPorts}}
-        end;
-    Port ->
-        Result =
-        case catch port_connect(Port, FromPid) of
-        true ->
-            true = unlink(Port),
-            {ok, Port};
-        Error ->
-            catch port_close(Port),
-            Error
-        end,
-        {reply, Result, {QueryServerList, LangPorts -- [{Lang,Port}]}}
-    end;
-handle_call({return_port, {Lang, Port}}, _From, {QueryServerList, LangPorts}) ->
-    case catch port_connect(Port, self()) of
-    true ->
-        {reply, ok, {QueryServerList, [{Lang, Port} | LangPorts]}};
+handle_call({get_proc, Lang}, _From, {Langs, PidLangs, Pids, InUse}=Server) ->
+    % Note to future self. Add max process limit.
+    case ets:lookup(Pids, Lang) of
+    [{Lang, [Pid|_]}] ->
+        add_value(PidLangs, Pid, Lang),
+        rem_from_list(Pids, Lang, Pid),
+        add_to_list(InUse, Lang, Pid),
+        true = couch_os_process:prompt(Pid, [<<"reset">>]),
+        {reply, Pid, Server};
     _ ->
-        catch port_close(Port),
-        {reply, ok, {QueryServerList, LangPorts}}
-    end.
-
-handle_cast(_Whatever, {Cmd, Ports}) ->
-    {noreply, {Cmd, Ports}}.
-
-handle_info({Port, {exit_status, Status}}, {QueryServerList, LangPorts}) ->
-    case lists:keysearch(Port, 2, LangPorts) of
-    {value, {Lang, _}} ->
+        {ok, Pid} = new_process(Langs, Lang),
+        add_to_list(InUse, Lang, Pid),
+        {reply, Pid, Server}
+    end;
+handle_call({ret_proc, Lang, Pid}, _From, {_, _, Pids, InUse}=Server) ->
+    % Along with max process limit, here we should check
+    % if we're over the limit and discard when we are.
+    add_to_list(Pids, Lang, Pid),
+    rem_from_list(InUse, Lang, Pid),
+    {reply, true, Server}.
+
+handle_cast(_Whatever, Server) ->
+    {noreply, Server}.
+
+handle_info({'EXIT', Pid, Status}, {Langs, PidLangs, Pids, InUse}) ->
+    case ets:lookup(PidLangs, Pid) of
+    [{Pid, Lang}] ->
         case Status of
-        0 -> ok;
-        _ -> ?LOG_ERROR("Abnormal shutdown of ~s query server process (exit_status: ~w).",
[Lang, Status])
+        normal -> ok;
+        _ -> ?LOG_DEBUG("Linked process died abnromally: ~p (reason: ~p)", [Pid, Status])
         end,
-        {noreply, {QueryServerList,  lists:keydelete(Port, 2, LangPorts)}};
-    _ ->
-        ?LOG_ERROR("Unknown linked port/process crash: ~p", [Port])
+        {ok, {
+            Langs,
+            rem_value(PidLangs, Pid),
+            rem_from_list(Pids, Lang, Pid),
+            rem_from_list(InUse, Lang, Pid)
+        }};
+    [] ->
+        ?LOG_DEBUG("Unknown linked process died: ~p (reason: ~p)", [Pid, Status]),
+        {ok, {Langs, PidLangs, Pids, InUse}}
     end.
 
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
 
-% test() ->
-%     test("../js/js -f main.js").
-% 
-% test(Cmd) ->
-%     start_link(Cmd),
-%     {ok, DocMap} = start_doc_map(<<"javascript">>, [<<"function(doc)
{if (doc[0] == 'a') return doc[1];}">>]),
-%     {ok, Results} = map_docs(DocMap, [#doc{body={"a", "b"}}, #doc{body={"c", "d"}},#doc{body={"a",
"c"}}]),
-%     io:format("Results: ~w~n", [Results]),
-%     stop_doc_map(DocMap),
-%     ok.
+% Private API
+
+new_process(Langs, Lang) ->
+    Proc =
+    case ets:lookup(Langs, Lang) of
+    [{Lang, Command}] ->
+        couch_os_process:start_link(Command);
+    _ ->
+        throw({unknown_query_language, Lang})
+    end,
+    Proc.
+
+get_os_process(Lang) ->
+    gen_server:call(couch_query_servers, {get_proc, Lang}).
+
+ret_os_process(Lang, Pid) ->
+    true = gen_server:call(couch_query_servers, {ret_proc, Lang, Pid}),
+    ok.
+
+add_value(Tid, Key, Value) ->
+    true = ets:insert(Tid, {Key, Value}).
+
+rem_value(Tid, Key) ->
+    true = ets:insert(Tid, Key).
+
+add_to_list(Tid, Key, Value) ->
+    case ets:lookup(Tid, Key) of
+    [{Key, Vals}] ->
+        true = ets:insert(Tid, {Key, [Value|Vals]});
+    [] ->
+        true = ets:insert(Tid, {Key, [Value]})
+    end.
+
+rem_from_list(Tid, Key, Value) ->
+    case ets:lookup(Tid, Key) of
+    [{Key, Vals}] ->
+        ets:insert(Tid, {Key, [Val || Val <- Vals, Val /= Value]});
+    [] -> ok
+    end.



Mime
View raw message