couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dam...@apache.org
Subject svn commit: r725909 - in /incubator/couchdb/trunk: etc/couchdb/local_dev.ini src/couchdb/couch_db.erl src/couchdb/couch_db.hrl src/couchdb/couch_file.erl src/couchdb/couch_view.erl src/couchdb/couch_view_group.erl src/couchdb/couch_view_updater.erl
Date Fri, 12 Dec 2008 05:23:37 GMT
Author: damien
Date: Thu Dec 11 21:23:37 2008
New Revision: 725909

URL: http://svn.apache.org/viewvc?rev=725909&view=rev
Log:
modifications to view server to keep the file descriptor open for the life of the view group.

Modified:
    incubator/couchdb/trunk/etc/couchdb/local_dev.ini
    incubator/couchdb/trunk/src/couchdb/couch_db.erl
    incubator/couchdb/trunk/src/couchdb/couch_db.hrl
    incubator/couchdb/trunk/src/couchdb/couch_file.erl
    incubator/couchdb/trunk/src/couchdb/couch_view.erl
    incubator/couchdb/trunk/src/couchdb/couch_view_group.erl
    incubator/couchdb/trunk/src/couchdb/couch_view_updater.erl

Modified: incubator/couchdb/trunk/etc/couchdb/local_dev.ini
URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/etc/couchdb/local_dev.ini?rev=725909&r1=725908&r2=725909&view=diff
==============================================================================
--- incubator/couchdb/trunk/etc/couchdb/local_dev.ini (original)
+++ incubator/couchdb/trunk/etc/couchdb/local_dev.ini Thu Dec 11 21:23:37 2008
@@ -12,7 +12,7 @@
 ;bind_address = 127.0.0.1
 
 [log]
-level = debug
+level = error
 
 [update_notification]
 ;unique notifier name=/full/path/to/exe -with "cmd line arg"
@@ -48,3 +48,9 @@
 
 [test]
 foo = bar
+
+[test]
+foo = bar
+
+[test]
+foo = bar

Modified: incubator/couchdb/trunk/src/couchdb/couch_db.erl
URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/src/couchdb/couch_db.erl?rev=725909&r1=725908&r2=725909&view=diff
==============================================================================
--- incubator/couchdb/trunk/src/couchdb/couch_db.erl (original)
+++ incubator/couchdb/trunk/src/couchdb/couch_db.erl Thu Dec 11 21:23:37 2008
@@ -553,7 +553,7 @@
     {ok, FullDocInfo} ->
         open_doc_int(Db, FullDocInfo, Options);
     not_found ->
-        throw({not_found, missing})
+        {not_found, missing}
     end.
 
 doc_meta_info(DocInfo, RevTree, Options) ->

