couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dav...@apache.org
Subject [couchdb] 01/03: Simplify compaction state management
Date Wed, 16 Aug 2017 19:56:26 GMT
This is an automated email from the ASF dual-hosted git repository.

davisp pushed a commit to branch compactor-rewrite
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit b267ccceab85e6baf77178bbb1f234e76568dce1
Author: Paul J. Davis <paul.joseph.davis@gmail.com>
AuthorDate: Wed Aug 16 12:17:40 2017 -0500

    Simplify compaction state management
    
    This change adds a new `#comp_st{}` record that is used to pass
    compaction state through the various compaction steps. There are zero
    changes to the existing compaction logic. This merely sets the stage for
    adding our docid copy optimization.
---
 src/couch/src/couch_db_updater.erl | 169 +++++++++++++++++++++++++++----------
 1 file changed, 124 insertions(+), 45 deletions(-)

diff --git a/src/couch/src/couch_db_updater.erl b/src/couch/src/couch_db_updater.erl
index f786048..a83e3b4 100644
--- a/src/couch/src/couch_db_updater.erl
+++ b/src/couch/src/couch_db_updater.erl
@@ -23,9 +23,16 @@
 
 -define(IDLE_LIMIT_DEFAULT, 61000).
 
+-record(comp_st, {
+    old_db,
+    new_db,
+    meta_fd,
+    retry
+}).
+
 -record(comp_header, {
     db_header,
-    meta_state
+    meta_st
 }).
 
 -record(merge_st, {
@@ -1034,56 +1041,90 @@ check_md5(_, _) -> throw(md5_mismatch).
 
 start_copy_compact(#db{}=Db) ->
     erlang:put(io_priority, {db_compact, Db#db.name}),
-    #db{name=Name, filepath=Filepath, options=Options, header=Header} = Db,
-    couch_log:debug("Compaction process spawned for db \"~s\"", [Name]),
+    couch_log:debug("Compaction process spawned for db \"~s\"", [Db#db.name]),
+
+    {ok, InitCompSt} = open_compaction_files(Db),
+
+    Stages = [
+        fun copy_purge_info/1,
+        fun copy_compact/1,
+        fun commit_compaction_data/1,
+        fun sort_meta_data/1,
+        fun commit_compaction_data/1,
+        fun copy_meta_data/1,
+        fun compact_final_sync/1
+    ],
 
-    {ok, NewDb, DName, DFd, MFd, Retry} =
-        open_compaction_files(Name, Header, Filepath, Options),
-    erlang:monitor(process, MFd),
+    FinalCompSt = lists:foldl(fun(Stage, CompSt) ->
+        Stage(CompSt)
+    end, InitCompSt, Stages),
 
-    % This is a bit worrisome. init_db/4 will monitor the data fd
-    % but it doesn't know about the meta fd. For now I'll maintain
-    % that the data fd is the old normal fd and meta fd is special
-    % and hope everything works out for the best.
-    unlink(DFd),
+    #comp_st{
+        new_db = FinalNewDb,
+        meta_fd = MetaFd
+    } = FinalCompSt,
 
-    NewDb1 = copy_purge_info(Db, NewDb),
-    NewDb2 = copy_compact(Db, NewDb1, Retry),
-    NewDb3 = sort_meta_data(NewDb2),
-    NewDb4 = commit_compaction_data(NewDb3),
-    NewDb5 = copy_meta_data(NewDb4),
-    NewDb6 = sync_header(NewDb5, db_to_header(NewDb5, NewDb5#db.header)),
-    close_db(NewDb6),
+    close_db(FinalNewDb),
+    ok = couch_file:close(MetaFd),
 
-    ok = couch_file:close(MFd),
-    gen_server:cast(Db#db.main_pid, {compact_done, DName}).
+    gen_server:cast(Db#db.main_pid, {compact_done, FinalNewDb#db.filepath}).
 
 
-open_compaction_files(DbName, SrcHdr, DbFilePath, Options) ->
+open_compaction_files(OldDb) ->
+    #db{
+        name = DbName,
+        filepath = DbFilePath,
+        options = Options,
+        header = SrcHdr
+    } = OldDb,
     DataFile = DbFilePath ++ ".compact.data",
     MetaFile = DbFilePath ++ ".compact.meta",
     {ok, DataFd, DataHdr} = open_compaction_file(DataFile),
     {ok, MetaFd, MetaHdr} = open_compaction_file(MetaFile),
     DataHdrIsDbHdr = couch_db_header:is_header(DataHdr),
-    case {DataHdr, MetaHdr} of
+    CompSt = case {DataHdr, MetaHdr} of
         {#comp_header{}=A, #comp_header{}=A} ->
+            % We're restarting a compaction that did not finish
+            % before trying to swap out with the original db
             DbHeader = A#comp_header.db_header,
             Db0 = init_db(DbName, DataFile, DataFd, DbHeader, Options),
-            Db1 = bind_emsort(Db0, MetaFd, A#comp_header.meta_state),
-            {ok, Db1, DataFile, DataFd, MetaFd, Db0#db.id_tree};
+            Db1 = bind_emsort(Db0, MetaFd, A#comp_header.meta_st),
+            #comp_st{
+                old_db = OldDb,
+                new_db = Db1,
+                meta_fd = MetaFd,
+                retry = Db0#db.id_tree
+            };
         _ when DataHdrIsDbHdr ->
+            % We tried to swap out the compaction but there were
+            % writes to the database during compaction. Start
+            % a compaction retry.
             ok = reset_compaction_file(MetaFd, couch_db_header:from(SrcHdr)),
             Db0 = init_db(DbName, DataFile, DataFd, DataHdr, Options),
             Db1 = bind_emsort(Db0, MetaFd, nil),
-            {ok, Db1, DataFile, DataFd, MetaFd, Db0#db.id_tree};
+            #comp_st{
+                old_db = OldDb,
+                new_db = Db1,
+                meta_fd = MetaFd,
+                retry = Db0#db.id_tree
+            };
         _ ->
+            % We're starting a compaction from scratch
             Header = couch_db_header:from(SrcHdr),
             ok = reset_compaction_file(DataFd, Header),
             ok = reset_compaction_file(MetaFd, Header),
             Db0 = init_db(DbName, DataFile, DataFd, Header, Options),
             Db1 = bind_emsort(Db0, MetaFd, nil),
-            {ok, Db1, DataFile, DataFd, MetaFd, nil}
-    end.
+            #comp_st{
+                old_db = OldDb,
+                new_db = Db1,
+                meta_fd = MetaFd,
+                retry = nil
+            }
+    end,
+    unlink(DataFd),
+    erlang:monitor(process, MetaFd),
+    {ok, CompSt}.
 
 
 open_compaction_file(FilePath) ->
@@ -1104,25 +1145,34 @@ reset_compaction_file(Fd, Header) ->
     ok = couch_file:write_header(Fd, Header).
 
 
-copy_purge_info(OldDb, NewDb) ->
+copy_purge_info(#comp_st{old_db = OldDb, new_db = NewDb} = CompSt) ->
     OldHdr = OldDb#db.header,
     NewHdr = NewDb#db.header,
     OldPurgeSeq = couch_db_header:purge_seq(OldHdr),
-    if OldPurgeSeq > 0 ->
+    NewPurgeSeq = couch_db_header:purge_seq(NewHdr),
+    if OldPurgeSeq > NewPurgeSeq ->
         {ok, PurgedIdsRevs} = couch_db:get_last_purged(OldDb),
         Opts = [{compression, NewDb#db.compression}],
         {ok, Ptr, _} = couch_file:append_term(NewDb#db.fd, PurgedIdsRevs, Opts),
-        NewNewHdr = couch_db_header:set(NewHdr, [
-            {purge_seq, OldPurgeSeq},
-            {purged_docs, Ptr}
-        ]),
-        NewDb#db{header = NewNewHdr};
+        CompSt#comp_st{
+            new_db = NewDb#db{
+                header = couch_db_header:set(NewHdr, [
+                    {purge_seq, OldPurgeSeq},
+                    {purged_docs, Ptr}
+                ])
+            }
+        };
     true ->
-        NewDb
+        CompSt
     end.
 
 
-copy_compact(Db, NewDb0, Retry) ->
+copy_compact(#comp_st{} = CompSt) ->
+    #comp_st{
+        old_db = Db,
+        new_db = NewDb0,
+        retry = Retry
+    } = CompSt,
     Compression = couch_compress:get_compression_method(),
     NewDb = NewDb0#db{compression=Compression},
     TotalChanges = couch_db:count_changes_since(Db, NewDb#db.update_seq),
@@ -1160,12 +1210,13 @@ copy_compact(Db, NewDb0, Retry) ->
 
     TaskProps0 = [
         {type, database_compaction},
+        {retry, (Retry /= nil)},
         {database, Db#db.name},
         {progress, 0},
         {changes_done, 0},
         {total_changes, TotalChanges}
     ],
-    case (Retry =/= nil) and couch_task_status:is_task_added() of
+    case (Retry /= nil) and couch_task_status:is_task_added() of
     true ->
         couch_task_status:update([
             {retry, true},
@@ -1195,7 +1246,11 @@ copy_compact(Db, NewDb0, Retry) ->
         NewDb4 = NewDb3
     end,
 
-    commit_compaction_data(NewDb4#db{update_seq=Db#db.update_seq}).
+    CompSt#comp_st{
+        new_db = NewDb4#db{
+            update_seq = Db#db.update_seq
+        }
+    }.
 
 
 copy_docs(Db, #db{fd = DestFd} = NewDb, MixedInfos, Retry) ->
@@ -1310,12 +1365,15 @@ copy_doc_attachments(#db{fd = SrcFd} = SrcDb, SrcSp, DestFd) ->
     {BodyData, NewBinInfos}.
 
 
-commit_compaction_data(#db{}=Db) ->
+commit_compaction_data(#comp_st{new_db = Db} = CompSt) ->
     % Compaction needs to write headers to both the data file
     % and the meta file so if we need to restart we can pick
     % back up from where we left off.
     commit_compaction_data(Db, couch_emsort:get_fd(Db#db.id_tree)),
-    commit_compaction_data(Db, Db#db.fd).
+    NewDb = commit_compaction_data(Db, Db#db.fd),
+    CompSt#comp_st{
+        new_db = NewDb
+    }.
 
 
 commit_compaction_data(#db{header=OldHeader}=Db0, Fd) ->
@@ -1330,7 +1388,7 @@ commit_compaction_data(#db{header=OldHeader}=Db0, Fd) ->
     Header = db_to_header(Db1, OldHeader),
     CompHeader = #comp_header{
         db_header = Header,
-        meta_state = MetaState
+        meta_st = MetaState
     },
     ok = couch_file:sync(Fd),
     ok = couch_file:write_header(Fd, CompHeader),
@@ -1359,12 +1417,20 @@ bind_id_tree(Db, Fd, State) ->
     Db#db{id_tree=IdBtree}.
 
 
-sort_meta_data(Db0) ->
+sort_meta_data(#comp_st{new_db = Db0} = CompSt) ->
     {ok, Ems} = couch_emsort:merge(Db0#db.id_tree),
-    Db0#db{id_tree=Ems}.
+    CompSt#comp_st{
+        new_db = Db0#db{
+            id_tree = Ems
+        }
+    }.
 
 
-copy_meta_data(#db{fd=Fd, header=Header}=Db) ->
+copy_meta_data(#comp_st{new_db = Db} = CompSt) ->
+    #db{
+        fd = Fd,
+        header = Header
+    } = Db,
     Src = Db#db.id_tree,
     DstState = couch_db_header:id_tree_state(Header),
     {ok, IdTree0} = couch_btree:open(DstState, Fd, [
@@ -1384,7 +1450,20 @@ copy_meta_data(#db{fd=Fd, header=Header}=Db) ->
     {ok, SeqTree} = couch_btree:add_remove(
         Acc#merge_st.seq_tree, [], Acc#merge_st.rem_seqs
     ),
-    Db#db{id_tree=IdTree, seq_tree=SeqTree}.
+    CompSt#comp_st{
+        new_db = Db#db{
+            id_tree = IdTree,
+            seq_tree = SeqTree
+        }
+    }.
+
+
+compact_final_sync(#comp_st{new_db = NewDb0} = CompSt) ->
+    NewHdr = db_to_header(NewDb0, NewDb0#db.header),
+    NewDb1 = sync_header(NewDb0, NewHdr),
+    CompSt#comp_st{
+        new_db = NewDb1
+    }.
 
 
 merge_docids(Iter, #merge_st{infos=Infos}=Acc) when length(Infos) > 1000 ->

-- 
To stop receiving notification emails like this one, please contact
"commits@couchdb.apache.org" <commits@couchdb.apache.org>.

Mime
View raw message