couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fdman...@apache.org
Subject [2/2] git commit: More efficient communication with the view server
Date Wed, 16 Nov 2011 11:56:18 GMT
More efficient communication with the view server

This change makes the communication between the Erlang VM and
an external view server (couchjs for e.g.) more efficient by
writing a series of commands into the port and reading all the
responses from the external view server after doing all those
writes. This minimizes the amount of time each endpoint spends
blocked reading from the port.

COUCHDB-1334


Project: http://git-wip-us.apache.org/repos/asf/couchdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb/commit/a851c6e5
Tree: http://git-wip-us.apache.org/repos/asf/couchdb/tree/a851c6e5
Diff: http://git-wip-us.apache.org/repos/asf/couchdb/diff/a851c6e5

Branch: refs/heads/master
Commit: a851c6e5150d14221ca018587d76214856c1555a
Parents: 7299264
Author: Filipe David Borba Manana <fdmanana@apache.org>
Authored: Sun Nov 6 14:25:04 2011 +0000
Committer: Filipe David Borba Manana <fdmanana@apache.org>
Committed: Wed Nov 16 11:55:28 2011 +0000

----------------------------------------------------------------------
 src/couch_mrview/src/couch_mrview_updater.erl |   46 ++++++++++++--------
 src/couchdb/couch_native_process.erl          |   11 ++++-
 src/couchdb/couch_os_process.erl              |   38 ++++++++++++++++-
 src/couchdb/couch_query_servers.erl           |   17 +++++--
 4 files changed, 87 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb/blob/a851c6e5/src/couch_mrview/src/couch_mrview_updater.erl