Modified: incubator/couchdb/trunk/src/couchdb/couch_db.hrl
URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/src/couchdb/couch_db.hrl?rev=725909&r1=725908&r2=725909&view=diff
==============================================================================
--- incubator/couchdb/trunk/src/couchdb/couch_db.hrl (original)
+++ incubator/couchdb/trunk/src/couchdb/couch_db.hrl Thu Dec 11 21:23:37 2008
@@ -35,7 +35,7 @@
     end).
 
 -define(LOG_ERROR(Format, Args),
-    error_logger:info_report(couch_error, {Format, Args})).
+    error_logger:error_report(couch_error, {Format, Args})).
 
 -record(doc_info,
     {
@@ -162,7 +162,8 @@
     id_btree=nil,
     current_seq=0,
     purge_seq=0,
-    query_server=nil
+    query_server=nil,
+    commit_fun
     }).
 
 -record(view,

Modified: incubator/couchdb/trunk/src/couchdb/couch_file.erl
URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/src/couchdb/couch_file.erl?rev=725909&r1=725908&r2=725909&view=diff
==============================================================================
--- incubator/couchdb/trunk/src/couchdb/couch_file.erl (original)
+++ incubator/couchdb/trunk/src/couchdb/couch_file.erl Thu Dec 11 21:23:37 2008
@@ -33,23 +33,14 @@
     open(Filepath, []).
     
 open(Filepath, Options) ->
-    case gen_server:start_link(couch_file, {Filepath, Options, self()}, []) of
-    {ok, FdPid} ->
-        % we got back an ok, but that doesn't really mean it was successful.
-        % Instead the true status has been sent back to us as a message.
-        % We do this because if the gen_server doesn't initialize properly,
-        % it generates a crash report that will get logged. This avoids
-        % that mess, because we don't want crash reports generated
-        % every time a file cannot be found.
+    case gen_server:start_link(couch_file,
+            {Filepath, Options, self(), Ref = make_ref()}, []) of
+    {ok, Fd} ->
+        {ok, Fd};
+    ignore ->
+        % get the error
         receive
-        {FdPid, ok} ->
-            {ok, FdPid};
-        {FdPid, Error} ->
-            case process_info(self(), trap_exit) of
-            {trap_exit, true} ->
-                receive {'EXIT', FdPid, _} -> ok end;
-            _ -> ok
-            end,
+        {Ref, Error} ->
             Error
         end;
     Error ->
@@ -235,12 +226,12 @@
                 ?LOG_INFO("Header version differences.~nPrimary Header: ~p~nSecondary Header:
~p", [Header1, Header2]),
                 {ok, Header1}
             end;
-        {error, Error} ->
+        Error ->
             % error reading second header. It's ok, but log it.
             ?LOG_INFO("Secondary header corruption (error: ~p). Using primary header.", [Error]),
             {ok, Header1}
         end;
-    {error, Error} ->
+    Error ->
         % error reading primary header
         case extract_header(Prefix, Bin2) of
         {ok, Header2} ->
@@ -250,7 +241,7 @@
         _ ->
             % error reading secondary header too
             % return the error, no need to log anything as the caller will be responsible
for dealing with the error.
-            {error, Error}
+            Error
         end
     end,
     case Result of
@@ -277,26 +268,20 @@
             Header = binary_to_term(TermBin),
             {ok, Header};
         false ->
-            {error, header_corrupt}
+            header_corrupt
         end;
     _ ->
-        {error, unknown_header_type}
+        unknown_header_type
     end.
 
 
-
-init_status_ok(ReturnPid, Fd) ->
-    ReturnPid ! {self(), ok}, % signal back ok
-    {ok, Fd}.
-
-init_status_error(ReturnPid, Error) ->
-    ReturnPid ! {self(), Error}, % signal back error status
-    gen_server:cast(self(), close), % tell ourself to close async
-    {ok, nil}.
+init_status_error(ReturnPid, Ref, Error) ->
+    ReturnPid ! {Ref, Error},
+    ignore.
 
 % server functions
 
-init({Filepath, Options, ReturnPid}) ->
+init({Filepath, Options, ReturnPid, Ref}) ->
     case lists:member(create, Options) of
     true ->
         filelib:ensure_dir(Filepath),
@@ -312,16 +297,16 @@
                 true ->
                     {ok, 0} = file:position(Fd, 0),
                     ok = file:truncate(Fd),
-                    init_status_ok(ReturnPid, Fd);
+                    {ok, Fd};
                 false ->
                     ok = file:close(Fd),
-                    init_status_error(ReturnPid, file_exists)
+                    init_status_error(ReturnPid, Ref, file_exists)
                 end;
             false ->
-                init_status_ok(ReturnPid, Fd)
+                {ok, Fd}
             end;
         Error ->
-            init_status_error(ReturnPid, Error)
+            init_status_error(ReturnPid, Ref, Error)
         end;
     false ->
         % open in read mode first, so we don't create the file if it doesn't exist.
@@ -329,15 +314,13 @@
         {ok, Fd_Read} ->
             {ok, Fd} = file:open(Filepath, [read, write, raw, binary]),
             ok = file:close(Fd_Read),
-            init_status_ok(ReturnPid, Fd);
+            {ok, Fd};
         Error ->
-            init_status_error(ReturnPid, Error)
+            init_status_error(ReturnPid, Ref, Error)
         end
     end.
 
 
-terminate(_Reason, nil) ->
-    ok;
 terminate(_Reason, Fd) ->
     file:close(Fd),
     ok.
@@ -403,9 +386,7 @@
 
 handle_info({'DOWN', MonitorRef, _Type, Pid, _Info}, Fd) ->
     {MonitorRef, _RefCount} = erase(Pid),
-    maybe_close_async(Fd);
-handle_info(Info, Fd) ->
-    exit({error, {Info, Fd}}).
+    maybe_close_async(Fd).
 
 
 

Modified: incubator/couchdb/trunk/src/couchdb/couch_view.erl
URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/src/couchdb/couch_view.erl?rev=725909&r1=725908&r2=725909&view=diff
==============================================================================
--- incubator/couchdb/trunk/src/couchdb/couch_view.erl (original)
+++ incubator/couchdb/trunk/src/couchdb/couch_view.erl Thu Dec 11 21:23:37 2008
@@ -27,8 +27,12 @@
     Pid.
 
 get_group_server(DbName, GroupId) ->
-    {ok, Pid} = gen_server:call(couch_view, {start_group_server, DbName, GroupId}),
-    Pid.
+    case gen_server:call(couch_view, {start_group_server, DbName, GroupId}) of
+    {ok, Pid} ->
+        Pid;
+    Error ->
+        throw(Error)
+    end.
     
 get_updated_group(DbName, GroupId, Update) ->
     couch_view_group:request_group(get_group_server(DbName, GroupId), seq_for_update(DbName,
Update)).
@@ -45,10 +49,10 @@
     {ok, {temp_reduce, View}};
 get_reduce_view({DbName, GroupId, Name}) ->
     case get_updated_group(DbName, GroupId, true) of
-    {error, Reason} ->
-        Reason;
     {ok, #group{views=Views,def_lang=Lang}} ->
-        get_reduce_view0(Name, Lang, Views)
+        get_reduce_view0(Name, Lang, Views);
+    Error ->
+        Error
     end.
 
 get_reduce_view0(_Name, _Lang, []) ->
@@ -124,10 +128,10 @@
     {ok, View};
 get_map_view({DbName, GroupId, Name, Update}) ->
     case get_updated_group(DbName, GroupId, Update) of
-    {error, Reason} ->
-        Reason;
     {ok, #group{views=Views}} ->
-        get_map_view0(Name, Views)
+        get_map_view0(Name, Views);
+    Error ->
+        Error
     end.
 
 get_map_view0(_Name, []) ->
@@ -217,7 +221,7 @@
             ok
         end,
         ?LOG_DEBUG("Spawning new temp update process for db ~s.", [DbName]),
-        {ok, NewPid} = couch_view_group:start_link({DbName, Fd, Lang, MapSrc, RedSrc}),
+        {ok, NewPid} = couch_view_group:start_link({temp_view, DbName, Fd, Lang, MapSrc,
RedSrc}),
         true = ets:insert(couch_temp_group_fd_by_db, {DbName, Fd, Count + 1}),
         add_to_ets(NewPid, DbName, Name),
         NewPid;
@@ -226,17 +230,19 @@
     end,
     {reply, {ok, Pid}, Server};
 handle_call({start_group_server, DbName, GroupId}, _From, #server{root_dir=Root}=Server)
->
-    Pid = 
     case ets:lookup(group_servers_by_name, {DbName, GroupId}) of
     [] ->
         ?LOG_DEBUG("Spawning new group server for view group ~s in database ~s.", [GroupId,
DbName]),
-        {ok, NewPid} = couch_view_group:start_link({Root, DbName, GroupId}),
-        add_to_ets(NewPid, DbName, GroupId),
-        NewPid;
-    [{_, ExistingPid0}] ->
-        ExistingPid0
-    end,
-    {reply, {ok, Pid}, Server}.
+        case couch_view_group:start_link({view, Root, DbName, GroupId}) of
+        {ok, NewPid} ->
+            add_to_ets(NewPid, DbName, GroupId),
+            {reply, {ok, NewPid}, Server};
+        Error ->
+            {reply, Error, Server}
+        end;
+    [{_, ExistingPid}] ->
+        {reply, {ok, ExistingPid}, Server}
+    end.
 
 handle_cast({reset_indexes, DbName}, #server{root_dir=Root}=Server) ->
     % shutdown all the updaters
@@ -254,14 +260,15 @@
     file:delete(Root ++ "/." ++ binary_to_list(DbName) ++ "_temp"),
     {noreply, Server}.
 
-handle_info({'EXIT', _FromPid, normal}, Server) ->
-   {noreply, Server};
 handle_info({'EXIT', FromPid, Reason}, #server{root_dir=RootDir}=Server) ->
-    ?LOG_DEBUG("Exit from process: ~p", [{FromPid, Reason}]),
     case ets:lookup(couch_groups_by_updater, FromPid) of
-    [] -> % non-updater linked process must have died, we propagate the error
-        ?LOG_ERROR("Exit on non-updater process: ~p", [Reason]),
-        exit(Reason);
+    [] ->
+        if Reason /= normal ->
+            % non-updater linked process died, we propagate the error
+            ?LOG_ERROR("Exit on non-updater process: ~p", [Reason]),
+            exit(Reason);
+        true -> ok
+        end;
     [{_, {DbName, "_temp_" ++ _ = GroupId}}] ->
         delete_from_ets(FromPid, DbName, GroupId),
         [{_, Fd, Count}] = ets:lookup(couch_temp_group_fd_by_db, DbName),
@@ -276,10 +283,7 @@
     [{_, {DbName, GroupId}}] ->
         delete_from_ets(FromPid, DbName, GroupId)
     end,
-    {noreply, Server};
-handle_info(Msg, _Server) ->
-    ?LOG_ERROR("Bad message received for view module: ~p", [Msg]),
-    exit({error, Msg}).
+    {noreply, Server}.
     
 add_to_ets(Pid, DbName, GroupId) ->
     true = ets:insert(couch_groups_by_updater, {Pid, {DbName, GroupId}}),

Modified: incubator/couchdb/trunk/src/couchdb/couch_view_group.erl
URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/src/couchdb/couch_view_group.erl?rev=725909&r1=725908&r2=725909&view=diff
==============================================================================
--- incubator/couchdb/trunk/src/couchdb/couch_view_group.erl (original)
+++ incubator/couchdb/trunk/src/couchdb/couch_view_group.erl Thu Dec 11 21:23:37 2008
@@ -15,7 +15,7 @@
 
 %% API
 -export([start_link/1, request_group/2]).
-% -export([design_doc_to_view_group/1]).
+-export([design_doc_to_view_group/1]).
 
 %% gen_server callbacks
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
@@ -24,10 +24,9 @@
 -include("couch_db.hrl").
 	 
 -record(group_state, {
-    spawn_fun,
-    target_seq=0,
-    group_seq=0,
-    group=nil,
+    db_name,
+    init_args,
+    group,
     updater_pid=nil,
     waiting_list=[]
 }).
@@ -37,7 +36,6 @@
     ?LOG_DEBUG("request_group {Pid, Seq} ~p", [{Pid, Seq}]),
     case gen_server:call(Pid, {request_group, Seq}, infinity) of
     {ok, Group} ->
-        ?LOG_DEBUG("get_updated_group replied with group", []),
         {ok, Group};
     Else ->
         ?LOG_DEBUG("get_updated_group replied with _Else ~p", [Else]),
@@ -47,15 +45,34 @@
 
 % from template
 start_link(InitArgs) ->
-    gen_server:start_link(couch_view_group, InitArgs, []).
+    case gen_server:start_link(couch_view_group,
+            {InitArgs, self(), Ref = make_ref()}, []) of
+    {ok, Pid}   -> {ok, Pid};
+    ignore      -> receive {Ref, Error} -> Error end;
+    Error       -> Error
+    end.
 
 % init differentiates between temp and design_doc views. It creates a closure
 % which spawns the appropriate view_updater. (It might also spawn the first
 % view_updater run.)
-init(InitArgs) ->
-    SpawnFun = fun() -> spawn_updater(InitArgs) end,
+init({InitArgs, ReturnPid, Ref}) ->
     process_flag(trap_exit, true),
-    {ok, #group_state{spawn_fun=SpawnFun}}.
+    case prepare_group(InitArgs, false) of
+    {ok, #group{db=Db}=Group} ->
+        couch_db:monitor(Db),
+        Pid = spawn_link(fun()-> couch_view_updater:update(Group) end),
+        {ok, #group_state{
+                db_name=couch_db:name(Db),
+                init_args=InitArgs,
+                updater_pid = Pid,
+                group=Group}};
+    Error ->
+        ReturnPid ! {Ref, Error},
+        ignore
+    end.
+
+
+
 
 % There are two sources of messages: couch_view, which requests an up to date
 % view group, and the couch_view_updater, which when spawned, updates the
@@ -73,87 +90,92 @@
 
 handle_call({request_group, RequestSeq}, From, 
         #group_state{
-            target_seq=TargetSeq, 
-            spawn_fun=SpawnFun,
-            updater_pid=Up,
-            waiting_list=WaitList
-            }=State) when RequestSeq > TargetSeq, Up == nil  ->    
-    UpdaterPid = SpawnFun(),
-    {noreply, State#group_state{
-        target_seq=RequestSeq, 
-        updater_pid=UpdaterPid,
-        waiting_list=[{From,RequestSeq}|WaitList]
-    }, infinity};
-
-handle_call({request_group, RequestSeq}, From, 
-        #group_state{
-            target_seq=TargetSeq,
+            db_name=DbName,
+            group=#group{current_seq=Seq}=Group,
+            updater_pid=nil,
             waiting_list=WaitList
-            }=State) when RequestSeq > TargetSeq  ->
+            }=State) when RequestSeq > Seq ->
+    {ok, Db} = couch_db:open(DbName, []),
+    Group2 = Group#group{db=Db},
+    Pid = spawn_link(fun()-> couch_view_updater:update(Group2) end),
+    
     {noreply, State#group_state{
-        target_seq=RequestSeq, 
+        updater_pid=Pid,
+        group=Group2,
         waiting_list=[{From,RequestSeq}|WaitList]
-    }, infinity};
+        }, infinity};
         
 
 % If the request seqence is less than or equal to the seq_id of a known Group,
 % we respond with that Group.
 handle_call({request_group, RequestSeq}, _From, 
-        State=#group_state{
-            group_seq=GroupSeq,
-            group=Group 
-            }) when RequestSeq =< GroupSeq  ->
+        #group_state{group=#group{current_seq=GroupSeq}=Group}=State)
+        when RequestSeq =< GroupSeq  ->
     {reply, {ok, Group}, State};
 
 % Otherwise: TargetSeq => RequestSeq > GroupSeq
 % We've already initiated the appropriate action, so just hold the response until the group
