couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jch...@apache.org
Subject svn commit: r724946 - in /incubator/couchdb/trunk: share/www/script/ src/couchdb/
Date Wed, 10 Dec 2008 01:13:18 GMT
Author: jchris
Date: Tue Dec  9 17:13:17 2008
New Revision: 724946

URL: http://svn.apache.org/viewvc?rev=724946&view=rev
Log:
view group state gen_server. thanks damien and davisp.

Added:
    incubator/couchdb/trunk/src/couchdb/couch_view_group.erl
    incubator/couchdb/trunk/src/couchdb/couch_view_updater.erl
Modified:
    incubator/couchdb/trunk/share/www/script/couch_tests.js
    incubator/couchdb/trunk/src/couchdb/Makefile.am
    incubator/couchdb/trunk/src/couchdb/couch_db.erl
    incubator/couchdb/trunk/src/couchdb/couch_db.hrl
    incubator/couchdb/trunk/src/couchdb/couch_db_updater.erl
    incubator/couchdb/trunk/src/couchdb/couch_httpd.erl
    incubator/couchdb/trunk/src/couchdb/couch_httpd_view.erl
    incubator/couchdb/trunk/src/couchdb/couch_server.erl
    incubator/couchdb/trunk/src/couchdb/couch_view.erl

Modified: incubator/couchdb/trunk/share/www/script/couch_tests.js
URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/share/www/script/couch_tests.js?rev=724946&r1=724945&r2=724946&view=diff
==============================================================================
--- incubator/couchdb/trunk/share/www/script/couch_tests.js [utf-8] (original)
+++ incubator/couchdb/trunk/share/www/script/couch_tests.js [utf-8] Tue Dec  9 17:13:17 2008
@@ -1905,6 +1905,7 @@
     }
     T(db.view("test/single_doc").total_rows == 1);
     
+    var info = db.info();
     var doc1 = db.open("1");
     var doc2 = db.open("2");
     
@@ -1913,7 +1914,13 @@
       body: JSON.stringify({"1":[doc1._rev], "2":[doc2._rev]}),
     });
     T(xhr.status == 200);
-    
+
+    var newInfo = db.info();
+    // purging increments the update sequence
+    T(info.update_seq+1 == newInfo.update_seq);
+    // and it increments the purge_seq
+    T(info.purge_seq+1 == newInfo.purge_seq);
+
     var result = JSON.parse(xhr.responseText);
     T(result.purged["1"][0] == doc1._rev);
     T(result.purged["2"][0] == doc2._rev);

Modified: incubator/couchdb/trunk/src/couchdb/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/src/couchdb/Makefile.am?rev=724946&r1=724945&r2=724946&view=diff
==============================================================================
--- incubator/couchdb/trunk/src/couchdb/Makefile.am (original)
+++ incubator/couchdb/trunk/src/couchdb/Makefile.am Tue Dec  9 17:13:17 2008
@@ -63,6 +63,8 @@
     couch_stream.erl \
     couch_util.erl \
     couch_view.erl \
+    couch_view_updater.erl \
+    couch_view_group.erl \
     couch_db_updater.erl
 
 EXTRA_DIST = $(source_files) couch_db.hrl
@@ -92,6 +94,8 @@
     couch_stream.beam \
     couch_util.beam \
     couch_view.beam \
+    couch_view_updater.beam \
+    couch_view_group.beam \
     couch_db_updater.beam
 
 # doc_base = \

Modified: incubator/couchdb/trunk/src/couchdb/couch_db.erl
URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/src/couchdb/couch_db.erl?rev=724946&r1=724945&r2=724946&view=diff
==============================================================================
--- incubator/couchdb/trunk/src/couchdb/couch_db.erl (original)
+++ incubator/couchdb/trunk/src/couchdb/couch_db.erl Tue Dec  9 17:13:17 2008
@@ -17,7 +17,7 @@
 -export([open_ref_counted/3,num_refs/1,monitor/1]).
 -export([update_doc/3,update_docs/4,update_docs/2,update_docs/3,delete_doc/3]).
 -export([get_doc_info/2,open_doc/2,open_doc/3,open_doc_revs/4]).
--export([get_missing_revs/2,name/1,doc_to_tree/1]).
+-export([get_missing_revs/2,name/1,doc_to_tree/1,get_update_seq/1]).
 -export([enum_docs/4,enum_docs/5,enum_docs_since/4,enum_docs_since/5]).
 -export([enum_docs_since_reduce_to_count/1,enum_docs_reduce_to_count/1]).
 -export([increment_update_seq/1,get_purge_seq/1,purge_docs/2,get_last_purged/1]).
@@ -145,6 +145,9 @@
     gen_server:call(UpdatePid, {purge_docs, IdsRevs}).
     
 
+get_update_seq(#db{header=#db_header{update_seq=Seq}})->
+    Seq.
+    
 get_purge_seq(#db{header=#db_header{purge_seq=PurgeSeq}})->
     PurgeSeq.
 

Modified: incubator/couchdb/trunk/src/couchdb/couch_db.hrl
URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/src/couchdb/couch_db.hrl?rev=724946&r1=724945&r2=724946&view=diff
==============================================================================
--- incubator/couchdb/trunk/src/couchdb/couch_db.hrl (original)
+++ incubator/couchdb/trunk/src/couchdb/couch_db.hrl Tue Dec  9 17:13:17 2008
@@ -88,7 +88,7 @@
 
 
 -record(user_ctx,
-    {name=nil,
+    {name=null,
     roles=[]
     }).
 
@@ -152,6 +152,41 @@
     include_docs = false
 }).
 
+-record(group,
+    {sig=nil,
+    db=nil,
+    fd=nil,
+    name,
+    def_lang,
+    views,
+    id_btree=nil,
+    current_seq=0,
+    purge_seq=0,
+    query_server=nil
+    }).
+
+-record(view,
+    {id_num,
+    map_names=[],
+    def,
+    btree=nil,
+    reduce_funs=[]
+    }).
+
+-record(server,{
+    root_dir = [],
+    dbname_regexp,
+    max_dbs_open=100,
+    current_dbs_open=0,
+    start_time=""
+    }).
+
+-record(index_header,
+    {seq=0,
+    purge_seq=0,
+    id_btree_state=nil,
+    view_states=nil
+    }).
 
 % small value used in revision trees to indicate the revision isn't stored
 -define(REV_MISSING, []).

Modified: incubator/couchdb/trunk/src/couchdb/couch_db_updater.erl
URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/src/couchdb/couch_db_updater.erl?rev=724946&r1=724945&r2=724946&view=diff
==============================================================================
--- incubator/couchdb/trunk/src/couchdb/couch_db_updater.erl (original)
+++ incubator/couchdb/trunk/src/couchdb/couch_db_updater.erl Tue Dec  9 17:13:17 2008
@@ -129,7 +129,7 @@
         Db#db{
             fulldocinfo_by_id_btree = DocInfoByIdBTree2,
             docinfo_by_seq_btree = DocInfoBySeqBTree2,
-            update_seq = NewSeq,
+            update_seq = NewSeq + 1,
             header=Header#db_header{purge_seq=PurgeSeq+1, purged_docs=Pointer}}),
     
     ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}),

Modified: incubator/couchdb/trunk/src/couchdb/couch_httpd.erl
URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/src/couchdb/couch_httpd.erl?rev=724946&r1=724945&r2=724946&view=diff
==============================================================================
--- incubator/couchdb/trunk/src/couchdb/couch_httpd.erl (original)
+++ incubator/couchdb/trunk/src/couchdb/couch_httpd.erl Tue Dec  9 17:13:17 2008
@@ -261,7 +261,7 @@
         [User, Pass] ->
             {User, Pass};
         [User] ->
-            {User, <<"">>};
+            {User, ""};
         _ ->
             nil
         end;

Modified: incubator/couchdb/trunk/src/couchdb/couch_httpd_view.erl
URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/src/couchdb/couch_httpd_view.erl?rev=724946&r1=724945&r2=724946&view=diff
==============================================================================
--- incubator/couchdb/trunk/src/couchdb/couch_httpd_view.erl (original)
+++ incubator/couchdb/trunk/src/couchdb/couch_httpd_view.erl Tue Dec  9 17:13:17 2008
@@ -22,18 +22,19 @@
     start_json_response/2,send_chunk/2,end_json_response/1]).
 
 design_doc_view(Req, Db, Id, ViewName, Keys) ->
+    #view_query_args{
+        update = Update,
+        reduce = Reduce
+    } = QueryArgs = parse_view_query(Req, Keys),
     case couch_view:get_map_view({couch_db:name(Db), 
-            <<"_design/", Id/binary>>, ViewName}) of
+            <<"_design/", Id/binary>>, ViewName, Update}) of
     {ok, View} ->    