----------------------------------------------------------------------
diff --git a/src/couch_mrview/src/couch_mrview_updater.erl b/src/couch_mrview/src/couch_mrview_updater.erl
index 9604ea9..3014664 100644
--- a/src/couch_mrview/src/couch_mrview_updater.erl
+++ b/src/couch_mrview/src/couch_mrview_updater.erl
@@ -130,32 +130,42 @@ map_docs(Parent, State0) ->
             couch_query_servers:stop_doc_map(State0#mrst.qserver),
             couch_work_queue:close(State0#mrst.write_queue);
         {ok, Dequeued} ->
-            % Run all the non deleted docs through the view engine and
-            % then pass the results on to the writer process.
             State1 = case State0#mrst.qserver of
                 nil -> start_query_server(State0);
                 _ -> State0
             end,
-            QServer = State1#mrst.qserver,
-            DocFun = fun
-                ({nil, Seq, _}, {SeqAcc, Results}) ->
-                    {erlang:max(Seq, SeqAcc), Results};
-                ({Id, Seq, deleted}, {SeqAcc, Results}) ->
-                    {erlang:max(Seq, SeqAcc), [{Id, []} | Results]};
-                ({Id, Seq, Doc}, {SeqAcc, Results}) ->
-                    {ok, Res} = couch_query_servers:map_doc_raw(QServer, Doc),
-                    {erlang:max(Seq, SeqAcc), [{Id, Res} | Results]}
-            end,
-            FoldFun = fun(Docs, Acc) ->
-                update_task(length(Docs)),
-                lists:foldl(DocFun, Acc, Docs)
-            end,
-            Results = lists:foldl(FoldFun, {0, []}, Dequeued),
-            couch_work_queue:queue(State1#mrst.write_queue, Results),
+            {ok, MapResults} = compute_map_results(State1, Dequeued),
+            couch_work_queue:queue(State1#mrst.write_queue, MapResults),
             map_docs(Parent, State1)
     end.
 
 
+compute_map_results(#mrst{qserver = Qs}, Dequeued) ->
+    % Run all the non deleted docs through the view engine and
+    % then pass the results on to the writer process.
+    DocFun = fun
+        ({nil, Seq, _}, {SeqAcc, AccDel, AccNotDel}) ->
+            {erlang:max(Seq, SeqAcc), AccDel, AccNotDel};
+        ({Id, Seq, deleted}, {SeqAcc, AccDel, AccNotDel}) ->
+            {erlang:max(Seq, SeqAcc), [{Id, []} | AccDel], AccNotDel};
+        ({_Id, Seq, Doc}, {SeqAcc, AccDel, AccNotDel}) ->
+            {erlang:max(Seq, SeqAcc), AccDel, [Doc | AccNotDel]}
+    end,
+    FoldFun = fun(Docs, Acc) ->
+        lists:foldl(DocFun, Acc, Docs)
+    end,
+    {MaxSeq, DeletedResults, Docs} =
+        lists:foldl(FoldFun, {0, [], []}, Dequeued),
+    {ok, MapResultList} = couch_query_servers:map_docs_raw(Qs, Docs),
+    NotDeletedResults = lists:zipwith(
+        fun(#doc{id = Id}, MapResults) -> {Id, MapResults} end,
+        Docs,
+        MapResultList),
+    AllMapResults = DeletedResults ++ NotDeletedResults,
+    update_task(length(AllMapResults)),
+    {ok, {MaxSeq, AllMapResults}}.
+
+
 write_results(Parent, State) ->
     case couch_work_queue:dequeue(State#mrst.write_queue) of
         closed ->

http://git-wip-us.apache.org/repos/asf/couchdb/blob/a851c6e5/src/couchdb/couch_native_process.erl
----------------------------------------------------------------------
diff --git a/src/couchdb/couch_native_process.erl b/src/couchdb/couch_native_process.erl
index 5a32e75..b1d51ed 100644
--- a/src/couchdb/couch_native_process.erl
+++ b/src/couchdb/couch_native_process.erl
@@ -42,7 +42,7 @@
 
 -export([start_link/0,init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,
          handle_info/2]).
--export([set_timeout/2, prompt/2]).
+-export([set_timeout/2, prompt/2, prompt_many/2]).
 
 -define(STATE, native_proc_state).
 -record(evstate, {ddocs, funs=[], query_config=[], list_pid=nil, timeout=5000}).
@@ -62,6 +62,15 @@ set_timeout(Pid, TimeOut) ->
 prompt(Pid, Data) when is_list(Data) ->
     gen_server:call(Pid, {prompt, Data}).
 
+prompt_many(Pid, DataList) ->
+    prompt_many(Pid, DataList, []).
+
+prompt_many(_Pid, [], Acc) ->
+    {ok, lists:reverse(Acc)};
+prompt_many(Pid, [Data | Rest], Acc) ->
+    Result = prompt(Pid, Data),
+    prompt_many(Pid, Rest, [Result | Acc]).
+
 % gen_server callbacks
 init([]) ->
     {ok, #evstate{ddocs=dict:new()}}.

http://git-wip-us.apache.org/repos/asf/couchdb/blob/a851c6e5/src/couchdb/couch_os_process.erl
----------------------------------------------------------------------
diff --git a/src/couchdb/couch_os_process.erl b/src/couchdb/couch_os_process.erl
index db62d49..3a267be 100644
--- a/src/couchdb/couch_os_process.erl
+++ b/src/couchdb/couch_os_process.erl
@@ -14,7 +14,7 @@
 -behaviour(gen_server).
 
 -export([start_link/1, start_link/2, start_link/3, stop/1]).
--export([set_timeout/2, prompt/2]).
+-export([set_timeout/2, prompt/2, prompt_many/2]).
 -export([send/2, writeline/2, readline/1, writejson/2, readjson/1]).
 -export([init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2, code_change/3]).
 
@@ -57,6 +57,40 @@ prompt(Pid, Data) ->
             throw(Error)
     end.
 
+prompt_many(Pid, DataList) ->
+    OsProc = gen_server:call(Pid, get_os_proc, infinity),
+    true = port_connect(OsProc#os_proc.port, self()),
+    try
+        send_many(OsProc, DataList),
+        receive_many(length(DataList), OsProc, [])
+    after
+        % Can throw badarg error, when OsProc Pid is dead or port was closed
+        % by the readline function on error/timeout.
+        (catch port_connect(OsProc#os_proc.port, Pid)),
+        unlink(OsProc#os_proc.port),
+        drop_port_messages(OsProc#os_proc.port)
+    end.
+
+send_many(_OsProc, []) ->
+    ok;
+send_many(#os_proc{writer = Writer} = OsProc, [Data | Rest]) ->
+    Writer(OsProc, Data),
+    send_many(OsProc, Rest).
+
+receive_many(0, _OsProc, Acc) ->
+    {ok, lists:reverse(Acc)};
+receive_many(N, #os_proc{reader = Reader} = OsProc, Acc) ->
+    Line = Reader(OsProc),
+    receive_many(N - 1, OsProc, [Line | Acc]).
+
+drop_port_messages(Port) ->
+    receive
+    {Port, _} ->
+        drop_port_messages(Port)
+    after 0 ->
+        ok
+    end.
+
 % Utility functions for reading and writing
 % in custom functions
 writeline(OsProc, Data) when is_record(OsProc, os_proc) ->
@@ -175,6 +209,8 @@ terminate(_Reason, #os_proc{port=Port}) ->
     catch port_close(Port),
     ok.
 
+handle_call(get_os_proc, _From, OsProc) ->
+    {reply, OsProc, OsProc};
 handle_call({set_timeout, TimeOut}, _From, OsProc) ->
     {reply, ok, OsProc#os_proc{timeout=TimeOut}};
 handle_call({prompt, Data}, _From, OsProc) ->

http://git-wip-us.apache.org/repos/asf/couchdb/blob/a851c6e5/src/couchdb/couch_query_servers.erl
----------------------------------------------------------------------
diff --git a/src/couchdb/couch_query_servers.erl b/src/couchdb/couch_query_servers.erl
index 03f2012..c9c2bc6 100644
--- a/src/couchdb/couch_query_servers.erl
+++ b/src/couchdb/couch_query_servers.erl
@@ -16,7 +16,7 @@
 -export([start_link/0, config_change/1]).
 
 -export([init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2,code_change/3]).
--export([start_doc_map/3, map_docs/2, map_doc_raw/2, stop_doc_map/1, raw_to_ejson/1]).
+-export([start_doc_map/3, map_docs/2, map_docs_raw/2, stop_doc_map/1, raw_to_ejson/1]).
 -export([reduce/3, rereduce/3,validate_doc_update/5]).
 -export([filter_docs/5]).
 -export([filter_view/3]).
@@ -33,6 +33,7 @@
     lang,
     ddoc_keys = [],
     prompt_fun,
+    prompt_many_fun,
     set_timeout_fun,
     stop_fun
 }).
@@ -83,10 +84,15 @@ map_docs(Proc, Docs) ->
         Docs),
     {ok, Results}.
 
-map_doc_raw(Proc, Doc) ->
-    Json = couch_doc:to_json_obj(Doc, []),
-    {ok, proc_prompt_raw(Proc, [<<"map_doc">>, Json])}.
-
+map_docs_raw(Proc, DocList) ->
+    {Mod, Fun} = Proc#proc.prompt_many_fun,
+    CommandList = lists:map(
+        fun(Doc) ->
+            EJson = couch_doc:to_json_obj(Doc, []),
+            [<<"map_doc">>, EJson]
+        end,
+        DocList),
+    Mod:Fun(Proc#proc.pid, CommandList).
 
 stop_doc_map(nil) ->
     ok;
@@ -479,6 +485,7 @@ new_process(Langs, LangLimits, Lang) ->
                        pid=Pid,
                        % Called via proc_prompt, proc_set_timeout, and proc_stop
                        prompt_fun={Mod, prompt},
+                       prompt_many_fun={Mod, prompt_many},
                        set_timeout_fun={Mod, set_timeout},
                        stop_fun={Mod, stop}}};
         _ ->


Mime
View raw message