Return-Path: Delivered-To: apmail-couchdb-commits-archive@www.apache.org Received: (qmail 76513 invoked from network); 16 Dec 2008 20:32:18 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 16 Dec 2008 20:32:18 -0000 Received: (qmail 29444 invoked by uid 500); 16 Dec 2008 20:32:31 -0000 Delivered-To: apmail-couchdb-commits-archive@couchdb.apache.org Received: (qmail 29366 invoked by uid 500); 16 Dec 2008 20:32:30 -0000 Mailing-List: contact commits-help@couchdb.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@couchdb.apache.org Delivered-To: mailing list commits@couchdb.apache.org Received: (qmail 29357 invoked by uid 99); 16 Dec 2008 20:32:30 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 16 Dec 2008 12:32:30 -0800 X-ASF-Spam-Status: No, hits=0.0 required=10.0 tests= X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 16 Dec 2008 20:32:15 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id DB360238889B; Tue, 16 Dec 2008 12:31:53 -0800 (PST) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r727132 - in /couchdb/trunk/src/couchdb: Makefile.am couch_db_update_notifier.erl couch_os_process.erl couch_query_servers.erl Date: Tue, 16 Dec 2008 20:31:52 -0000 To: commits@couchdb.apache.org From: jchris@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20081216203153.DB360238889B@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: jchris Date: Tue Dec 16 12:31:51 2008 New Revision: 727132 URL: http://svn.apache.org/viewvc?rev=727132&view=rev Log: couch_os_process to manage the JSON line protocol. thanks davisp. Added: couchdb/trunk/src/couchdb/couch_os_process.erl Modified: couchdb/trunk/src/couchdb/Makefile.am couchdb/trunk/src/couchdb/couch_db_update_notifier.erl couchdb/trunk/src/couchdb/couch_query_servers.erl Modified: couchdb/trunk/src/couchdb/Makefile.am URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/Makefile.am?rev=727132&r1=727131&r2=727132&view=diff ============================================================================== --- couchdb/trunk/src/couchdb/Makefile.am (original) +++ couchdb/trunk/src/couchdb/Makefile.am Tue Dec 16 12:31:51 2008 @@ -55,6 +55,7 @@ couch_httpd_misc_handlers.erl \ couch_key_tree.erl \ couch_log.erl \ + couch_os_process.erl \ couch_query_servers.erl \ couch_rep.erl \ couch_server.erl \ @@ -85,6 +86,7 @@ couch_httpd_misc_handlers.beam \ couch_key_tree.beam \ couch_log.beam \ + couch_os_process.beam \ couch_query_servers.beam \ couch_rep.beam \ couch_server.beam \ Modified: couchdb/trunk/src/couchdb/couch_db_update_notifier.erl URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_db_update_notifier.erl?rev=727132&r1=727131&r2=727132&view=diff ============================================================================== --- couchdb/trunk/src/couchdb/couch_db_update_notifier.erl (original) +++ couchdb/trunk/src/couchdb/couch_db_update_notifier.erl Tue Dec 16 12:31:51 2008 @@ -37,12 +37,14 @@ couch_event_sup:stop(Pid). init(Exec) when is_list(Exec) -> % an exe - Port = open_port({spawn, Exec}, [stream, exit_status, hide]), - {ok, Port}; + {ok, couch_os_process:start_link(Exec, [], [stream, exit_status, hide])}; init(Else) -> {ok, Else}. -terminate(_Reason, _Port) -> +terminate(_Reason, Pid) when is_pid(Pid) -> + couch_os_process:stop(Pid), + ok; +terminate(_Reason, _State) -> ok. handle_event(Event, Fun) when is_function(Fun, 1) -> @@ -51,16 +53,17 @@ handle_event(Event, {Fun, FunAcc}) -> FunAcc2 = Fun(Event, FunAcc), {ok, {Fun, FunAcc2}}; -handle_event({EventAtom, DbName}, Port) -> +handle_event({EventAtom, DbName}, Pid) -> Obj = {[{type, list_to_binary(atom_to_list(EventAtom))}, {db, DbName}]}, - true = port_command(Port, ?JSON_ENCODE(Obj) ++ "\n"), - {ok, Port}. + true = couch_os_process:write(Pid, Obj), + {ok, Pid}. handle_call(_Request, State) -> - {ok, ok, State}. + {reply, ok, State}. -handle_info({'EXIT', _, _Reason}, _Port) -> - remove_handler. +handle_info({'EXIT', Pid, Reason}, Pid) -> + ?LOG_ERROR("Update notification process ~p died: ~p", [Pid, Reason]), + {stop, nil}. code_change(_OldVsn, State, _Extra) -> {ok, State}. Added: couchdb/trunk/src/couchdb/couch_os_process.erl URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_os_process.erl?rev=727132&view=auto ============================================================================== --- couchdb/trunk/src/couchdb/couch_os_process.erl (added) +++ couchdb/trunk/src/couchdb/couch_os_process.erl Tue Dec 16 12:31:51 2008 @@ -0,0 +1,161 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_os_process). +-behaviour(gen_server). + +-export([start_link/1, start_link/2, start_link/3, stop/1]). +-export([set_timeout/2, write/2, read/1, prompt/2, async/3]). +-export([writeline/2, readline/1, writejson/2, readjson/1]). +-export([init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2, code_change/3]). + +-include("couch_db.hrl"). + +-define(PORT_OPTIONS, [stream, {line, 1024}, binary, exit_status, hide]). + +-record(os_proc, + {command, + port, + writer, + reader, + timeout + }). + +start_link(Command) -> + start_link(Command, []). +start_link(Command, Options) -> + start_link(Command, Options, ?PORT_OPTIONS). +start_link(Command, Options, PortOptions) -> + gen_server:start_link(couch_os_process, [Command, Options, PortOptions], []). + +stop(Pid) -> + gen_server:cast(Pid, stop). + +% Read/Write API +set_timeout(Pid, TimeOut) when is_integer(TimeOut) -> + ok = gen_server:call(Pid, {set_timeout, TimeOut}). + +write(Pid, Data) -> + gen_server:call(Pid, {write, Data}). + +read(Pid) -> + gen_server:call(Pid, read). + +prompt(Pid, Data) -> + gen_server:call(Pid, {prompt, Data}). + +async(Pid, Data, CallBack) -> + gen_server:cast(Pid, {async, Data, CallBack}). + +% Utility functions for reading and writing +% in custom functions +writeline(OsProc, Data) when is_record(OsProc, os_proc) -> + port_command(OsProc#os_proc.port, Data ++ "\n"). + +readline(OsProc) when is_record(OsProc, os_proc) -> + readline(OsProc, []). +readline(OsProc, Acc) when is_record(OsProc, os_proc) -> + #os_proc{port=Port} = OsProc, + receive + {Port, {data, {noeol, Data}}} -> + readline(OsProc, [Data|Acc]); + {Port, {data, {eol, Data}}} -> + lists:reverse(Acc, Data); + {Port, Err} -> + catch port_close(Port), + throw({os_process_error, Err}) + after OsProc#os_proc.timeout -> + catch port_close(Port), + throw({os_process_error, "OS process timed out."}) + end. + +% Standard JSON functions +writejson(OsProc, Data) when is_record(OsProc, os_proc) -> + true = writeline(OsProc, ?JSON_ENCODE(Data)). + +readjson(OsProc) when is_record(OsProc, os_proc) -> + Line = readline(OsProc), + case ?JSON_DECODE(Line) of + {[{<<"log">>,Msg}]} when is_binary(Msg) -> + % we got a message to log. Log it and continue + ?LOG_INFO("OS Process Log Message: ~s", [Msg]), + readjson(OsProc); + {[{<<"error">>, Id}, {<<"reason">>, Reason}]} -> + throw({list_to_atom(binary_to_list(Id)),Reason}); + {[{<<"reason">>, Reason}, {<<"error">>, Id}]} -> + throw({list_to_atom(binary_to_list(Id)),Reason}); + Result -> + Result + end. + +% gen_server API +init([Command, Options, PortOptions]) -> + BaseProc = #os_proc{ + command=Command, + port=open_port({spawn, Command}, PortOptions), + writer=fun writejson/2, + reader=fun readjson/1, + timeout=5000 + }, + OsProc = + lists:foldl(fun(Opt, Proc) -> + case Opt of + {writer, Writer} when is_function(Writer) -> + Proc#os_proc{writer=Writer}; + {reader, Reader} when is_function(Reader) -> + Proc#os_proc{reader=Reader}; + {timeout, TimeOut} when is_integer(TimeOut) -> + Proc#os_proc{timeout=TimeOut} + end + end, BaseProc, Options), + {ok, OsProc}. + +terminate(_Reason, #os_proc{port=nil}) -> + ok; +terminate(_Reason, #os_proc{port=Port}) -> + catch port_close(Port), + ok. + +handle_call({set_timeout, TimeOut}, _From, OsProc) -> + {reply, ok, OsProc#os_proc{timeout=TimeOut}}; +handle_call({write, Data}, _From, OsProc) -> + Writer = OsProc#os_proc.writer, + {reply, Writer(OsProc, Data), OsProc}; +handle_call(read, _From, OsProc) -> + Reader = OsProc#os_proc.reader, + {reply, Reader(OsProc), OsProc}; +handle_call({prompt, Data}, _From, OsProc) -> + #os_proc{writer=Writer, reader=Reader} = OsProc, + Writer(OsProc, Data), + {reply, Reader(OsProc), OsProc}. + +handle_cast({async, Data, CallBack}, OsProc) -> + #os_proc{writer=Writer, reader=Reader} = OsProc, + Writer(OsProc, Data), + CallBack(Reader(OsProc)), + {noreply, OsProc}; +handle_cast(stop, OsProc) -> + {stop, normal, OsProc}; +handle_cast(Msg, OsProc) -> + ?LOG_DEBUG("OS Proc: Unknown cast: ~p", [Msg]), + {noreply, OsProc}. + +handle_info({Port, {exit_status, Status}}, #os_proc{port=Port}=OsProc) -> + ?LOG_ERROR("OS Process died with status: ~p", [Status]), + {stop, error, OsProc}; +handle_info(Msg, OsProc) -> + ?LOG_DEBUG("OS Proc: Unknown info: ~p", [Msg]), + {noreply, OsProc}. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + Modified: couchdb/trunk/src/couchdb/couch_query_servers.erl URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_query_servers.erl?rev=727132&r1=727131&r2=727132&view=diff ============================================================================== --- couchdb/trunk/src/couchdb/couch_query_servers.erl (original) +++ couchdb/trunk/src/couchdb/couch_query_servers.erl Tue Dec 16 12:31:51 2008 @@ -28,71 +28,20 @@ stop() -> exit(whereis(couch_query_servers), close). -readline(Port) -> - readline(Port, []). - -readline(Port, Acc) -> - case get(query_server_timeout) of - undefined -> - Timeout = list_to_integer(couch_config:get( - "couchdb", "view_timeout", "5000")), - put(query_server_timeout, Timeout); - Timeout -> ok - end, - receive - {Port, {data, {noeol, Data}}} -> - readline(Port, [Data|Acc]); - {Port, {data, {eol, Data}}} -> - lists:reverse(Acc, Data); - {Port, Err} -> - catch port_close(Port), - throw({external_process_error, Err}) - after Timeout -> - catch port_close(Port), - throw({external_process_error, "External process timed out"}) - end. - -read_json(Port) -> - Line = readline(Port), - case ?JSON_DECODE(Line) of - {[{<<"log">>,Msg}]} when is_binary(Msg) -> - % we got a message to log. Log it and continue - ?LOG_INFO("Query Server Log Message: ~s", [Msg]), - read_json(Port); - Else -> - Else - end. - -% send command and get a response. -prompt(Port, Json) -> - Bin = iolist_to_binary([?JSON_ENCODE(Json) , "\n"]), - true = port_command(Port, Bin), - case read_json(Port) of - {[{<<"error">>, Id}, {<<"reason">>, Reason}]} -> - throw({Id,Reason}); - {[{<<"reason">>, Reason}, {<<"error">>, Id}]} -> - throw({Id,Reason}); - Result -> - Result - end. - - start_doc_map(Lang, Functions) -> - Port = get_linked_port(Lang), - % send the functions as json strings + Pid = get_os_process(Lang), lists:foreach(fun(FunctionSource) -> - true = prompt(Port, [<<"add_fun">>, FunctionSource]) - end, - Functions), - {ok, {Lang, Port}}. + true = couch_os_process:prompt(Pid, [<<"add_fun">>, FunctionSource]) + end, Functions), + {ok, {Lang, Pid}}. -map_docs({_Lang, Port}, Docs) -> +map_docs({_Lang, Pid}, Docs) -> % send the documents Results = lists:map( fun(Doc) -> Json = couch_doc:to_json_obj(Doc, []), - FunsResults = prompt(Port, [<<"map_doc">>, Json]), + FunsResults = couch_os_process:prompt(Pid, [<<"map_doc">>, Json]), % the results are a json array of function map yields like this: % [FunResults1, FunResults2 ...] % where funresults is are json arrays of key value pairs: @@ -111,30 +60,8 @@ stop_doc_map(nil) -> ok; -stop_doc_map({Lang, Port}) -> - return_linked_port(Lang, Port). - -get_linked_port(Lang) -> - case gen_server:call(couch_query_servers, {get_port, Lang}) of - {ok, Port0} -> - link(Port0), - true = prompt(Port0, [<<"reset">>]), - Port0; - {empty, Cmd} -> - ?LOG_INFO("Spawning new ~s instance.", [Lang]), - open_port({spawn, Cmd}, [stream, - {line, 1000}, - binary, - exit_status, - hide]); - Error -> - throw(Error) - end. - -return_linked_port(Lang, Port) -> - ok = gen_server:call(couch_query_servers, {return_port, {Lang, Port}}), - true = unlink(Port), - ok. +stop_doc_map({Lang, Pid}) -> + ok = ret_os_process(Lang, Pid). group_reductions_results([]) -> []; @@ -153,29 +80,29 @@ rereduce(_Lang, [], _ReducedValues) -> {ok, []}; rereduce(Lang, RedSrcs, ReducedValues) -> - Port = get_linked_port(Lang), + Pid = get_os_process(Lang), Grouped = group_reductions_results(ReducedValues), Results = lists:zipwith( fun(FunSrc, Values) -> [true, [Result]] = - prompt(Port, [<<"rereduce">>, [FunSrc], Values]), + couch_os_process:prompt(Pid, [<<"rereduce">>, [FunSrc], Values]), Result end, RedSrcs, Grouped), - return_linked_port(Lang, Port), + ok = ret_os_process(Lang, Pid), {ok, Results}. reduce(_Lang, [], _KVs) -> {ok, []}; reduce(Lang, RedSrcs, KVs) -> - Port = get_linked_port(Lang), - [true, Results] = prompt(Port, + Pid = get_os_process(Lang), + [true, Results] = couch_os_process:prompt(Pid, [<<"reduce">>, RedSrcs, KVs]), - return_linked_port(Lang, Port), + ok = ret_os_process(Lang, Pid), {ok, Results}. validate_doc_update(Lang, FunSrc, EditDoc, DiskDoc, Ctx) -> - Port = get_linked_port(Lang), + Pid = get_os_process(Lang), JsonEditDoc = couch_doc:to_json_obj(EditDoc, [revs]), JsonDiskDoc = if DiskDoc == nil -> @@ -183,7 +110,7 @@ true -> couch_doc:to_json_obj(DiskDoc, [revs]) end, - try prompt(Port, + try couch_os_process:prompt(Pid, [<<"validate">>, FunSrc, JsonEditDoc, JsonDiskDoc, Ctx]) of 1 -> ok; @@ -192,7 +119,7 @@ {[{<<"unauthorized">>, Message}]} -> throw({unauthorized, Message}) after - return_linked_port(Lang, Port) + ok = ret_os_process(Lang, Pid) end. init([]) -> @@ -206,72 +133,101 @@ fun("query_servers" ++ _, _) -> ?MODULE:stop() end), - - QueryServers = couch_config:get("query_servers"), - QueryServers2 = - [{list_to_binary(Lang), Path} || {Lang, Path} <- QueryServers], - - {ok, {QueryServers2, []}}. + + Langs = ets:new(couch_query_server_langs, [set, private]), + PidLangs = ets:new(couch_query_server_pid_langs, [set, private]), + Pids = ets:new(couch_query_server_procs, [set, private]), + InUse = ets:new(couch_query_server_used, [set, private]), + lists:foreach(fun({Lang, Command}) -> + true = ets:insert(Langs, {?l2b(Lang), Command}) + end, couch_config:get("query_servers")), + {ok, {Langs, PidLangs, Pids, InUse}}. terminate(_Reason, _Server) -> ok. -handle_call({get_port, Lang}, {FromPid, _}, {QueryServerList, LangPorts}) -> - case proplists:get_value(Lang, LangPorts) of - undefined -> - case proplists:get_value(Lang, QueryServerList) of - undefined -> % not a supported language - {reply, {query_language_unknown, Lang}, {QueryServerList, LangPorts}}; - ServerCmd -> - {reply, {empty, ServerCmd}, {QueryServerList, LangPorts}} - end; - Port -> - Result = - case catch port_connect(Port, FromPid) of - true -> - true = unlink(Port), - {ok, Port}; - Error -> - catch port_close(Port), - Error - end, - {reply, Result, {QueryServerList, LangPorts -- [{Lang,Port}]}} - end; -handle_call({return_port, {Lang, Port}}, _From, {QueryServerList, LangPorts}) -> - case catch port_connect(Port, self()) of - true -> - {reply, ok, {QueryServerList, [{Lang, Port} | LangPorts]}}; +handle_call({get_proc, Lang}, _From, {Langs, PidLangs, Pids, InUse}=Server) -> + % Note to future self. Add max process limit. + case ets:lookup(Pids, Lang) of + [{Lang, [Pid|_]}] -> + add_value(PidLangs, Pid, Lang), + rem_from_list(Pids, Lang, Pid), + add_to_list(InUse, Lang, Pid), + true = couch_os_process:prompt(Pid, [<<"reset">>]), + {reply, Pid, Server}; _ -> - catch port_close(Port), - {reply, ok, {QueryServerList, LangPorts}} - end. - -handle_cast(_Whatever, {Cmd, Ports}) -> - {noreply, {Cmd, Ports}}. - -handle_info({Port, {exit_status, Status}}, {QueryServerList, LangPorts}) -> - case lists:keysearch(Port, 2, LangPorts) of - {value, {Lang, _}} -> + {ok, Pid} = new_process(Langs, Lang), + add_to_list(InUse, Lang, Pid), + {reply, Pid, Server} + end; +handle_call({ret_proc, Lang, Pid}, _From, {_, _, Pids, InUse}=Server) -> + % Along with max process limit, here we should check + % if we're over the limit and discard when we are. + add_to_list(Pids, Lang, Pid), + rem_from_list(InUse, Lang, Pid), + {reply, true, Server}. + +handle_cast(_Whatever, Server) -> + {noreply, Server}. + +handle_info({'EXIT', Pid, Status}, {Langs, PidLangs, Pids, InUse}) -> + case ets:lookup(PidLangs, Pid) of + [{Pid, Lang}] -> case Status of - 0 -> ok; - _ -> ?LOG_ERROR("Abnormal shutdown of ~s query server process (exit_status: ~w).", [Lang, Status]) + normal -> ok; + _ -> ?LOG_DEBUG("Linked process died abnromally: ~p (reason: ~p)", [Pid, Status]) end, - {noreply, {QueryServerList, lists:keydelete(Port, 2, LangPorts)}}; - _ -> - ?LOG_ERROR("Unknown linked port/process crash: ~p", [Port]) + {ok, { + Langs, + rem_value(PidLangs, Pid), + rem_from_list(Pids, Lang, Pid), + rem_from_list(InUse, Lang, Pid) + }}; + [] -> + ?LOG_DEBUG("Unknown linked process died: ~p (reason: ~p)", [Pid, Status]), + {ok, {Langs, PidLangs, Pids, InUse}} end. code_change(_OldVsn, State, _Extra) -> {ok, State}. -% test() -> -% test("../js/js -f main.js"). -% -% test(Cmd) -> -% start_link(Cmd), -% {ok, DocMap} = start_doc_map(<<"javascript">>, [<<"function(doc) {if (doc[0] == 'a') return doc[1];}">>]), -% {ok, Results} = map_docs(DocMap, [#doc{body={"a", "b"}}, #doc{body={"c", "d"}},#doc{body={"a", "c"}}]), -% io:format("Results: ~w~n", [Results]), -% stop_doc_map(DocMap), -% ok. +% Private API + +new_process(Langs, Lang) -> + Proc = + case ets:lookup(Langs, Lang) of + [{Lang, Command}] -> + couch_os_process:start_link(Command); + _ -> + throw({unknown_query_language, Lang}) + end, + Proc. + +get_os_process(Lang) -> + gen_server:call(couch_query_servers, {get_proc, Lang}). + +ret_os_process(Lang, Pid) -> + true = gen_server:call(couch_query_servers, {ret_proc, Lang, Pid}), + ok. + +add_value(Tid, Key, Value) -> + true = ets:insert(Tid, {Key, Value}). + +rem_value(Tid, Key) -> + true = ets:insert(Tid, Key). + +add_to_list(Tid, Key, Value) -> + case ets:lookup(Tid, Key) of + [{Key, Vals}] -> + true = ets:insert(Tid, {Key, [Value|Vals]}); + [] -> + true = ets:insert(Tid, {Key, [Value]}) + end. + +rem_from_list(Tid, Key, Value) -> + case ets:lookup(Tid, Key) of + [{Key, Vals}] -> + ets:insert(Tid, {Key, [Val || Val <- Vals, Val /= Value]}); + [] -> ok + end.