couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dam...@apache.org
Subject svn commit: r644593 - in /incubator/couchdb/trunk: share/www/script/couch_tests.js src/couchdb/couch_db.erl src/couchdb/couch_file.erl src/couchdb/couch_stream.erl src/couchdb/couch_view.erl src/couchdb/mod_couch.erl
Date Fri, 04 Apr 2008 03:10:35 GMT
Author: damien
Date: Thu Apr  3 20:10:34 2008
New Revision: 644593

URL: http://svn.apache.org/viewvc?rev=644593&view=rev
Log:
compaction code, not hooked up to webserver yet

Modified:
    incubator/couchdb/trunk/share/www/script/couch_tests.js
    incubator/couchdb/trunk/src/couchdb/couch_db.erl
    incubator/couchdb/trunk/src/couchdb/couch_file.erl
    incubator/couchdb/trunk/src/couchdb/couch_stream.erl
    incubator/couchdb/trunk/src/couchdb/couch_view.erl
    incubator/couchdb/trunk/src/couchdb/mod_couch.erl

Modified: incubator/couchdb/trunk/share/www/script/couch_tests.js
URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/share/www/script/couch_tests.js?rev=644593&r1=644592&r2=644593&view=diff
==============================================================================
Binary files - no diff available.

Modified: incubator/couchdb/trunk/src/couchdb/couch_db.erl
URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/src/couchdb/couch_db.erl?rev=644593&r1=644592&r2=644593&view=diff
==============================================================================
--- incubator/couchdb/trunk/src/couchdb/couch_db.erl (original)
+++ incubator/couchdb/trunk/src/couchdb/couch_db.erl Thu Apr  3 20:10:34 2008
@@ -17,9 +17,9 @@
 -export([save_docs/2, save_docs/3, get_db_info/1, update_doc/3, update_docs/2, update_docs/3]).
 -export([delete_doc/3,open_doc/2,open_doc/3,enum_docs_since/4,enum_docs_since/5]).
 -export([enum_docs/4,enum_docs/5, open_doc_revs/4, get_missing_revs/2]).
--export([start_update_loop/1]).
+-export([start_update_loop/2]).
 -export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]).
--export([start_copy_compact_int/1,continue_copy_compact_int/2]).
+-export([start_copy_compact_int/2]).
 
 -include("couch_db.hrl").
 
@@ -47,24 +47,52 @@
     update_seq,
     doc_count,
     doc_del_count,
-    name
+    name,
+    filepath
     }).
 
 % small value used in revision trees to indicate the revision isn't stored
 -define(REV_MISSING, []).
 
 start_link(DbName, Filepath, Options) ->
+    catch start_link0(DbName, Filepath, Options).
+        
+start_link0(DbName, Filepath, Options) ->
+     % first delete the old file previous compaction
+    Fd = 
     case couch_file:open(Filepath, Options) of
-    {ok, Fd} ->
-         Result = gen_server:start_link(couch_db, {DbName, Fd, Options}, []),
-         unlink(Fd),
-         Result;
+    {ok, Fd0} ->
+        Fd0;
     {error, enoent} ->
-        % couldn't find file
-        {error, not_found};
+        % couldn't find file. is there a compact version? This can happen if
+        % crashed during the file switch.
+        case couch_file:open(Filepath ++ ".compact") of
+        {ok, Fd0} ->
+            couch_log:info("Found ~s~s compaction file, using as primary storage.", [Filepath,
".compact"]),
+            ok = file:rename(Filepath ++ ".compact", Filepath),
+            Fd0;
+        {error, enoent} ->
+            throw({error, notfound})
+        end;
     Else ->
-        Else
-    end.
+        throw(Else)
+    end,
+    
+    StartResult = gen_server:start_link(couch_db, {DbName, Filepath, Fd, Options}, []),
+    unlink(Fd),
+    case StartResult of
+    {ok, _} ->
+        % We successfully opened the db, delete old storage files if around
+        case file:delete(Filepath ++ ".old") of
+        ok ->
+            couch_log:info("Deleted old storage file ~s~s", [Filepath, ".old"]);
+        {error, enoent} ->
+            ok  % normal result
+        end;
+    _ ->
+        ok
+    end,
+    StartResult.
 
 %%% Interface functions %%%
 