-        QueryArgs = parse_view_query(Req, Keys),
         output_map_view(Req, View, Db, QueryArgs, Keys);
     {not_found, Reason} ->
         case couch_view:get_reduce_view({couch_db:name(Db),
                 <<"_design/", Id/binary>>, ViewName}) of
         {ok, View} ->
-            #view_query_args{
-                reduce = Reduce
-            } = QueryArgs = parse_view_query(Req, Keys, true),
+            parse_view_query(Req, Keys, true), % just for validation
             case Reduce of
             false ->
                 {reduce, _N, _Lang, MapView} = View,

Modified: incubator/couchdb/trunk/src/couchdb/couch_server.erl
URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/src/couchdb/couch_server.erl?rev=724946&r1=724945&r2=724946&view=diff
==============================================================================
--- incubator/couchdb/trunk/src/couchdb/couch_server.erl (original)
+++ incubator/couchdb/trunk/src/couchdb/couch_server.erl Tue Dec  9 17:13:17 2008
@@ -22,14 +22,6 @@
 
 -include("couch_db.hrl").
 
--record(server,{
-    root_dir = [],
-    dbname_regexp,
-    max_dbs_open=100,
-    current_dbs_open=0,
-    start_time=""
-    }).
-
 start() ->
     start(["default.ini"]).
 

Modified: incubator/couchdb/trunk/src/couchdb/couch_view.erl
URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/src/couchdb/couch_view.erl?rev=724946&r1=724945&r2=724946&view=diff
==============================================================================
--- incubator/couchdb/trunk/src/couchdb/couch_view.erl (original)
+++ incubator/couchdb/trunk/src/couchdb/couch_view.erl Tue Dec  9 17:13:17 2008
@@ -13,45 +13,12 @@
 -module(couch_view).
 -behaviour(gen_server).
 
--export([start_link/0,fold/4,fold/5,less_json/2, start_update_loop/3, start_temp_update_loop/5]).
+-export([start_link/0,fold/4,fold/5,less_json/2,less_json_keys/2,expand_dups/2,detuple_kvs/2]).
 -export([init/1,terminate/2,handle_call/3,handle_cast/2,handle_info/2,code_change/3]).
 -export([get_reduce_view/1, get_map_view/1,get_row_count/1,reduce_to_count/1, fold_reduce/7]).
 
 -include("couch_db.hrl").
-
--record(group,
-    {sig=nil,
-    db=nil,
-    fd=nil,
-    name,
-    def_lang,
-    views,
-    id_btree=nil,
-    current_seq=0,
-    purge_seq=0,
-    query_server=nil
-    }).
-
--record(view,
-    {id_num,
-    map_names=[],
-    def,
-    btree=nil,
-    reduce_funs=[]
-    }).
-
--record(server,
-    {root_dir
-    }).
-
--record(index_header,
-    {seq=0,
-    purge_seq=0,
-    id_btree_state=nil,
-    view_states=nil
-    }).
     
-
 start_link() ->
     gen_server:start_link({local, couch_view}, couch_view, [], []).
 
@@ -59,41 +26,30 @@
     {ok, Pid} = gen_server:call(couch_view, {start_temp_updater, DbName, Type, MapSrc, RedSrc}),
     Pid.
 
-get_updater(DbName, GroupId) ->
-    {ok, Pid} = gen_server:call(couch_view, {start_updater, DbName, GroupId}),
+get_group_server(DbName, GroupId) ->
+    {ok, Pid} = gen_server:call(couch_view, {start_group_server, DbName, GroupId}),
     Pid.
     
