couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dam...@apache.org
Subject svn commit: r643556 - in /incubator/couchdb/trunk/src/couchdb: couch_btree.erl couch_db.erl couch_stream.erl couch_util.erl couch_view.erl
Date Tue, 01 Apr 2008 20:32:17 GMT
Author: damien
Date: Tue Apr  1 13:32:15 2008
New Revision: 643556

URL: http://svn.apache.org/viewvc?rev=643556&view=rev
Log:
Fix for runaway process in the view code and the so far untested storage  compaction code.

Modified:
    incubator/couchdb/trunk/src/couchdb/couch_btree.erl
    incubator/couchdb/trunk/src/couchdb/couch_db.erl
    incubator/couchdb/trunk/src/couchdb/couch_stream.erl
    incubator/couchdb/trunk/src/couchdb/couch_util.erl
    incubator/couchdb/trunk/src/couchdb/couch_view.erl

Modified: incubator/couchdb/trunk/src/couchdb/couch_btree.erl
URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/src/couchdb/couch_btree.erl?rev=643556&r1=643555&r2=643556&view=diff
==============================================================================
--- incubator/couchdb/trunk/src/couchdb/couch_btree.erl (original)
+++ incubator/couchdb/trunk/src/couchdb/couch_btree.erl Tue Apr  1 13:32:15 2008
@@ -12,7 +12,7 @@
 
 -module(couch_btree).
 
--export([open/2, open/3, query_modify/4, add_remove/3, foldl/3, foldl/4]).
+-export([open/2, open/3, query_modify/4, add/2, add_remove/3, foldl/3, foldl/4]).
 -export([foldr/3, foldr/4, fold/4, fold/5, row_count/1]).
 -export([lookup/2, get_state/1, test/1, test/0]).
 
@@ -85,9 +85,12 @@
     {_ContinueFlag, Acc2} = stream_node(Bt, 0, Bt#btree.root, Key, Dir, convert_fun_arity(Fun),
Acc),
     {ok, Acc2}.
 
+add(Bt, InsertKeyValues) ->
+    add_remove(Bt, InsertKeyValues, []).
+
 add_remove(Bt, InsertKeyValues, RemoveKeys) ->
-    {Result, [], Bt2} = query_modify(Bt, [], InsertKeyValues, RemoveKeys),
-    {Result, Bt2}.
+    {ok, [], Bt2} = query_modify(Bt, [], InsertKeyValues, RemoveKeys),
+    {ok, Bt2}.
 
 query_modify(Bt, LookupKeys, InsertValues, RemoveKeys) ->
     #btree{root=Root} = Bt,

Modified: incubator/couchdb/trunk/src/couchdb/couch_db.erl
URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/src/couchdb/couch_db.erl?rev=643556&r1=643555&r2=643556&view=diff
==============================================================================
--- incubator/couchdb/trunk/src/couchdb/couch_db.erl (original)
+++ incubator/couchdb/trunk/src/couchdb/couch_db.erl Tue Apr  1 13:32:15 2008
@@ -13,20 +13,21 @@
 -module(couch_db).
 -behaviour(gen_server).
 
--export([open/2,create/2,create/3,get_doc_info/2]).
+-export([open/2,create/2,create/3,get_doc_info/2,start_compact/1]).
 -export([save_docs/2, save_docs/3, get_db_info/1, update_doc/3, update_docs/2, update_docs/3]).
--export([delete_doc/3,open_doc/2,open_doc/3,close/1,enum_docs_since/4,enum_docs_since/5]).
+-export([delete_doc/3,open_doc/2,open_doc/3,enum_docs_since/4,enum_docs_since/5]).
 -export([enum_docs/4,enum_docs/5, open_doc_revs/4, get_missing_revs/2]).
 -export([start_update_loop/1]).
 -export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]).
