Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id D0C61200C22 for ; Tue, 21 Feb 2017 18:46:21 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id CF73B160B68; Tue, 21 Feb 2017 17:46:21 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 8AFEC160B7F for ; Tue, 21 Feb 2017 18:46:19 +0100 (CET) Received: (qmail 32308 invoked by uid 500); 21 Feb 2017 17:46:18 -0000 Mailing-List: contact commits-help@couchdb.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@couchdb.apache.org Delivered-To: mailing list commits@couchdb.apache.org Received: (qmail 31688 invoked by uid 99); 21 Feb 2017 17:46:18 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 21 Feb 2017 17:46:18 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 02239E0082; Tue, 21 Feb 2017 17:46:18 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: davisp@apache.org To: commits@couchdb.apache.org Date: Tue, 21 Feb 2017 17:46:32 -0000 Message-Id: <8cd13ce08c174410b6701e3967076942@git.apache.org> In-Reply-To: <8b83471d06d34057a5c381b7f39623a2@git.apache.org> References: <8b83471d06d34057a5c381b7f39623a2@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [16/22] couch commit: updated refs/heads/COUCHDB-3287-pluggable-storage-engines to 327c053 archived-at: Tue, 21 Feb 2017 17:46:22 -0000 http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/c05c1282/src/couch_db_updater.erl ---------------------------------------------------------------------- diff --git a/src/couch_db_updater.erl b/src/couch_db_updater.erl index 1970b78..37e77cd 100644 --- a/src/couch_db_updater.erl +++ b/src/couch_db_updater.erl @@ -14,74 +14,37 @@ -behaviour(gen_server). -vsn(1). --export([btree_by_id_split/1, btree_by_id_join/2, btree_by_id_reduce/2]). --export([btree_by_seq_split/1, btree_by_seq_join/2, btree_by_seq_reduce/2]). --export([make_doc_summary/2]). +-export([make_doc_summary/2, add_sizes/3, upgrade_sizes/1]). -export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]). -include_lib("couch/include/couch_db.hrl"). -include("couch_db_int.hrl"). --record(comp_header, { - db_header, - meta_state -}). --record(merge_st, { - id_tree, - seq_tree, - curr, - rem_seqs, - infos -}). -init({DbName, Filepath, Fd, Options}) -> +init({Engine, DbName, FilePath, Options0}) -> erlang:put(io_priority, {db_update, DbName}), - case lists:member(create, Options) of - true -> - % create a new header and writes it to the file - Header = couch_db_header:new(), - ok = couch_file:write_header(Fd, Header), - % delete any old compaction files that might be hanging around - RootDir = config:get("couchdb", "database_dir", "."), - couch_file:delete(RootDir, Filepath ++ ".compact"), - couch_file:delete(RootDir, Filepath ++ ".compact.data"), - couch_file:delete(RootDir, Filepath ++ ".compact.meta"); - false -> - case couch_file:read_header(Fd) of - {ok, Header} -> - ok; - no_valid_header -> - % create a new header and writes it to the file - Header = couch_db_header:new(), - ok = couch_file:write_header(Fd, Header), - % delete any old compaction files that might be hanging around - file:delete(Filepath ++ ".compact"), - file:delete(Filepath ++ ".compact.data"), - file:delete(Filepath ++ ".compact.meta") - end - end, - Db = init_db(DbName, Filepath, Fd, Header, Options), - case lists:member(sys_db, Options) of - false -> - couch_stats_process_tracker:track([couchdb, open_databases]); - true -> - ok - end, - % we don't load validation funs here because the fabric query is liable to - % race conditions. Instead see couch_db:validate_doc_update, which loads - % them lazily - {ok, Db#db{main_pid = self()}}. + DefaultSecObj = default_security_object(DbName), + Options = [{default_security_object, DefaultSecObj} | Options0], + try + {ok, EngineState} = couch_db_engine:init(Engine, FilePath, Options), + Db = init_db(DbName, FilePath, EngineState, Options), + maybe_track_db(Db), + % we don't load validation funs here because the fabric query is liable to + % race conditions. Instead see couch_db:validate_doc_update, which loads + % them lazily + NewDb = Db#db{main_pid = self()}, + proc_lib:init_ack({ok, NewDb}), + gen_server:enter_loop(?MODULE, [], NewDb) + catch + throw:InitError -> + proc_lib:init_ack(InitError) + end. -terminate(_Reason, Db) -> - % If the reason we died is because our fd disappeared - % then we don't need to try closing it again. - if Db#db.fd_monitor == closed -> ok; true -> - ok = couch_file:close(Db#db.fd) - end, +terminate(Reason, Db) -> couch_util:shutdown_sync(Db#db.compactor_pid), - couch_util:shutdown_sync(Db#db.fd), + couch_db_engine:terminate(Reason, Db), ok. handle_call(get_db, _From, Db) -> @@ -105,28 +68,21 @@ handle_call(cancel_compact, _From, #db{compactor_pid = nil} = Db) -> handle_call(cancel_compact, _From, #db{compactor_pid = Pid} = Db) -> unlink(Pid), exit(Pid, kill), - RootDir = config:get("couchdb", "database_dir", "."), - ok = couch_file:delete(RootDir, Db#db.filepath ++ ".compact"), + couch_server:delete_compaction_files(Db#db.name), Db2 = Db#db{compactor_pid = nil}, ok = gen_server:call(couch_server, {db_updated, Db2}, infinity), {reply, ok, Db2}; -handle_call(increment_update_seq, _From, Db) -> - Db2 = commit_data(Db#db{update_seq=Db#db.update_seq+1}), - ok = gen_server:call(couch_server, {db_updated, Db2}, infinity), - couch_event:notify(Db#db.name, updated), - {reply, {ok, Db2#db.update_seq}, Db2}; -handle_call({set_security, NewSec}, _From, #db{compression = Comp} = Db) -> - {ok, Ptr, _} = couch_file:append_term( - Db#db.fd, NewSec, [{compression, Comp}]), - Db2 = commit_data(Db#db{security=NewSec, security_ptr=Ptr, - update_seq=Db#db.update_seq+1}), - ok = gen_server:call(couch_server, {db_updated, Db2}, infinity), - {reply, ok, Db2}; +handle_call({set_security, NewSec}, _From, #db{} = Db) -> + {ok, NewDb} = couch_db_engine:set_security(Db, NewSec), + NewSecDb = NewDb#db{ + security = NewSec + }, + ok = gen_server:call(couch_server, {db_updated, NewSecDb}, infinity), + {reply, ok, NewSecDb}; handle_call({set_revs_limit, Limit}, _From, Db) -> - Db2 = commit_data(Db#db{revs_limit=Limit, - update_seq=Db#db.update_seq+1}), + {ok, Db2} = couch_db_engine:set_revs_limit(Db, Limit), ok = gen_server:call(couch_server, {db_updated, Db2}, infinity), {reply, ok, Db2}; @@ -134,73 +90,78 @@ handle_call({purge_docs, _IdRevs}, _From, #db{compactor_pid=Pid}=Db) when Pid /= nil -> {reply, {error, purge_during_compaction}, Db}; handle_call({purge_docs, IdRevs}, _From, Db) -> - #db{ - fd = Fd, - id_tree = DocInfoByIdBTree, - seq_tree = DocInfoBySeqBTree, - update_seq = LastSeq, - header = Header, - compression = Comp - } = Db, - DocLookups = couch_btree:lookup(DocInfoByIdBTree, - [Id || {Id, _Revs} <- IdRevs]), - - NewDocInfos = lists:zipwith( - fun({_Id, Revs}, {ok, #full_doc_info{rev_tree=Tree}=FullDocInfo}) -> + DocIds = [Id || {Id, _Revs} <- IdRevs], + OldDocInfos = couch_db_engine:open_docs(Db, DocIds), + + NewDocInfos = lists:flatmap(fun + ({{Id, Revs}, #full_doc_info{id = Id, rev_tree = Tree} = FDI}) -> case couch_key_tree:remove_leafs(Tree, Revs) of - {_, []=_RemovedRevs} -> % no change - nil; - {NewTree, RemovedRevs} -> - {FullDocInfo#full_doc_info{rev_tree=NewTree},RemovedRevs} + {_, [] = _RemovedRevs} -> % no change + []; + {NewTree, RemovedRevs} -> + NewFDI = FDI#full_doc_info{rev_tree = NewTree}, + [{FDI, NewFDI, RemovedRevs}] end; - (_, not_found) -> - nil + ({_, not_found}) -> + [] + end, lists:zip(IdRevs, OldDocInfos)), + + InitUpdateSeq = couch_db_engine:get_update_seq(Db), + InitAcc = {InitUpdateSeq, [], []}, + FinalAcc = lists:foldl(fun({_, #full_doc_info{} = OldFDI, RemRevs}, Acc) -> + #full_doc_info{ + id = Id, + rev_tree = OldTree + } = OldFDI, + {SeqAcc0, FDIAcc, IdRevsAcc} = Acc, + + {NewFDIAcc, NewSeqAcc} = case OldTree of + [] -> + % If we purged every #leaf{} in the doc record + % then we're removing it completely from the + % database. + FDIAcc; + _ -> + % Its possible to purge the #leaf{} that contains + % the update_seq where this doc sits in the update_seq + % sequence. Rather than do a bunch of complicated checks + % we just re-label every #leaf{} and reinsert it into + % the update_seq sequence. + {NewTree, SeqAcc1} = couch_key_tree:mapfold(fun + (_RevId, Leaf, leaf, InnerSeqAcc) -> + {Leaf#leaf{seq = InnerSeqAcc + 1}, InnerSeqAcc + 1}; + (_RevId, Value, _Type, InnerSeqAcc) -> + {Value, InnerSeqAcc} + end, SeqAcc0, OldTree), + + NewFDI = OldFDI#full_doc_info{ + update_seq = SeqAcc1, + rev_tree = NewTree + }, + + {[NewFDI | FDIAcc], SeqAcc1} end, - IdRevs, DocLookups), - - SeqsToRemove = [Seq - || {#full_doc_info{update_seq=Seq},_} <- NewDocInfos], - - FullDocInfoToUpdate = [FullInfo - || {#full_doc_info{rev_tree=Tree}=FullInfo,_} - <- NewDocInfos, Tree /= []], - - IdRevsPurged = [{Id, Revs} - || {#full_doc_info{id=Id}, Revs} <- NewDocInfos], - - {DocInfoToUpdate, NewSeq} = lists:mapfoldl( - fun(#full_doc_info{rev_tree=Tree}=FullInfo, SeqAcc) -> - Tree2 = couch_key_tree:map_leafs( - fun(_RevId, Leaf) -> - Leaf#leaf{seq=SeqAcc+1} - end, Tree), - {FullInfo#full_doc_info{rev_tree=Tree2}, SeqAcc + 1} - end, LastSeq, FullDocInfoToUpdate), - - IdsToRemove = [Id || {#full_doc_info{id=Id,rev_tree=[]},_} - <- NewDocInfos], - - {ok, DocInfoBySeqBTree2} = couch_btree:add_remove(DocInfoBySeqBTree, - DocInfoToUpdate, SeqsToRemove), - {ok, DocInfoByIdBTree2} = couch_btree:add_remove(DocInfoByIdBTree, - FullDocInfoToUpdate, IdsToRemove), - {ok, Pointer, _} = couch_file:append_term( - Fd, IdRevsPurged, [{compression, Comp}]), - - NewHeader = couch_db_header:set(Header, [ - {purge_seq, couch_db_header:purge_seq(Header) + 1}, - {purged_docs, Pointer} - ]), - Db2 = commit_data( - Db#db{ - id_tree = DocInfoByIdBTree2, - seq_tree = DocInfoBySeqBTree2, - update_seq = NewSeq + 1, - header=NewHeader}), + NewIdRevsAcc = [{Id, RemRevs} | IdRevsAcc], + {NewSeqAcc, NewFDIAcc, NewIdRevsAcc} + end, InitAcc, NewDocInfos), + + {_FinalSeq, FDIs, PurgedIdRevs} = FinalAcc, + + % We need to only use the list of #full_doc_info{} records + % that we have actually changed due to a purge. + PreviousFDIs = [PrevFDI || {PrevFDI, _, _} <- NewDocInfos], + Pairs = pair_purge_info(PreviousFDIs, FDIs), + + {ok, Db2} = couch_db_engine:write_doc_infos(Db, Pairs, [], PurgedIdRevs), ok = gen_server:call(couch_server, {db_updated, Db2}, infinity), couch_event:notify(Db#db.name, updated), - {reply, {ok, couch_db_header:purge_seq(NewHeader), IdRevsPurged}, Db2}. + + PurgeSeq = couch_db_engine:get_purge_seq(Db2), + {reply, {ok, PurgeSeq, PurgedIdRevs}, Db2}; + +handle_call(Msg, From, Db) -> + couch_db_engine:handle_call(Msg, From, Db). handle_cast({load_validation_funs, ValidationFuns}, Db) -> @@ -209,65 +170,29 @@ handle_cast({load_validation_funs, ValidationFuns}, Db) -> {noreply, Db2}; handle_cast(start_compact, Db) -> case Db#db.compactor_pid of - nil -> - couch_log:info("Starting compaction for db \"~s\"", [Db#db.name]), - Pid = spawn_link(fun() -> start_copy_compact(Db) end), - Db2 = Db#db{compactor_pid=Pid}, - ok = gen_server:call(couch_server, {db_updated, Db2}, infinity), - {noreply, Db2}; - _ -> - % compact currently running, this is a no-op - {noreply, Db} - end; -handle_cast({compact_done, CompactFilepath}, #db{filepath=Filepath}=Db) -> - {ok, NewFd} = couch_file:open(CompactFilepath), - {ok, NewHeader0} = couch_file:read_header(NewFd), - NewHeader = couch_db_header:set(NewHeader0, [ - {compacted_seq, Db#db.update_seq} - ]), - #db{update_seq=NewSeq} = NewDb = - init_db(Db#db.name, Filepath, NewFd, NewHeader, Db#db.options), - unlink(NewFd), - case Db#db.update_seq == NewSeq of - true -> - % suck up all the local docs into memory and write them to the new db - {ok, _, LocalDocs} = couch_btree:foldl(Db#db.local_tree, - fun(Value, _Offset, Acc) -> {ok, [Value | Acc]} end, []), - {ok, NewLocalBtree} = couch_btree:add(NewDb#db.local_tree, LocalDocs), - - NewDb2 = commit_data(NewDb#db{ - local_tree = NewLocalBtree, - main_pid = self(), - filepath = Filepath, - instance_start_time = Db#db.instance_start_time, - revs_limit = Db#db.revs_limit - }), - - couch_log:debug("CouchDB swapping files ~s and ~s.", - [Filepath, CompactFilepath]), - ok = file:rename(CompactFilepath, Filepath ++ ".compact"), - RootDir = config:get("couchdb", "database_dir", "."), - couch_file:delete(RootDir, Filepath), - ok = file:rename(Filepath ++ ".compact", Filepath), - % Delete the old meta compaction file after promoting - % the compaction file. - couch_file:delete(RootDir, Filepath ++ ".compact.meta"), - close_db(Db), - NewDb3 = refresh_validate_doc_funs(NewDb2), - ok = gen_server:call(couch_server, {db_updated, NewDb3}, infinity), - couch_event:notify(NewDb3#db.name, compacted), - couch_log:info("Compaction for db \"~s\" completed.", [Db#db.name]), - {noreply, NewDb3#db{compactor_pid=nil}}; - false -> - couch_log:info("Compaction file still behind main file " - "(update seq=~p. compact update seq=~p). Retrying.", - [Db#db.update_seq, NewSeq]), - close_db(NewDb), - Pid = spawn_link(fun() -> start_copy_compact(Db) end), - Db2 = Db#db{compactor_pid=Pid}, - ok = gen_server:call(couch_server, {db_updated, Db2}, infinity), - {noreply, Db2} + nil -> + % For now we only support compacting to the same + % storage engine. After the first round of patches + % we'll add a field that sets the target engine + % type to compact to with a new copy compactor. + UpdateSeq = couch_db_engine:get_update_seq(Db), + Args = [Db#db.name, UpdateSeq], + couch_log:info("Starting compaction for db \"~s\" at ~p", Args), + {ok, Db2} = couch_db_engine:start_compaction(Db), + ok = gen_server:call(couch_server, {db_updated, Db2}, infinity), + {noreply, Db2}; + _ -> + % compact currently running, this is a no-op + {noreply, Db} end; +handle_cast({compact_done, CompactEngine, CompactInfo}, #db{} = OldDb) -> + {ok, NewDb} = case couch_db_engine:get_engine(OldDb) of + CompactEngine -> + couch_db_engine:finish_compaction(OldDb, CompactInfo); + _ -> + finish_engine_swap(OldDb, CompactEngine, CompactInfo) + end, + {noreply, NewDb}; handle_cast(Msg, #db{name = Name} = Db) -> couch_log:error("Database `~s` updater received unexpected cast: ~p", @@ -291,9 +216,9 @@ handle_info({update_docs, Client, GroupedDocs, NonRepDocs, MergeConflicts, FullCommit2) of {ok, Db2, UpdatedDDocIds} -> ok = gen_server:call(couch_server, {db_updated, Db2}, infinity), - if Db2#db.update_seq /= Db#db.update_seq -> - couch_event:notify(Db2#db.name, updated); - true -> ok + case {couch_db:get_update_seq(Db), couch_db:get_update_seq(Db2)} of + {Seq, Seq} -> ok; + _ -> couch_event:notify(Db2#db.name, updated) end, if NonRepDocs2 /= [] -> couch_event:notify(Db2#db.name, local_updated); @@ -336,9 +261,8 @@ handle_info({'EXIT', _Pid, normal}, Db) -> {noreply, Db}; handle_info({'EXIT', _Pid, Reason}, Db) -> {stop, Reason, Db}; -handle_info({'DOWN', Ref, _, _, Reason}, #db{fd_monitor=Ref, name=Name} = Db) -> - couch_log:error("DB ~s shutting down - Fd ~p", [Name, Reason]), - {stop, normal, Db#db{fd=undefined, fd_monitor=closed}}. +handle_info(Msg, Db) -> + couch_db_engine:handle_info(Msg, Db). code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -389,235 +313,32 @@ collect_updates(GroupedDocsAcc, ClientsAcc, MergeConflicts, FullCommit) -> {GroupedDocsAcc, ClientsAcc, FullCommit} end. -rev_tree(DiskTree) -> - couch_key_tree:map(fun - (_RevId, {Del, Ptr, Seq}) -> - #leaf{ - deleted = ?i2b(Del), - ptr = Ptr, - seq = Seq - }; - (_RevId, {Del, Ptr, Seq, Size}) -> - #leaf{ - deleted = ?i2b(Del), - ptr = Ptr, - seq = Seq, - sizes = upgrade_sizes(Size) - }; - (_RevId, {Del, Ptr, Seq, Sizes, Atts}) -> - #leaf{ - deleted = ?i2b(Del), - ptr = Ptr, - seq = Seq, - sizes = upgrade_sizes(Sizes), - atts = Atts - }; - (_RevId, ?REV_MISSING) -> - ?REV_MISSING - end, DiskTree). - -disk_tree(RevTree) -> - couch_key_tree:map(fun - (_RevId, ?REV_MISSING) -> - ?REV_MISSING; - (_RevId, #leaf{} = Leaf) -> - #leaf{ - deleted = Del, - ptr = Ptr, - seq = Seq, - sizes = Sizes, - atts = Atts - } = Leaf, - {?b2i(Del), Ptr, Seq, split_sizes(Sizes), Atts} - end, RevTree). - -upgrade_sizes(#size_info{}=SI) -> - SI; -upgrade_sizes({D, E}) -> - #size_info{active=D, external=E}; -upgrade_sizes(S) when is_integer(S) -> - #size_info{active=S, external=0}. - -split_sizes(#size_info{}=SI) -> - {SI#size_info.active, SI#size_info.external}. - -join_sizes({Active, External}) when is_integer(Active), is_integer(External) -> - #size_info{active=Active, external=External}. - -btree_by_seq_split(#full_doc_info{}=Info) -> - #full_doc_info{ - id = Id, - update_seq = Seq, - deleted = Del, - sizes = SizeInfo, - rev_tree = Tree - } = Info, - {Seq, {Id, ?b2i(Del), split_sizes(SizeInfo), disk_tree(Tree)}}. - -btree_by_seq_join(Seq, {Id, Del, DiskTree}) when is_integer(Del) -> - btree_by_seq_join(Seq, {Id, Del, {0, 0}, DiskTree}); -btree_by_seq_join(Seq, {Id, Del, Sizes, DiskTree}) when is_integer(Del) -> - #full_doc_info{ - id = Id, - update_seq = Seq, - deleted = ?i2b(Del), - sizes = join_sizes(Sizes), - rev_tree = rev_tree(DiskTree) - }; -btree_by_seq_join(KeySeq, {Id, RevInfos, DeletedRevInfos}) -> - % Older versions stored #doc_info records in the seq_tree. - % Compact to upgrade. - #doc_info{ - id = Id, - high_seq=KeySeq, - revs = - [#rev_info{rev=Rev,seq=Seq,deleted=false,body_sp = Bp} || - {Rev, Seq, Bp} <- RevInfos] ++ - [#rev_info{rev=Rev,seq=Seq,deleted=true,body_sp = Bp} || - {Rev, Seq, Bp} <- DeletedRevInfos]}. - -btree_by_id_split(#full_doc_info{}=Info) -> - #full_doc_info{ - id = Id, - update_seq = Seq, - deleted = Deleted, - sizes = SizeInfo, - rev_tree = Tree - } = Info, - {Id, {Seq, ?b2i(Deleted), split_sizes(SizeInfo), disk_tree(Tree)}}. - -% Handle old formats before data_size was added -btree_by_id_join(Id, {HighSeq, Deleted, DiskTree}) -> - btree_by_id_join(Id, {HighSeq, Deleted, #size_info{}, DiskTree}); - -btree_by_id_join(Id, {HighSeq, Deleted, Sizes, DiskTree}) -> - #full_doc_info{ - id = Id, - update_seq = HighSeq, - deleted = ?i2b(Deleted), - sizes = upgrade_sizes(Sizes), - rev_tree = rev_tree(DiskTree) - }. -btree_by_id_reduce(reduce, FullDocInfos) -> - lists:foldl( - fun(Info, {NotDeleted, Deleted, Sizes}) -> - Sizes2 = reduce_sizes(Sizes, Info#full_doc_info.sizes), - case Info#full_doc_info.deleted of - true -> - {NotDeleted, Deleted + 1, Sizes2}; - false -> - {NotDeleted + 1, Deleted, Sizes2} - end - end, - {0, 0, #size_info{}}, FullDocInfos); -btree_by_id_reduce(rereduce, Reds) -> - lists:foldl( - fun({NotDeleted, Deleted}, {AccNotDeleted, AccDeleted, _AccSizes}) -> - % pre 1.2 format, will be upgraded on compaction - {AccNotDeleted + NotDeleted, AccDeleted + Deleted, nil}; - ({NotDeleted, Deleted, Sizes}, {AccNotDeleted, AccDeleted, AccSizes}) -> - AccSizes2 = reduce_sizes(AccSizes, Sizes), - {AccNotDeleted + NotDeleted, AccDeleted + Deleted, AccSizes2} - end, - {0, 0, #size_info{}}, Reds). - -reduce_sizes(nil, _) -> - nil; -reduce_sizes(_, nil) -> - nil; -reduce_sizes(#size_info{}=S1, #size_info{}=S2) -> - #size_info{ - active = S1#size_info.active + S2#size_info.active, - external = S1#size_info.external + S2#size_info.external - }; -reduce_sizes(S1, S2) -> - reduce_sizes(upgrade_sizes(S1), upgrade_sizes(S2)). - -btree_by_seq_reduce(reduce, DocInfos) -> - % count the number of documents - length(DocInfos); -btree_by_seq_reduce(rereduce, Reds) -> - lists:sum(Reds). - -init_db(DbName, Filepath, Fd, Header0, Options) -> - Header = couch_db_header:upgrade(Header0), - - {ok, FsyncOptions} = couch_util:parse_term( - config:get("couchdb", "fsync_options", - "[before_header, after_header, on_file_open]")), - - case lists:member(on_file_open, FsyncOptions) of - true -> ok = couch_file:sync(Fd); - _ -> ok - end, - - Compression = couch_compress:get_compression_method(), - - IdTreeState = couch_db_header:id_tree_state(Header), - SeqTreeState = couch_db_header:seq_tree_state(Header), - LocalTreeState = couch_db_header:local_tree_state(Header), - {ok, IdBtree} = couch_btree:open(IdTreeState, Fd, - [{split, fun ?MODULE:btree_by_id_split/1}, - {join, fun ?MODULE:btree_by_id_join/2}, - {reduce, fun ?MODULE:btree_by_id_reduce/2}, - {compression, Compression}]), - {ok, SeqBtree} = couch_btree:open(SeqTreeState, Fd, - [{split, fun ?MODULE:btree_by_seq_split/1}, - {join, fun ?MODULE:btree_by_seq_join/2}, - {reduce, fun ?MODULE:btree_by_seq_reduce/2}, - {compression, Compression}]), - {ok, LocalDocsBtree} = couch_btree:open(LocalTreeState, Fd, - [{compression, Compression}]), - case couch_db_header:security_ptr(Header) of - nil -> - Security = default_security_object(DbName), - SecurityPtr = nil; - SecurityPtr -> - {ok, Security} = couch_file:pread_term(Fd, SecurityPtr) - end, +init_db(DbName, FilePath, EngineState, Options) -> % convert start time tuple to microsecs and store as a binary string {MegaSecs, Secs, MicroSecs} = os:timestamp(), StartTime = ?l2b(io_lib:format("~p", [(MegaSecs*1000000*1000000) + (Secs*1000000) + MicroSecs])), - ok = couch_file:set_db_pid(Fd, self()), - Db = #db{ - fd=Fd, - fd_monitor = erlang:monitor(process, Fd), - header=Header, - id_tree = IdBtree, - seq_tree = SeqBtree, - local_tree = LocalDocsBtree, - committed_update_seq = couch_db_header:update_seq(Header), - update_seq = couch_db_header:update_seq(Header), + + BDU = couch_util:get_value(before_doc_update, Options, nil), + ADR = couch_util:get_value(after_doc_read, Options, nil), + + CleanedOpts = [Opt || Opt <- Options, Opt /= create], + + InitDb = #db{ name = DbName, - filepath = Filepath, - security = Security, - security_ptr = SecurityPtr, + filepath = FilePath, + engine = EngineState, instance_start_time = StartTime, - revs_limit = couch_db_header:revs_limit(Header), - fsync_options = FsyncOptions, - options = Options, - compression = Compression, - before_doc_update = couch_util:get_value(before_doc_update, Options, nil), - after_doc_read = couch_util:get_value(after_doc_read, Options, nil) + options = CleanedOpts, + before_doc_update = BDU, + after_doc_read = ADR }, - % If we just created a new UUID while upgrading a - % database then we want to flush that to disk or - % we risk sending out the uuid and having the db - % crash which would result in it generating a new - % uuid each time it was reopened. - case Header /= Header0 of - true -> - sync_header(Db, Header); - false -> - Db - end. - - -close_db(#db{fd_monitor = Ref}) -> - erlang:demonitor(Ref). + InitDb#db{ + committed_update_seq = couch_db_engine:get_update_seq(InitDb), + security = couch_db_engine:get_security(InitDb) + }. refresh_validate_doc_funs(#db{name = <<"shards/", _/binary>> = Name} = Db) -> @@ -641,50 +362,36 @@ refresh_validate_doc_funs(Db0) -> flush_trees(_Db, [], AccFlushedTrees) -> {ok, lists:reverse(AccFlushedTrees)}; -flush_trees(#db{fd = Fd} = Db, +flush_trees(#db{} = Db, [InfoUnflushed | RestUnflushed], AccFlushed) -> #full_doc_info{update_seq=UpdateSeq, rev_tree=Unflushed} = InfoUnflushed, {Flushed, FinalAcc} = couch_key_tree:mapfold( fun(_Rev, Value, Type, SizesAcc) -> case Value of - #doc{deleted = IsDeleted, body = {summary, _, _, _} = DocSummary} -> - {summary, Summary, AttSizeInfo, AttsFd} = DocSummary, - % this node value is actually an unwritten document summary, - % write to disk. - % make sure the Fd in the written bins is the same Fd we are - % and convert bins, removing the FD. - % All bins should have been written to disk already. - case {AttsFd, Fd} of - {nil, _} -> - ok; - {SameFd, SameFd} -> - ok; - _ -> - % Fd where the attachments were written to is not the same - % as our Fd. This can happen when a database is being - % switched out during a compaction. - couch_log:debug("File where the attachments are written has" - " changed. Possibly retrying.", []), - throw(retry) - end, - ExternalSize = ?term_size(Summary), - {ok, NewSummaryPointer, SummarySize} = - couch_file:append_raw_chunk(Fd, Summary), - Leaf = #leaf{ - deleted = IsDeleted, - ptr = NewSummaryPointer, - seq = UpdateSeq, - sizes = #size_info{ - active = SummarySize, - external = ExternalSize + % This node is a document summary that needs to be + % flushed to disk. + #doc{} = Doc -> + check_doc_atts(Db, Doc), + ExternalSize = ?term_size(Doc#doc.body), + {size_info, AttSizeInfo} = + lists:keyfind(size_info, 1, Doc#doc.meta), + {ok, NewDoc, WrittenSize} = + couch_db_engine:write_doc_body(Db, Doc), + Leaf = #leaf{ + deleted = Doc#doc.deleted, + ptr = NewDoc#doc.body, + seq = UpdateSeq, + sizes = #size_info{ + active = WrittenSize, + external = ExternalSize + }, + atts = AttSizeInfo }, - atts = AttSizeInfo - }, - {Leaf, add_sizes(Type, Leaf, SizesAcc)}; - #leaf{} -> - {Value, add_sizes(Type, Value, SizesAcc)}; - _ -> - {Value, SizesAcc} + {Leaf, add_sizes(Type, Leaf, SizesAcc)}; + #leaf{} -> + {Value, add_sizes(Type, Value, SizesAcc)}; + _ -> + {Value, SizesAcc} end end, {0, 0, []}, Unflushed), {FinalAS, FinalES, FinalAtts} = FinalAcc, @@ -698,6 +405,29 @@ flush_trees(#db{fd = Fd} = Db, }, flush_trees(Db, RestUnflushed, [NewInfo | AccFlushed]). + +check_doc_atts(Db, Doc) -> + {atts_stream, Stream} = lists:keyfind(atts_stream, 1, Doc#doc.meta), + % Make sure that the attachments were written to the currently + % active attachment stream. If compaction swaps during a write + % request we may have to rewrite our attachment bodies. + if Stream == nil -> ok; true -> + case couch_db:is_active_stream(Db, Stream) of + true -> + ok; + false -> + % Stream where the attachments were written to is + % no longer the current attachment stream. This + % can happen when a database is switched at + % compaction time. + couch_log:debug("Stream where the attachments were" + " written has changed." + " Possibly retrying.", []), + throw(retry) + end + end. + + add_sizes(Type, #leaf{sizes=Sizes, atts=AttSizes}, Acc) -> % Maybe upgrade from disk_size only #size_info{ @@ -710,6 +440,15 @@ add_sizes(Type, #leaf{sizes=Sizes, atts=AttSizes}, Acc) -> NewAttsAcc = lists:umerge(AttSizes, AttsAcc), {NewASAcc, NewESAcc, NewAttsAcc}. + +upgrade_sizes(#size_info{}=SI) -> + SI; +upgrade_sizes({D, E}) -> + #size_info{active=D, external=E}; +upgrade_sizes(S) when is_integer(S) -> + #size_info{active=S, external=0}. + + send_result(Client, Doc, NewResult) -> % used to send a result to the client catch(Client ! {result, self(), {doc_tag(Doc), NewResult}}). @@ -836,58 +575,40 @@ merge_rev_tree(OldInfo, NewDoc, _Client, Limit, true) -> {NewTree, _} = couch_key_tree:merge(OldTree, NewTree0, Limit), OldInfo#full_doc_info{rev_tree = NewTree}. -stem_full_doc_infos(#db{revs_limit=Limit}, DocInfos) -> - [Info#full_doc_info{rev_tree=couch_key_tree:stem(Tree, Limit)} || - #full_doc_info{rev_tree=Tree}=Info <- DocInfos]. +update_docs_int(Db, DocsList, LocalDocs, MergeConflicts, FullCommit) -> + UpdateSeq = couch_db_engine:get_update_seq(Db), + RevsLimit = couch_db_engine:get_revs_limit(Db), -update_docs_int(Db, DocsList, NonRepDocs, MergeConflicts, FullCommit) -> - #db{ - id_tree = DocInfoByIdBTree, - seq_tree = DocInfoBySeqBTree, - update_seq = LastSeq, - revs_limit = RevsLimit - } = Db, Ids = [Id || [{_Client, #doc{id=Id}}|_] <- DocsList], % lookup up the old documents, if they exist. - OldDocLookups = couch_btree:lookup(DocInfoByIdBTree, Ids), - OldDocInfos = lists:zipwith( - fun(_Id, {ok, FullDocInfo}) -> - FullDocInfo; + OldDocLookups = couch_db_engine:open_docs(Db, Ids), + OldDocInfos = lists:zipwith(fun + (_Id, #full_doc_info{} = FDI) -> + FDI; (Id, not_found) -> #full_doc_info{id=Id} - end, - Ids, OldDocLookups), + end, Ids, OldDocLookups), % Merge the new docs into the revision trees. - {ok, NewFullDocInfos, RemoveSeqs, NewSeq} = merge_rev_trees(RevsLimit, - MergeConflicts, DocsList, OldDocInfos, [], [], LastSeq), - - % All documents are now ready to write. - - {ok, Db2} = update_local_docs(Db, NonRepDocs), + {ok, NewFullDocInfos, RemSeqs, _} = merge_rev_trees(RevsLimit, + MergeConflicts, DocsList, OldDocInfos, [], [], UpdateSeq), % Write out the document summaries (the bodies are stored in the nodes of % the trees, the attachments are already written to disk) - {ok, IndexFullDocInfos} = flush_trees(Db2, NewFullDocInfos, []), + {ok, IndexFDIs} = flush_trees(Db, NewFullDocInfos, []), + Pairs = pair_write_info(OldDocLookups, IndexFDIs), + LocalDocs2 = update_local_doc_revs(LocalDocs), - % and the indexes - {ok, DocInfoByIdBTree2} = couch_btree:add_remove(DocInfoByIdBTree, IndexFullDocInfos, []), - {ok, DocInfoBySeqBTree2} = couch_btree:add_remove(DocInfoBySeqBTree, IndexFullDocInfos, RemoveSeqs), + {ok, Db1} = couch_db_engine:write_doc_infos(Db, Pairs, LocalDocs2, []), - - WriteCount = length(IndexFullDocInfos), + WriteCount = length(IndexFDIs), couch_stats:increment_counter([couchdb, document_inserts], - WriteCount - length(RemoveSeqs)), + WriteCount - length(RemSeqs)), couch_stats:increment_counter([couchdb, document_writes], WriteCount), couch_stats:increment_counter( [couchdb, local_document_writes], - length(NonRepDocs) + length(LocalDocs2) ), - Db3 = Db2#db{ - id_tree = DocInfoByIdBTree2, - seq_tree = DocInfoBySeqBTree2, - update_seq = NewSeq}, - % Check if we just updated any design documents, and update the validation % funs if we did. UpdatedDDocIds = lists:flatmap(fun @@ -895,549 +616,97 @@ update_docs_int(Db, DocsList, NonRepDocs, MergeConflicts, FullCommit) -> (_) -> [] end, Ids), - Db4 = case length(UpdatedDDocIds) > 0 of + Db2 = case length(UpdatedDDocIds) > 0 of true -> - couch_event:notify(Db3#db.name, ddoc_updated), - ddoc_cache:evict(Db3#db.name, UpdatedDDocIds), - refresh_validate_doc_funs(Db3); + couch_event:notify(Db1#db.name, ddoc_updated), + ddoc_cache:evict(Db1#db.name, UpdatedDDocIds), + refresh_validate_doc_funs(Db1); false -> - Db3 + Db1 end, - {ok, commit_data(Db4, not FullCommit), UpdatedDDocIds}. - -update_local_docs(Db, []) -> - {ok, Db}; -update_local_docs(#db{local_tree=Btree}=Db, Docs) -> - BtreeEntries = lists:map( - fun({Client, NewDoc}) -> - #doc{ - id = Id, - deleted = Delete, - revs = {0, PrevRevs}, - body = Body - } = NewDoc, - case PrevRevs of - [RevStr|_] -> + {ok, commit_data(Db2, not FullCommit), UpdatedDDocIds}. + + +update_local_doc_revs(Docs) -> + lists:map(fun({Client, NewDoc}) -> + #doc{ + deleted = Delete, + revs = {0, PrevRevs} + } = NewDoc, + case PrevRevs of + [RevStr | _] -> PrevRev = list_to_integer(?b2l(RevStr)); [] -> PrevRev = 0 - end, - case Delete of - false -> - send_result(Client, NewDoc, {ok, - {0, ?l2b(integer_to_list(PrevRev + 1))}}), - {update, {Id, {PrevRev + 1, Body}}}; - true -> - send_result(Client, NewDoc, - {ok, {0, <<"0">>}}), - {remove, Id} - end - end, Docs), - - BtreeIdsRemove = [Id || {remove, Id} <- BtreeEntries], - BtreeIdsUpdate = [{Key, Val} || {update, {Key, Val}} <- BtreeEntries], - - {ok, Btree2} = - couch_btree:add_remove(Btree, BtreeIdsUpdate, BtreeIdsRemove), - - {ok, Db#db{local_tree = Btree2}}. + end, + NewRev = case Delete of + false -> + ?l2b(integer_to_list(PrevRev + 1)); + true -> + <<"0">> + end, + send_result(Client, NewDoc, {ok, {0, NewRev}}), + NewDoc#doc{ + revs = {0, [NewRev]} + } + end, Docs). -db_to_header(Db, Header) -> - couch_db_header:set(Header, [ - {update_seq, Db#db.update_seq}, - {seq_tree_state, couch_btree:get_state(Db#db.seq_tree)}, - {id_tree_state, couch_btree:get_state(Db#db.id_tree)}, - {local_tree_state, couch_btree:get_state(Db#db.local_tree)}, - {security_ptr, Db#db.security_ptr}, - {revs_limit, Db#db.revs_limit} - ]). commit_data(Db) -> commit_data(Db, false). -commit_data(#db{waiting_delayed_commit=nil} = Db, true) -> - TRef = erlang:send_after(1000,self(),delayed_commit), - Db#db{waiting_delayed_commit=TRef}; +commit_data(#db{waiting_delayed_commit = nil} = Db, true) -> + TRef = erlang:send_after(1000, self(), delayed_commit), + Db#db{waiting_delayed_commit = TRef}; commit_data(Db, true) -> Db; commit_data(Db, _) -> #db{ - header = OldHeader, waiting_delayed_commit = Timer } = Db, if is_reference(Timer) -> erlang:cancel_timer(Timer); true -> ok end, - case db_to_header(Db, OldHeader) of - OldHeader -> Db#db{waiting_delayed_commit=nil}; - NewHeader -> sync_header(Db, NewHeader) - end. - -sync_header(Db, NewHeader) -> - #db{ - fd = Fd, - filepath = FilePath, - fsync_options = FsyncOptions, - waiting_delayed_commit = Timer - } = Db, - - if is_reference(Timer) -> erlang:cancel_timer(Timer); true -> ok end, - - Before = lists:member(before_header, FsyncOptions), - After = lists:member(after_header, FsyncOptions), - - if Before -> couch_file:sync(FilePath); true -> ok end, - ok = couch_file:write_header(Fd, NewHeader), - if After -> couch_file:sync(FilePath); true -> ok end, - - Db#db{ - header=NewHeader, - committed_update_seq=Db#db.update_seq, - waiting_delayed_commit=nil + {ok, Db1} = couch_db_engine:commit_data(Db), + Db1#db{ + waiting_delayed_commit = nil, + committed_update_seq = couch_db_engine:get_update_seq(Db) }. -copy_doc_attachments(#db{fd = SrcFd} = SrcDb, SrcSp, DestFd) -> - {ok, {BodyData, BinInfos0}} = couch_db:read_doc(SrcDb, SrcSp), - BinInfos = case BinInfos0 of - _ when is_binary(BinInfos0) -> - couch_compress:decompress(BinInfos0); - _ when is_list(BinInfos0) -> - % pre 1.2 file format - BinInfos0 - end, - % copy the bin values - NewBinInfos = lists:map( - fun({Name, Type, BinSp, AttLen, RevPos, ExpectedMd5}) -> - % 010 UPGRADE CODE - {NewBinSp, AttLen, AttLen, ActualMd5, _IdentityMd5} = - couch_stream:copy_to_new_stream(SrcFd, BinSp, DestFd), - check_md5(ExpectedMd5, ActualMd5), - {Name, Type, NewBinSp, AttLen, AttLen, RevPos, ExpectedMd5, identity}; - ({Name, Type, BinSp, AttLen, DiskLen, RevPos, ExpectedMd5, Enc1}) -> - {NewBinSp, AttLen, _, ActualMd5, _IdentityMd5} = - couch_stream:copy_to_new_stream(SrcFd, BinSp, DestFd), - check_md5(ExpectedMd5, ActualMd5), - Enc = case Enc1 of - true -> - % 0110 UPGRADE CODE - gzip; - false -> - % 0110 UPGRADE CODE - identity; - _ -> - Enc1 - end, - {Name, Type, NewBinSp, AttLen, DiskLen, RevPos, ExpectedMd5, Enc} - end, BinInfos), - {BodyData, NewBinInfos}. - -merge_lookups(Infos, []) -> - Infos; -merge_lookups([], _) -> - []; -merge_lookups([#doc_info{}=DI | RestInfos], [{ok, FDI} | RestLookups]) -> - % Assert we've matched our lookups - if DI#doc_info.id == FDI#full_doc_info.id -> ok; true -> - erlang:error({mismatched_doc_infos, DI#doc_info.id}) - end, - [FDI | merge_lookups(RestInfos, RestLookups)]; -merge_lookups([FDI | RestInfos], Lookups) -> - [FDI | merge_lookups(RestInfos, Lookups)]. - -check_md5(Md5, Md5) -> ok; -check_md5(_, _) -> throw(md5_mismatch). - -copy_docs(Db, #db{fd = DestFd} = NewDb, MixedInfos, Retry) -> - DocInfoIds = [Id || #doc_info{id=Id} <- MixedInfos], - LookupResults = couch_btree:lookup(Db#db.id_tree, DocInfoIds), - % COUCHDB-968, make sure we prune duplicates during compaction - NewInfos0 = lists:usort(fun(#full_doc_info{id=A}, #full_doc_info{id=B}) -> - A =< B - end, merge_lookups(MixedInfos, LookupResults)), - - NewInfos1 = lists:map(fun(Info) -> - {NewRevTree, FinalAcc} = couch_key_tree:mapfold(fun - (_Rev, #leaf{ptr=Sp}=Leaf, leaf, SizesAcc) -> - {Body, AttInfos} = copy_doc_attachments(Db, Sp, DestFd), - SummaryChunk = make_doc_summary(NewDb, {Body, AttInfos}), - ExternalSize = ?term_size(SummaryChunk), - {ok, Pos, SummarySize} = couch_file:append_raw_chunk( - DestFd, SummaryChunk), - AttSizes = [{element(3,A), element(4,A)} || A <- AttInfos], - NewLeaf = Leaf#leaf{ - ptr = Pos, - sizes = #size_info{ - active = SummarySize, - external = ExternalSize - }, - atts = AttSizes - }, - {NewLeaf, add_sizes(leaf, NewLeaf, SizesAcc)}; - (_Rev, _Leaf, branch, SizesAcc) -> - {?REV_MISSING, SizesAcc} - end, {0, 0, []}, Info#full_doc_info.rev_tree), - {FinalAS, FinalES, FinalAtts} = FinalAcc, - TotalAttSize = lists:foldl(fun({_, S}, A) -> S + A end, 0, FinalAtts), - NewActiveSize = FinalAS + TotalAttSize, - NewExternalSize = FinalES + TotalAttSize, - Info#full_doc_info{ - rev_tree = NewRevTree, - sizes = #size_info{ - active = NewActiveSize, - external = NewExternalSize - } - } - end, NewInfos0), - - NewInfos = stem_full_doc_infos(Db, NewInfos1), - RemoveSeqs = - case Retry of - nil -> - []; - OldDocIdTree -> - % Compaction is being rerun to catch up to writes during the - % first pass. This means we may have docs that already exist - % in the seq_tree in the .data file. Here we lookup any old - % update_seqs so that they can be removed. - Ids = [Id || #full_doc_info{id=Id} <- NewInfos], - Existing = couch_btree:lookup(OldDocIdTree, Ids), - [Seq || {ok, #full_doc_info{update_seq=Seq}} <- Existing] - end, - {ok, SeqTree} = couch_btree:add_remove( - NewDb#db.seq_tree, NewInfos, RemoveSeqs), - - FDIKVs = lists:map(fun(#full_doc_info{id=Id, update_seq=Seq}=FDI) -> - {{Id, Seq}, FDI} - end, NewInfos), - {ok, IdEms} = couch_emsort:add(NewDb#db.id_tree, FDIKVs), - update_compact_task(length(NewInfos)), - NewDb#db{id_tree=IdEms, seq_tree=SeqTree}. - - -copy_compact(Db, NewDb0, Retry) -> - Compression = couch_compress:get_compression_method(), - NewDb = NewDb0#db{compression=Compression}, - TotalChanges = couch_db:count_changes_since(Db, NewDb#db.update_seq), - BufferSize = list_to_integer( - config:get("database_compaction", "doc_buffer_size", "524288")), - CheckpointAfter = couch_util:to_integer( - config:get("database_compaction", "checkpoint_after", - BufferSize * 10)), - - EnumBySeqFun = - fun(DocInfo, _Offset, - {AccNewDb, AccUncopied, AccUncopiedSize, AccCopiedSize}) -> - - Seq = case DocInfo of - #full_doc_info{} -> DocInfo#full_doc_info.update_seq; - #doc_info{} -> DocInfo#doc_info.high_seq - end, - - AccUncopiedSize2 = AccUncopiedSize + ?term_size(DocInfo), - if AccUncopiedSize2 >= BufferSize -> - NewDb2 = copy_docs( - Db, AccNewDb, lists:reverse([DocInfo | AccUncopied]), Retry), - AccCopiedSize2 = AccCopiedSize + AccUncopiedSize2, - if AccCopiedSize2 >= CheckpointAfter -> - CommNewDb2 = commit_compaction_data(NewDb2#db{update_seq=Seq}), - {ok, {CommNewDb2, [], 0, 0}}; - true -> - {ok, {NewDb2#db{update_seq = Seq}, [], 0, AccCopiedSize2}} - end; +maybe_track_db(#db{options = Options}) -> + case lists:member(sys_db, Options) of true -> - {ok, {AccNewDb, [DocInfo | AccUncopied], AccUncopiedSize2, - AccCopiedSize}} - end - end, - - TaskProps0 = [ - {type, database_compaction}, - {database, Db#db.name}, - {progress, 0}, - {changes_done, 0}, - {total_changes, TotalChanges} - ], - case (Retry =/= nil) and couch_task_status:is_task_added() of - true -> - couch_task_status:update([ - {retry, true}, - {progress, 0}, - {changes_done, 0}, - {total_changes, TotalChanges} - ]); - false -> - couch_task_status:add_task(TaskProps0), - couch_task_status:set_update_frequency(500) - end, - - {ok, _, {NewDb2, Uncopied, _, _}} = - couch_btree:foldl(Db#db.seq_tree, EnumBySeqFun, - {NewDb, [], 0, 0}, - [{start_key, NewDb#db.update_seq + 1}]), - - NewDb3 = copy_docs(Db, NewDb2, lists:reverse(Uncopied), Retry), - - % copy misc header values - if NewDb3#db.security /= Db#db.security -> - {ok, Ptr, _} = couch_file:append_term( - NewDb3#db.fd, Db#db.security, - [{compression, NewDb3#db.compression}]), - NewDb4 = NewDb3#db{security=Db#db.security, security_ptr=Ptr}; - true -> - NewDb4 = NewDb3 - end, - - commit_compaction_data(NewDb4#db{update_seq=Db#db.update_seq}). - - -start_copy_compact(#db{}=Db) -> - erlang:put(io_priority, {db_compact, Db#db.name}), - #db{name=Name, filepath=Filepath, options=Options, header=Header} = Db, - couch_log:debug("Compaction process spawned for db \"~s\"", [Name]), - - {ok, NewDb, DName, DFd, MFd, Retry} = - open_compaction_files(Name, Header, Filepath, Options), - erlang:monitor(process, MFd), - - % This is a bit worrisome. init_db/4 will monitor the data fd - % but it doesn't know about the meta fd. For now I'll maintain - % that the data fd is the old normal fd and meta fd is special - % and hope everything works out for the best. - unlink(DFd), - - NewDb1 = copy_purge_info(Db, NewDb), - NewDb2 = copy_compact(Db, NewDb1, Retry), - NewDb3 = sort_meta_data(NewDb2), - NewDb4 = commit_compaction_data(NewDb3), - NewDb5 = copy_meta_data(NewDb4), - NewDb6 = sync_header(NewDb5, db_to_header(NewDb5, NewDb5#db.header)), - close_db(NewDb6), - - ok = couch_file:close(MFd), - gen_server:cast(Db#db.main_pid, {compact_done, DName}). - - -open_compaction_files(DbName, SrcHdr, DbFilePath, Options) -> - DataFile = DbFilePath ++ ".compact.data", - MetaFile = DbFilePath ++ ".compact.meta", - {ok, DataFd, DataHdr} = open_compaction_file(DataFile), - {ok, MetaFd, MetaHdr} = open_compaction_file(MetaFile), - DataHdrIsDbHdr = couch_db_header:is_header(DataHdr), - case {DataHdr, MetaHdr} of - {#comp_header{}=A, #comp_header{}=A} -> - DbHeader = A#comp_header.db_header, - Db0 = init_db(DbName, DataFile, DataFd, DbHeader, Options), - Db1 = bind_emsort(Db0, MetaFd, A#comp_header.meta_state), - {ok, Db1, DataFile, DataFd, MetaFd, Db0#db.id_tree}; - _ when DataHdrIsDbHdr -> - ok = reset_compaction_file(MetaFd, couch_db_header:from(SrcHdr)), - Db0 = init_db(DbName, DataFile, DataFd, DataHdr, Options), - Db1 = bind_emsort(Db0, MetaFd, nil), - {ok, Db1, DataFile, DataFd, MetaFd, Db0#db.id_tree}; - _ -> - Header = couch_db_header:from(SrcHdr), - ok = reset_compaction_file(DataFd, Header), - ok = reset_compaction_file(MetaFd, Header), - Db0 = init_db(DbName, DataFile, DataFd, Header, Options), - Db1 = bind_emsort(Db0, MetaFd, nil), - {ok, Db1, DataFile, DataFd, MetaFd, nil} - end. - - -open_compaction_file(FilePath) -> - case couch_file:open(FilePath, [nologifmissing]) of - {ok, Fd} -> - case couch_file:read_header(Fd) of - {ok, Header} -> {ok, Fd, Header}; - no_valid_header -> {ok, Fd, nil} - end; - {error, enoent} -> - {ok, Fd} = couch_file:open(FilePath, [create]), - {ok, Fd, nil} + ok; + false -> + couch_stats_process_tracker:track([couchdb, open_databases]) end. -reset_compaction_file(Fd, Header) -> - ok = couch_file:truncate(Fd, 0), - ok = couch_file:write_header(Fd, Header). - - -copy_purge_info(OldDb, NewDb) -> - OldHdr = OldDb#db.header, - NewHdr = NewDb#db.header, - OldPurgeSeq = couch_db_header:purge_seq(OldHdr), - if OldPurgeSeq > 0 -> - {ok, PurgedIdsRevs} = couch_db:get_last_purged(OldDb), - Opts = [{compression, NewDb#db.compression}], - {ok, Ptr, _} = couch_file:append_term(NewDb#db.fd, PurgedIdsRevs, Opts), - NewNewHdr = couch_db_header:set(NewHdr, [ - {purge_seq, OldPurgeSeq}, - {purged_docs, Ptr} - ]), - NewDb#db{header = NewNewHdr}; - true -> - NewDb - end. +finish_engine_swap(_OldDb, _NewEngine, _CompactFilePath) -> + erlang:error(explode). -commit_compaction_data(#db{}=Db) -> - % Compaction needs to write headers to both the data file - % and the meta file so if we need to restart we can pick - % back up from where we left off. - commit_compaction_data(Db, couch_emsort:get_fd(Db#db.id_tree)), - commit_compaction_data(Db, Db#db.fd). - - -commit_compaction_data(#db{header=OldHeader}=Db0, Fd) -> - % Mostly copied from commit_data/2 but I have to - % replace the logic to commit and fsync to a specific - % fd instead of the Filepath stuff that commit_data/2 - % does. - DataState = couch_db_header:id_tree_state(OldHeader), - MetaFd = couch_emsort:get_fd(Db0#db.id_tree), - MetaState = couch_emsort:get_state(Db0#db.id_tree), - Db1 = bind_id_tree(Db0, Db0#db.fd, DataState), - Header = db_to_header(Db1, OldHeader), - CompHeader = #comp_header{ - db_header = Header, - meta_state = MetaState - }, - ok = couch_file:sync(Fd), - ok = couch_file:write_header(Fd, CompHeader), - Db2 = Db1#db{ - waiting_delayed_commit=nil, - header=Header, - committed_update_seq=Db1#db.update_seq - }, - bind_emsort(Db2, MetaFd, MetaState). - - -bind_emsort(Db, Fd, nil) -> - {ok, Ems} = couch_emsort:open(Fd), - Db#db{id_tree=Ems}; -bind_emsort(Db, Fd, State) -> - {ok, Ems} = couch_emsort:open(Fd, [{root, State}]), - Db#db{id_tree=Ems}. - - -bind_id_tree(Db, Fd, State) -> - {ok, IdBtree} = couch_btree:open(State, Fd, [ - {split, fun ?MODULE:btree_by_id_split/1}, - {join, fun ?MODULE:btree_by_id_join/2}, - {reduce, fun ?MODULE:btree_by_id_reduce/2} - ]), - Db#db{id_tree=IdBtree}. - - -sort_meta_data(Db0) -> - {ok, Ems} = couch_emsort:merge(Db0#db.id_tree), - Db0#db{id_tree=Ems}. - - -copy_meta_data(#db{fd=Fd, header=Header}=Db) -> - Src = Db#db.id_tree, - DstState = couch_db_header:id_tree_state(Header), - {ok, IdTree0} = couch_btree:open(DstState, Fd, [ - {split, fun ?MODULE:btree_by_id_split/1}, - {join, fun ?MODULE:btree_by_id_join/2}, - {reduce, fun ?MODULE:btree_by_id_reduce/2} - ]), - {ok, Iter} = couch_emsort:iter(Src), - Acc0 = #merge_st{ - id_tree=IdTree0, - seq_tree=Db#db.seq_tree, - rem_seqs=[], - infos=[] - }, - Acc = merge_docids(Iter, Acc0), - {ok, IdTree} = couch_btree:add(Acc#merge_st.id_tree, Acc#merge_st.infos), - {ok, SeqTree} = couch_btree:add_remove( - Acc#merge_st.seq_tree, [], Acc#merge_st.rem_seqs - ), - Db#db{id_tree=IdTree, seq_tree=SeqTree}. - - -merge_docids(Iter, #merge_st{infos=Infos}=Acc) when length(Infos) > 1000 -> - #merge_st{ - id_tree=IdTree0, - seq_tree=SeqTree0, - rem_seqs=RemSeqs - } = Acc, - {ok, IdTree1} = couch_btree:add(IdTree0, Infos), - {ok, SeqTree1} = couch_btree:add_remove(SeqTree0, [], RemSeqs), - Acc1 = Acc#merge_st{ - id_tree=IdTree1, - seq_tree=SeqTree1, - rem_seqs=[], - infos=[] - }, - merge_docids(Iter, Acc1); -merge_docids(Iter, #merge_st{curr=Curr}=Acc) -> - case next_info(Iter, Curr, []) of - {NextIter, NewCurr, FDI, Seqs} -> - Acc1 = Acc#merge_st{ - infos = [FDI | Acc#merge_st.infos], - rem_seqs = Seqs ++ Acc#merge_st.rem_seqs, - curr = NewCurr - }, - merge_docids(NextIter, Acc1); - {finished, FDI, Seqs} -> - Acc#merge_st{ - infos = [FDI | Acc#merge_st.infos], - rem_seqs = Seqs ++ Acc#merge_st.rem_seqs, - curr = undefined - }; - empty -> - Acc - end. - +make_doc_summary(Db, DocParts) -> + couch_db_engine:make_doc_summary(Db, DocParts). -next_info(Iter, undefined, []) -> - case couch_emsort:next(Iter) of - {ok, {{Id, Seq}, FDI}, NextIter} -> - next_info(NextIter, {Id, Seq, FDI}, []); - finished -> - empty - end; -next_info(Iter, {Id, Seq, FDI}, Seqs) -> - case couch_emsort:next(Iter) of - {ok, {{Id, NSeq}, NFDI}, NextIter} -> - next_info(NextIter, {Id, NSeq, NFDI}, [Seq | Seqs]); - {ok, {{NId, NSeq}, NFDI}, NextIter} -> - {NextIter, {NId, NSeq, NFDI}, FDI, Seqs}; - finished -> - {finished, FDI, Seqs} - end. +pair_write_info(Old, New) -> + lists:map(fun(FDI) -> + case lists:keyfind(FDI#full_doc_info.id, #full_doc_info.id, Old) of + #full_doc_info{} = OldFDI -> {OldFDI, FDI}; + false -> {not_found, FDI} + end + end, New). -update_compact_task(NumChanges) -> - [Changes, Total] = couch_task_status:get([changes_done, total_changes]), - Changes2 = Changes + NumChanges, - Progress = case Total of - 0 -> - 0; - _ -> - (Changes2 * 100) div Total - end, - couch_task_status:update([{changes_done, Changes2}, {progress, Progress}]). +pair_purge_info(Old, New) -> + lists:map(fun(OldFDI) -> + case lists:keyfind(OldFDI#full_doc_info.id, #full_doc_info.id, New) of + #full_doc_info{} = NewFDI -> {OldFDI, NewFDI}; + false -> {OldFDI, not_found} + end + end, Old). -make_doc_summary(#db{compression = Comp}, {Body0, Atts0}) -> - Body = case couch_compress:is_compressed(Body0, Comp) of - true -> - Body0; - false -> - % pre 1.2 database file format - couch_compress:compress(Body0, Comp) - end, - Atts = case couch_compress:is_compressed(Atts0, Comp) of - true -> - Atts0; - false -> - couch_compress:compress(Atts0, Comp) - end, - SummaryBin = ?term_to_bin({Body, Atts}), - couch_file:assemble_file_chunk(SummaryBin, couch_crypto:hash(md5, SummaryBin)). default_security_object(<<"shards/", _/binary>>) -> case config:get("couchdb", "default_security", "everyone") of http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/c05c1282/src/couch_httpd_db.erl ---------------------------------------------------------------------- diff --git a/src/couch_httpd_db.erl b/src/couch_httpd_db.erl index fe42dfe..4cf6857 100644 --- a/src/couch_httpd_db.erl +++ b/src/couch_httpd_db.erl @@ -217,7 +217,13 @@ handle_design_info_req(#httpd{ create_db_req(#httpd{user_ctx=UserCtx}=Req, DbName) -> ok = couch_httpd:verify_is_server_admin(Req), - case couch_server:create(DbName, [{user_ctx, UserCtx}]) of + Engine = case couch_httpd:qs_value(Req, "engine") of + EngineStr when is_list(EngineStr) -> + [{engine, iolist_to_binary(EngineStr)}]; + _ -> + [] + end, + case couch_server:create(DbName, [{user_ctx, UserCtx}] ++ Engine) of {ok, Db} -> couch_db:close(Db), DbUrl = absolute_uri(Req, "/" ++ couch_util:url_encode(DbName)), http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/c05c1282/src/couch_httpd_misc_handlers.erl ---------------------------------------------------------------------- diff --git a/src/couch_httpd_misc_handlers.erl b/src/couch_httpd_misc_handlers.erl index eb75a94..3fc4d9a 100644 --- a/src/couch_httpd_misc_handlers.erl +++ b/src/couch_httpd_misc_handlers.erl @@ -17,8 +17,6 @@ handle_uuids_req/1,handle_config_req/1, handle_task_status_req/1, handle_file_req/2]). --export([increment_update_seq_req/2]). - -include_lib("couch/include/couch_db.hrl"). @@ -310,14 +308,3 @@ handle_approved_config_req(#httpd{method='DELETE',path_parts=[_,Section,Key]}=Re send_json(Req, 200, list_to_binary(OldValue)) end. - -% httpd db handlers - -increment_update_seq_req(#httpd{method='POST'}=Req, Db) -> - couch_httpd:validate_ctype(Req, "application/json"), - {ok, NewSeq} = couch_db:increment_update_seq(Db), - send_json(Req, {[{ok, true}, - {update_seq, NewSeq} - ]}); -increment_update_seq_req(Req, _Db) -> - send_method_not_allowed(Req, "POST"). http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/c05c1282/src/couch_server.erl ---------------------------------------------------------------------- diff --git a/src/couch_server.erl b/src/couch_server.erl index f365718..2c77e98 100644 --- a/src/couch_server.erl +++ b/src/couch_server.erl @@ -21,6 +21,8 @@ -export([handle_cast/2,code_change/3,handle_info/2,terminate/2]). -export([dev_start/0,is_admin/2,has_admins/0,get_stats/0]). -export([close_lru/0]). +-export([delete_compaction_files/1]). +-export([exists/1]). % config_listener api -export([handle_config_change/5, handle_config_terminate/3]). @@ -33,6 +35,7 @@ -record(server,{ root_dir = [], + engines = [], max_dbs_open=?MAX_DBS_OPEN, dbs_open=0, start_time="", @@ -118,6 +121,35 @@ create(DbName, Options0) -> delete(DbName, Options) -> gen_server:call(couch_server, {delete, DbName, Options}, infinity). + +exists(DbName) -> + RootDir = config:get("couchdb", "database_dir", "."), + Engines = get_configured_engines(), + Possible = lists:foldl(fun({Extension, Engine}, Acc) -> + Path = make_filepath(RootDir, DbName, Extension), + case couch_db_engine:exists(Engine, Path) of + true -> + [{Engine, Path} | Acc]; + false -> + Acc + end + end, [], Engines), + Possible /= []. + + +delete_compaction_files(DbName) -> + delete_compaction_files(DbName, []). + +delete_compaction_files(DbName, DelOpts) when is_list(DbName) -> + RootDir = config:get("couchdb", "database_dir", "."), + lists:foreach(fun({Ext, Engine}) -> + FPath = make_filepath(RootDir, DbName, Ext), + couch_db_engine:delete_compaction_files(Engine, RootDir, FPath, DelOpts) + end, get_configured_engines()), + ok; +delete_compaction_files(DbName, DelOpts) when is_binary(DbName) -> + delete_compaction_files(?b2l(DbName), DelOpts). + maybe_add_sys_db_callbacks(DbName, Options) when is_binary(DbName) -> maybe_add_sys_db_callbacks(?b2l(DbName), Options); maybe_add_sys_db_callbacks(DbName, Options) -> @@ -165,9 +197,6 @@ is_admin(User, ClearPwd) -> has_admins() -> config:get("admins") /= []. -get_full_filename(Server, DbName) -> - filename:join([Server#server.root_dir, "./" ++ DbName ++ ".couch"]). - hash_admin_passwords() -> hash_admin_passwords(true). @@ -185,6 +214,7 @@ init([]) -> % will restart us and then we will pick up the new settings. RootDir = config:get("couchdb", "database_dir", "."), + Engines = get_configured_engines(), MaxDbsOpen = list_to_integer( config:get("couchdb", "max_dbs_open", integer_to_list(?MAX_DBS_OPEN))), UpdateLruOnRead = @@ -196,6 +226,7 @@ init([]) -> ets:new(couch_dbs_pid_to_name, [set, protected, named_table]), process_flag(trap_exit, true), {ok, #server{root_dir=RootDir, + engines = Engines, max_dbs_open=MaxDbsOpen, update_lru_on_read=UpdateLruOnRead, start_time=couch_util:rfc1123_date()}}. @@ -220,6 +251,8 @@ handle_config_change("couchdb", "max_dbs_open", Max, _, _) when is_list(Max) -> {ok, gen_server:call(couch_server,{set_max_dbs_open,list_to_integer(Max)})}; handle_config_change("couchdb", "max_dbs_open", _, _, _) -> {ok, gen_server:call(couch_server,{set_max_dbs_open,?MAX_DBS_OPEN})}; +handle_config_change("couchdb_engines", _, _, _, _) -> + {ok, gen_server:call(couch_server,reload_engines)}; handle_config_change("admins", _, _, Persist, _) -> % spawn here so couch event manager doesn't deadlock {ok, spawn(fun() -> hash_admin_passwords(Persist) end)}; @@ -254,11 +287,15 @@ all_databases() -> all_databases(Fun, Acc0) -> {ok, #server{root_dir=Root}} = gen_server:call(couch_server, get_server), NormRoot = couch_util:normpath(Root), - FinalAcc = try - filelib:fold_files(Root, + Extensions = get_engine_extensions(), + ExtRegExp = "(" ++ string:join(Extensions, "|") ++ ")", + RegExp = "^[a-z0-9\\_\\$()\\+\\-]*" % stock CouchDB name regex "(\\.[0-9]{10,})?" % optional shard timestamp - "\\.couch$", % filename extension + "\\." ++ ExtRegExp ++ "$", % filename extension + FinalAcc = try + couch_util:fold_files(Root, + RegExp, true, fun(Filename, AccIn) -> NormFilename = couch_util:normpath(Filename), @@ -266,7 +303,8 @@ all_databases(Fun, Acc0) -> [$/ | RelativeFilename] -> ok; RelativeFilename -> ok end, - case Fun(couch_util:drop_dot_couch_ext(?l2b(RelativeFilename)), AccIn) of + Ext = filename:extension(RelativeFilename), + case Fun(?l2b(filename:rootname(RelativeFilename, Ext)), AccIn) of {ok, NewAcc} -> NewAcc; {stop, NewAcc} -> throw({stop, Fun, NewAcc}) end @@ -293,11 +331,11 @@ maybe_close_lru_db(#server{lru=Lru}=Server) -> {error, all_dbs_active} end. -open_async(Server, From, DbName, Filepath, Options) -> +open_async(Server, From, DbName, {Module, Filepath}, Options) -> Parent = self(), T0 = os:timestamp(), Opener = spawn_link(fun() -> - Res = couch_db:start_link(DbName, Filepath, Options), + Res = couch_db:start_link(Module, DbName, Filepath, Options), case {Res, lists:member(create, Options)} of {{ok, _Db}, true} -> couch_event:notify(DbName, created); @@ -334,6 +372,8 @@ handle_call({set_update_lru_on_read, UpdateOnRead}, _From, Server) -> {reply, ok, Server#server{update_lru_on_read=UpdateOnRead}}; handle_call({set_max_dbs_open, Max}, _From, Server) -> {reply, ok, Server#server{max_dbs_open=Max}}; +handle_call(reload_engines, _From, Server) -> + {reply, ok, Server#server{engines = get_configured_engines()}}; handle_call(get_server, _From, Server) -> {reply, {ok, Server}, Server}; handle_call({open_result, T0, DbName, {ok, Db}}, {FromPid, _Tag}, Server) -> @@ -351,7 +391,7 @@ handle_call({open_result, T0, DbName, {ok, Db}}, {FromPid, _Tag}, Server) -> [gen_server:reply(Waiter, {ok, Db}) || Waiter <- Waiters], % Cancel the creation request if it exists. case ReqType of - {create, DbName, _Filepath, _Options, CrFrom} -> + {create, DbName, _Engine, _Options, CrFrom} -> gen_server:reply(CrFrom, file_exists); _ -> ok @@ -386,8 +426,8 @@ handle_call({open_result, _T0, DbName, Error}, {FromPid, _Tag}, Server) -> true = ets:delete(couch_dbs, DbName), true = ets:delete(couch_dbs_pid_to_name, FromPid), NewServer = case ReqType of - {create, DbName, Filepath, Options, CrFrom} -> - open_async(Server, CrFrom, DbName, Filepath, Options); + {create, DbName, Engine, Options, CrFrom} -> + open_async(Server, CrFrom, DbName, Engine, Options); _ -> Server end, @@ -401,8 +441,8 @@ handle_call({open, DbName, Options}, From, Server) -> ok -> case make_room(Server, Options) of {ok, Server2} -> - Filepath = get_full_filename(Server, DbNameList), - {noreply, open_async(Server2, From, DbName, Filepath, Options)}; + Engine = get_engine(Server, DbNameList), + {noreply, open_async(Server2, From, DbName, Engine, Options)}; CloseError -> {reply, CloseError, Server} end; @@ -421,14 +461,14 @@ handle_call({open, DbName, Options}, From, Server) -> end; handle_call({create, DbName, Options}, From, Server) -> DbNameList = binary_to_list(DbName), - Filepath = get_full_filename(Server, DbNameList), + Engine = get_engine(Server, DbNameList, Options), case check_dbname(Server, DbNameList) of ok -> case ets:lookup(couch_dbs, DbName) of [] -> case make_room(Server, Options) of {ok, Server2} -> - {noreply, open_async(Server2, From, DbName, Filepath, + {noreply, open_async(Server2, From, DbName, Engine, [create | Options])}; CloseError -> {reply, CloseError, Server} @@ -438,7 +478,7 @@ handle_call({create, DbName, Options}, From, Server) -> % the middle of trying to open it. We allow one creator % to wait while we figure out if it'll succeed. CrOptions = [create | Options], - Req = {create, DbName, Filepath, CrOptions, From}, + Req = {create, DbName, Engine, CrOptions, From}, true = ets:insert(couch_dbs, Entry#entry{req_type = Req}), {noreply, Server}; [_AlreadyRunningDb] -> @@ -451,7 +491,6 @@ handle_call({delete, DbName, Options}, _From, Server) -> DbNameList = binary_to_list(DbName), case check_dbname(Server, DbNameList) of ok -> - FullFilepath = get_full_filename(Server, DbNameList), Server2 = case ets:lookup(couch_dbs, DbName) of [] -> Server; @@ -468,18 +507,16 @@ handle_call({delete, DbName, Options}, _From, Server) -> db_closed(Server, Entry#entry.db_options) end, - %% Delete any leftover compaction files. If we don't do this a - %% subsequent request for this DB will try to open them to use - %% as a recovery. - lists:foreach(fun(Ext) -> - couch_file:delete(Server#server.root_dir, FullFilepath ++ Ext) - end, [".compact", ".compact.data", ".compact.meta"]), - couch_file:delete(Server#server.root_dir, FullFilepath ++ ".compact"), - couch_db_plugin:on_delete(DbName, Options), DelOpt = [{context, delete} | Options], - case couch_file:delete(Server#server.root_dir, FullFilepath, DelOpt) of + + % Make sure and remove all compaction data + delete_compaction_files(DbNameList, DelOpt), + + {Engine, FilePath} = get_engine(Server, DbNameList), + RootDir = Server#server.root_dir, + case couch_db_engine:delete(Engine, RootDir, FilePath, DelOpt) of ok -> couch_event:notify(DbName, deleted), {reply, ok, Server2}; @@ -558,6 +595,106 @@ db_closed(Server, Options) -> true -> Server end. + +get_configured_engines() -> + ConfigEntries = config:get("couchdb_engines"), + Engines = lists:flatmap(fun({Extension, ModuleStr}) -> + try + [{Extension, list_to_atom(ModuleStr)}] + catch _T:_R -> + [] + end + end, ConfigEntries), + case Engines of + [] -> + [{"couch", couch_bt_engine}]; + Else -> + Else + end. + + +get_engine(Server, DbName, Options) -> + #server{ + root_dir = RootDir, + engines = Engines + } = Server, + case couch_util:get_value(engine, Options) of + Ext when is_binary(Ext) -> + ExtStr = binary_to_list(Ext), + case couch_util:get_value(ExtStr, Engines) of + Engine when is_atom(Engine) -> + Path = make_filepath(RootDir, DbName, ExtStr), + {Engine, Path}; + _ -> + get_engine(Server, DbName) + end; + _ -> + get_engine(Server, DbName) + end. + + +get_engine(Server, DbName) -> + #server{ + root_dir = RootDir, + engines = Engines + } = Server, + Possible = lists:foldl(fun({Extension, Engine}, Acc) -> + Path = make_filepath(RootDir, DbName, Extension), + case couch_db_engine:exists(Engine, Path) of + true -> + [{Engine, Path} | Acc]; + false -> + Acc + end + end, [], Engines), + case Possible of + [] -> + get_default_engine(Server, DbName); + [Engine] -> + Engine; + _ -> + erlang:error(engine_conflict) + end. + + +get_default_engine(Server, DbName) -> + #server{ + root_dir = RootDir, + engines = Engines + } = Server, + Default = {couch_bt_engine, make_filepath(RootDir, DbName, "couch")}, + case config:get("couchdb", "default_engine") of + Extension when is_list(Extension) -> + case lists:keyfind(Extension, 1, Engines) of + {Extension, Module} -> + {Module, make_filepath(RootDir, DbName, Extension)}; + false -> + Default + end; + _ -> + Default + end. + + +make_filepath(RootDir, DbName, Extension) when is_binary(RootDir) -> + make_filepath(binary_to_list(RootDir), DbName, Extension); +make_filepath(RootDir, DbName, Extension) when is_binary(DbName) -> + make_filepath(RootDir, binary_to_list(DbName), Extension); +make_filepath(RootDir, DbName, Extension) when is_binary(Extension) -> + make_filepath(RootDir, DbName, binary_to_list(Extension)); +make_filepath(RootDir, DbName, Extension) -> + filename:join([RootDir, "./" ++ DbName ++ "." ++ Extension]). + + +get_engine_extensions() -> + case config:get("couchdb_engines") of + [] -> + ["couch"]; + Entries -> + [Ext || {Ext, _Mod} <- Entries] + end. + + -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/c05c1282/src/couch_stream.erl ---------------------------------------------------------------------- diff --git a/src/couch_stream.erl b/src/couch_stream.erl index 913977f..1980246 100644 --- a/src/couch_stream.erl +++ b/src/couch_stream.erl @@ -14,21 +14,40 @@ -behaviour(gen_server). -vsn(1). -% public API --export([open/1, open/2, close/1]). --export([foldl/4, foldl/5, foldl_decode/6, range_foldl/6]). --export([copy_to_new_stream/3, write/2]). -% gen_server callbacks --export([init/1, terminate/2, code_change/3]). --export([handle_cast/2, handle_call/3, handle_info/2]). +-export([ + open/1, + open/2, + close/1, + + copy/2, + write/2, + to_disk_term/1, + + foldl/3, + foldl/4, + foldl_decode/5, + range_foldl/5 +]). + +-export([ + init/1, + terminate/2, + handle_call/3, + handle_cast/2, + handle_info/2, + code_change/3 +]). + -include_lib("couch/include/couch_db.hrl"). + -define(DEFAULT_BUFFER_SIZE, 4096). --record(stream, - {fd = 0, + +-record(stream, { + engine, opener_monitor, written_pointers=[], buffer_list = [], @@ -42,114 +61,94 @@ identity_len = 0, encoding_fun, end_encoding_fun - }). +}). -%%% Interface functions %%% +open({_StreamEngine, _StreamEngineState} = Engine) -> + open(Engine, []). -open(Fd) -> - open(Fd, []). -open(Fd, Options) -> - gen_server:start_link(couch_stream, {Fd, self(), erlang:get(io_priority), Options}, []). +open({_StreamEngine, _StreamEngineState} = Engine, Options) -> + gen_server:start_link(?MODULE, {Engine, self(), erlang:get(io_priority), Options}, []). + close(Pid) -> gen_server:call(Pid, close, infinity). -copy_to_new_stream(Fd, PosList, DestFd) -> - {ok, Dest} = open(DestFd), - foldl(Fd, PosList, - fun(Bin, _) -> - ok = write(Dest, Bin) - end, ok), - close(Dest). - -foldl(_Fd, [], _Fun, Acc) -> - Acc; -foldl(Fd, [Pos|Rest], Fun, Acc) -> - {ok, Bin} = couch_file:pread_iolist(Fd, Pos), - foldl(Fd, Rest, Fun, Fun(Bin, Acc)). - -foldl(Fd, PosList, <<>>, Fun, Acc) -> - foldl(Fd, PosList, Fun, Acc); -foldl(Fd, PosList, Md5, Fun, Acc) -> - foldl(Fd, PosList, Md5, couch_crypto:hash_init(md5), Fun, Acc). - -foldl_decode(Fd, PosList, Md5, Enc, Fun, Acc) -> + +copy(Src, Dst) -> + foldl(Src, fun(Bin, _) -> + ok = write(Dst, Bin) + end, ok). + + +write(_Pid, <<>>) -> + ok; +write(Pid, Bin) -> + gen_server:call(Pid, {write, Bin}, infinity). + + +to_disk_term({Engine, EngineState}) -> + Engine:to_disk_term(EngineState). + + +foldl({Engine, EngineState}, Fun, Acc) -> + Engine:foldl(EngineState, Fun, Acc). + + +foldl(Engine, <<>>, Fun, Acc) -> + foldl(Engine, Fun, Acc); +foldl(Engine, Md5, UserFun, UserAcc) -> + InitAcc = {couch_crypto:hash_init(md5), UserFun, UserAcc}, + {Md5Acc, _, OutAcc} = foldl(Engine, fun foldl_md5/2, InitAcc), + Md5 = couch_crypto:hash_final(md5, Md5Acc), + OutAcc. + + +foldl_decode(Engine, Md5, Enc, UserFun, UserAcc1) -> {DecDataFun, DecEndFun} = case Enc of - gzip -> - ungzip_init(); - identity -> - identity_enc_dec_funs() + gzip -> ungzip_init(); + identity -> identity_enc_dec_funs() end, - Result = foldl_decode( - DecDataFun, Fd, PosList, Md5, couch_crypto:hash_init(md5), Fun, Acc - ), + InitAcc = {DecDataFun, UserFun, UserAcc1}, + {_, _, UserAcc2} = foldl(Engine, Md5, fun foldl_decode/2, InitAcc), DecEndFun(), - Result. + UserAcc2. + + +range_foldl(Engine, From, To, UserFun, UserAcc) when To >= From -> + NewEngine = do_seek(Engine, From), + InitAcc = {To - From, UserFun, UserAcc}, + try + {_, _, UserAcc2} = foldl(NewEngine, fun foldl_length/2, InitAcc), + UserAcc2 + catch + throw:{finished, UserAcc3} -> + UserAcc3 + end. -foldl(_Fd, [], Md5, Md5Acc, _Fun, Acc) -> - Md5 = couch_crypto:hash_final(md5, Md5Acc), - Acc; -foldl(Fd, [{Pos, _Size}], Md5, Md5Acc, Fun, Acc) -> % 0110 UPGRADE CODE - foldl(Fd, [Pos], Md5, Md5Acc, Fun, Acc); -foldl(Fd, [Pos], Md5, Md5Acc, Fun, Acc) -> - {ok, Bin} = couch_file:pread_iolist(Fd, Pos), - Md5 = couch_crypto:hash_final(md5, couch_crypto:hash_update(md5, Md5Acc, Bin)), - Fun(Bin, Acc); -foldl(Fd, [{Pos, _Size}|Rest], Md5, Md5Acc, Fun, Acc) -> - foldl(Fd, [Pos|Rest], Md5, Md5Acc, Fun, Acc); -foldl(Fd, [Pos|Rest], Md5, Md5Acc, Fun, Acc) -> - {ok, Bin} = couch_file:pread_iolist(Fd, Pos), - foldl(Fd, Rest, Md5, couch_crypto:hash_update(md5, Md5Acc, Bin), Fun, Fun(Bin, Acc)). - -range_foldl(Fd, PosList, From, To, Fun, Acc) -> - range_foldl(Fd, PosList, From, To, 0, Fun, Acc). - -range_foldl(_Fd, _PosList, _From, To, Off, _Fun, Acc) when Off >= To -> - Acc; -range_foldl(Fd, [Pos|Rest], From, To, Off, Fun, Acc) when is_integer(Pos) -> % old-style attachment - {ok, Bin} = couch_file:pread_iolist(Fd, Pos), - range_foldl(Fd, [{Pos, iolist_size(Bin)}] ++ Rest, From, To, Off, Fun, Acc); -range_foldl(Fd, [{_Pos, Size}|Rest], From, To, Off, Fun, Acc) when From > Off + Size -> - range_foldl(Fd, Rest, From, To, Off + Size, Fun, Acc); -range_foldl(Fd, [{Pos, Size}|Rest], From, To, Off, Fun, Acc) -> - {ok, Bin} = couch_file:pread_iolist(Fd, Pos), - Bin1 = if - From =< Off andalso To >= Off + Size -> Bin; %% the whole block is covered - true -> - PrefixLen = clip(From - Off, 0, Size), - PostfixLen = clip(Off + Size - To, 0, Size), - MatchLen = Size - PrefixLen - PostfixLen, - <<_Prefix:PrefixLen/binary,Match:MatchLen/binary,_Postfix:PostfixLen/binary>> = iolist_to_binary(Bin), - Match - end, - range_foldl(Fd, Rest, From, To, Off + Size, Fun, Fun(Bin1, Acc)). -clip(Value, Lo, Hi) -> - if - Value < Lo -> Lo; - Value > Hi -> Hi; - true -> Value +foldl_md5(Bin, {Md5Acc, UserFun, UserAcc}) -> + NewMd5Acc = couch_crypto:hash_update(md5, Md5Acc, Bin), + {NewMd5Acc, UserFun, UserFun(Bin, UserAcc)}. + + +foldl_decode(EncBin, {DecFun, UserFun, UserAcc}) -> + case DecFun(EncBin) of + <<>> -> {DecFun, UserFun, UserAcc}; + Dec -> {DecFun, UserFun, UserFun(Dec, UserAcc)} end. -foldl_decode(_DecFun, _Fd, [], Md5, Md5Acc, _Fun, Acc) -> - Md5 = couch_crypto:hash_final(md5, Md5Acc), - Acc; -foldl_decode(DecFun, Fd, [{Pos, _Size}], Md5, Md5Acc, Fun, Acc) -> - foldl_decode(DecFun, Fd, [Pos], Md5, Md5Acc, Fun, Acc); -foldl_decode(DecFun, Fd, [Pos], Md5, Md5Acc, Fun, Acc) -> - {ok, EncBin} = couch_file:pread_iolist(Fd, Pos), - Md5 = couch_crypto:hash_final(md5, couch_crypto:hash_update(md5, Md5Acc, EncBin)), - Bin = DecFun(EncBin), - Fun(Bin, Acc); -foldl_decode(DecFun, Fd, [{Pos, _Size}|Rest], Md5, Md5Acc, Fun, Acc) -> - foldl_decode(DecFun, Fd, [Pos|Rest], Md5, Md5Acc, Fun, Acc); -foldl_decode(DecFun, Fd, [Pos|Rest], Md5, Md5Acc, Fun, Acc) -> - {ok, EncBin} = couch_file:pread_iolist(Fd, Pos), - Bin = DecFun(EncBin), - Md5Acc2 = couch_crypto:hash_update(md5, Md5Acc, EncBin), - foldl_decode(DecFun, Fd, Rest, Md5, Md5Acc2, Fun, Fun(Bin, Acc)). + +foldl_length(Bin, {Length, UserFun, UserAcc}) -> + BinSize = size(Bin), + case BinSize =< Length of + true -> + {Length - BinSize, UserFun, UserFun(Bin, UserAcc)}; + false -> + <> = Bin, + throw({finished, UserFun(Trunc, UserAcc)}) + end. gzip_init(Options) -> case couch_util:get_value(compression_level, Options, 0) of @@ -192,23 +191,16 @@ identity_enc_dec_funs() -> fun() -> [] end }. -write(_Pid, <<>>) -> - ok; -write(Pid, Bin) -> - gen_server:call(Pid, {write, Bin}, infinity). - -init({Fd, OpenerPid, OpenerPriority, Options}) -> +init({Engine, OpenerPid, OpenerPriority, Options}) -> erlang:put(io_priority, OpenerPriority), {EncodingFun, EndEncodingFun} = case couch_util:get_value(encoding, Options, identity) of - identity -> - identity_enc_dec_funs(); - gzip -> - gzip_init(Options) + identity -> identity_enc_dec_funs(); + gzip -> gzip_init(Options) end, {ok, #stream{ - fd=Fd, + engine=Engine, opener_monitor=erlang:monitor(process, OpenerPid), md5=couch_crypto:hash_init(md5), identity_md5=couch_crypto:hash_init(md5), @@ -225,9 +217,8 @@ terminate(_Reason, _Stream) -> handle_call({write, Bin}, _From, Stream) -> BinSize = iolist_size(Bin), #stream{ - fd = Fd, + engine = Engine, written_len = WrittenLen, - written_pointers = Written, buffer_len = BufferLen, buffer_list = Buffer, max_buffer = Max, @@ -242,19 +233,18 @@ handle_call({write, Bin}, _From, Stream) -> [] -> % case where the encoder did some internal buffering % (zlib does it for example) + NewEngine = Engine, WrittenLen2 = WrittenLen, - Md5_2 = Md5, - Written2 = Written; + Md5_2 = Md5; WriteBin2 -> - {ok, Pos, _} = couch_file:append_binary(Fd, WriteBin2), + NewEngine = do_write(Engine, WriteBin2), WrittenLen2 = WrittenLen + iolist_size(WriteBin2), - Md5_2 = couch_crypto:hash_update(md5, Md5, WriteBin2), - Written2 = [{Pos, iolist_size(WriteBin2)}|Written] + Md5_2 = couch_crypto:hash_update(md5, Md5, WriteBin2) end, {reply, ok, Stream#stream{ + engine = NewEngine, written_len=WrittenLen2, - written_pointers=Written2, buffer_list=[], buffer_len=0, md5=Md5_2, @@ -268,10 +258,9 @@ handle_call({write, Bin}, _From, Stream) -> end; handle_call(close, _From, Stream) -> #stream{ - fd = Fd, + engine = Engine, opener_monitor = MonRef, written_len = WrittenLen, - written_pointers = Written, buffer_list = Buffer, md5 = Md5, identity_md5 = IdenMd5, @@ -285,12 +274,11 @@ handle_call(close, _From, Stream) -> Md5Final = couch_crypto:hash_final(md5, couch_crypto:hash_update(md5, Md5, WriteBin2)), Result = case WriteBin2 of [] -> - {lists:reverse(Written), WrittenLen, IdenLen, Md5Final, IdenMd5Final}; + {do_finalize(Engine), WrittenLen, IdenLen, Md5Final, IdenMd5Final}; _ -> - {ok, Pos, _} = couch_file:append_binary(Fd, WriteBin2), - StreamInfo = lists:reverse(Written, [{Pos, iolist_size(WriteBin2)}]), + NewEngine = do_write(Engine, WriteBin2), StreamLen = WrittenLen + iolist_size(WriteBin2), - {StreamInfo, StreamLen, IdenLen, Md5Final, IdenMd5Final} + {do_finalize(NewEngine), StreamLen, IdenLen, Md5Final, IdenMd5Final} end, erlang:demonitor(MonRef), {stop, normal, Result, Stream}. @@ -305,3 +293,17 @@ handle_info({'DOWN', Ref, _, _, _}, #stream{opener_monitor=Ref} = State) -> {stop, normal, State}; handle_info(_Info, State) -> {noreply, State}. + + +do_seek({Engine, EngineState}, Offset) -> + {ok, NewState} = Engine:seek(EngineState, Offset), + {Engine, NewState}. + +do_write({Engine, EngineState}, Data) -> + {ok, NewState} = Engine:write(EngineState, Data), + {Engine, NewState}. + +do_finalize({Engine, EngineState}) -> + {ok, NewState} = Engine:finalize(EngineState), + {Engine, NewState}. + http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/c05c1282/src/couch_util.erl ---------------------------------------------------------------------- diff --git a/src/couch_util.erl b/src/couch_util.erl index d688c12..dc2ef64 100644 --- a/src/couch_util.erl +++ b/src/couch_util.erl @@ -12,7 +12,7 @@ -module(couch_util). --export([priv_dir/0, normpath/1]). +-export([priv_dir/0, normpath/1, fold_files/5]). -export([should_flush/0, should_flush/1, to_existing_atom/1]). -export([rand32/0, implode/2, collate/2, collate/3]). -export([abs_pathname/1,abs_pathname/2, trim/1, drop_dot_couch_ext/1]). @@ -33,6 +33,7 @@ -export([find_in_binary/2]). -export([callback_exists/3, validate_callback_exists/3]). -export([with_proc/4]). +-export([check_md5/2]). -include_lib("couch/include/couch_db.hrl"). @@ -62,6 +63,37 @@ normparts(["." | RestParts], Acc) -> normparts([Part | RestParts], Acc) -> normparts(RestParts, [Part | Acc]). + +fold_files(Dir, RegExp, Recursive, Fun, Acc) -> + {ok, Re} = re:compile(RegExp, [unicode]), + fold_files1(Dir, Re, Recursive, Fun, Acc). + +fold_files1(Dir, RegExp, Recursive, Fun, Acc) -> + case file:list_dir(Dir) of + {ok, Files} -> + fold_files2(Files, Dir, RegExp, Recursive, Fun, Acc); + {error, _} -> + Acc + end. + +fold_files2([], _Dir, _RegExp, _Recursive, _Fun, Acc) -> + Acc; +fold_files2([File | Rest], Dir, RegExp, Recursive, Fun, Acc0) -> + FullName = filename:join(Dir, File), + case (catch re:run(File, RegExp, [{capture, none}])) of + match -> + Acc1 = Fun(FullName, Acc0), + fold_files2(Rest, Dir, RegExp, Recursive, Fun, Acc1); + _ -> + case Recursive andalso filelib:is_dir(FullName) of + true -> + Acc1 = fold_files1(FullName, RegExp, Recursive, Fun, Acc0), + fold_files2(Rest, Dir, RegExp, Recursive, Fun, Acc1); + false -> + fold_files2(Rest, Dir, RegExp, Recursive, Fun, Acc0) + end + end. + % works like list_to_existing_atom, except can be list or binary and it % gives you the original value instead of an error if no existing atom. to_existing_atom(V) when is_list(V) -> @@ -578,6 +610,12 @@ validate_callback_exists(Module, Function, Arity) -> {undefined_callback, CallbackStr, {Module, Function, Arity}}}) end. + +check_md5(_NewSig, <<>>) -> ok; +check_md5(Sig, Sig) -> ok; +check_md5(_, _) -> throw(md5_mismatch). + + ensure_loaded(Module) when is_atom(Module) -> case code:ensure_loaded(Module) of {module, Module} -> http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/c05c1282/src/test_util.erl ---------------------------------------------------------------------- diff --git a/src/test_util.erl b/src/test_util.erl index b5bb232..6bf7b46 100644 --- a/src/test_util.erl +++ b/src/test_util.erl @@ -34,6 +34,8 @@ -export([start/1, start/2, start/3, stop/1]). +-export([fake_db/1]). + -record(test_context, {mocked = [], started = [], module}). -define(DEFAULT_APPS, http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/c05c1282/test/couch_db_plugin_tests.erl ---------------------------------------------------------------------- diff --git a/test/couch_db_plugin_tests.erl b/test/couch_db_plugin_tests.erl index 94dd3df..52533fe 100644 --- a/test/couch_db_plugin_tests.erl +++ b/test/couch_db_plugin_tests.erl @@ -43,7 +43,7 @@ data_providers() -> []. data_subscriptions() -> []. processes() -> []. notify(_, _, _) -> ok. -fake_db() -> element(2, couch_db:clustered_db(fake, totes_fake)). +fake_db() -> test_util:fake_db([]). setup() -> couch_tests:setup([