-get_updated_group(Pid) ->
-    Mref = erlang:monitor(process, Pid),
-    receive
-    {'DOWN', Mref, _, _, Reason} ->
-        throw(Reason)
-    after 0 ->
-        Pid ! {self(), get_updated},
-        receive
-        {Pid, Response} ->
-            erlang:demonitor(Mref),
-            receive
-                {'DOWN', Mref, _, _, _} -> ok
-                after 0 -> ok
-            end,
-            Response;
-        {'DOWN', Mref, _, _, Reason} ->
-            throw(Reason)
-        end
-    end.
+get_updated_group(DbName, GroupId, Update) ->
+    couch_view_group:request_group(get_group_server(DbName, GroupId), seq_for_update(DbName, Update)).
+
+get_updated_group(temp, DbName, Type, MapSrc, RedSrc, Update) ->
+    couch_view_group:request_group(get_temp_updater(DbName, Type, MapSrc, RedSrc), seq_for_update(DbName, Update)).
 
 get_row_count(#view{btree=Bt}) ->
     {ok, {Count, _Reds}} = couch_btree:full_reduce(Bt),
     {ok, Count}.
 
 get_reduce_view({temp, DbName, Type, MapSrc, RedSrc}) ->
-    {ok, #group{views=[View]}} = get_updated_group(get_temp_updater(DbName, Type, MapSrc, RedSrc)),
+    {ok, #group{views=[View]}} = get_updated_group(temp, DbName, Type, MapSrc, RedSrc, true),
     {ok, {temp_reduce, View}};
 get_reduce_view({DbName, GroupId, Name}) ->
-    {ok, #group{views=Views,def_lang=Lang}} =
-            get_updated_group(get_updater(DbName, GroupId)),
-    get_reduce_view0(Name, Lang, Views).
+    case get_updated_group(DbName, GroupId, true) of
+    {error, Reason} ->
+        Reason;
+    {ok, #group{views=Views,def_lang=Lang}} ->
+        get_reduce_view0(Name, Lang, Views)
+    end.
 
 get_reduce_view0(_Name, _Lang, []) ->
     {not_found, missing_named_view};
@@ -153,13 +109,26 @@
     N + 1;
 get_key_pos(Key, [_|Rest], N) ->
     get_key_pos(Key, Rest, N+1).
+       
+seq_for_update(DbName, Update) ->
+    case Update of
+    true ->
+        {ok, #db{update_seq=CurrentSeq}} = couch_db:open(DbName, []),
+        CurrentSeq;
+    _Else ->
+        0
+    end.  
         
 get_map_view({temp, DbName, Type, Src}) ->
-    {ok, #group{views=[View]}} = get_updated_group(get_temp_updater(DbName, Type, Src, [])),
+    {ok, #group{views=[View]}} = get_updated_group(temp, DbName, Type, Src, [], true),
     {ok, View};
-get_map_view({DbName, GroupId, Name}) ->
-    {ok, #group{views=Views}} = get_updated_group(get_updater(DbName, GroupId)),
-    get_map_view0(Name, Views).
+get_map_view({DbName, GroupId, Name, Update}) ->
+    case get_updated_group(DbName, GroupId, Update) of
+    {error, Reason} ->
+        Reason;
+    {ok, #group{views=Views}} ->
+        get_map_view0(Name, Views)
+    end.
 
 get_map_view0(_Name, []) ->
     {not_found, missing_named_view};
@@ -183,37 +152,6 @@
     Count.
                 
 
-design_doc_to_view_group(#doc{id=Id,body={Fields}}) ->
-    Language = proplists:get_value(<<"language">>, Fields, <<"javascript">>),
-    {RawViews} = proplists:get_value(<<"views">>, Fields, {[]}),
-            
-    % add the views to a dictionary object, with the map source as the key
-    DictBySrc =
-    lists:foldl(
-        fun({Name, {MRFuns}}, DictBySrcAcc) ->
-            MapSrc = proplists:get_value(<<"map">>, MRFuns),
-            RedSrc = proplists:get_value(<<"reduce">>, MRFuns, null),
-            View =
-            case dict:find(MapSrc, DictBySrcAcc) of
-                {ok, View0} -> View0;
-                error -> #view{def=MapSrc} % create new view object
-            end,
-            View2 =
-            if RedSrc == null ->
-                View#view{map_names=[Name|View#view.map_names]};
-            true ->
-                View#view{reduce_funs=[{Name,RedSrc}|View#view.reduce_funs]}
-            end,
-            dict:store(MapSrc, View2, DictBySrcAcc)
-        end, dict:new(), RawViews),
-    % number the views
-    {Views, _N} = lists:mapfoldl(
-        fun({_Src, View}, N) ->
-            {View#view{id_num=N},N+1}
-        end, 0, dict:to_list(DictBySrc)),
-    
-    Group = #group{name=Id, views=Views, def_lang=Language},
-    Group#group{sig=erlang:md5(term_to_binary(Group))}.
     
 fold_fun(_Fun, [], _, Acc) ->
     {ok, Acc};
@@ -253,10 +191,10 @@
         (_Else) ->
             ok
         end),
-    ets:new(couch_views_by_db, [bag, private, named_table]),
-    ets:new(couch_views_by_name, [set, protected, named_table]),
-    ets:new(couch_views_by_updater, [set, private, named_table]),
-    ets:new(couch_views_temp_fd_by_db, [set, protected, named_table]),
+    ets:new(couch_groups_by_db, [bag, private, named_table]),
+    ets:new(group_servers_by_name, [set, protected, named_table]),
+    ets:new(couch_groups_by_updater, [set, private, named_table]),
+    ets:new(couch_temp_group_fd_by_db, [set, protected, named_table]),
     process_flag(trap_exit, true),
     {ok, #server{root_dir=RootDir}}.
 
@@ -268,9 +206,9 @@
     <<SigInt:128/integer>> = erlang:md5(term_to_binary({Lang, MapSrc, RedSrc})),
     Name = lists:flatten(io_lib:format("_temp_~.36B",[SigInt])),
     Pid = 
-    case ets:lookup(couch_views_by_name, {DbName, Name}) of
+    case ets:lookup(group_servers_by_name, {DbName, Name}) of
     [] ->
-        case ets:lookup(couch_views_temp_fd_by_db, DbName) of
+        case ets:lookup(couch_temp_group_fd_by_db, DbName) of
         [] ->
             FileName = Root ++ "/." ++ binary_to_list(DbName) ++ "_temp",
             {ok, Fd} = couch_file:open(FileName, [create, overwrite]),
@@ -279,21 +217,20 @@
             ok
         end,
         ?LOG_DEBUG("Spawning new temp update process for db ~s.", [DbName]),
-        NewPid = spawn_link(couch_view, start_temp_update_loop,
-                    [DbName, Fd, Lang, MapSrc, RedSrc]),
-        true = ets:insert(couch_views_temp_fd_by_db, {DbName, Fd, Count + 1}),
+        {ok, NewPid} = couch_view_group:start_link({DbName, Fd, Lang, MapSrc, RedSrc}),
+        true = ets:insert(couch_temp_group_fd_by_db, {DbName, Fd, Count + 1}),
         add_to_ets(NewPid, DbName, Name),
         NewPid;
     [{_, ExistingPid0}] ->
         ExistingPid0
     end,
     {reply, {ok, Pid}, Server};
-handle_call({start_updater, DbName, GroupId}, _From, #server{root_dir=Root}=Server) ->
+handle_call({start_group_server, DbName, GroupId}, _From, #server{root_dir=Root}=Server) ->
     Pid = 
-    case ets:lookup(couch_views_by_name, {DbName, GroupId}) of
+    case ets:lookup(group_servers_by_name, {DbName, GroupId}) of
     [] ->
-        ?LOG_DEBUG("Spawning new update process for view group ~s in database ~s.", [GroupId, DbName]),
-        NewPid = spawn_link(couch_view, start_update_loop, [Root, DbName, GroupId]),
+        ?LOG_DEBUG("Spawning new group server for view group ~s in database ~s.", [GroupId, DbName]),
+        {ok, NewPid} = couch_view_group:start_link({Root, DbName, GroupId}),
         add_to_ets(NewPid, DbName, GroupId),
         NewPid;
     [{_, ExistingPid0}] ->
@@ -303,11 +240,11 @@
 
 handle_cast({reset_indexes, DbName}, #server{root_dir=Root}=Server) ->
     % shutdown all the updaters
-    Names = ets:lookup(couch_views_by_db, DbName),
+    Names = ets:lookup(couch_groups_by_db, DbName),
     lists:foreach(
         fun({_DbName, GroupId}) ->
             ?LOG_DEBUG("Killing update process for view group ~s. in database ~s.", [GroupId, DbName]),
-            [{_, Pid}] = ets:lookup(couch_views_by_name, {DbName, GroupId}),
+            [{_, Pid}] = ets:lookup(group_servers_by_name, {DbName, GroupId}),
             exit(Pid, kill),
             receive {'EXIT', Pid, _} ->
                 delete_from_ets(Pid, DbName, GroupId)
@@ -318,22 +255,23 @@
     {noreply, Server}.
 
 handle_info({'EXIT', _FromPid, normal}, Server) ->
-    {noreply, Server};
+   {noreply, Server};
 handle_info({'EXIT', FromPid, Reason}, #server{root_dir=RootDir}=Server) ->
-    case ets:lookup(couch_views_by_updater, FromPid) of
+    ?LOG_DEBUG("Exit from process: ~p", [{FromPid, Reason}]),
+    case ets:lookup(couch_groups_by_updater, FromPid) of
     [] -> % non-updater linked process must have died, we propagate the error
         ?LOG_ERROR("Exit on non-updater process: ~p", [Reason]),
         exit(Reason);
     [{_, {DbName, "_temp_" ++ _ = GroupId}}] ->
         delete_from_ets(FromPid, DbName, GroupId),
-        [{_, Fd, Count}] = ets:lookup(couch_views_temp_fd_by_db, DbName),
+        [{_, Fd, Count}] = ets:lookup(couch_temp_group_fd_by_db, DbName),
         case Count of
         1 -> % Last ref
             couch_file:close(Fd),
             file:delete(RootDir ++ "/." ++ binary_to_list(DbName) ++ "_temp"),
-            true = ets:delete(couch_views_temp_fd_by_db, DbName);
+            true = ets:delete(couch_temp_group_fd_by_db, DbName);
         _ ->
-            true = ets:insert(couch_views_temp_fd_by_db, {DbName, Fd, Count - 1})
+            true = ets:insert(couch_temp_group_fd_by_db, {DbName, Fd, Count - 1})
         end;
     [{_, {DbName, GroupId}}] ->
         delete_from_ets(FromPid, DbName, GroupId)
@@ -344,225 +282,21 @@
     exit({error, Msg}).
     
 add_to_ets(Pid, DbName, GroupId) ->
-    true = ets:insert(couch_views_by_updater, {Pid, {DbName, GroupId}}),
-    true = ets:insert(couch_views_by_name, {{DbName, GroupId}, Pid}),
-    true = ets:insert(couch_views_by_db, {DbName, GroupId}).
+    true = ets:insert(couch_groups_by_updater, {Pid, {DbName, GroupId}}),
+    true = ets:insert(group_servers_by_name, {{DbName, GroupId}, Pid}),
+    true = ets:insert(couch_groups_by_db, {DbName, GroupId}).
     
 delete_from_ets(Pid, DbName, GroupId) ->
-    true = ets:delete(couch_views_by_updater, Pid),
-    true = ets:delete(couch_views_by_name, {DbName, GroupId}),
-    true = ets:delete_object(couch_views_by_db, {DbName, GroupId}).
+    true = ets:delete(couch_groups_by_updater, Pid),
+    true = ets:delete(group_servers_by_name, {DbName, GroupId}),
+    true = ets:delete_object(couch_groups_by_db, {DbName, GroupId}).
 
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
 
 
-start_temp_update_loop(DbName, Fd, Lang, MapSrc, RedSrc) ->
-    NotifyPids = get_notify_pids(1000),
-    case couch_db:open(DbName, []) of
-    {ok, Db} ->
-        View = #view{map_names=["_temp"],
-            id_num=0,
-            btree=nil,
-            def=MapSrc,
-            reduce_funs= if RedSrc==[] -> []; true -> [{"_temp", RedSrc}] end},
-        Group = #group{name="_temp",
-            db=Db,
-            views=[View],
-            current_seq=0,
-            def_lang=Lang,
-            id_btree=nil},
-        Group2 = init_group(Db, Fd, Group,nil),
-        couch_db:monitor(Db),
-        couch_db:close(Db),
-        temp_update_loop(DbName, Group2, NotifyPids);
-    Else ->
-        exit(Else)
-    end.
 
-temp_update_loop(DbName, Group, NotifyPids) ->
-    {ok, Db} = couch_db:open(DbName, []),
-    {_Updated, Group2} = update_group(Group#group{db=Db}),
-    couch_db:close(Db),
-    [Pid ! {self(), {ok, Group2}} || Pid <- NotifyPids],
-    garbage_collect(),
-    temp_update_loop(DbName, Group2, get_notify_pids(10000)).
-
-
-reset_group(#group{views=Views}=Group) ->
-    Views2 = [View#view{btree=nil} || View <- Views],
-    Group#group{db=nil,fd=nil,query_server=nil,current_seq=0,
-            id_btree=nil,views=Views2}.
-
-start_update_loop(RootDir, DbName, GroupId) ->
-    % wait for a notify request before doing anything. This way, we can just
-    % exit and any exits will be noticed by the callers.
-    start_update_loop(RootDir, DbName, GroupId, get_notify_pids(1000)).
-    
-start_update_loop(RootDir, DbName, GroupId, NotifyPids) ->
-    {Db, Group} =
-    case (catch couch_db:open(DbName, [])) of
-    {ok, Db0} ->
-        case (catch couch_db:open_doc(Db0, GroupId)) of
-        {ok, Doc} ->
-            {Db0, design_doc_to_view_group(Doc)};
-        Else ->
-            delete_index_file(RootDir, DbName, GroupId),
-            exit(Else)
-        end;
-    Else ->
-        delete_index_file(RootDir, DbName, GroupId),
-        exit(Else)
-    end,
-    FileName = RootDir ++ "/." ++ binary_to_list(DbName) ++ 
-            binary_to_list(GroupId) ++".view",
-    Group2 =
-    case couch_file:open(FileName) of
-    {ok, Fd} ->
-        Sig = Group#group.sig,
-        case (catch couch_file:read_header(Fd, <<$r, $c, $k, 0>>)) of
-        {ok, {Sig, #index_header{purge_seq=PurgeSeq}=HeaderInfo}} ->
-            % sigs match!
-            DbPurgeSeq = couch_db:get_purge_seq(Db),
-            case (PurgeSeq == DbPurgeSeq) or ((PurgeSeq + 1) == DbPurgeSeq) of
-            true ->
-                % We can only use index with the same, or next purge seq as the
-                % db.
-                init_group(Db, Fd, Group, HeaderInfo);
-            false ->
-                reset_file(Db, Fd, DbName, Group)
-            end;
-        _ ->
-            reset_file(Db, Fd, DbName, Group)
-        end;
-    {error, enoent} ->
-        case couch_file:open(FileName, [create]) of
-        {ok, Fd} -> reset_file(Db, Fd, DbName, Group);
-        Error    -> throw(Error)
-        end
-    end,
-    
-    couch_db:monitor(Db),
-    couch_db:close(Db),
-    update_loop(RootDir, DbName, GroupId, Group2, NotifyPids).
-
-reset_file(Db, Fd, DbName, #group{sig=Sig,name=Name} = Group) ->
-    ?LOG_DEBUG("Reseting group index \"~s\" in db ~s", [Name, DbName]),
-    ok = couch_file:truncate(Fd, 0),
-    ok = couch_file:write_header(Fd, <<$r, $c, $k, 0>>, {Sig, nil}),
-    init_group(Db, Fd, reset_group(Group), nil).
-
-update_loop(RootDir, DbName, GroupId, #group{sig=Sig,fd=Fd}=Group, NotifyPids) ->
-    {ok, Db}= couch_db:open(DbName, []),
-    Result =
-    try
-        update_group(Group#group{db=Db})
-    catch
-        throw: restart -> restart
-    after
-        couch_db:close(Db)
-    end,
-    case Result of
-    {same, Group2} ->
-        [Pid ! {self(), {ok, Group2}} || Pid <- NotifyPids],
-        update_loop(RootDir, DbName, GroupId, Group2, get_notify_pids(100000));
-    {updated, Group2} ->
-        HeaderData = {Sig, get_index_header_data(Group2)},
-        ok = couch_file:write_header(Fd, <<$r, $c, $k, 0>>, HeaderData),
-        [Pid ! {self(), {ok, Group2}} || Pid <- NotifyPids],
-        garbage_collect(),
-        update_loop(RootDir, DbName, GroupId, Group2, get_notify_pids(100000));
-    restart ->
-        couch_file:close(Group#group.fd),
-        start_update_loop(RootDir, DbName, GroupId, NotifyPids ++ get_notify_pids())
-    end.
 
-% wait for the first request to come in.
-get_notify_pids(Wait) ->
-    receive
-    {Pid, get_updated} ->
-        [Pid | get_notify_pids()];
-    {'DOWN', _MonitorRef, _Type, _Pid, _Info} ->
-        ?LOG_DEBUG("View monitor received parent db shutdown notification. Shutting down instance.", []),
-        exit(db_shutdown);
-    Else ->
-        ?LOG_ERROR("Unexpected message in view updater: ~p", [Else]),
-        exit({error, Else})
-    after Wait ->
-        exit(wait_timeout)
-    end.
-% then keep getting all available and return.
-get_notify_pids() ->
-    receive
-    {Pid, get_updated} ->
-        [Pid | get_notify_pids()]
-    after 0 ->
-        []
-    end.
-    
-purge(#group{db=Db, views=Views, id_btree=IdBtree}=Group) ->
-    {ok, PurgedIdsRevs} = couch_db:get_last_purged(Db),
-    Ids = [Id || {Id, _Revs} <- PurgedIdsRevs],
-    {ok, Lookups, IdBtree2} = couch_btree:query_modify(IdBtree, Ids, [], Ids),
-
-    % now populate the dictionary with all the keys to delete
-    ViewKeysToRemoveDict = lists:foldl(
-        fun({ok,{DocId,ViewNumRowKeys}}, ViewDictAcc) ->
-            lists:foldl(
-                fun({ViewNum, RowKey}, ViewDictAcc2) ->
-                    dict:append(ViewNum, {RowKey, DocId}, ViewDictAcc2)
-                end, ViewDictAcc, ViewNumRowKeys);
-        ({not_found, _}, ViewDictAcc) ->
-            ViewDictAcc
-        end, dict:new(), Lookups),
-    
-    % Now remove the values from the btrees
-    Views2 = lists:map(
-        fun(#view{id_num=Num,btree=Btree}=View) ->
-            case dict:find(Num, ViewKeysToRemoveDict) of
-            {ok, RemoveKeys} ->
-                {ok, Btree2} = couch_btree:add_remove(Btree, [], RemoveKeys),
-                View#view{btree=Btree2};
-            error -> % no keys to remove in this view
-                View
-            end
-        end, Views),
-    Group#group{id_btree=IdBtree2,
-            views=Views2,
-            purge_seq=couch_db:get_purge_seq(Db)}.
-    
-    
-update_group(#group{db=Db,current_seq=CurrentSeq,
-        purge_seq=GroupPurgeSeq}=Group) ->
-    ViewEmptyKVs = [{View, []} || View <- Group#group.views],
-    % compute on all docs modified since we last computed.
-    DbPurgeSeq = couch_db:get_purge_seq(Db),
-    Group2 =
-    case DbPurgeSeq of
-    GroupPurgeSeq -> 
-        Group;
-    DbPurgeSeq when GroupPurgeSeq + 1 == DbPurgeSeq ->
-        purge(Group);
-    _ ->
-        throw(restart)
-    end,
-    {ok, {UncomputedDocs, Group3, ViewKVsToAdd, DocIdViewIdKeys, NewSeq}}
-        = couch_db:enum_docs_since(
-            Db,
-            CurrentSeq,
-            fun(DocInfo, _, Acc) -> process_doc(Db, DocInfo, Acc) end,
-            {[], Group2, ViewEmptyKVs, [], CurrentSeq}
-            ),
-    {Group4, Results} = view_compute(Group3, UncomputedDocs),
-    {ViewKVsToAdd2, DocIdViewIdKeys2} = view_insert_query_results(UncomputedDocs, Results, ViewKVsToAdd, DocIdViewIdKeys),
-    couch_query_servers:stop_doc_map(Group4#group.query_server),
-    if CurrentSeq /= NewSeq ->
-        {ok, Group5} = write_changes(Group4, ViewKVsToAdd2, DocIdViewIdKeys2, NewSeq),
-        {updated, Group5#group{query_server=nil}};
-    true ->
-        {same, Group4#group{query_server=nil}}
-    end.
-    
 delete_index_dir(RootDir, DbName) ->
     nuke_dir(RootDir ++ "/." ++ binary_to_list(DbName) ++ "_design").
 
@@ -583,50 +317,6 @@
         ok = file:del_dir(Dir)
     end.
 
-delete_index_file(RootDir, DbName, GroupId) ->
-    file:delete(RootDir ++ "/." ++ binary_to_list(DbName)
-            ++ binary_to_list(GroupId) ++ ".view").
-
-init_group(Db, Fd, #group{views=Views}=Group, nil) ->
-    init_group(Db, Fd, Group,
-        #index_header{seq=0, purge_seq=couch_db:get_purge_seq(Db),
-            id_btree_state=nil, view_states=[nil || _ <- Views]});
-init_group(Db, Fd, #group{def_lang=Lang,views=Views}=Group, IndexHeader) ->
-     #index_header{seq=Seq, purge_seq=PurgeSeq,
-            id_btree_state=IdBtreeState, view_states=ViewStates} = IndexHeader,
-    {ok, IdBtree} = couch_btree:open(IdBtreeState, Fd),
-    Views2 = lists:zipwith(
-        fun(BtreeState, #view{reduce_funs=RedFuns}=View) ->
-            FunSrcs = [FunSrc || {_Name, FunSrc} <- RedFuns],
-            ReduceFun = 
-                fun(reduce, KVs) ->
-                    KVs2 = expand_dups(KVs,[]),
-                    KVs3 = detuple_kvs(KVs2,[]),
-                    {ok, Reduced} = couch_query_servers:reduce(Lang, FunSrcs, KVs3),
-                    {length(KVs3), Reduced};
-                (rereduce, Reds) ->
-                    Count = lists:sum([Count0 || {Count0, _} <- Reds]),
-                    UserReds = [UserRedsList || {_, UserRedsList} <- Reds],
-                    {ok, Reduced} = couch_query_servers:rereduce(Lang, FunSrcs, UserReds),
-                    {Count, Reduced}
-                end,
-            {ok, Btree} = couch_btree:open(BtreeState, Fd,
-                        [{less, fun less_json_keys/2},{reduce, ReduceFun}]),
-            View#view{btree=Btree}
-        end,
-        ViewStates, Views),
-    Group#group{db=Db, fd=Fd, current_seq=Seq, purge_seq=PurgeSeq,
-        id_btree=IdBtree, views=Views2}.
-
-
-get_index_header_data(#group{current_seq=Seq, purge_seq=PurgeSeq, 
-            id_btree=IdBtree,views=Views}) ->
-    ViewStates = [couch_btree:get_state(Btree) || #view{btree=Btree} <- Views],
-    #index_header{seq=Seq,
-            purge_seq=PurgeSeq,
-            id_btree_state=couch_btree:get_state(IdBtree),
-            view_states=ViewStates}.
-
 % keys come back in the language of btree - tuples.
 less_json_keys(A, B) ->
     less_json(tuple_to_list(A), tuple_to_list(B)).
@@ -703,129 +393,4 @@
         end
     end.
 
-process_doc(Db, DocInfo, {Docs, #group{sig=Sig,name=GroupId}=Group, ViewKVs, DocIdViewIdKeys, _LastSeq}) ->
-    % This fun computes once for each document
-    #doc_info{id=DocId, update_seq=Seq, deleted=Deleted} = DocInfo,
-    case DocId of
-    GroupId ->
-        % uh oh. this is the design doc with our definitions. See if
-        % anything in the definition changed.
-        case couch_db:open_doc(Db, DocInfo) of
-        {ok, Doc} ->
-            case design_doc_to_view_group(Doc) of
-            #group{sig=Sig} ->
-                % The same md5 signature, keep on computing
-                {ok, {Docs, Group, ViewKVs, DocIdViewIdKeys, Seq}};
-            _ ->
-                throw(restart)
-            end;
-        {not_found, deleted} ->
-            throw(restart)
-        end;
-    <<?DESIGN_DOC_PREFIX, _>> -> % we skip design docs
-        {ok, {Docs, Group, ViewKVs, DocIdViewIdKeys, Seq}};
-    _ ->
-        {Docs2, DocIdViewIdKeys2} =
-        if Deleted ->
-            {Docs, [{DocId, []} | DocIdViewIdKeys]};
-        true ->
-            {ok, Doc} = couch_db:open_doc(Db, DocInfo, [conflicts, deleted_conflicts]),
-            {[Doc | Docs], DocIdViewIdKeys}
-        end,
-        case couch_util:should_flush() of
-        true ->
-            {Group1, Results} = view_compute(Group, Docs2),
-            {ViewKVs3, DocIdViewIdKeys3} = view_insert_query_results(Docs2, Results, ViewKVs, DocIdViewIdKeys2),
-            {ok, Group2} = write_changes(Group1, ViewKVs3, DocIdViewIdKeys3, Seq),
-            garbage_collect(),
-            ViewEmptyKeyValues = [{View, []} || View <- Group2#group.views],
-            {ok, {[], Group2, ViewEmptyKeyValues, [], Seq}};
-        false ->
-            {ok, {Docs2, Group, ViewKVs, DocIdViewIdKeys2, Seq}}
-        end
-    end.
-
-view_insert_query_results([], [], ViewKVs, DocIdViewIdKeysAcc) ->
-    {ViewKVs, DocIdViewIdKeysAcc};
-view_insert_query_results([Doc|RestDocs], [QueryResults | RestResults], ViewKVs, DocIdViewIdKeysAcc) ->
-    {NewViewKVs, NewViewIdKeys} = view_insert_doc_query_results(Doc, QueryResults, ViewKVs, [], []),
-    NewDocIdViewIdKeys = [{Doc#doc.id, NewViewIdKeys} | DocIdViewIdKeysAcc],
-    view_insert_query_results(RestDocs, RestResults, NewViewKVs, NewDocIdViewIdKeys).
-
-
-view_insert_doc_query_results(_Doc, [], [], ViewKVsAcc, ViewIdKeysAcc) ->
-    {lists:reverse(ViewKVsAcc), lists:reverse(ViewIdKeysAcc)};
-view_insert_doc_query_results(#doc{id=DocId}=Doc, [ResultKVs|RestResults], [{View, KVs}|RestViewKVs], ViewKVsAcc, ViewIdKeysAcc) ->
-    % Take any identical keys and combine the values
-    ResultKVs2 = lists:foldl(
-        fun({Key,Value}, [{PrevKey,PrevVal}|AccRest]) ->
-            case Key == PrevKey of
-            true ->
-                case PrevVal of
-                {dups, Dups} ->
-                    [{PrevKey, {dups, [Value|Dups]}} | AccRest];
-                _ ->
-                    [{PrevKey, {dups, [Value,PrevVal]}} | AccRest]
-                end;
-            false ->
-                [{Key,Value},{PrevKey,PrevVal}|AccRest]
-            end;
-        (KV, []) ->
-           [KV] 
-        end, [], lists:sort(ResultKVs)),
-    NewKVs = [{{Key, DocId}, Value} || {Key, Value} <- ResultKVs2],
-    NewViewKVsAcc = [{View, NewKVs ++ KVs} | ViewKVsAcc],
-    NewViewIdKeys = [{View#view.id_num, Key} || {Key, _Value} <- ResultKVs2],
-    NewViewIdKeysAcc = NewViewIdKeys ++ ViewIdKeysAcc,
-    view_insert_doc_query_results(Doc, RestResults, RestViewKVs, NewViewKVsAcc, NewViewIdKeysAcc).
-
-view_compute(Group, []) ->
-    {Group, []};
-view_compute(#group{def_lang=DefLang, query_server=QueryServerIn}=Group, Docs) ->
-    {ok, QueryServer} =
-    case QueryServerIn of
-    nil -> % doc map not started
-        Definitions = [View#view.def || View <- Group#group.views],
-        couch_query_servers:start_doc_map(DefLang, Definitions);
-    _ ->
-        {ok, QueryServerIn}
-    end,
-    {ok, Results} = couch_query_servers:map_docs(QueryServer, Docs),
-    {Group#group{query_server=QueryServer}, Results}.
-
 
-
-write_changes(Group, ViewKeyValuesToAdd, DocIdViewIdKeys, NewSeq) ->
-    #group{id_btree=IdBtree} = Group,
-
-    AddDocIdViewIdKeys = [{DocId, ViewIdKeys} || {DocId, ViewIdKeys} <- DocIdViewIdKeys, ViewIdKeys /= []],
-    RemoveDocIds = [DocId || {DocId, ViewIdKeys} <- DocIdViewIdKeys, ViewIdKeys == []],
-    LookupDocIds = [DocId || {DocId, _ViewIdKeys} <- DocIdViewIdKeys],
-    {ok, LookupResults, IdBtree2}
-        = couch_btree:query_modify(IdBtree, LookupDocIds, AddDocIdViewIdKeys, RemoveDocIds),
-    KeysToRemoveByView = lists:foldl(
-        fun(LookupResult, KeysToRemoveByViewAcc) ->
-            case LookupResult of
-            {ok, {DocId, ViewIdKeys}} ->
-                lists:foldl(
-                    fun({ViewId, Key}, KeysToRemoveByViewAcc2) ->
-                        dict:append(ViewId, {Key, DocId}, KeysToRemoveByViewAcc2)
-                    end,
-                    KeysToRemoveByViewAcc, ViewIdKeys);
-            {not_found, _} ->
-                KeysToRemoveByViewAcc
-            end
-        end,
-        dict:new(), LookupResults),
-
-    Views2 = [
-        begin
-            KeysToRemove = couch_util:dict_find(View#view.id_num, KeysToRemoveByView, []),
-            {ok, ViewBtree2} = couch_btree:add_remove(View#view.btree, AddKeyValues, KeysToRemove),
-            View#view{btree = ViewBtree2}
-        end
-    ||
-        {View, AddKeyValues} <- ViewKeyValuesToAdd
-    ],
-    Group2 = Group#group{views=Views2, current_seq=NewSeq, id_btree=IdBtree2},
-    {ok, Group2}.

Added: incubator/couchdb/trunk/src/couchdb/couch_view_group.erl
URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/src/couchdb/couch_view_group.erl?rev=724946&view=auto
==============================================================================
--- incubator/couchdb/trunk/src/couchdb/couch_view_group.erl (added)
+++ incubator/couchdb/trunk/src/couchdb/couch_view_group.erl Tue Dec  9 17:13:17 2008
@@ -0,0 +1,192 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License.  You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_view_group).
+-behaviour(gen_server).
+
+%% API
+-export([start_link/1, request_group/2]).
+% -export([design_doc_to_view_group/1]).
+
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+	 terminate/2, code_change/3]).
+
+-include("couch_db.hrl").
+	 
+-record(group_state, {
+    spawn_fun,
+    target_seq=0,
+    group_seq=0,
+    group=nil,
+    updater_pid=nil,
+    waiting_list=[]
+}).
+
+% api methods
+request_group(Pid, Seq) ->
+    ?LOG_DEBUG("request_group {Pid, Seq} ~p", [{Pid, Seq}]),
+    case gen_server:call(Pid, {request_group, Seq}, infinity) of
+    {ok, Group} ->
+        ?LOG_DEBUG("get_updated_group replied with group", []),
+        {ok, Group};
+    Else ->
+        ?LOG_DEBUG("get_updated_group replied with _Else ~p", [Else]),
+        Else
+    end.
+
+
+% from template
+start_link(InitArgs) ->
+    gen_server:start_link(couch_view_group, InitArgs, []).
+
+% init differentiates between temp and design_doc views. It creates a closure
+% which spawns the appropriate view_updater. (It might also spawn the first
+% view_updater run.)
+init(InitArgs) ->
+    SpawnFun = fun() -> spawn_updater(InitArgs) end,
+    process_flag(trap_exit, true),
+    {ok, #group_state{spawn_fun=SpawnFun}}.
+
+% There are two sources of messages: couch_view, which requests an up to date
+% view group, and the couch_view_updater, which when spawned, updates the
+% group and sends it back here. We employ a caching mechanism, so that between
+% database writes, we don't have to spawn a couch_view_updater with every view
+% request. This should give us more control, and the ability to request view
+% statuses eventually.
+
+% The caching mechanism: each request is submitted with a seq_id for the
+% database at the time it was read. We guarantee to return a view from that
+% sequence or newer.
+
+% If the request sequence is higher than our current high_target seq, we set
+% that as the highest seqence. If the updater is not running, we launch it.
+
+handle_call({request_group, RequestSeq}, From, 
+        #group_state{
+            target_seq=TargetSeq, 
+            spawn_fun=SpawnFun,
+            updater_pid=Up,
+            waiting_list=WaitList
+            }=State) when RequestSeq > TargetSeq, Up == nil  ->    
+    UpdaterPid = SpawnFun(),
+    {noreply, State#group_state{
+        target_seq=RequestSeq, 
+        updater_pid=UpdaterPid,
+        waiting_list=[{From,RequestSeq}|WaitList]
+    }, infinity};
+
+handle_call({request_group, RequestSeq}, From, 
+        #group_state{
+            target_seq=TargetSeq,
+            waiting_list=WaitList
+            }=State) when RequestSeq > TargetSeq  ->
+    {noreply, State#group_state{
+        target_seq=RequestSeq, 
+        waiting_list=[{From,RequestSeq}|WaitList]
+    }, infinity};
+        
+
+% If the request seqence is less than or equal to the seq_id of a known Group,
+% we respond with that Group.
+handle_call({request_group, RequestSeq}, _From, 
+        State=#group_state{
+            group_seq=GroupSeq,
+            group=Group 
+            }) when RequestSeq =< GroupSeq  ->
+    {reply, {ok, Group}, State};
+
+% Otherwise: TargetSeq => RequestSeq > GroupSeq
+% We've already initiated the appropriate action, so just hold the response until the group is up to the RequestSeq
+handle_call({request_group, RequestSeq}, From,
+    #group_state{
+        waiting_list=WaitList
+        }=State) ->
+    {noreply, State#group_state{
+        waiting_list=[{From, RequestSeq}|WaitList]
+    }, infinity}.
+
+
+% When the updater finishes, it will return a group with a seq_id, we should
+% store that group and seq_id in our state. If our high_target is higher than
+% the returned group, start a new updater.
+
+handle_cast({new_group, Group=#group{current_seq=NewGroupSeq}}, 
+        State=#group_state{
+            target_seq=TargetSeq, 
+            waiting_list=WaitList,
+            spawn_fun=SpawnFun}) when TargetSeq > NewGroupSeq ->
+    StillWaiting = reply_with_group(Group, WaitList, []),
+    UpdaterPid = SpawnFun(),
+    {noreply, State#group_state{ 
+        updater_pid=UpdaterPid,
+        waiting_list=StillWaiting,
+        group_seq=NewGroupSeq,
+        group=Group}};
+        
+handle_cast({new_group, Group=#group{current_seq=NewGroupSeq}},
+        State=#group_state{waiting_list=WaitList}) ->
+    StillWaiting = reply_with_group(Group, WaitList, []),
+    {noreply, State#group_state{
+        updater_pid=nil,
+        waiting_list=StillWaiting,
+        group_seq=NewGroupSeq,
+        group=Group}}.
+   
+handle_info({'EXIT', _FromPid, normal}, State) ->
+    {noreply, State};
+    
+handle_info({'EXIT', FromPid, Reason}, State) ->
+    ?LOG_DEBUG("Exit from updater: ~p", [{FromPid, Reason}]),
+    {stop, Reason, State};
+    
+handle_info(_Info, State) ->
+    {noreply, State}.
+
+terminate(Reason, _State=#group_state{waiting_list=WaitList}) ->
+    lists:foreach(fun({Waiter, _}) -> gen_server:reply(Waiter, {error, Reason}) end, WaitList),    
+    ok.
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+% error handling? the updater could die on us, we can save ourselves here.
+% but we shouldn't, we could be dead for a reason, like the view got changed, or something.
+
+
+%% Local Functions
+
+% reply_with_group/3
+% for each item in the WaitingList {Pid, Seq}
+% if the Seq is =< GroupSeq, reply
+reply_with_group(Group=#group{current_seq=GroupSeq}, [{Pid, Seq}|WaitList], StillWaiting) when Seq =< GroupSeq ->
+    gen_server:reply(Pid, {ok, Group}),
+    reply_with_group(Group, WaitList, StillWaiting);
+
+% else
+% put it in the continuing waiting list    
+reply_with_group(Group, [{Pid, Seq}|WaitList], StillWaiting) ->
+    reply_with_group(Group, WaitList, [{Pid, Seq}|StillWaiting]);
+
+% return the still waiting list
+reply_with_group(_Group, [], StillWaiting) ->
+    StillWaiting.
+
+spawn_updater({RootDir, DbName, GroupId}) -> 
+    spawn_link(couch_view_updater, update,
+        [RootDir, DbName, GroupId, self()]);
+
+spawn_updater({DbName, Fd, Lang, MapSrc, RedSrc}) ->
+    spawn_link(couch_view_updater, temp_update,
+        [DbName, Fd, Lang, MapSrc, RedSrc, self()]).
+    
+

Added: incubator/couchdb/trunk/src/couchdb/couch_view_updater.erl
URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/src/couchdb/couch_view_updater.erl?rev=724946&view=auto
==============================================================================
--- incubator/couchdb/trunk/src/couchdb/couch_view_updater.erl (added)
+++ incubator/couchdb/trunk/src/couchdb/couch_view_updater.erl Tue Dec  9 17:13:17 2008
@@ -0,0 +1,384 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License.  You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_view_updater).
+
+-export([update/4, temp_update/6]).
+
+-include("couch_db.hrl").
+
+
+
+update(RootDir, DbName, GroupId, NotifyPid) ->
+    {ok, #group{sig=Sig,fd=Fd}=Group} = prepare_group(RootDir, DbName, GroupId),
+    {ok, Db} = couch_db:open(DbName, []),
+    Result = update_group(Group#group{db=Db}),
+    ?LOG_DEBUG("update {Result} DONE ~p", [{Result}]),    
+    couch_db:close(Db),
+    case Result of
+    {same, Group2} ->
+        gen_server:cast(NotifyPid, {new_group, Group2});
+    {updated, Group2} ->
+        HeaderData = {Sig, get_index_header_data(Group2)},
+        ok = couch_file:write_header(Fd, <<$r, $c, $k, 0>>, HeaderData),
+        gen_server:cast(NotifyPid, {new_group, Group2})
+    end,
+    garbage_collect().
+    
+temp_update(DbName, Fd, Lang, MapSrc, RedSrc, NotifyPid) ->
+    case couch_db:open(DbName, []) of
+    {ok, Db} ->
+        View = #view{map_names=["_temp"],
+            id_num=0,
+            btree=nil,
+            def=MapSrc,
+            reduce_funs= if RedSrc==[] -> []; true -> [{"_temp", RedSrc}] end},
+        Group = #group{name="_temp",
+            db=Db,
+            views=[View],
+            current_seq=0,
+            def_lang=Lang,
+            id_btree=nil},
+        Group2 = init_group(Db, Fd, Group,nil),
+        couch_db:monitor(Db),
+        {_Updated, Group3} = update_group(Group2#group{db=Db}),
+        couch_db:close(Db),
+        gen_server:cast(NotifyPid, {new_group, Group3}),
+        garbage_collect();
+    Else ->
+        exit(Else)
+    end.
+
+
+update_group(#group{db=Db,current_seq=CurrentSeq}=Group) ->
+    ViewEmptyKVs = [{View, []} || View <- Group#group.views],
+    % compute on all docs modified since we last computed.
+    {ok, {UncomputedDocs, Group3, ViewKVsToAdd, DocIdViewIdKeys}}
+        = couch_db:enum_docs_since(
+            Db,
+            CurrentSeq,
+            fun(DocInfo, _, Acc) -> process_doc(Db, DocInfo, Acc) end,
+            {[], Group, ViewEmptyKVs, []}
+            ),
+    {Group4, Results} = view_compute(Group3, UncomputedDocs),
+    {ViewKVsToAdd2, DocIdViewIdKeys2} = view_insert_query_results(UncomputedDocs, Results, ViewKVsToAdd, DocIdViewIdKeys),
+    couch_query_servers:stop_doc_map(Group4#group.query_server),
+    NewSeq = couch_db:get_update_seq(Db),
+    if CurrentSeq /= NewSeq ->
+        {ok, Group5} = write_changes(Group4, ViewKVsToAdd2, DocIdViewIdKeys2, NewSeq),
+        {updated, Group5#group{query_server=nil}};
+    true ->
+        {same, Group4#group{query_server=nil}}
+    end.
+
+
+get_index_header_data(#group{current_seq=Seq, purge_seq=PurgeSeq, 
+            id_btree=IdBtree,views=Views}) ->
+    ViewStates = [couch_btree:get_state(Btree) || #view{btree=Btree} <- Views],
+    #index_header{seq=Seq,
+            purge_seq=PurgeSeq,
+            id_btree_state=couch_btree:get_state(IdBtree),
+            view_states=ViewStates}.
+
+
+purge_index(#group{db=Db, views=Views, id_btree=IdBtree}=Group) ->
+    {ok, PurgedIdsRevs} = couch_db:get_last_purged(Db),
+    Ids = [Id || {Id, _Revs} <- PurgedIdsRevs],
+    {ok, Lookups, IdBtree2} = couch_btree:query_modify(IdBtree, Ids, [], Ids),
+
+    % now populate the dictionary with all the keys to delete
+    ViewKeysToRemoveDict = lists:foldl(
+        fun({ok,{DocId,ViewNumRowKeys}}, ViewDictAcc) ->
+            lists:foldl(
+                fun({ViewNum, RowKey}, ViewDictAcc2) ->
+                    dict:append(ViewNum, {RowKey, DocId}, ViewDictAcc2)
+                end, ViewDictAcc, ViewNumRowKeys);
+        ({not_found, _}, ViewDictAcc) ->
+            ViewDictAcc
+        end, dict:new(), Lookups),
+
+    % Now remove the values from the btrees
+    Views2 = lists:map(
+        fun(#view{id_num=Num,btree=Btree}=View) ->
+            case dict:find(Num, ViewKeysToRemoveDict) of
+            {ok, RemoveKeys} ->
+                {ok, Btree2} = couch_btree:add_remove(Btree, [], RemoveKeys),
+                View#view{btree=Btree2};
+            error -> % no keys to remove in this view
+                View
+            end
+        end, Views),
+    Group#group{id_btree=IdBtree2,
+            views=Views2,
+            purge_seq=couch_db:get_purge_seq(Db)}.
+
+process_doc(Db, DocInfo, {Docs, #group{sig=Sig,name=GroupId}=Group, ViewKVs, DocIdViewIdKeys}) ->
+    % This fun computes once for each document        
+    #doc_info{id=DocId, deleted=Deleted} = DocInfo,
+    case DocId of
+    GroupId ->
+        % uh oh. this is the design doc with our definitions. See if
+        % anything in the definition changed.
+        case couch_db:open_doc(Db, DocInfo) of
+        {ok, Doc} ->
+            case design_doc_to_view_group(Doc) of
+            #group{sig=Sig} ->
+                % The same md5 signature, keep on computing
+                {ok, {Docs, Group, ViewKVs, DocIdViewIdKeys}};
+            _ ->
+                ?LOG_DEBUG("throw(restart) md5 broke ~p", [DocId]),
+                throw(restart)
+            end;
+        {not_found, deleted} ->
+            ?LOG_DEBUG("throw(restart) {not_found, deleted} ~p", [DocId]),
+            throw(restart)
+        end;
+    <<?DESIGN_DOC_PREFIX, _/binary>> -> % we skip design docs
+        {ok, {Docs, Group, ViewKVs, DocIdViewIdKeys}};
+    _ ->
+        {Docs2, DocIdViewIdKeys2} =
+        if Deleted ->
+            {Docs, [{DocId, []} | DocIdViewIdKeys]};
+        true ->
+            {ok, Doc} = couch_db:open_doc(Db, DocInfo, [conflicts, deleted_conflicts]),
+            {[Doc | Docs], DocIdViewIdKeys}
+        end,
+        
+        case couch_util:should_flush() of
+        true ->
+            {Group1, Results} = view_compute(Group, Docs2),
+            {ViewKVs3, DocIdViewIdKeys3} = view_insert_query_results(Docs2, Results, ViewKVs, DocIdViewIdKeys2),
+            {ok, Group2} = write_changes(Group1, ViewKVs3, DocIdViewIdKeys3,
+                    DocInfo#doc_info.update_seq),
+            garbage_collect(),
+            ViewEmptyKeyValues = [{View, []} || View <- Group2#group.views],
+            {ok, {[], Group2, ViewEmptyKeyValues, []}};
+        false ->
+            {ok, {Docs2, Group, ViewKVs, DocIdViewIdKeys2}}
+        end
+    end.
+
+view_insert_query_results([], [], ViewKVs, DocIdViewIdKeysAcc) ->
+    {ViewKVs, DocIdViewIdKeysAcc};
+view_insert_query_results([Doc|RestDocs], [QueryResults | RestResults], ViewKVs, DocIdViewIdKeysAcc) ->
+    {NewViewKVs, NewViewIdKeys} = view_insert_doc_query_results(Doc, QueryResults, ViewKVs, [], []),
+    NewDocIdViewIdKeys = [{Doc#doc.id, NewViewIdKeys} | DocIdViewIdKeysAcc],
+    view_insert_query_results(RestDocs, RestResults, NewViewKVs, NewDocIdViewIdKeys).
+
+
+view_insert_doc_query_results(_Doc, [], [], ViewKVsAcc, ViewIdKeysAcc) ->
+    {lists:reverse(ViewKVsAcc), lists:reverse(ViewIdKeysAcc)};
+view_insert_doc_query_results(#doc{id=DocId}=Doc, [ResultKVs|RestResults], [{View, KVs}|RestViewKVs], ViewKVsAcc, ViewIdKeysAcc) ->
+    % Take any identical keys and combine the values
+    ResultKVs2 = lists:foldl(
+        fun({Key,Value}, [{PrevKey,PrevVal}|AccRest]) ->
+            case Key == PrevKey of
+            true ->
+                case PrevVal of
+                {dups, Dups} ->
+                    [{PrevKey, {dups, [Value|Dups]}} | AccRest];
+                _ ->
+                    [{PrevKey, {dups, [Value,PrevVal]}} | AccRest]
+                end;
+            false ->
+                [{Key,Value},{PrevKey,PrevVal}|AccRest]
+            end;
+        (KV, []) ->
+           [KV] 
+        end, [], lists:sort(ResultKVs)),
+    NewKVs = [{{Key, DocId}, Value} || {Key, Value} <- ResultKVs2],
+    NewViewKVsAcc = [{View, NewKVs ++ KVs} | ViewKVsAcc],
+    NewViewIdKeys = [{View#view.id_num, Key} || {Key, _Value} <- ResultKVs2],
+    NewViewIdKeysAcc = NewViewIdKeys ++ ViewIdKeysAcc,
+    view_insert_doc_query_results(Doc, RestResults, RestViewKVs, NewViewKVsAcc, NewViewIdKeysAcc).
+
+view_compute(Group, []) ->
+    {Group, []};
+view_compute(#group{def_lang=DefLang, query_server=QueryServerIn}=Group, Docs) ->
+    {ok, QueryServer} =
+    case QueryServerIn of
+    nil -> % doc map not started
+        Definitions = [View#view.def || View <- Group#group.views],
+        couch_query_servers:start_doc_map(DefLang, Definitions);
+    _ ->
+        {ok, QueryServerIn}
+    end,
+    {ok, Results} = couch_query_servers:map_docs(QueryServer, Docs),
+    {Group#group{query_server=QueryServer}, Results}.
+
+
+
+write_changes(Group, ViewKeyValuesToAdd, DocIdViewIdKeys, NewSeq) ->
+    #group{id_btree=IdBtree} = Group,
+
+    AddDocIdViewIdKeys = [{DocId, ViewIdKeys} || {DocId, ViewIdKeys} <- DocIdViewIdKeys, ViewIdKeys /= []],
+    RemoveDocIds = [DocId || {DocId, ViewIdKeys} <- DocIdViewIdKeys, ViewIdKeys == []],
+    LookupDocIds = [DocId || {DocId, _ViewIdKeys} <- DocIdViewIdKeys],
+    {ok, LookupResults, IdBtree2}
+        = couch_btree:query_modify(IdBtree, LookupDocIds, AddDocIdViewIdKeys, RemoveDocIds),
+    KeysToRemoveByView = lists:foldl(
+        fun(LookupResult, KeysToRemoveByViewAcc) ->
+            case LookupResult of
+            {ok, {DocId, ViewIdKeys}} ->
+                lists:foldl(
+                    fun({ViewId, Key}, KeysToRemoveByViewAcc2) ->
+                        dict:append(ViewId, {Key, DocId}, KeysToRemoveByViewAcc2)
+                    end,
+                    KeysToRemoveByViewAcc, ViewIdKeys);
+            {not_found, _} ->
+                KeysToRemoveByViewAcc
+            end
+        end,
+        dict:new(), LookupResults),
+
+    Views2 = [
+        begin
+            KeysToRemove = couch_util:dict_find(View#view.id_num, KeysToRemoveByView, []),
+            {ok, ViewBtree2} = couch_btree:add_remove(View#view.btree, AddKeyValues, KeysToRemove),
+            View#view{btree = ViewBtree2}
+        end
+    ||
+        {View, AddKeyValues} <- ViewKeyValuesToAdd
+    ],
+    Group2 = Group#group{views=Views2, current_seq=NewSeq, id_btree=IdBtree2},
+    {ok, Group2}.
+    
+prepare_group(RootDir, DbName, GroupId) ->
+    {Db, Group} = case (catch couch_db:open(DbName, [])) of
+    {ok, Db0} ->
+        case (catch couch_db:open_doc(Db0, GroupId)) of
+        {ok, Doc} ->
+            {Db0, design_doc_to_view_group(Doc)};
+        Else ->
+            delete_index_file(RootDir, DbName, GroupId),
+            ?LOG_DEBUG("prepare_group exit Else ~p self() ~p", [Else, self()]),    
+            exit(Else)
+        end;
+    Else ->
+        delete_index_file(RootDir, DbName, GroupId),
+        exit(Else)
+    end,
+    FileName = RootDir ++ "/." ++ binary_to_list(DbName) ++ 
+            binary_to_list(GroupId) ++".view",
+    Group2 =
+    case couch_file:open(FileName) of
+    {ok, Fd} ->
+        Sig = Group#group.sig,
+        case (catch couch_file:read_header(Fd, <<$r, $c, $k, 0>>)) of
+        {ok, {Sig, #index_header{purge_seq=PurgeSeq}=HeaderInfo}} ->
+            % sigs match!
+            DbPurgeSeq = couch_db:get_purge_seq(Db),
+            % We can only use index with the same, or next purge seq as the db.
+            if DbPurgeSeq == PurgeSeq ->
+                init_group(Db, Fd, Group, HeaderInfo);
+            DbPurgeSeq == PurgeSeq + 1 ->
+                ?LOG_DEBUG("Purging entries from view index.",[]),
+                purge_index(init_group(Db, Fd, Group, HeaderInfo));
+            true ->
+                ?LOG_DEBUG("Reseting view index due to lost purge entries.",[]),
+                reset_file(Db, Fd, DbName, Group)
+            end;
+        _ ->
+            reset_file(Db, Fd, DbName, Group)
+        end;
+    {error, enoent} ->
+        case couch_file:open(FileName, [create]) of
+        {ok, Fd} -> reset_file(Db, Fd, DbName, Group);
+        Error    -> throw(Error)
+        end
+    end,
+
+    couch_db:monitor(Db),
+    couch_db:close(Db),
+    {ok, Group2}.
+
+% maybe move to another module
+design_doc_to_view_group(#doc{id=Id,body={Fields}}) ->
+    Language = proplists:get_value(<<"language">>, Fields, <<"javascript">>),
+    {RawViews} = proplists:get_value(<<"views">>, Fields, {[]}),
+
+    % add the views to a dictionary object, with the map source as the key
+    DictBySrc =
+    lists:foldl(
+        fun({Name, {MRFuns}}, DictBySrcAcc) ->
+            MapSrc = proplists:get_value(<<"map">>, MRFuns),
+            RedSrc = proplists:get_value(<<"reduce">>, MRFuns, null),
+            View =
+            case dict:find(MapSrc, DictBySrcAcc) of
+                {ok, View0} -> View0;
+                error -> #view{def=MapSrc} % create new view object
+            end,
+            View2 =
+            if RedSrc == null ->
+                View#view{map_names=[Name|View#view.map_names]};
+            true ->
+                View#view{reduce_funs=[{Name,RedSrc}|View#view.reduce_funs]}
+            end,
+            dict:store(MapSrc, View2, DictBySrcAcc)
+        end, dict:new(), RawViews),
+    % number the views
+    {Views, _N} = lists:mapfoldl(
+        fun({_Src, View}, N) ->
+            {View#view{id_num=N},N+1}
+        end, 0, dict:to_list(DictBySrc)),
+
+    Group = #group{name=Id, views=Views, def_lang=Language},
+    Group#group{sig=erlang:md5(term_to_binary(Group))}.
+
+reset_group(#group{views=Views}=Group) ->
+    Views2 = [View#view{btree=nil} || View <- Views],
+    Group#group{db=nil,fd=nil,query_server=nil,current_seq=0,
+            id_btree=nil,views=Views2}.
+
+reset_file(Db, Fd, DbName, #group{sig=Sig,name=Name} = Group) ->
+    ?LOG_DEBUG("Reseting group index \"~s\" in db ~s", [Name, DbName]),
+    ok = couch_file:truncate(Fd, 0),
+    ok = couch_file:write_header(Fd, <<$r, $c, $k, 0>>, {Sig, nil}),
+    init_group(Db, Fd, reset_group(Group), nil).
+
+delete_index_file(RootDir, DbName, GroupId) ->
+    file:delete(RootDir ++ "/." ++ binary_to_list(DbName)
+            ++ binary_to_list(GroupId) ++ ".view").
+
+init_group(Db, Fd, #group{views=Views}=Group, nil) ->
+    init_group(Db, Fd, Group,
+        #index_header{seq=0, purge_seq=couch_db:get_purge_seq(Db),
+            id_btree_state=nil, view_states=[nil || _ <- Views]});
+init_group(Db, Fd, #group{def_lang=Lang,views=Views}=Group, IndexHeader) ->
+     #index_header{seq=Seq, purge_seq=PurgeSeq,
+            id_btree_state=IdBtreeState, view_states=ViewStates} = IndexHeader,
+    {ok, IdBtree} = couch_btree:open(IdBtreeState, Fd),
+    Views2 = lists:zipwith(
+        fun(BtreeState, #view{reduce_funs=RedFuns}=View) ->
+            FunSrcs = [FunSrc || {_Name, FunSrc} <- RedFuns],
+            ReduceFun = 
+                fun(reduce, KVs) ->
+                    KVs2 = couch_view:expand_dups(KVs,[]),
+                    KVs3 = couch_view:detuple_kvs(KVs2,[]),
+                    {ok, Reduced} = couch_query_servers:reduce(Lang, FunSrcs, 
+                        KVs3),
+                    {length(KVs3), Reduced};
+                (rereduce, Reds) ->
+                    Count = lists:sum([Count0 || {Count0, _} <- Reds]),
+                    UserReds = [UserRedsList || {_, UserRedsList} <- Reds],
+                    {ok, Reduced} = couch_query_servers:rereduce(Lang, FunSrcs,
+                        UserReds),
+                    {Count, Reduced}
+                end,
+            {ok, Btree} = couch_btree:open(BtreeState, Fd,
+                        [{less, fun couch_view:less_json_keys/2},
+                            {reduce, ReduceFun}]),
+            View#view{btree=Btree}
+        end,
+        ViewStates, Views),
+    Group#group{db=Db, fd=Fd, current_seq=Seq, purge_seq=PurgeSeq,
+        id_btree=IdBtree, views=Views2}.
\ No newline at end of file



Mime
View raw message