is up to the RequestSeq
 handle_call({request_group, RequestSeq}, From,
-    #group_state{
-        waiting_list=WaitList
-        }=State) ->
+        #group_state{waiting_list=WaitList}=State) ->
     {noreply, State#group_state{
         waiting_list=[{From, RequestSeq}|WaitList]
-    }, infinity}.
+        }, infinity}.
 
 
-% When the updater finishes, it will return a group with a seq_id, we should
-% store that group and seq_id in our state. If our high_target is higher than
-% the returned group, start a new updater.
-
-handle_cast({new_group, Group=#group{current_seq=NewGroupSeq}}, 
-        State=#group_state{
-            target_seq=TargetSeq, 
-            waiting_list=WaitList,
-            spawn_fun=SpawnFun}) when TargetSeq > NewGroupSeq ->
-    StillWaiting = reply_with_group(Group, WaitList, []),
-    UpdaterPid = SpawnFun(),
-    {noreply, State#group_state{ 
-        updater_pid=UpdaterPid,
-        waiting_list=StillWaiting,
-        group_seq=NewGroupSeq,
-        group=Group}};
-        
-handle_cast({new_group, Group=#group{current_seq=NewGroupSeq}},
-        State=#group_state{waiting_list=WaitList}) ->
-    StillWaiting = reply_with_group(Group, WaitList, []),
-    {noreply, State#group_state{
-        updater_pid=nil,
-        waiting_list=StillWaiting,
-        group_seq=NewGroupSeq,
-        group=Group}}.
-   
+handle_cast(foo, State) ->
+    {ok, State}.
+
+
+handle_info({'EXIT', FromPid, {new_group, #group{db=Db}=Group}},
+        #group_state{db_name=DbName,
+            updater_pid=UpPid,
+            waiting_list=WaitList}=State) when UpPid == FromPid ->
+    ok = couch_db:close(Db),
+    case reply_with_group(Group, WaitList, []) of
+    [] ->
+        {noreply, State#group_state{waiting_list=[],
+                group=Group#group{db=nil},
+                updater_pid=nil}};
+    StillWaiting ->
+        % we still have some waiters, reopen the database and reupdate the index
+        {ok, Db2} = couch_db:open(DbName, []),
+        Group2 = Group#group{db=Db2},
+        Pid = spawn_link(fun() -> couch_view_updater:update(Group2) end),
+        {noreply, State#group_state{waiting_list=StillWaiting,
+                group=Group2,
+                updater_pid=Pid}}
+    end;
+    
+handle_info({'EXIT', FromPid, reset}, 
+        #group_state{
+            init_args=InitArgs,
+            updater_pid=UpPid,
+            group=Group}=State) when UpPid == FromPid ->
+    ok = couch_db:close(Group#group.db),
+    case prepare_group(InitArgs, true) of
+    {ok, ResetGroup} ->
+        Pid = spawn_link(fun()-> couch_view_updater:update(ResetGroup) end),
+        {noreply, State#group_state{
+                updater_pid=Pid,
+                group=ResetGroup}};
+    Error ->
+        {stop, normal, reply_all(State, Error)}
+    end;
+    
 handle_info({'EXIT', _FromPid, normal}, State) ->
     {noreply, State};
     
 handle_info({'EXIT', FromPid, Reason}, State) ->