@@ -146,12 +174,13 @@
         doc_count=Count,
         doc_del_count=DelCount,
         update_seq=SeqNum} = Db,
+    {ok, Size} = couch_file:bytes(Fd),
     InfoList = [
         {doc_count, Count},
         {doc_del_count, DelCount},
-        {last_update_seq, SeqNum},
-        {compacting, Compactor==nil},
-        {size, couch_file:bytes(Fd)}
+        {update_seq, SeqNum},
+        {compacting, Compactor/=nil},
+        {size, Size}
         ],
     {ok, InfoList}.
 
@@ -337,23 +366,12 @@
 
 % server functions
 
-init({DbName, Fd, Options}) ->
-    link(Fd),
-    case lists:member(create, Options) of
-    true ->
-        % create a new header and writes it to the file
-        Header =  #db_header{},
-        ok = couch_file:write_header(Fd, <<$g, $m, $k, 0>>, Header),
-        ok = couch_file:sync(Fd);
-    false ->
-        {ok, Header} = couch_file:read_header(Fd, <<$g, $m, $k, 0>>)
-    end,
-    
-    Db = init_db(DbName, Fd, Header),
-
-    UpdatePid = spawn_link(couch_db, start_update_loop, [Db]),
-
-    {ok, Db#db{update_pid=UpdatePid}}.
+init(InitArgs) ->
+    spawn_link(couch_db, start_update_loop, [self(), InitArgs]),
+    receive
+    {initialized, Db} ->
+        {ok, Db}
+    end.
 
 btree_by_seq_split(DocInfo) ->
     #doc_info{
@@ -383,7 +401,7 @@
     #full_doc_info{id=Id, update_seq=Seq, rev_tree=Tree}.
     
 
-init_db(DbName, Fd, Header) ->
+init_db(DbName, Filepath, Fd, Header) ->
     {ok, SummaryStream} = couch_stream:open(Header#db_header.summary_stream_state, Fd),
     ok = couch_stream:set_min_buffer(SummaryStream, 10000),
     {ok, IdBtree} = couch_btree:open(Header#db_header.fulldocinfo_by_id_btree_state, Fd,
@@ -395,7 +413,7 @@
     {ok, LocalDocsBtree} = couch_btree:open(Header#db_header.local_docs_btree_state, Fd),
 
     #db{
-        main_pid=self(),
+        update_pid=self(),
         fd=Fd,
         header=Header,
         summary_stream = SummaryStream,
@@ -405,12 +423,16 @@
         update_seq = Header#db_header.update_seq,
         doc_count = Header#db_header.doc_count,
         doc_del_count = Header#db_header.doc_del_count,
-        name = DbName
+        name = DbName,
+        filepath=Filepath
         }.
 
+close_db(#db{fd=Fd,summary_stream=Ss}) ->
+    couch_file:close(Fd),
+    couch_stream:close(Ss).
+    
 terminate(_Reason, Db) ->
-    exit(Db#db.update_pid, kill),
-    couch_file:close(Db#db.fd).
+    exit(Db#db.update_pid, kill).
     
 handle_call({update_docs, DocActions, Options}, From, #db{update_pid=Updater}=Db) ->
     Updater ! {From, update_docs, DocActions, Options},
@@ -435,17 +457,34 @@
 
 %%% Internal function %%%
 
-start_update_loop(Db) ->
-    update_loop(Db#db{update_pid=self()}).
+start_update_loop(MainPid, {DbName, Filepath, Fd, Options}) ->
+    link(Fd),
+    
+    case lists:member(create, Options) of
+    true ->
+        % create a new header and writes it to the file
+        Header =  #db_header{},
+        ok = couch_file:write_header(Fd, <<$g, $m, $k, 0>>, Header),
+        % delete any old compaction files that might be hanging around
+        file:delete(Filepath ++ ".compact"),
+        file:delete(Filepath ++ ".old");
+    false ->
+        {ok, Header} = couch_file:read_header(Fd, <<$g, $m, $k, 0>>)
+    end,
+    
+    Db = init_db(DbName, Filepath, Fd, Header),
+    Db2 = Db#db{main_pid=MainPid},
+    MainPid ! {initialized, Db2},
+    update_loop(Db2).
     
-update_loop(Db) ->
+update_loop(#db{name=Name,filepath=Filepath, main_pid=MainPid}=Db) ->
     receive
     {OrigFrom, update_docs, DocActions, Options} ->
         case (catch update_docs_int(Db, DocActions, Options)) of
         {ok, Db2} ->
-            ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}),
+            ok = gen_server:call(MainPid, {db_updated, Db2}),
             gen_server:reply(OrigFrom, ok),
-            couch_db_update_notifier:notify({updated, Db2#db.name}),
+            couch_db_update_notifier:notify({updated, Name}),
             update_loop(Db2);
         conflict ->
             gen_server:reply(OrigFrom, conflict),
@@ -456,22 +495,40 @@
     compact ->
         case Db#db.compactor_pid of
         nil ->
-            Pid = spawn_link(couch_db, start_copy_compact_int, [Db]),
+            couch_log:info("Starting compaction for db \"~s\"", [Name]),
+            Pid = spawn_link(couch_db, start_copy_compact_int, [Db, true]),
             Db2 = Db#db{compactor_pid=Pid},
-            ok = gen_server:call(Db#db.main_pid, {db_updated, Db2}),
+            ok = gen_server:call(MainPid, {db_updated, Db2}),
             update_loop(Db2);
         _ ->
             update_loop(Db) % already started
         end;
-    {compact_done, #db{update_seq=CompactSeq}=NewDb} ->
-        case CompactSeq == Db#db.update_seq of
+    {compact_done, CompactFilepath} ->
+        {ok, NewFd} = couch_file:open(CompactFilepath),
+        {ok, NewHeader} = couch_file:read_header(NewFd, <<$g, $m, $k, 0>>),
+        #db{update_seq=NewSeq}= NewDb =
+                init_db(Name, CompactFilepath, NewFd, NewHeader),
+        case Db#db.update_seq == NewSeq of
         true ->
-            NewDb2 = swap_files(Db, NewDb),
+            couch_log:debug("CouchDB swapping files ~s and ~s.", [Filepath, CompactFilepath]),
+            ok = file:rename(Filepath, Filepath ++ ".old"),
+            ok = file:rename(CompactFilepath, Filepath),
+            
+            NewDb2 = NewDb#db{
+                main_pid = Db#db.main_pid,
+                doc_count = Db#db.doc_count,
+                doc_del_count = Db#db.doc_del_count,
+                filepath = Filepath},
+            close_db(Db),
+            ok = gen_server:call(MainPid, {db_updated, NewDb2}),
+            couch_log:info("Compaction for db ~p completed.", [Name]),
             update_loop(NewDb2#db{compactor_pid=nil});
         false ->
-            Pid = spawn_link(couch_db, continue_copy_compact_int, [Db, NewDb]),
+            couch_log:info("Compaction file still behind main file "
+                "(update seq=~p. compact update seq=~p). Retrying.",
+                [Db#db.update_seq, NewSeq]),
+            Pid = spawn_link(couch_db, start_copy_compact_int, [Db, false]),
             Db2 = Db#db{compactor_pid=Pid},
-            ok = gen_server:call(Db#db.main_pid, {db_updated, Db2}),
             update_loop(Db2)
         end;
     Else ->
@@ -479,14 +536,6 @@
         exit({error, Else})
     end.
 
-swap_files(#db{fd=OldFd, name=Name}=_DbOld, DbNew) ->
-    NormalFilename = couch_server:get_filename(Name),
-    true = file:rename(NormalFilename, NormalFilename ++ ".old"),
-    true = file:rename(NormalFilename ++ ".compact", NormalFilename),
-    couch_file:close(OldFd),
-    file:delete(NormalFilename ++ ".old"),
-    DbNew.
-
 get_db(MainPid) ->
     {ok, Db} = gen_server:call(MainPid, get_db),
     Db.
@@ -773,7 +822,6 @@
 
 
 commit_data(#db{fd=Fd, header=Header} = Db) ->
-    ok = couch_file:sync(Fd), % commit outstanding data
     Header2 = Header#db_header{
         update_seq = Db#db.update_seq,
         summary_stream_state = couch_stream:get_state(Db#db.summary_stream),
@@ -787,7 +835,6 @@
         Db; % unchanged. nothing to do
     true ->
         ok = couch_file:write_header(Fd, <<$g, $m, $k, 0>>, Header2),
-        ok = couch_file:sync(Fd), % commit header to disk
         Db#db{header = Header2}
     end.
 
@@ -795,21 +842,22 @@
     {ok, {BodyData, BinInfos}} = couch_stream:read_term(SrcFd, SrcSp),
     % copy the bin values
     NewBinInfos = lists:map(fun({Name, {Type, BinSp, Len}}) ->
-        {ok, NewBinSp} = couch_stream:copy_stream(SrcFd, BinSp, Len, DestFd),
+        {ok, NewBinSp} = couch_stream:copy_to_new_stream(SrcFd, BinSp, Len, DestFd),
         {Name, {Type, NewBinSp, Len}}
         end, BinInfos),
     % now write the document summary
-    {ok, _SummaryPointer} = couch_stream:write_term(DestStream, {BodyData, NewBinInfos}).
+    {ok, Sp} = couch_stream:write_term(DestStream, {BodyData, NewBinInfos}),
+    Sp.
 
 copy_rev_tree(_SrcFd, _DestFd, _DestStream, []) ->
     [];
-copy_rev_tree(SrcFd, DestFd, DestStream, [{RevId, {IsDel, Sp}, []} | RestTrees]) ->
+copy_rev_tree(SrcFd, DestFd, DestStream, [{RevId, {IsDel, Sp}, []} | RestTree]) ->
     % This is a leaf node, copy it over
     NewSp = copy_raw_doc(SrcFd, Sp, DestFd, DestStream),
-    [{RevId, {IsDel, NewSp}, []} | copy_rev_tree(SrcFd, DestFd, DestStream, RestTrees)];
-copy_rev_tree(SrcFd, DestFd, DestStream, [{RevId, _, SubTrees} | RestTrees]) ->
+    [{RevId, {IsDel, NewSp}, []} | copy_rev_tree(SrcFd, DestFd, DestStream, RestTree)];
+copy_rev_tree(SrcFd, DestFd, DestStream, [{RevId, _, SubTree} | RestTree]) ->
     % inner node, only copy info/data from leaf nodes
-    [{RevId, ?REV_MISSING, copy_rev_tree(SrcFd, DestFd, DestStream, SubTrees)} | copy_rev_tree(SrcFd,
DestFd, DestStream, RestTrees)].
+    [{RevId, ?REV_MISSING, copy_rev_tree(SrcFd, DestFd, DestStream, SubTree)} | copy_rev_tree(SrcFd,
DestFd, DestStream, RestTree)].
     
 copy_docs(#db{fd=SrcFd}=Db, #db{fd=DestFd,summary_stream=DestStream}=NewDb, InfoBySeq) ->
     Ids = [Id || #doc_info{id=Id} <- InfoBySeq],
@@ -849,34 +897,33 @@
         NewDb2
     end.
 
-start_copy_compact_int(#db{name=Name}=Db) ->
-    couch_log:debug("New compaction process spawned for db \"%s\"", [Name]),
-    Filename = couch_server:get_compaction_filename(Name),
-    case couch_file:open(Filename) of
+start_copy_compact_int(#db{name=Name,filepath=Filepath}=Db, CopyLocal) ->
+    CompactFile = Filepath ++ ".compact",
+    couch_log:debug("Compaction process spawned for db \"~s\"", [Name]),
+    case couch_file:open(CompactFile) of
     {ok, Fd} ->
-        couch_log:debug("Found existing compaction file for db \"%s\"", [Name]),
+        couch_log:debug("Found existing compaction file for db \"~s\"", [Name]),
         {ok, Header} = couch_file:read_header(Fd, <<$g, $m, $k, 0>>);
     {error, enoent} -> %
-        {ok, Fd} = couch_file:open(Filename, [create]),
+        {ok, Fd} = couch_file:open(CompactFile, [create]),
         Header =  #db_header{},
-        ok = couch_file:write_header(Fd, <<$g, $m, $k, 0>>, Header),
-        ok = couch_file:sync(Fd)
+        ok = couch_file:write_header(Fd, <<$g, $m, $k, 0>>, Header)
     end,
-    NewDb = init_db(Name, Fd, Header),
+    NewDb = init_db(Name, CompactFile, Fd, Header),
     NewDb2 = copy_compact_docs(Db, NewDb),
-    
-    % suck up all the local docs into memory and write them to the new db
-    {ok, LocalDocs} = couch_btree:foldl(Db#db.local_docs_btree,
-            fun(Value, _Offset, Acc) -> {ok, [Value | Acc]} end, []),
-    {ok, NewLocalBtree} = couch_btree:add(NewDb2#db.local_docs_btree, LocalDocs),
-    NewDb3 = commit_data(NewDb2#db{local_docs_btree=NewLocalBtree}),
-    
-    NewDb3#db.update_pid ! {compact_done, NewDb3}.
-    
-continue_copy_compact_int(#db{name=Name}=Db, NewDb) ->
-    couch_log:debug("Continued compaction process spawned for db \"%s\"", [Name]),
-    NewDb2 = copy_compact_docs(Db, NewDb),
-    NewDb2#db.update_pid ! {compact_done, NewDb2}.
+    NewDb3 =
+    case CopyLocal of
+    true ->
+        % suck up all the local docs into memory and write them to the new db
+        {ok, LocalDocs} = couch_btree:foldl(Db#db.local_docs_btree,
+                fun(Value, _Offset, Acc) -> {ok, [Value | Acc]} end, []),
+        {ok, NewLocalBtree} = couch_btree:add(NewDb2#db.local_docs_btree, LocalDocs),
+        commit_data(NewDb2#db{local_docs_btree=NewLocalBtree});
+    _ ->
+        NewDb2
+    end,
+    close_db(NewDb3),
+    Db#db.update_pid ! {compact_done, CompactFile}.
     
     
     

Modified: incubator/couchdb/trunk/src/couchdb/couch_file.erl
URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/src/couchdb/couch_file.erl?rev=644593&r1=644592&r2=644593&view=diff
==============================================================================
--- incubator/couchdb/trunk/src/couchdb/couch_file.erl (original)
+++ incubator/couchdb/trunk/src/couchdb/couch_file.erl Thu Apr  3 20:10:34 2008
@@ -142,8 +142,7 @@
 
 
 write_header(Fd, Prefix, Data) ->
-    % The leading bytes in every db file, the sig and the file version:
-    %the actual header data
+    ok = sync(Fd),
     TermBin = term_to_binary(Data),
     % the size of all the bytes written to the header, including the md5 signature (16 bytes)
     FilledSize = size(Prefix) + size(TermBin) + 16,
@@ -159,7 +158,8 @@
         WriteBin = <<Prefix/binary, TermBin/binary, PadZeros/binary, Sig/binary>>,
         ?HEADER_SIZE = size(WriteBin), % sanity check
         DblWriteBin = [WriteBin, WriteBin],
-        ok = pwrite(Fd, 0, DblWriteBin)
+        ok = pwrite(Fd, 0, DblWriteBin),
+        ok = sync(Fd)
     end.
 
 

Modified: incubator/couchdb/trunk/src/couchdb/couch_stream.erl
URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/src/couchdb/couch_stream.erl?rev=644593&r1=644592&r2=644593&view=diff
==============================================================================
--- incubator/couchdb/trunk/src/couchdb/couch_stream.erl (original)
+++ incubator/couchdb/trunk/src/couchdb/couch_stream.erl Thu Apr  3 20:10:34 2008
@@ -15,7 +15,7 @@
 
 -export([test/1]).
 -export([open/1, open/2, close/1, read/3, read_term/2, write/2, write_term/2, get_state/1,
foldl/5]).
--export([copy/4]).
+-export([copy/4, copy_to_new_stream/4]).
 -export([ensure_buffer/2, set_min_buffer/2]).
 -export([init/1, terminate/2, handle_call/3]).
 -export([handle_cast/2,code_change/3,handle_info/2]).
@@ -78,10 +78,16 @@
     Bin = list_to_binary(lists:reverse(RevBin)),
     {ok, Bin, Sp2}.
 
-copy(#stream{pid = _Pid, fd = Fd}, Sp, Num, DestStream) ->
-    copy(Fd, Sp, Num, DestStream);
-copy(Fd, Sp, Num, DestStream) ->
-    {ok, NewSp, _Sp2} = stream_data(Fd, Sp, Num, ?HUGE_CHUNK,
+copy_to_new_stream(Src, Sp, Len, DestFd) ->
+    Dest = open(DestFd),
+    {ok, NewSp} = copy(Src, Sp, Len, Dest),
+    close(Dest),
+    {ok, NewSp}.
+
+copy(#stream{pid = _Pid, fd = Fd}, Sp, Len, DestStream) ->
+    copy(Fd, Sp, Len, DestStream);
+copy(Fd, Sp, Len, DestStream) ->
+    {ok, NewSp, _Sp2} = stream_data(Fd, Sp, Len, ?HUGE_CHUNK,
         fun(Bin, AccPointer) ->
             {ok, NewPointer} = write(DestStream, Bin),
             if AccPointer == null -> NewPointer; true -> AccPointer end

Modified: incubator/couchdb/trunk/src/couchdb/couch_view.erl
URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/src/couchdb/couch_view.erl?rev=644593&r1=644592&r2=644593&view=diff
==============================================================================
--- incubator/couchdb/trunk/src/couchdb/couch_view.erl (original)
+++ incubator/couchdb/trunk/src/couchdb/couch_view.erl Thu Apr  3 20:10:34 2008
@@ -179,9 +179,12 @@
     file:delete(Root ++ "/." ++ DbName ++ "_temp"),
     {noreply, Server}.
 
+handle_info({'EXIT', _FromPid, normal}, Server) ->
+    {noreply, Server};
 handle_info({'EXIT', FromPid, Reason}, #server{root_dir=RootDir}=Server) ->
     case ets:lookup(couch_views_by_updater, FromPid) of
     [] -> % non-updater linked process must have died, we propagate the error
+        couch_log:error("Exit on non-updater process: ~p", [Reason]),
         exit(Reason);
     [{_, {DbName, "_temp_" ++ _ = GroupId}}] ->
         delete_from_ets(FromPid, DbName, GroupId),

Modified: incubator/couchdb/trunk/src/couchdb/mod_couch.erl
URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/src/couchdb/mod_couch.erl?rev=644593&r1=644592&r2=644593&view=diff
==============================================================================
--- incubator/couchdb/trunk/src/couchdb/mod_couch.erl (original)
+++ incubator/couchdb/trunk/src/couchdb/mod_couch.erl Thu Apr  3 20:10:34 2008
@@ -500,14 +500,7 @@
 send_database_info(Mod, #uri_parts{db=DbName}=Parts) ->
     Db = open_db(Parts),
     {ok, InfoList} = couch_db:get_db_info(Db),
-    ok = send_header(Mod, 200, resp_json_header(Mod)),
-    DocCount = proplists:get_value(doc_count, InfoList),
-    LastUpdateSequence = proplists:get_value(last_update_seq, InfoList),
-    ok = send_chunk(Mod, "{\"db_name\": \"" ++ DbName ++
-        "\", \"doc_count\":" ++ integer_to_list(DocCount) ++
-        ", \"update_seq\":" ++ integer_to_list(LastUpdateSequence)++"}"),
-    ok = send_final_chunk(Mod),
-    {ok, 200}.
+    send_json(Mod, 200, {obj, [{db_name, DbName} | InfoList]}).
 
 send_doc(#mod{parsed_header=Headers}=Mod,
         #uri_parts{doc=DocId,querystr=QueryStr}=Parts) ->



Mime
View raw message