+-export([start_copy_compact_int/1,continue_copy_compact_int/2]).
 
 -include("couch_db.hrl").
 
 -record(db_header,
     {write_version = 0,
-     last_update_seq = 0,
+     update_seq = 0,
      summary_stream_state = nil,
-     docinfo_by_Id_btree_state = nil,
+     fulldocinfo_by_id_btree_state = nil,
      docinfo_by_seq_btree_state = nil,
      local_docs_btree_state = nil,
      doc_count=0,
@@ -34,20 +35,24 @@
     }).
 
 -record(db,
-    {main_pid,
-    update_pid,
+    {main_pid=nil,
+    update_pid=nil,
+    compactor_pid=nil,
     fd,
     header = #db_header{},
     summary_stream,
-    docinfo_by_Id_btree,
+    fulldocinfo_by_id_btree,
     docinfo_by_seq_btree,
     local_docs_btree,
-    last_update_seq,
+    update_seq,
     doc_count,
     doc_del_count,
     name
     }).
 
+% small value used in revision trees to indicate the revision isn't stored
+-define(REV_MISSING, []).
+
 start_link(DbName, Filepath, Options) ->
     case couch_file:open(Filepath, Options) of
     {ok, Fd} ->
@@ -72,6 +77,9 @@
 open(DbName, Filepath) ->
     start_link(DbName, Filepath, []).
 
+start_compact(MainPid) ->
+    gen_server:cast(MainPid, start_compact).
+
 delete_doc(MainPid, Id, Revisions) ->
     DeletedDocs = [#doc{id=Id, revs=[Rev], deleted=true} || Rev <- Revisions],
     {ok, [Result]} = update_docs(MainPid, DeletedDocs, [new_edits]),
@@ -128,15 +136,22 @@
 get_full_doc_infos(MainPid, Ids) when is_pid(MainPid) ->
     get_full_doc_infos(get_db(MainPid), Ids);
 get_full_doc_infos(#db{}=Db, Ids) ->
-    couch_btree:lookup(Db#db.docinfo_by_Id_btree, Ids).
+    couch_btree:lookup(Db#db.fulldocinfo_by_id_btree, Ids).
 
 get_db_info(MainPid) when is_pid(MainPid) ->
     get_db_info(get_db(MainPid));
-get_db_info(#db{doc_count=Count, doc_del_count=DelCount, last_update_seq=SeqNum}) ->
+get_db_info(Db) ->
+    #db{fd=Fd,
+        compactor_pid=Compactor,
+        doc_count=Count,
+        doc_del_count=DelCount,
+        update_seq=SeqNum} = Db,
     InfoList = [
         {doc_count, Count},
         {doc_del_count, DelCount},
-        {last_update_seq, SeqNum}
+        {last_update_seq, SeqNum},
+        {compacting, Compactor==nil},
+        {size, couch_file:bytes(Fd)}
         ],
     {ok, InfoList}.
 
@@ -315,21 +330,11 @@
 
 enum_docs(MainPid, StartId, Direction, InFun, InAcc) ->
     Db = get_db(MainPid),
-    couch_btree:fold(Db#db.docinfo_by_Id_btree, StartId, Direction, InFun, InAcc).
+    couch_btree:fold(Db#db.fulldocinfo_by_id_btree, StartId, Direction, InFun, InAcc).
 
 enum_docs(MainPid, StartId, InFun, Ctx) ->
     enum_docs(MainPid, StartId, fwd, InFun, Ctx).
 
-close(MainPid) ->
-    Ref = erlang:monitor(process, MainPid),
-    unlink(MainPid),
-    exit(MainPid, normal),
-    receive
-    {'DOWN', Ref, process, MainPid, _Reason} ->
-        ok
-    end.
-
-
 % server functions
 
 init({DbName, Fd, Options}) ->
@@ -339,12 +344,16 @@
         % create a new header and writes it to the file
         Header =  #db_header{},
         ok = couch_file:write_header(Fd, <<$g, $m, $k, 0>>, Header),
-        ok = couch_file:sync(Fd),
-        init_main(DbName, Fd, Header);
+        ok = couch_file:sync(Fd);
     false ->
-        {ok, Header} = couch_file:read_header(Fd, <<$g, $m, $k, 0>>),
-        init_main(DbName, Fd, Header)
-    end.
+        {ok, Header} = couch_file:read_header(Fd, <<$g, $m, $k, 0>>)
+    end,
+    
+    Db = init_db(DbName, Fd, Header),
+
+    UpdatePid = spawn_link(couch_db, start_update_loop, [Db]),
+
+    {ok, Db#db{update_pid=UpdatePid}}.
 
 btree_by_seq_split(DocInfo) ->
     #doc_info{
@@ -374,10 +383,10 @@
     #full_doc_info{id=Id, update_seq=Seq, rev_tree=Tree}.
     
 
-init_main(DbName, Fd, Header) ->
+init_db(DbName, Fd, Header) ->
     {ok, SummaryStream} = couch_stream:open(Header#db_header.summary_stream_state, Fd),
     ok = couch_stream:set_min_buffer(SummaryStream, 10000),
-    {ok, IdBtree} = couch_btree:open(Header#db_header.docinfo_by_Id_btree_state, Fd,
+    {ok, IdBtree} = couch_btree:open(Header#db_header.fulldocinfo_by_id_btree_state, Fd,
         [{split, fun(V) -> btree_by_name_split(V) end},
         {join, fun(K,V) -> btree_by_name_join(K,V) end}] ),
     {ok, SeqBtree} = couch_btree:open(Header#db_header.docinfo_by_seq_btree_state, Fd,
@@ -385,26 +394,22 @@
             {join, fun(K,V) -> btree_by_seq_join(K,V) end}] ),
     {ok, LocalDocsBtree} = couch_btree:open(Header#db_header.local_docs_btree_state, Fd),
 
-    Db = #db{
+    #db{
         main_pid=self(),
         fd=Fd,
         header=Header,
         summary_stream = SummaryStream,
-        docinfo_by_Id_btree = IdBtree,
+        fulldocinfo_by_id_btree = IdBtree,
         docinfo_by_seq_btree = SeqBtree,
         local_docs_btree = LocalDocsBtree,
-        last_update_seq = Header#db_header.last_update_seq,
+        update_seq = Header#db_header.update_seq,
         doc_count = Header#db_header.doc_count,
         doc_del_count = Header#db_header.doc_del_count,
         name = DbName
-        },
-
-    UpdatePid = spawn_link(couch_db, start_update_loop, [Db]),
-
-    {ok, Db#db{update_pid=UpdatePid}}.
+        }.
 
 terminate(_Reason, Db) ->
-    Db#db.update_pid ! close,
+    exit(Db#db.update_pid, kill),
     couch_file:close(Db#db.fd).
     
 handle_call({update_docs, DocActions, Options}, From, #db{update_pid=Updater}=Db) ->
@@ -416,8 +421,17 @@
     {reply, ok, NewDb}.
 
 
-handle_cast(foo, Main) ->
-    {noreply, Main}.
+handle_cast(start_compact, #db{update_pid=Updater}=Db) ->
+    Updater ! compact,
+    {noreply, Db}.
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+handle_info(Msg, Db) ->
+    couch_log:error("Bad message received for db ~s: ~p", [Db#db.name, Msg]),
+    exit({error, Msg}).
+
 
 %%% Internal function %%%
 
@@ -439,11 +453,40 @@
         Error ->
             exit(Error) % we crashed
         end;
-    close ->
-        % terminate loop
-        exit(normal)
+    compact ->
+        case Db#db.compactor_pid of
+        nil ->
+            Pid = spawn_link(couch_db, start_copy_compact_int, [Db]),
+            Db2 = Db#db{compactor_pid=Pid},
+            ok = gen_server:call(Db#db.main_pid, {db_updated, Db2}),
+            update_loop(Db2);
+        _ ->
+            update_loop(Db) % already started
+        end;
+    {compact_done, #db{update_seq=CompactSeq}=NewDb} ->
+        case CompactSeq == Db#db.update_seq of
+        true ->
+            NewDb2 = swap_files(Db, NewDb),
+            update_loop(NewDb2#db{compactor_pid=nil});
+        false ->
+            Pid = spawn_link(couch_db, continue_copy_compact_int, [Db, NewDb]),
+            Db2 = Db#db{compactor_pid=Pid},
+            ok = gen_server:call(Db#db.main_pid, {db_updated, Db2}),
+            update_loop(Db2)
+        end;
+    Else ->
+        couch_log:error("Unknown message received in db ~s:~p", [Db#db.name, Else]),
+        exit({error, Else})
     end.
 
+swap_files(#db{fd=OldFd, name=Name}=_DbOld, DbNew) ->
+    NormalFilename = couch_server:get_filename(Name),
+    true = file:rename(NormalFilename, NormalFilename ++ ".old"),
+    true = file:rename(NormalFilename ++ ".compact", NormalFilename),
+    couch_file:close(OldFd),
+    file:delete(NormalFilename ++ ".old"),
+    DbNew.
+
 get_db(MainPid) ->
     {ok, Db} = gen_server:call(MainPid, get_db),
     Db.
@@ -466,7 +509,7 @@
         FoundResults =
         lists:map(fun({Rev, Value, FoundRevPath}) ->
             case Value of
-            0 ->
+            ?REV_MISSING ->
                 % we have the rev in our list but know nothing about it
                 {{not_found, missing}, Rev};
             {IsDeleted, SummaryPtr} ->
@@ -538,7 +581,7 @@
 doc_to_tree(Doc, [RevId]) ->
     [{RevId, Doc, []}];
 doc_to_tree(Doc, [RevId | Rest]) ->
-    [{RevId, [], doc_to_tree(Doc, Rest)}].
+    [{RevId, ?REV_MISSING, doc_to_tree(Doc, Rest)}].
 
 make_doc(Db, Id, Deleted, SummaryPointer, RevisionPath) ->
     {BodyData, BinValues} =
@@ -613,9 +656,9 @@
 
 update_docs_int(Db, DocsList, Options) ->
     #db{
-        docinfo_by_Id_btree = DocInfoByIdBTree,
+        fulldocinfo_by_id_btree = DocInfoByIdBTree,
         docinfo_by_seq_btree = DocInfoBySeqBTree,
-        last_update_seq = LastSeq,
+        update_seq = LastSeq,
         doc_count = FullDocCount,
         doc_del_count = FullDelCount
         } = Db,
@@ -678,9 +721,9 @@
     {ok, DocInfoByIdBTree2} = couch_btree:add_remove(DocInfoByIdBTree, InfoById, []),
 
     Db3 = Db2#db{
-        docinfo_by_Id_btree = DocInfoByIdBTree2,
+        fulldocinfo_by_id_btree = DocInfoByIdBTree2,
         docinfo_by_seq_btree = DocInfoBySeqBTree2,
-        last_update_seq = NewSeq,
+        update_seq = NewSeq,
         doc_count = FullDocCount + NewDocsCount - OldCount,
         doc_del_count = FullDelCount + NewDelCount - OldDelCount
         },
@@ -689,7 +732,7 @@
     true ->
         {ok, Db3};
     false ->
-        commit_outstanding(Db3)
+        {ok, commit_data(Db3)}
     end.
 
 update_local_docs(#db{local_docs_btree=Btree}=Db, Docs) ->
@@ -697,20 +740,20 @@
     OldDocLookups = couch_btree:lookup(Btree, Ids),
     BtreeEntries = lists:zipwith(
         fun(#doc{id=Id,deleted=Delete,revs=Revs,body=Body}, OldDocLookup) ->
-            BasedOnRev =
+            NewRev =
             case Revs of
                 [] -> 0;
-                [RevStr|_] -> list_to_integer(RevStr) - 1
+                [RevStr|_] -> list_to_integer(RevStr)
             end,
             OldRev =
             case OldDocLookup of
                 {ok, {_, {OldRev0, _}}} -> OldRev0;
                 not_found -> 0
             end,
-            case OldRev == BasedOnRev of
+            case OldRev + 1 == NewRev of
             true ->
                 case Delete of
-                    false -> {update, {Id, {OldRev+1, Body}}};
+                    false -> {update, {Id, {NewRev, Body}}};
                     true  -> {remove, Id}
                 end;
             false ->
@@ -729,29 +772,111 @@
 
 
 
-commit_outstanding(#db{fd=Fd, header=Header} = Db) ->
+commit_data(#db{fd=Fd, header=Header} = Db) ->
     ok = couch_file:sync(Fd), % commit outstanding data
     Header2 = Header#db_header{
-        last_update_seq = Db#db.last_update_seq,
+        update_seq = Db#db.update_seq,
         summary_stream_state = couch_stream:get_state(Db#db.summary_stream),
         docinfo_by_seq_btree_state = couch_btree:get_state(Db#db.docinfo_by_seq_btree),
-        docinfo_by_Id_btree_state = couch_btree:get_state(Db#db.docinfo_by_Id_btree),
+        fulldocinfo_by_id_btree_state = couch_btree:get_state(Db#db.fulldocinfo_by_id_btree),
         local_docs_btree_state = couch_btree:get_state(Db#db.local_docs_btree),
         doc_count = Db#db.doc_count,
         doc_del_count = Db#db.doc_del_count
         },
-    ok = couch_file:write_header(Fd, <<$g, $m, $k, 0>>, Header2),
-    ok = couch_file:sync(Fd), % commit header to disk
-    Db2 = Db#db{
-        header = Header2
-        },
-    {ok, Db2}.
-
+    if Header == Header2 ->
+        Db; % unchanged. nothing to do
+    true ->
+        ok = couch_file:write_header(Fd, <<$g, $m, $k, 0>>, Header2),
+        ok = couch_file:sync(Fd), % commit header to disk
+        Db#db{header = Header2}
+    end.
+
+copy_raw_doc(SrcFd, SrcSp, DestFd, DestStream) ->
+    {ok, {BodyData, BinInfos}} = couch_stream:read_term(SrcFd, SrcSp),
+    % copy the bin values
+    NewBinInfos = lists:map(fun({Name, {Type, BinSp, Len}}) ->
+        {ok, NewBinSp} = couch_stream:copy_stream(SrcFd, BinSp, Len, DestFd),
+        {Name, {Type, NewBinSp, Len}}
+        end, BinInfos),
+    % now write the document summary
+    {ok, _SummaryPointer} = couch_stream:write_term(DestStream, {BodyData, NewBinInfos}).
+
+copy_rev_tree(_SrcFd, _DestFd, _DestStream, []) ->
+    [];
+copy_rev_tree(SrcFd, DestFd, DestStream, [{RevId, {IsDel, Sp}, []} | RestTrees]) ->
+    % This is a leaf node, copy it over
+    NewSp = copy_raw_doc(SrcFd, Sp, DestFd, DestStream),
+    [{RevId, {IsDel, NewSp}, []} | copy_rev_tree(SrcFd, DestFd, DestStream, RestTrees)];
+copy_rev_tree(SrcFd, DestFd, DestStream, [{RevId, _, SubTrees} | RestTrees]) ->
+    % inner node, only copy info/data from leaf nodes
+    [{RevId, ?REV_MISSING, copy_rev_tree(SrcFd, DestFd, DestStream, SubTrees)} | copy_rev_tree(SrcFd,
DestFd, DestStream, RestTrees)].
+    
+copy_docs(#db{fd=SrcFd}=Db, #db{fd=DestFd,summary_stream=DestStream}=NewDb, InfoBySeq) ->
+    Ids = [Id || #doc_info{id=Id} <- InfoBySeq],
+    LookupResults = couch_btree:lookup(Db#db.fulldocinfo_by_id_btree, Ids),
+    NewFullDocInfos = lists:map(
+        fun({ok, #full_doc_info{rev_tree=RevTree}=Info}) ->
+            Info#full_doc_info{rev_tree=copy_rev_tree(SrcFd, DestFd, DestStream, RevTree)}
+        end, LookupResults),
+    NewDocInfos = [couch_doc:to_doc_info(FullDocInfo) || FullDocInfo <- NewFullDocInfos],
+    {ok, DocInfoBTree} =
+        couch_btree:add_remove(NewDb#db.docinfo_by_seq_btree, NewDocInfos, []),
+    {ok, FullDocInfoBTree} =
+        couch_btree:add_remove(NewDb#db.fulldocinfo_by_id_btree, NewFullDocInfos, []),
+    NewDb#db{fulldocinfo_by_id_btree=FullDocInfoBTree, docinfo_by_seq_btree=DocInfoBTree}.
+
+
+          
+copy_compact_docs(Db, NewDb) ->
+    EnumBySeqFun =
+    fun(#doc_info{update_seq=Seq}=DocInfo, _Offset, {AccNewDb, AccUncopied}) ->
+        case couch_util:should_flush() of
+        true ->
+            NewDb2 = copy_docs(Db, AccNewDb, lists:reverse(AccUncopied, DocInfo)),
+            {ok, {commit_data(NewDb2#db{update_seq=Seq}), []}};
+        false ->    
+            {ok, {AccNewDb, [DocInfo | AccUncopied]}}
+        end
+    end,
+    {ok, {NewDb2, Uncopied}} =
+        couch_btree:foldl(Db#db.docinfo_by_seq_btree, NewDb#db.update_seq + 1, EnumBySeqFun,
{NewDb, []}),
 
-code_change(_OldVsn, State, _Extra) ->
-    {ok, State}.
+    case Uncopied of
+    [#doc_info{update_seq=LastSeq} | _] ->
+        commit_data( copy_docs(Db, NewDb2#db{update_seq=LastSeq},
+            lists:reverse(Uncopied)));
+    [] ->
+        NewDb2
+    end.
 
-handle_info(_Info, State) ->
-    {noreply, State}.
+start_copy_compact_int(#db{name=Name}=Db) ->
+    couch_log:debug("New compaction process spawned for db \"%s\"", [Name]),
+    Filename = couch_server:get_compaction_filename(Name),
+    case couch_file:open(Filename) of
+    {ok, Fd} ->
+        couch_log:debug("Found existing compaction file for db \"%s\"", [Name]),
+        {ok, Header} = couch_file:read_header(Fd, <<$g, $m, $k, 0>>);
+    {error, enoent} -> %
+        {ok, Fd} = couch_file:open(Filename, [create]),
+        Header =  #db_header{},
+        ok = couch_file:write_header(Fd, <<$g, $m, $k, 0>>, Header),
+        ok = couch_file:sync(Fd)
+    end,
+    NewDb = init_db(Name, Fd, Header),
+    NewDb2 = copy_compact_docs(Db, NewDb),
+    
+    % suck up all the local docs into memory and write them to the new db
+    {ok, LocalDocs} = couch_btree:foldl(Db#db.local_docs_btree,
+            fun(Value, _Offset, Acc) -> {ok, [Value | Acc]} end, []),
+    {ok, NewLocalBtree} = couch_btree:add(NewDb2#db.local_docs_btree, LocalDocs),
+    NewDb3 = commit_data(NewDb2#db{local_docs_btree=NewLocalBtree}),
+    
+    NewDb3#db.update_pid ! {compact_done, NewDb3}.
+    
+continue_copy_compact_int(#db{name=Name}=Db, NewDb) ->
+    couch_log:debug("Continued compaction process spawned for db \"%s\"", [Name]),
+    NewDb2 = copy_compact_docs(Db, NewDb),
+    NewDb2#db.update_pid ! {compact_done, NewDb2}.
     
     
+    
\ No newline at end of file

Modified: incubator/couchdb/trunk/src/couchdb/couch_stream.erl
URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/src/couchdb/couch_stream.erl?rev=643556&r1=643555&r2=643556&view=diff
==============================================================================
--- incubator/couchdb/trunk/src/couchdb/couch_stream.erl (original)
+++ incubator/couchdb/trunk/src/couchdb/couch_stream.erl Tue Apr  1 13:32:15 2008
@@ -83,7 +83,7 @@
 copy(Fd, Sp, Num, DestStream) ->
     {ok, NewSp, _Sp2} = stream_data(Fd, Sp, Num, ?HUGE_CHUNK,
         fun(Bin, AccPointer) ->
-            {ok, NewPointer} = write(Bin, DestStream),
+            {ok, NewPointer} = write(DestStream, Bin),
             if AccPointer == null -> NewPointer; true -> AccPointer end
         end,
         null),

Modified: incubator/couchdb/trunk/src/couchdb/couch_util.erl
URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/src/couchdb/couch_util.erl?rev=643556&r1=643555&r2=643556&view=diff
==============================================================================
--- incubator/couchdb/trunk/src/couchdb/couch_util.erl (original)
+++ incubator/couchdb/trunk/src/couchdb/couch_util.erl Tue Apr  1 13:32:15 2008
@@ -14,7 +14,7 @@
 -behaviour(gen_server).
 
 -export([start_link/0,start_link/1]).
--export([parse_ini/1]).
+-export([parse_ini/1,should_flush/0, should_flush/1]).
 -export([new_uuid/0, rand32/0, implode/2, collate/2, collate/3]).
 -export([abs_pathname/1,abs_pathname/2, trim/1, ascii_lower/1, test/0]).
 -export([encodeBase64/1, decodeBase64/1]).
@@ -22,6 +22,8 @@
 -export([init/1, terminate/2, handle_call/3]).
 -export([handle_cast/2,code_change/3,handle_info/2]).
 
+% arbitrarily chosen amount of memory to use before flushing to disk
+-define(FLUSH_MAX_MEM, 10000000).
 
 start_link() ->
     start_link("").
@@ -246,6 +248,22 @@
         [2] -> 0
     end.
 
+should_flush() ->
+    should_flush(?FLUSH_MAX_MEM).
+    
+should_flush(MemThreshHold) ->
+    case process_info(self(), memory) of
+    {memory, Mem} when Mem > 2*MemThreshHold ->
+        garbage_collect(),
+        case process_info(self(), memory) of
+        {memory, Mem} when Mem > MemThreshHold ->
+            true;
+        _ ->
+            false
+        end;
+    _ ->
+        false
+    end.
 
 
 

Modified: incubator/couchdb/trunk/src/couchdb/couch_view.erl
URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/src/couchdb/couch_view.erl?rev=643556&r1=643555&r2=643556&view=diff
==============================================================================
--- incubator/couchdb/trunk/src/couchdb/couch_view.erl (original)
+++ incubator/couchdb/trunk/src/couchdb/couch_view.erl Tue Apr  1 13:32:15 2008
@@ -20,9 +20,6 @@
 
 -include("couch_db.hrl").
 
-% arbitrarily chosen amount of memory to use before flushing to disk
--define(FLUSH_MAX_MEM, 10000000).
-
 -record(group,
     {db,
     fd,
@@ -68,12 +65,11 @@
 	    receive
     	{Pid, Response} ->
     	    erlang:demonitor(Mref),
-    	    receive 
-    		{'DOWN', Mref, _, _, _} -> 
-    		    Response
-    	    after 0 -> 
-    		    Response
-    	    end;
+    	    receive
+        		{'DOWN', Mref, _, _, _} -> ok
+        	    after 0 -> ok
+    	    end,
+    	    Response;
     	{'DOWN', Mref, _, _, Reason} ->
     	    throw(Reason)
         end
@@ -201,7 +197,10 @@
     [{_, {DbName, GroupId}}] ->
         delete_from_ets(FromPid, DbName, GroupId)
     end,
-    {noreply, Server}.
+    {noreply, Server};
+handle_info(Msg, _Server) ->
+    couch_log:error("Bad message received for view module: ~p", [Msg]),
+    exit({error, Msg}).
     
 add_to_ets(Pid, DbName, GroupId) ->
     true = ets:insert(couch_views_by_updater, {Pid, {DbName, GroupId}}),
@@ -216,11 +215,6 @@
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
 
-start_update_loop(RootDir, DbName, GroupId) ->
-    % wait for a notify request before doing anything. This way, we can just
-    % exit and any exits will be noticed by the callers.
-    start_update_loop(RootDir, DbName, GroupId, get_notify_pids(1000)).
-
 
 start_temp_update_loop(DbName, Fd, Lang, Query) ->
     NotifyPids = get_notify_pids(1000),
@@ -243,7 +237,12 @@
     {ok, Group2} = update_group(Group),
     [Pid ! {self(), {ok, Group2}} || Pid <- NotifyPids],
     garbage_collect(),
-    temp_update_loop(Group2, get_notify_pids(100000)).
+    temp_update_loop(Group2, get_notify_pids(10000)).
+
+start_update_loop(RootDir, DbName, GroupId) ->
+    % wait for a notify request before doing anything. This way, we can just
+    % exit and any exits will be noticed by the callers.
+    start_update_loop(RootDir, DbName, GroupId, get_notify_pids(1000)).
     
 start_update_loop(RootDir, DbName, GroupId, NotifyPids) ->
     {Db, DefLang, Defs} =
@@ -284,13 +283,16 @@
     update_loop(Group2).
     
 update_loop(Group) ->
-    update_loop(Group, get_notify_pids()).
+    update_loop(Group, get_notify_pids(100000)).
 
 % wait for the first request to come in.
 get_notify_pids(Wait) ->
     receive
     {Pid, get_updated} ->
-        [Pid | get_notify_pids()]
+        [Pid | get_notify_pids()];
+    Else ->
+        couch_log:error("Unexpected message in view updater: ~p", [Else]),
+        exit({error, Else})
     after Wait ->
         exit(wait_timeout)
 	end.
@@ -526,15 +528,15 @@
             {ok, Doc} = couch_db:open_doc(Db, DocInfo, [conflicts, deleted_conflicts]),
             {[Doc | Docs], DocIdViewIdKeys}
         end,
-        case process_info(self(), memory) of
-        {memory, Mem} when Mem > ?FLUSH_MAX_MEM ->
+        case couch_util:should_flush() of
+        true ->
             {Group1, Results} = view_compute(Group, Docs2),
             {ViewKVs3, DocIdViewIdKeys3} = view_insert_query_results(Docs2, Results, ViewKVs,
DocIdViewIdKeys2),
             {ok, Group2} = write_changes(Group1, ViewKVs3, DocIdViewIdKeys3, Seq),
             garbage_collect(),
             ViewEmptyKeyValues = [{View, []} || View <- Group2#group.views],
             {ok, {[], Group2, ViewEmptyKeyValues, [], Seq}};
-        _Else ->
+        false ->
             {ok, {Docs2, Group, ViewKVs, DocIdViewIdKeys2, Seq}}
         end
     end.



Mime
View raw message