-    ?LOG_DEBUG("Exit from updater: ~p", [{FromPid, Reason}]),
+    ?LOG_DEBUG("Exit from linked pid: ~p", [{FromPid, Reason}]),
     {stop, Reason, State};
     
-handle_info(_Info, State) ->
-    {noreply, State}.
+handle_info({'DOWN',_,_,_,_}, State) ->
+    ?LOG_INFO("Shutting down view group server, monitored db is closing.", []),
+    {stop, normal, reply_all(State, shutdown)}.
 
-terminate(Reason, _State=#group_state{waiting_list=WaitList}) ->
-    lists:foreach(fun({Waiter, _}) -> gen_server:reply(Waiter, {error, Reason}) end, WaitList),
   
+
+terminate(Reason, #group_state{group=#group{fd=Fd}}=State) ->
+    reply_all(State, Reason),
     ok.
 
 code_change(_OldVsn, State, _Extra) ->
@@ -181,12 +203,165 @@
 reply_with_group(_Group, [], StillWaiting) ->
     StillWaiting.
 
-spawn_updater({RootDir, DbName, GroupId}) -> 
-    spawn_link(couch_view_updater, update,
-        [RootDir, DbName, GroupId, self()]);
-
-spawn_updater({DbName, Fd, Lang, MapSrc, RedSrc}) ->
-    spawn_link(couch_view_updater, temp_update,
-        [DbName, Fd, Lang, MapSrc, RedSrc, self()]).
+reply_all(#group_state{waiting_list=WaitList}=State, Reply) ->
+    [catch gen_server:reply(Pid, Reply) || {Pid, _} <- WaitList],
+    State#group_state{waiting_list=[]}.
+
+prepare_group({view, RootDir, DbName, GroupId}, ForceReset)->
+    case open_db_group(DbName, GroupId) of
+    {ok, Db, #group{sig=Sig}=Group0} ->
+        case open_index_file(RootDir, DbName, GroupId) of
+        {ok, Fd} ->  
+            Group = Group0#group{
+                commit_fun = fun(GroupIn) ->
+                    Header = {Sig, get_index_header_data(GroupIn)},
+                    ok = couch_file:write_header(Fd, <<$r, $c, $k, 0>>, Header)
+                end},
+            if ForceReset ->
+                {ok, reset_file(Db, Fd, DbName, Group)};
+            true ->
+                case (catch couch_file:read_header(Fd, <<$r, $c, $k, 0>>)) of
+                {ok, {Sig, HeaderInfo}} ->
+                    % sigs match!
+                    {ok, init_group(Db, Fd, Group, HeaderInfo)};
+                _ ->
+                    {ok, reset_file(Db, Fd, DbName, Group)}
+                end
+            end;
+        Error ->
+            catch delete_index_file(RootDir, DbName, GroupId),
+            Error
+        end;
+    Error ->
+        catch delete_index_file(RootDir, DbName, GroupId),
+        Error
+    end;
+prepare_group({temp_view, DbName, Fd, Lang, MapSrc, RedSrc}, _ForceReset) ->
+    case couch_db:open(DbName, []) of
+    {ok, Db} ->
+        View = #view{map_names=["_temp"],
+            id_num=0,
+            btree=nil,
+            def=MapSrc,
+            reduce_funs= if RedSrc==[] -> []; true -> [{"_temp", RedSrc}] end},
+        {ok, init_group(Db, Fd, #group{name="_temp", db=Db, views=[View],
+                    def_lang=Lang, commit_fun=fun(_G) -> ok end}, nil)};
+    Error ->
+        Error
+    end.
+
+
+get_index_header_data(#group{current_seq=Seq, purge_seq=PurgeSeq, 
+            id_btree=IdBtree,views=Views}) ->
+    ViewStates = [couch_btree:get_state(Btree) || #view{btree=Btree} <- Views],
+    #index_header{seq=Seq,
+            purge_seq=PurgeSeq,
+            id_btree_state=couch_btree:get_state(IdBtree),
+            view_states=ViewStates}.
+
+
+open_index_file(RootDir, DbName, GroupId) ->
+    FileName = RootDir ++ "/." ++ ?b2l(DbName) ++ ?b2l(GroupId) ++".view",
+    case couch_file:open(FileName) of
+    {ok, Fd}        -> {ok, Fd};
+    {error, enoent} -> couch_file:open(FileName, [create]);
+    Error           -> Error
+    end.
     
+open_db_group(DbName, GroupId) ->
+    case couch_db:open(DbName, []) of
+    {ok, Db} ->
+        case couch_db:open_doc(Db, GroupId) of
+        {ok, Doc} ->
+            {ok, Db, design_doc_to_view_group(Doc)};
+        Else ->
+            couch_db:close(Db),
+            Else
+        end;
+    Else ->
+        Else
+    end.
+
+% maybe move to another module
+design_doc_to_view_group(#doc{id=Id,body={Fields}}) ->
+    Language = proplists:get_value(<<"language">>, Fields, <<"javascript">>),
+    {RawViews} = proplists:get_value(<<"views">>, Fields, {[]}),
+
+    % add the views to a dictionary object, with the map source as the key
+    DictBySrc =
+    lists:foldl(
+        fun({Name, {MRFuns}}, DictBySrcAcc) ->
+            MapSrc = proplists:get_value(<<"map">>, MRFuns),
+            RedSrc = proplists:get_value(<<"reduce">>, MRFuns, null),
+            View =
+            case dict:find(MapSrc, DictBySrcAcc) of
+                {ok, View0} -> View0;
+                error -> #view{def=MapSrc} % create new view object
+            end,
+            View2 =
+            if RedSrc == null ->
+                View#view{map_names=[Name|View#view.map_names]};
+            true ->
+                View#view{reduce_funs=[{Name,RedSrc}|View#view.reduce_funs]}
+            end,
+            dict:store(MapSrc, View2, DictBySrcAcc)
+        end, dict:new(), RawViews),
+    % number the views
+    {Views, _N} = lists:mapfoldl(
+        fun({_Src, View}, N) ->
+            {View#view{id_num=N},N+1}
+        end, 0, dict:to_list(DictBySrc)),
+
+    Group = #group{name=Id, views=Views, def_lang=Language},
+    Group#group{sig=erlang:md5(term_to_binary(Group))}.
+
+reset_group(#group{views=Views}=Group) ->
+    Views2 = [View#view{btree=nil} || View <- Views],
+    Group#group{db=nil,fd=nil,query_server=nil,current_seq=0,
+            id_btree=nil,views=Views2}.
+
+reset_file(Db, Fd, DbName, #group{sig=Sig,name=Name} = Group) ->
+    ?LOG_DEBUG("Reseting group index \"~s\" in db ~s", [Name, DbName]),
+    ok = couch_file:truncate(Fd, 0),
+    ok = couch_file:write_header(Fd, <<$r, $c, $k, 0>>, {Sig, nil}),
+    init_group(Db, Fd, reset_group(Group), nil).
+
+delete_index_file(RootDir, DbName, GroupId) ->
+    file:delete(RootDir ++ "/." ++ binary_to_list(DbName)
+            ++ binary_to_list(GroupId) ++ ".view").
+
+init_group(Db, Fd, #group{views=Views}=Group, nil) ->
+    init_group(Db, Fd, Group,
+        #index_header{seq=0, purge_seq=couch_db:get_purge_seq(Db),
+            id_btree_state=nil, view_states=[nil || _ <- Views]});
+init_group(Db, Fd, #group{def_lang=Lang,views=Views}=Group, IndexHeader) ->
+     #index_header{seq=Seq, purge_seq=PurgeSeq,
+            id_btree_state=IdBtreeState, view_states=ViewStates} = IndexHeader,
+    {ok, IdBtree} = couch_btree:open(IdBtreeState, Fd),
+    Views2 = lists:zipwith(
+        fun(BtreeState, #view{reduce_funs=RedFuns}=View) ->
+            FunSrcs = [FunSrc || {_Name, FunSrc} <- RedFuns],
+            ReduceFun = 
+                fun(reduce, KVs) ->
+                    KVs2 = couch_view:expand_dups(KVs,[]),
+                    KVs3 = couch_view:detuple_kvs(KVs2,[]),
+                    {ok, Reduced} = couch_query_servers:reduce(Lang, FunSrcs, 
+                        KVs3),
+                    {length(KVs3), Reduced};
+                (rereduce, Reds) ->
+                    Count = lists:sum([Count0 || {Count0, _} <- Reds]),
+                    UserReds = [UserRedsList || {_, UserRedsList} <- Reds],
+                    {ok, Reduced} = couch_query_servers:rereduce(Lang, FunSrcs,
+                        UserReds),
+                    {Count, Reduced}
+                end,
+            {ok, Btree} = couch_btree:open(BtreeState, Fd,
+                        [{less, fun couch_view:less_json_keys/2},
+                            {reduce, ReduceFun}]),
+            View#view{btree=Btree}
+        end,
+        ViewStates, Views),
+    Group#group{db=Db, fd=Fd, current_seq=Seq, purge_seq=PurgeSeq,
+        id_btree=IdBtree, views=Views2}.
+
 

Modified: incubator/couchdb/trunk/src/couchdb/couch_view_updater.erl
URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/src/couchdb/couch_view_updater.erl?rev=725909&r1=725908&r2=725909&view=diff
==============================================================================
--- incubator/couchdb/trunk/src/couchdb/couch_view_updater.erl (original)
+++ incubator/couchdb/trunk/src/couchdb/couch_view_updater.erl Thu Dec 11 21:23:37 2008
@@ -12,83 +12,49 @@
 
 -module(couch_view_updater).
 
--export([update/4, temp_update/6]).
+-export([update/1]).
 
 -include("couch_db.hrl").
 
-
-
-update(RootDir, DbName, GroupId, NotifyPid) ->
-    {ok, #group{sig=Sig,fd=Fd}=Group} = prepare_group(RootDir, DbName, GroupId),
-    {ok, Db} = couch_db:open(DbName, []),
-    Result = update_group(Group#group{db=Db}),
-    couch_db:close(Db),
-    case Result of
-    {same, Group2} ->
-        gen_server:cast(NotifyPid, {new_group, Group2});
-    {updated, Group2} ->
-        HeaderData = {Sig, get_index_header_data(Group2)},
-        ok = couch_file:write_header(Fd, <<$r, $c, $k, 0>>, HeaderData),
-        gen_server:cast(NotifyPid, {new_group, Group2})
+update(#group{db=Db,current_seq=Seq,purge_seq=PurgeSeq,
+        commit_fun=CommitFun}=Group) ->
+    ?LOG_DEBUG("Starting index update.",[]),
+    DbPurgeSeq = couch_db:get_purge_seq(Db),
+    Group2 =
+    if DbPurgeSeq == PurgeSeq ->
+        Group;
+    DbPurgeSeq == PurgeSeq + 1 ->
+        ?LOG_DEBUG("Purging entries from view index.",[]),
+        purge_index(Group);
+    true ->
+        ?LOG_DEBUG("Resetting view index due to lost purge entries.",[]),
+        exit(reset)
     end,
-    garbage_collect().
     
-temp_update(DbName, Fd, Lang, MapSrc, RedSrc, NotifyPid) ->
-    case couch_db:open(DbName, []) of
-    {ok, Db} ->
-        View = #view{map_names=["_temp"],
-            id_num=0,
-            btree=nil,
-            def=MapSrc,
-            reduce_funs= if RedSrc==[] -> []; true -> [{"_temp", RedSrc}] end},
-        Group = #group{name="_temp",
-            db=Db,
-            views=[View],
-            current_seq=0,
-            def_lang=Lang,
-            id_btree=nil},
-        Group2 = init_group(Db, Fd, Group,nil),
-        couch_db:monitor(Db),
-        {_Updated, Group3} = update_group(Group2#group{db=Db}),
-        couch_db:close(Db),
-        gen_server:cast(NotifyPid, {new_group, Group3}),
-        garbage_collect();
-    Else ->
-        exit(Else)
-    end.
-
-
-update_group(#group{db=Db,current_seq=CurrentSeq}=Group) ->
-    ViewEmptyKVs = [{View, []} || View <- Group#group.views],
+    ViewEmptyKVs = [{View, []} || View <- Group2#group.views],
     % compute on all docs modified since we last computed.
     {ok, {UncomputedDocs, Group3, ViewKVsToAdd, DocIdViewIdKeys}}
         = couch_db:enum_docs_since(
             Db,
-            CurrentSeq,
+            Seq,
             fun(DocInfo, _, Acc) -> process_doc(Db, DocInfo, Acc) end,
-            {[], Group, ViewEmptyKVs, []}
+            {[], Group2, ViewEmptyKVs, []}
             ),
     {Group4, Results} = view_compute(Group3, UncomputedDocs),
-    {ViewKVsToAdd2, DocIdViewIdKeys2} = view_insert_query_results(UncomputedDocs, Results,
ViewKVsToAdd, DocIdViewIdKeys),
+    {ViewKVsToAdd2, DocIdViewIdKeys2} = view_insert_query_results(
+            UncomputedDocs, Results, ViewKVsToAdd, DocIdViewIdKeys),
     couch_query_servers:stop_doc_map(Group4#group.query_server),
     NewSeq = couch_db:get_update_seq(Db),
-    if CurrentSeq /= NewSeq ->
-        {ok, Group5} = write_changes(Group4, ViewKVsToAdd2, DocIdViewIdKeys2, NewSeq),
-        {updated, Group5#group{query_server=nil}};
+    if Seq /= NewSeq ->
+        {ok, Group5} = write_changes(Group4, ViewKVsToAdd2, DocIdViewIdKeys2,
+                NewSeq),
+        ok = CommitFun(Group5),
+        exit({new_group, Group5#group{query_server=nil}});
     true ->
-        {same, Group4#group{query_server=nil}}
+        exit({new_group, Group4#group{query_server=nil}})
     end.
 
 
-get_index_header_data(#group{current_seq=Seq, purge_seq=PurgeSeq, 
-            id_btree=IdBtree,views=Views}) ->
-    ViewStates = [couch_btree:get_state(Btree) || #view{btree=Btree} <- Views],
-    #index_header{seq=Seq,
-            purge_seq=PurgeSeq,
-            id_btree_state=couch_btree:get_state(IdBtree),
-            view_states=ViewStates}.
-
-
 purge_index(#group{db=Db, views=Views, id_btree=IdBtree}=Group) ->
     {ok, PurgedIdsRevs} = couch_db:get_last_purged(Db),
     Ids = [Id || {Id, _Revs} <- PurgedIdsRevs],
@@ -120,7 +86,8 @@
             views=Views2,
             purge_seq=couch_db:get_purge_seq(Db)}.
 
-process_doc(Db, DocInfo, {Docs, #group{sig=Sig,name=GroupId}=Group, ViewKVs, DocIdViewIdKeys})
->
+process_doc(Db, DocInfo, {Docs, #group{sig=Sig,name=GroupId}=Group, ViewKVs,
+        DocIdViewIdKeys}) ->
     % This fun computes once for each document        
     #doc_info{id=DocId, deleted=Deleted} = DocInfo,
     case DocId of
@@ -129,17 +96,15 @@
         % anything in the definition changed.
         case couch_db:open_doc(Db, DocInfo) of
         {ok, Doc} ->
-            case design_doc_to_view_group(Doc) of
+            case couch_view_group:design_doc_to_view_group(Doc) of
             #group{sig=Sig} ->
                 % The same md5 signature, keep on computing
                 {ok, {Docs, Group, ViewKVs, DocIdViewIdKeys}};
             _ ->
-                ?LOG_DEBUG("throw(restart) md5 broke ~p", [DocId]),
-                throw(restart)
+                exit(reset)
             end;
         {not_found, deleted} ->
-            ?LOG_DEBUG("throw(restart) {not_found, deleted} ~p", [DocId]),
-            throw(restart)
+            exit(reset)
         end;
     <<?DESIGN_DOC_PREFIX, _/binary>> -> % we skip design docs
         {ok, {Docs, Group, ViewKVs, DocIdViewIdKeys}};
@@ -250,134 +215,4 @@
     ],
     Group2 = Group#group{views=Views2, current_seq=NewSeq, id_btree=IdBtree2},
     {ok, Group2}.
-    
-prepare_group(RootDir, DbName, GroupId) ->
-    {Db, Group} = case (catch couch_db:open(DbName, [])) of
-    {ok, Db0} ->
-        case (catch couch_db:open_doc(Db0, GroupId)) of
-        {ok, Doc} ->
-            {Db0, design_doc_to_view_group(Doc)};
-        Else ->
-            delete_index_file(RootDir, DbName, GroupId),
-            ?LOG_DEBUG("prepare_group exit Else ~p self() ~p", [Else, self()]),    
-            exit(Else)
-        end;
-    Else ->
-        delete_index_file(RootDir, DbName, GroupId),
-        exit(Else)
-    end,
-    FileName = RootDir ++ "/." ++ binary_to_list(DbName) ++ 
-            binary_to_list(GroupId) ++".view",
-    Group2 =
-    case couch_file:open(FileName) of
-    {ok, Fd} ->
-        Sig = Group#group.sig,
-        case (catch couch_file:read_header(Fd, <<$r, $c, $k, 0>>)) of
-        {ok, {Sig, #index_header{purge_seq=PurgeSeq}=HeaderInfo}} ->
-            % sigs match!
-            DbPurgeSeq = couch_db:get_purge_seq(Db),
-            % We can only use index with the same, or next purge seq as the db.
-            if DbPurgeSeq == PurgeSeq ->
-                init_group(Db, Fd, Group, HeaderInfo);
-            DbPurgeSeq == PurgeSeq + 1 ->
-                ?LOG_DEBUG("Purging entries from view index.",[]),
-                purge_index(init_group(Db, Fd, Group, HeaderInfo));
-            true ->
-                ?LOG_DEBUG("Reseting view index due to lost purge entries.",[]),
-                reset_file(Db, Fd, DbName, Group)
-            end;
-        _ ->
-            reset_file(Db, Fd, DbName, Group)
-        end;
-    {error, enoent} ->
-        case couch_file:open(FileName, [create]) of
-        {ok, Fd} -> reset_file(Db, Fd, DbName, Group);
-        Error    -> throw(Error)
-        end
-    end,
-
-    couch_db:monitor(Db),
-    couch_db:close(Db),
-    {ok, Group2}.
 
-% maybe move to another module
-design_doc_to_view_group(#doc{id=Id,body={Fields}}) ->
-    Language = proplists:get_value(<<"language">>, Fields, <<"javascript">>),
-    {RawViews} = proplists:get_value(<<"views">>, Fields, {[]}),
-
-    % add the views to a dictionary object, with the map source as the key
-    DictBySrc =
-    lists:foldl(
-        fun({Name, {MRFuns}}, DictBySrcAcc) ->
-            MapSrc = proplists:get_value(<<"map">>, MRFuns),
-            RedSrc = proplists:get_value(<<"reduce">>, MRFuns, null),
-            View =
-            case dict:find(MapSrc, DictBySrcAcc) of
-                {ok, View0} -> View0;
-                error -> #view{def=MapSrc} % create new view object
-            end,
-            View2 =
-            if RedSrc == null ->
-                View#view{map_names=[Name|View#view.map_names]};
-            true ->
-                View#view{reduce_funs=[{Name,RedSrc}|View#view.reduce_funs]}
-            end,
-            dict:store(MapSrc, View2, DictBySrcAcc)
-        end, dict:new(), RawViews),
-    % number the views
-    {Views, _N} = lists:mapfoldl(
-        fun({_Src, View}, N) ->
-            {View#view{id_num=N},N+1}
-        end, 0, dict:to_list(DictBySrc)),
-
-    Group = #group{name=Id, views=Views, def_lang=Language},
-    Group#group{sig=erlang:md5(term_to_binary(Group))}.
-
-reset_group(#group{views=Views}=Group) ->
-    Views2 = [View#view{btree=nil} || View <- Views],
-    Group#group{db=nil,fd=nil,query_server=nil,current_seq=0,
-            id_btree=nil,views=Views2}.
-
-reset_file(Db, Fd, DbName, #group{sig=Sig,name=Name} = Group) ->
-    ?LOG_DEBUG("Reseting group index \"~s\" in db ~s", [Name, DbName]),
-    ok = couch_file:truncate(Fd, 0),
-    ok = couch_file:write_header(Fd, <<$r, $c, $k, 0>>, {Sig, nil}),
-    init_group(Db, Fd, reset_group(Group), nil).
-
-delete_index_file(RootDir, DbName, GroupId) ->
-    file:delete(RootDir ++ "/." ++ binary_to_list(DbName)
-            ++ binary_to_list(GroupId) ++ ".view").
-
-init_group(Db, Fd, #group{views=Views}=Group, nil) ->
-    init_group(Db, Fd, Group,
-        #index_header{seq=0, purge_seq=couch_db:get_purge_seq(Db),
-            id_btree_state=nil, view_states=[nil || _ <- Views]});
-init_group(Db, Fd, #group{def_lang=Lang,views=Views}=Group, IndexHeader) ->
-     #index_header{seq=Seq, purge_seq=PurgeSeq,
-            id_btree_state=IdBtreeState, view_states=ViewStates} = IndexHeader,
-    {ok, IdBtree} = couch_btree:open(IdBtreeState, Fd),
-    Views2 = lists:zipwith(
-        fun(BtreeState, #view{reduce_funs=RedFuns}=View) ->
-            FunSrcs = [FunSrc || {_Name, FunSrc} <- RedFuns],
-            ReduceFun = 
-                fun(reduce, KVs) ->
-                    KVs2 = couch_view:expand_dups(KVs,[]),
-                    KVs3 = couch_view:detuple_kvs(KVs2,[]),
-                    {ok, Reduced} = couch_query_servers:reduce(Lang, FunSrcs, 
-                        KVs3),
-                    {length(KVs3), Reduced};
-                (rereduce, Reds) ->
-                    Count = lists:sum([Count0 || {Count0, _} <- Reds]),
-                    UserReds = [UserRedsList || {_, UserRedsList} <- Reds],
-                    {ok, Reduced} = couch_query_servers:rereduce(Lang, FunSrcs,
-                        UserReds),
-                    {Count, Reduced}
-                end,
-            {ok, Btree} = couch_btree:open(BtreeState, Fd,
-                        [{less, fun couch_view:less_json_keys/2},
-                            {reduce, ReduceFun}]),
-            View#view{btree=Btree}
-        end,
-        ViewStates, Views),
-    Group#group{db=Db, fd=Fd, current_seq=Seq, purge_seq=PurgeSeq,
-        id_btree=IdBtree, views=Views2}.
\ No newline at end of file



Mime
View raw message