Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id E2E52200C0E for ; Wed, 1 Feb 2017 18:46:43 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id E1A48160B41; Wed, 1 Feb 2017 17:46:43 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id B44EB160B63 for ; Wed, 1 Feb 2017 18:46:40 +0100 (CET) Received: (qmail 28062 invoked by uid 500); 1 Feb 2017 17:46:39 -0000 Mailing-List: contact commits-help@couchdb.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@couchdb.apache.org Delivered-To: mailing list commits@couchdb.apache.org Received: (qmail 27821 invoked by uid 99); 1 Feb 2017 17:46:39 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 01 Feb 2017 17:46:39 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 7A81BDFDCF; Wed, 1 Feb 2017 17:46:39 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: davisp@apache.org To: commits@couchdb.apache.org Date: Wed, 01 Feb 2017 17:46:43 -0000 Message-Id: In-Reply-To: <4d3f43d0acb94acd934d155a4212ca74@git.apache.org> References: <4d3f43d0acb94acd934d155a4212ca74@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [5/6] couch commit: updated refs/heads/COUCHDB-3287-pluggable-storage-engines to 7f90b57 archived-at: Wed, 01 Feb 2017 17:46:44 -0000 Implement pluggable storage engines This change moves the main work of storage engines to run through the new couch_db_engine behavior. This allows us to replace the storage engine with different implementations that can be tailored to specific work loads and environments. Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch/repo Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch/commit/a7c6713d Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch/tree/a7c6713d Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch/diff/a7c6713d Branch: refs/heads/COUCHDB-3287-pluggable-storage-engines Commit: a7c6713d2f4fa763e132bf13b35417cca66b655b Parents: 60633f5 Author: Paul J. Davis Authored: Fri Feb 5 12:04:20 2016 -0600 Committer: Paul J. Davis Committed: Wed Feb 1 11:42:50 2017 -0600 ---------------------------------------------------------------------- include/couch_db.hrl | 38 +- src/couch_att.erl | 132 ++- src/couch_auth_cache.erl | 17 +- src/couch_changes.erl | 25 +- src/couch_compaction_daemon.erl | 32 +- src/couch_db.erl | 759 ++++++++------- src/couch_db_engine.erl | 3 - src/couch_db_updater.erl | 1275 ++++++------------------- src/couch_httpd_db.erl | 8 +- src/couch_httpd_misc_handlers.erl | 13 - src/couch_lru.erl | 10 +- src/couch_server.erl | 239 ++++- src/couch_stream.erl | 256 ++--- src/couch_util.erl | 40 +- test/couch_stream_tests.erl | 30 +- test/couchdb_compaction_daemon_tests.erl | 4 +- test/couchdb_views_tests.erl | 45 +- 17 files changed, 1250 insertions(+), 1676 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/a7c6713d/include/couch_db.hrl ---------------------------------------------------------------------- diff --git a/include/couch_db.hrl b/include/couch_db.hrl index e7cd85d..03d7cc4 100644 --- a/include/couch_db.hrl +++ b/include/couch_db.hrl @@ -78,7 +78,8 @@ update_seq = 0, deleted = false, rev_tree = [], - sizes = #size_info{} + sizes = #size_info{}, + meta = [] }). -record(httpd, { @@ -129,30 +130,29 @@ }). -record(db, { + name, + filepath, + + engine = {couch_bt_engine, undefined}, + main_pid = nil, compactor_pid = nil, - instance_start_time, % number of microsecs since jan 1 1970 as a binary string - fd, - fd_monitor, - header = couch_db_header:new(), + committed_update_seq, - id_tree, - seq_tree, - local_tree, - update_seq, - name, - filepath, - validate_doc_funs = undefined, - security = [], - security_ptr = nil, + + instance_start_time, % number of microsecs since jan 1 1970 as a binary string + user_ctx = #user_ctx{}, + security = [], + validate_doc_funs = undefined, + + before_doc_update = nil, % nil | fun(Doc, Db) -> NewDoc + after_doc_read = nil, % nil | fun(Doc, Db) -> NewDoc + waiting_delayed_commit = nil, - revs_limit = 1000, - fsync_options = [], + options = [], - compression, - before_doc_update = nil, % nil | fun(Doc, Db) -> NewDoc - after_doc_read = nil % nil | fun(Doc, Db) -> NewDoc + compression }). -record(view_fold_helper_funs, { http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/a7c6713d/src/couch_att.erl ---------------------------------------------------------------------- diff --git a/src/couch_att.erl b/src/couch_att.erl index 9d38cfa..8ea1e45 100644 --- a/src/couch_att.erl +++ b/src/couch_att.erl @@ -18,7 +18,8 @@ fetch/2, store/2, store/3, - transform/3 + transform/3, + copy/2 ]). -export([ @@ -233,6 +234,14 @@ transform(Field, Fun, Att) -> store(Field, NewValue, Att). +copy(Att, DstStream) -> + [{stream, SrcStream}, AttLen, OldMd5] = fetch([data, att_len, md5], Att), + ok = couch_stream:copy(SrcStream, DstStream), + {NewStream, AttLen, _, NewMd5, _} = couch_stream:close(DstStream), + couch_util:check_md5(OldMd5, NewMd5), + store(data, {stream, NewStream}, Att). + + is_stub(Att) -> stub == fetch(data, Att). @@ -292,11 +301,12 @@ size_info(Atts) -> %% as safe as possible, avoiding the need for complicated disk versioning %% schemes. to_disk_term(#att{} = Att) -> - {_, StreamIndex} = fetch(data, Att), + {stream, StreamEngine} = fetch(data, Att), + {ok, Sp} = couch_stream:to_disk_term(StreamEngine), { fetch(name, Att), fetch(type, Att), - StreamIndex, + Sp, fetch(att_len, Att), fetch(disk_len, Att), fetch(revpos, Att), @@ -309,9 +319,13 @@ to_disk_term(Att) -> fun (data, {Props, Values}) -> case lists:keytake(data, 1, Props) of - {value, {_, {_Fd, Sp}}, Other} -> {Other, [Sp | Values]}; - {value, {_, Value}, Other} -> {Other, [Value | Values]}; - false -> {Props, [undefined |Values ]} + {value, {_, {stream, StreamEngine}}, Other} -> + {ok, Sp} = couch_stream:to_disk_term(StreamEngine), + {Other, [Sp | Values]}; + {value, {_, Value}, Other} -> + {Other, [Value | Values]}; + false -> + {Props, [undefined |Values ]} end; (Key, {Props, Values}) -> case lists:keytake(Key, 1, Props) of @@ -332,9 +346,11 @@ to_disk_term(Att) -> %% compression to remove these sorts of common bits (block level compression %% with something like a shared dictionary that is checkpointed every now and %% then). -from_disk_term(Fd, {Base, Extended}) when is_tuple(Base), is_list(Extended) -> - store(Extended, from_disk_term(Fd, Base)); -from_disk_term(Fd, {Name,Type,Sp,AttLen,DiskLen,RevPos,Md5,Enc}) -> +from_disk_term(StreamSrc, {Base, Extended}) + when is_tuple(Base), is_list(Extended) -> + store(Extended, from_disk_term(StreamSrc, Base)); +from_disk_term(StreamSrc, {Name,Type,Sp,AttLen,DiskLen,RevPos,Md5,Enc}) -> + {ok, Stream} = open_stream(StreamSrc, Sp), #att{ name=Name, type=Type, @@ -342,10 +358,11 @@ from_disk_term(Fd, {Name,Type,Sp,AttLen,DiskLen,RevPos,Md5,Enc}) -> disk_len=DiskLen, md5=Md5, revpos=RevPos, - data={Fd,Sp}, + data={stream, Stream}, encoding=upgrade_encoding(Enc) }; -from_disk_term(Fd, {Name,Type,Sp,AttLen,RevPos,Md5}) -> +from_disk_term(StreamSrc, {Name,Type,Sp,AttLen,RevPos,Md5}) -> + {ok, Stream} = open_stream(StreamSrc, Sp), #att{ name=Name, type=Type, @@ -353,9 +370,10 @@ from_disk_term(Fd, {Name,Type,Sp,AttLen,RevPos,Md5}) -> disk_len=AttLen, md5=Md5, revpos=RevPos, - data={Fd,Sp} + data={stream, Stream} }; -from_disk_term(Fd, {Name,{Type,Sp,AttLen}}) -> +from_disk_term(StreamSrc, {Name,{Type,Sp,AttLen}}) -> + {ok, Stream} = open_stream(StreamSrc, Sp), #att{ name=Name, type=Type, @@ -363,7 +381,7 @@ from_disk_term(Fd, {Name,{Type,Sp,AttLen}}) -> disk_len=AttLen, md5= <<>>, revpos=0, - data={Fd,Sp} + data={stream, Stream} }. @@ -477,32 +495,18 @@ to_json(Att, OutputData, DataToFollow, ShowEncoding) -> {Name, {Props ++ DigestProp ++ DataProps ++ EncodingProps ++ HeadersProp}}. -flush(Fd, Att) -> - flush_data(Fd, fetch(data, Att), Att). +flush(Db, Att) -> + flush_data(Db, fetch(data, Att), Att). -flush_data(Fd, {Fd0, _}, Att) when Fd0 == Fd -> - % already written to our file, nothing to write - Att; -flush_data(Fd, {OtherFd, StreamPointer}, Att) -> - [InMd5, InDiskLen] = fetch([md5, disk_len], Att), - {NewStreamData, Len, _IdentityLen, Md5, IdentityMd5} = - couch_stream:copy_to_new_stream(OtherFd, StreamPointer, Fd), - couch_db:check_md5(IdentityMd5, InMd5), - store([ - {data, {Fd, NewStreamData}}, - {md5, Md5}, - {att_len, Len}, - {disk_len, InDiskLen} - ], Att); -flush_data(Fd, Data, Att) when is_binary(Data) -> - couch_db:with_stream(Fd, Att, fun(OutputStream) -> +flush_data(Db, Data, Att) when is_binary(Data) -> + couch_db:with_stream(Db, Att, fun(OutputStream) -> couch_stream:write(OutputStream, Data) end); -flush_data(Fd, Fun, Att) when is_function(Fun) -> +flush_data(Db, Fun, Att) when is_function(Fun) -> case fetch(att_len, Att) of undefined -> - couch_db:with_stream(Fd, Att, fun(OutputStream) -> + couch_db:with_stream(Db, Att, fun(OutputStream) -> % Fun(MaxChunkSize, WriterFun) must call WriterFun % once for each chunk of the attachment, Fun(4096, @@ -523,11 +527,11 @@ flush_data(Fd, Fun, Att) when is_function(Fun) -> end, ok) end); AttLen -> - couch_db:with_stream(Fd, Att, fun(OutputStream) -> + couch_db:with_stream(Db, Att, fun(OutputStream) -> write_streamed_attachment(OutputStream, Fun, AttLen) end) end; -flush_data(Fd, {follows, Parser, Ref}, Att) -> +flush_data(Db, {follows, Parser, Ref}, Att) -> ParserRef = erlang:monitor(process, Parser), Fun = fun() -> Parser ! {get_bytes, Ref, self()}, @@ -541,9 +545,23 @@ flush_data(Fd, {follows, Parser, Ref}, Att) -> end end, try - flush_data(Fd, Fun, store(data, Fun, Att)) + flush_data(Db, Fun, store(data, Fun, Att)) after erlang:demonitor(ParserRef, [flush]) + end; +flush_data(Db, {stream, StreamEngine}, Att) -> + case couch_db:is_active_stream(Db, StreamEngine) of + true -> + % Already written + Att; + false -> + NewAtt = couch_db:with_stream(Db, Att, fun(OutputStream) -> + couch_stream:copy(StreamEngine, OutputStream) + end), + InMd5 = fetch(md5, Att), + OutMd5 = fetch(md5, NewAtt), + couch_util:check_md5(OutMd5, InMd5), + NewAtt end. @@ -572,9 +590,9 @@ foldl(Att, Fun, Acc) -> foldl(Bin, _Att, Fun, Acc) when is_binary(Bin) -> Fun(Bin, Acc); -foldl({Fd, Sp}, Att, Fun, Acc) -> +foldl({stream, StreamEngine}, Att, Fun, Acc) -> Md5 = fetch(md5, Att), - couch_stream:foldl(Fd, Sp, Md5, Fun, Acc); + couch_stream:foldl(StreamEngine, Md5, Fun, Acc); foldl(DataFun, Att, Fun, Acc) when is_function(DataFun) -> Len = fetch(att_len, Att), fold_streamed_data(DataFun, Len, Fun, Acc); @@ -599,14 +617,15 @@ foldl({follows, Parser, Ref}, Att, Fun, Acc) -> range_foldl(Att, From, To, Fun, Acc) -> - {Fd, Sp} = fetch(data, Att), - couch_stream:range_foldl(Fd, Sp, From, To, Fun, Acc). + {stream, StreamEngine} = fetch(data, Att), + couch_stream:range_foldl(StreamEngine, From, To, Fun, Acc). foldl_decode(Att, Fun, Acc) -> case fetch([data, encoding], Att) of - [{Fd, Sp}, Enc] -> - couch_stream:foldl_decode(Fd, Sp, fetch(md5, Att), Enc, Fun, Acc); + [{stream, StreamEngine}, Enc] -> + couch_stream:foldl_decode( + StreamEngine, fetch(md5, Att), Enc, Fun, Acc); [Fun2, identity] -> fold_streamed_data(Fun2, fetch(att_len, Att), Fun, Acc) end. @@ -620,7 +639,7 @@ to_binary(Bin, _Att) when is_binary(Bin) -> Bin; to_binary(Iolist, _Att) when is_list(Iolist) -> iolist_to_binary(Iolist); -to_binary({_Fd,_Sp}, Att) -> +to_binary({stream, _StreamEngine}, Att) -> iolist_to_binary( lists:reverse(foldl(Att, fun(Bin,Acc) -> [Bin|Acc] end, [])) ); @@ -680,9 +699,25 @@ upgrade_encoding(false) -> identity; upgrade_encoding(Encoding) -> Encoding. +open_stream(StreamSrc, Data) -> + case couch_db:is_db(StreamSrc) of + true -> + couch_db:open_read_stream(StreamSrc, Data); + false -> + case is_function(StreamSrc, 1) of + true -> + StreamSrc(Data); + false -> + erlang:error({invalid_stream_source, StreamSrc}) + end + end. + + -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). +% Eww... +-include("couch_bt_engine.hrl"). %% Test utilities @@ -737,7 +772,7 @@ attachment_disk_term_test_() -> {disk_len, 0}, {md5, <<212,29,140,217,143,0,178,4,233,128,9,152,236,248,66,126>>}, {revpos, 4}, - {data, {fake_fd, fake_sp}}, + {data, {stream, {couch_bt_engine_stream, {fake_fd, fake_sp}}}}, {encoding, identity} ]), BaseDiskTerm = { @@ -751,11 +786,14 @@ attachment_disk_term_test_() -> Headers = [{<<"X-Foo">>, <<"bar">>}], ExtendedAttachment = store(headers, Headers, BaseAttachment), ExtendedDiskTerm = {BaseDiskTerm, [{headers, Headers}]}, + FakeDb = #db{ + engine = {couch_bt_engine, #st{fd = fake_fd}} + }, {"Disk term tests", [ ?_assertEqual(BaseDiskTerm, to_disk_term(BaseAttachment)), - ?_assertEqual(BaseAttachment, from_disk_term(fake_fd, BaseDiskTerm)), + ?_assertEqual(BaseAttachment, from_disk_term(FakeDb, BaseDiskTerm)), ?_assertEqual(ExtendedDiskTerm, to_disk_term(ExtendedAttachment)), - ?_assertEqual(ExtendedAttachment, from_disk_term(fake_fd, ExtendedDiskTerm)) + ?_assertEqual(ExtendedAttachment, from_disk_term(FakeDb, ExtendedDiskTerm)) ]}. http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/a7c6713d/src/couch_auth_cache.erl ---------------------------------------------------------------------- diff --git a/src/couch_auth_cache.erl b/src/couch_auth_cache.erl index 9b00a9d..6f25665 100644 --- a/src/couch_auth_cache.erl +++ b/src/couch_auth_cache.erl @@ -331,15 +331,12 @@ refresh_entries(AuthDb) -> nil -> ok; AuthDb2 -> - case AuthDb2#db.update_seq > AuthDb#db.update_seq of + OldSeq = couch_db:get_update_seq(AuthDb), + NewSeq = couch_db:get_update_seq(AuthDb2), + case NewSeq > OldSeq of true -> - {ok, _, _} = couch_db:enum_docs_since( - AuthDb2, - AuthDb#db.update_seq, - fun(DocInfo, _, _) -> refresh_entry(AuthDb2, DocInfo) end, - AuthDb#db.update_seq, - [] - ), + Fun = fun(DocInfo, _) -> refresh_entry(AuthDb2, DocInfo) end, + {ok, _} = couch_db:fold_changes(AuthDb2, OldSeq, Fun, nil), true = ets:insert(?STATE, {auth_db, AuthDb2}); false -> ok @@ -395,7 +392,9 @@ cache_needs_refresh() -> nil -> false; AuthDb2 -> - AuthDb2#db.update_seq > AuthDb#db.update_seq + OldSeq = couch_db:get_update_seq(AuthDb), + NewSeq = couch_db:get_update_seq(AuthDb2), + NewSeq > OldSeq end end, false http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/a7c6713d/src/couch_changes.erl ---------------------------------------------------------------------- diff --git a/src/couch_changes.erl b/src/couch_changes.erl index 52ff39d..5779c12 100644 --- a/src/couch_changes.erl +++ b/src/couch_changes.erl @@ -219,7 +219,7 @@ configure_filter("_view", Style, Req, Db) -> catch _:_ -> view end, - case Db#db.id_tree of + case Db#db.main_pid of undefined -> DIR = fabric_util:doc_id_and_rev(DDoc), {fetch, FilterType, Style, DIR, VName}; @@ -242,7 +242,7 @@ configure_filter(FilterName, Style, Req, Db) -> [DName, FName] -> {ok, DDoc} = open_ddoc(Db, <<"_design/", DName/binary>>), check_member_exists(DDoc, [<<"filters">>, FName]), - case Db#db.id_tree of + case Db#db.main_pid of undefined -> DIR = fabric_util:doc_id_and_rev(DDoc), {fetch, custom, Style, Req, DIR, FName}; @@ -395,7 +395,7 @@ check_fields(_Fields) -> throw({bad_request, "Selector error: fields must be JSON array"}). -open_ddoc(#db{name=DbName, id_tree=undefined}, DDocId) -> +open_ddoc(#db{name=DbName, main_pid=undefined}, DDocId) -> case ddoc_cache:open_doc(mem3:dbname(DbName), DDocId) of {ok, _} = Resp -> Resp; Else -> throw(Else) @@ -531,7 +531,8 @@ send_changes(Acc, Dir, FirstRound) -> {#mrview{}, {fast_view, _, _, _}} -> couch_mrview:view_changes_since(View, StartSeq, DbEnumFun, [{dir, Dir}], Acc); {undefined, _} -> - couch_db:changes_since(Db, StartSeq, DbEnumFun, [{dir, Dir}], Acc); + Opts = [doc_info, {dir, Dir}], + couch_db:fold_changes(Db, StartSeq, DbEnumFun, Acc, Opts); {#mrview{}, _} -> ViewEnumFun = fun view_changes_enumerator/2, {Go, Acc0} = couch_mrview:view_changes_since(View, StartSeq, ViewEnumFun, [{dir, Dir}], Acc), @@ -566,20 +567,24 @@ can_optimize(_, _) -> send_changes_doc_ids(Db, StartSeq, Dir, Fun, Acc0, {doc_ids, _Style, DocIds}) -> - Lookups = couch_btree:lookup(Db#db.id_tree, DocIds), + Results = couch_db:open_docs(Db, DocIds, [full_doc_info]), FullInfos = lists:foldl(fun - ({ok, FDI}, Acc) -> [FDI | Acc]; + (#full_doc_info{}=FDI, Acc) -> [FDI | Acc]; (not_found, Acc) -> Acc - end, [], Lookups), + end, [], Results), send_lookup_changes(FullInfos, StartSeq, Dir, Db, Fun, Acc0). send_changes_design_docs(Db, StartSeq, Dir, Fun, Acc0, {design_docs, _Style}) -> - FoldFun = fun(FullDocInfo, _, Acc) -> + FoldFun = fun(FullDocInfo, Acc) -> {ok, [FullDocInfo | Acc]} end, - KeyOpts = [{start_key, <<"_design/">>}, {end_key_gt, <<"_design0">>}], - {ok, _, FullInfos} = couch_btree:fold(Db#db.id_tree, FoldFun, [], KeyOpts), + KeyOpts = [ + include_deleted, + {start_key, <<"_design/">>}, + {end_key_gt, <<"_design0">>} + ], + {ok, FullInfos} = couch_db:fold_docs(Db, FoldFun, [], KeyOpts), send_lookup_changes(FullInfos, StartSeq, Dir, Db, Fun, Acc0). http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/a7c6713d/src/couch_compaction_daemon.erl ---------------------------------------------------------------------- diff --git a/src/couch_compaction_daemon.erl b/src/couch_compaction_daemon.erl index 8f95eb2..77e3f54 100644 --- a/src/couch_compaction_daemon.erl +++ b/src/couch_compaction_daemon.erl @@ -236,17 +236,18 @@ maybe_compact_views(DbName, [DDocName | Rest], Config) -> db_ddoc_names(Db) -> - {ok, _, DDocNames} = couch_db:enum_docs( - Db, - fun(#full_doc_info{id = <<"_design/", _/binary>>, deleted = true}, _, Acc) -> - {ok, Acc}; - (#full_doc_info{id = <<"_design/", Id/binary>>}, _, Acc) -> - {ok, [Id | Acc]}; - (_, _, Acc) -> - {stop, Acc} - end, [], [{start_key, <<"_design/">>}, {end_key_gt, <<"_design0">>}]), + FoldFun = fun ddoc_name/2, + Opts = [{start_key, <<"_design/">>}], + {ok, DDocNames} = couch_db:fold_docs(Db, FoldFun, [], Opts), DDocNames. +ddoc_name(#full_doc_info{id = <<"_design/", _/binary>>, deleted = true}, Acc) -> + {ok, Acc}; +ddoc_name(#full_doc_info{id = <<"_design/", Id/binary>>}, Acc) -> + {ok, [Id | Acc]}; +ddoc_name(_, Acc) -> + {stop, Acc}. + maybe_compact_view(DbName, GroupId, Config) -> DDocId = <<"_design/", GroupId/binary>>, @@ -391,21 +392,22 @@ check_frag(Threshold, Frag) -> frag(Props) -> - FileSize = couch_util:get_value(disk_size, Props), + {Sizes} = couch_util:get_value(sizes, Props), + FileSize = couch_util:get_value(file, Sizes), MinFileSize = list_to_integer( config:get("compaction_daemon", "min_file_size", "131072")), case FileSize < MinFileSize of true -> {0, FileSize}; false -> - case couch_util:get_value(data_size, Props) of - null -> - {100, FileSize}; + case couch_util:get_value(active, Sizes) of 0 -> {0, FileSize}; - DataSize -> + DataSize when is_integer(DataSize), DataSize > 0 -> Frag = round(((FileSize - DataSize) / FileSize * 100)), - {Frag, space_required(DataSize)} + {Frag, space_required(DataSize)}; + _ -> + {100, FileSize} end end. http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/a7c6713d/src/couch_db.erl ---------------------------------------------------------------------- diff --git a/src/couch_db.erl b/src/couch_db.erl index 8005e6d..25843d8 100644 --- a/src/couch_db.erl +++ b/src/couch_db.erl @@ -13,30 +13,40 @@ -module(couch_db). -export([open/2,open_int/2,close/1,create/2,get_db_info/1,get_design_docs/1]). --export([start_compact/1, cancel_compact/1]). +-export([get_path/1]). +-export([shutdown/1]). +-export([incref/1]). +-export([start_compact/1, start_compact/2, cancel_compact/1]). -export([wait_for_compaction/1, wait_for_compaction/2]). --export([is_idle/1,monitor/1,count_changes_since/2]). +-export([is_idle/1,monitor/1,pid/1,compactor_pid/1]). -export([update_doc/3,update_doc/4,update_docs/4,update_docs/2,update_docs/3,delete_doc/3]). -export([get_doc_info/2,get_full_doc_info/2,get_full_doc_infos/2]). --export([open_doc/2,open_doc/3,open_doc_revs/4]). +-export([open_doc/2,open_doc/3,open_doc_revs/4, open_docs/2, open_docs/3]). -export([set_revs_limit/2,get_revs_limit/1]). +-export([get_user_ctx/1, set_user_ctx/2]). -export([get_missing_revs/2,name/1,get_update_seq/1,get_committed_update_seq/1]). -export([get_uuid/1, get_epochs/1, get_compacted_seq/1]). --export([enum_docs/4,enum_docs_since/5]). --export([enum_docs_since_reduce_to_count/1,enum_docs_reduce_to_count/1]). --export([increment_update_seq/1,get_purge_seq/1,purge_docs/2,get_last_purged/1]). --export([start_link/3,open_doc_int/3,ensure_full_commit/1,ensure_full_commit/2]). +-export([get_instance_start_time/1]). +-export([get_purge_seq/1,purge_docs/2,get_last_purged/1]). +-export([start_link/4,open_doc_int/3,ensure_full_commit/1,ensure_full_commit/2]). +-export([fold_docs/3, fold_docs/4]). +-export([fold_changes/4, fold_changes/5, count_changes_since/2]). -export([set_security/2,get_security/1]). --export([changes_since/4,changes_since/5,read_doc/2,new_revid/1]). +-export([read_doc/2,new_revid/1]). -export([check_is_admin/1, is_admin/1, check_is_member/1, get_doc_count/1]). --export([reopen/1, is_system_db/1, compression/1, make_doc/5]). --export([load_validation_funs/1]). --export([check_md5/2, with_stream/3]). +-export([reopen/1, is_system_db/1, make_doc/5]). +-export([load_validation_funs/1, reload_validation_funs/1]). +-export([with_stream/3]). -export([monitored_by/1]). -export([normalize_dbname/1]). -export([validate_dbname/1]). -export([dbname_suffix/1]). + +-export([is_db/1]). +-export([open_write_stream/2, open_read_stream/2, is_active_stream/2]). +-export([get_before_doc_update/1, get_after_doc_read/1]). + -include_lib("couch/include/couch_db.hrl"). -define(DBNAME_REGEX, @@ -44,38 +54,9 @@ "(\\.[0-9]{10,})?$" % but allow an optional shard timestamp at the end ). -start_link(DbName, Filepath, Options) -> - case open_db_file(Filepath, Options) of - {ok, Fd} -> - {ok, UpdaterPid} = gen_server:start_link(couch_db_updater, {DbName, - Filepath, Fd, Options}, []), - unlink(Fd), - gen_server:call(UpdaterPid, get_db); - Else -> - Else - end. - -open_db_file(Filepath, Options) -> - case couch_file:open(Filepath, Options) of - {ok, Fd} -> - {ok, Fd}; - {error, enoent} -> - % couldn't find file. is there a compact version? This can happen if - % crashed during the file switch. - case couch_file:open(Filepath ++ ".compact", [nologifmissing]) of - {ok, Fd} -> - couch_log:info("Found ~s~s compaction file, using as primary" - " storage.", [Filepath, ".compact"]), - ok = file:rename(Filepath ++ ".compact", Filepath), - ok = couch_file:sync(Fd), - {ok, Fd}; - {error, enoent} -> - {not_found, no_db_file} - end; - Error -> - Error - end. - +start_link(Engine, DbName, Filepath, Options) -> + Arg = {Engine, DbName, Filepath, Options}, + proc_lib:start_link(couch_db_updater, init, [Arg]). create(DbName, Options) -> couch_server:create(DbName, Options). @@ -87,7 +68,7 @@ open_int(DbName, Options) -> % this should be called anytime an http request opens the database. % it ensures that the http userCtx is a valid reader -open(DbName, Options) -> +open(DbName, Options) when is_binary(DbName) -> case couch_server:open(DbName, Options) of {ok, Db} -> try @@ -98,23 +79,49 @@ open(DbName, Options) -> close(Db), throw(Error) end; - Else -> Else - end. + Else -> + Else + end; +open(DbName, Options) -> + open(iolist_to_binary(DbName), Options). -reopen(#db{main_pid = Pid, fd = Fd, fd_monitor = OldRef, user_ctx = UserCtx}) -> - {ok, #db{fd = NewFd} = NewDb} = gen_server:call(Pid, get_db, infinity), - case NewFd =:= Fd of - true -> - {ok, NewDb#db{user_ctx = UserCtx}}; - false -> - erlang:demonitor(OldRef, [flush]), - NewRef = erlang:monitor(process, NewFd), - {ok, NewDb#db{user_ctx = UserCtx, fd_monitor = NewRef}} - end. + +reopen(#db{} = Db) -> + % We could have just swapped out the storage engine + % for this database during a compaction so we just + % reimplement this as a close/open pair now. + close(Db), + open(Db#db.name, [{user_ctx, Db#db.user_ctx} | Db#db.options]). + + +% You shouldn't call this. Its part of the ref counting between +% couch_server and couch_db instances. +incref(#db{} = Db) -> + couch_db_engine:incref(Db). + + +close(#db{} = Db) -> + ok = couch_db_engine:decref(Db). + +shutdown(#db{} = Db) -> + couch_util:shutdown_sync(Db#db.main_pid). + +is_db(#db{}) -> true; +is_db(_Else) -> false. + + +pid(#db{} = Db) -> + Db#db.main_pid. + +compactor_pid(#db{} = Db) -> + Db#db.compactor_pid. is_system_db(#db{options = Options}) -> lists:member(sys_db, Options). +get_path(#db{filepath = Path}) -> + Path. + ensure_full_commit(#db{main_pid=Pid, instance_start_time=StartTime}) -> ok = gen_server:call(Pid, full_commit, infinity), {ok, StartTime}. @@ -124,30 +131,35 @@ ensure_full_commit(Db, RequiredSeq) -> ok = gen_server:call(Pid, {full_commit, RequiredSeq}, infinity), {ok, StartTime}. -close(#db{fd_monitor=Ref}) -> - erlang:demonitor(Ref, [flush]), - ok. - is_idle(#db{compactor_pid=nil, waiting_delayed_commit=nil} = Db) -> monitored_by(Db) == []; is_idle(_Db) -> false. monitored_by(Db) -> - case erlang:process_info(Db#db.fd, monitored_by) of - undefined -> - []; - {monitored_by, Pids} -> - PidTracker = whereis(couch_stats_process_tracker), - Pids -- [Db#db.main_pid, PidTracker] + case couch_db_engine:monitored_by(Db) of + Pids when is_list(Pids) -> + PidTracker = whereis(couch_stats_process_tracker), + Pids -- [Db#db.main_pid, PidTracker]; + undefined -> + [] end. monitor(#db{main_pid=MainPid}) -> erlang:monitor(process, MainPid). -start_compact(#db{main_pid=Pid}) -> - gen_server:call(Pid, start_compact). +start_compact(#db{} = Db) -> + start_compact(Db, []). + +start_compact(#db{} = Db, Opts) -> + case lists:keyfind(notify, 1, Opts) of + {notify, Pid, Term} -> + Db#db.main_pid ! {'$gen_call', {Pid, Term}, start_compact}, + ok; + _ -> + gen_server:call(Db#db.main_pid, start_compact) + end. cancel_compact(#db{main_pid=Pid}) -> gen_server:call(Pid, cancel_compact). @@ -198,6 +210,14 @@ open_doc(Db, Id, Options) -> apply_open_options(Else,Options) end. +open_docs(Db, Id) -> + open_docs(Db, Id, []). + +open_docs(Db, Ids, _Options) -> + % TODO: Add support for returning other + % types of docs beyond #full_doc_info{} + couch_db_engine:open_docs(Db, Ids). + apply_open_options({ok, Doc},Options) -> apply_open_options2(Doc,Options); apply_open_options(Else,_Options) -> @@ -247,7 +267,8 @@ get_missing_revs(Db, IdRevsList) -> find_missing([], []) -> []; -find_missing([{Id, Revs}|RestIdRevs], [{ok, FullInfo} | RestLookupInfo]) -> +find_missing([{Id, Revs}|RestIdRevs], [FullInfo | RestLookupInfo]) + when is_record(FullInfo, full_doc_info) -> case couch_key_tree:find_missing(FullInfo#full_doc_info.rev_tree, Revs) of [] -> find_missing(RestIdRevs, RestLookupInfo); @@ -275,8 +296,8 @@ find_missing([{Id, Revs}|RestIdRevs], [not_found | RestLookupInfo]) -> get_doc_info(Db, Id) -> case get_full_doc_info(Db, Id) of - {ok, DocInfo} -> - {ok, couch_doc:to_doc_info(DocInfo)}; + #full_doc_info{} = FDI -> + {ok, couch_doc:to_doc_info(FDI)}; Else -> Else end. @@ -287,10 +308,7 @@ get_full_doc_info(Db, Id) -> Result. get_full_doc_infos(Db, Ids) -> - couch_btree:lookup(Db#db.id_tree, Ids). - -increment_update_seq(#db{main_pid=Pid}) -> - gen_server:call(Pid, increment_update_seq). + couch_db_engine:open_docs(Db, Ids). purge_docs(#db{main_pid=Pid}, IdsRevs) -> gen_server:call(Pid, {purge_docs, IdsRevs}). @@ -298,56 +316,56 @@ purge_docs(#db{main_pid=Pid}, IdsRevs) -> get_committed_update_seq(#db{committed_update_seq=Seq}) -> Seq. -get_update_seq(#db{update_seq=Seq})-> - Seq. +get_user_ctx(#db{} = Db) -> + Db#db.user_ctx. + +set_user_ctx(#db{} = Db, #user_ctx{} = UserCtx) -> + {ok, Db#db{user_ctx = UserCtx}}. + +get_update_seq(#db{} = Db)-> + couch_db_engine:get(Db, update_seq). get_purge_seq(#db{}=Db) -> - couch_db_header:purge_seq(Db#db.header). + {ok, couch_db_engine:get(Db, purge_seq)}. get_last_purged(#db{}=Db) -> - case couch_db_header:purged_docs(Db#db.header) of - nil -> - {ok, []}; - Pointer -> - couch_file:pread_term(Db#db.fd, Pointer) - end. + {ok, couch_db_engine:get(Db, last_purged)}. get_doc_count(Db) -> - {ok, {Count, _, _}} = couch_btree:full_reduce(Db#db.id_tree), - {ok, Count}. + {ok, couch_db_engine:get(Db, doc_count)}. + +get_del_doc_count(Db) -> + {ok, couch_db_engine:get(Db, del_doc_count)}. get_uuid(#db{}=Db) -> - couch_db_header:uuid(Db#db.header). + couch_db_engine:get(Db, uuid). get_epochs(#db{}=Db) -> - couch_db_header:epochs(Db#db.header). + couch_db_engine:get(Db, epochs). get_compacted_seq(#db{}=Db) -> - couch_db_header:compacted_seq(Db#db.header). + couch_db_engine:get(Db, compacted_seq). + +get_instance_start_time(#db{}=Db) -> + Db#db.instance_start_time. + +get_before_doc_update(#db{} = Db) -> + Db#db.before_doc_update. + +get_after_doc_read(#db{} = Db) -> + Db#db.after_doc_read. get_db_info(Db) -> - #db{fd=Fd, - header=Header, - compactor_pid=Compactor, - update_seq=SeqNum, - name=Name, - instance_start_time=StartTime, - committed_update_seq=CommittedUpdateSeq, - id_tree = IdBtree + #db{ + name = Name, + compactor_pid = Compactor, + instance_start_time = StartTime, + committed_update_seq=CommittedUpdateSeq } = Db, - {ok, FileSize} = couch_file:bytes(Fd), - {ok, DbReduction} = couch_btree:full_reduce(IdBtree), - SizeInfo0 = element(3, DbReduction), - SizeInfo = case SizeInfo0 of - SI when is_record(SI, size_info) -> - SI; - {AS, ES} -> - #size_info{active=AS, external=ES}; - AS -> - #size_info{active=AS} - end, - ActiveSize = active_size(Db, SizeInfo), - DiskVersion = couch_db_header:disk_version(Header), + {ok, DocCount} = get_doc_count(Db), + {ok, DelDocCount} = get_del_doc_count(Db), + SizeInfo = couch_db_engine:get(Db, size_info), + DiskVersion = couch_db_engine:get(Db, disk_version), Uuid = case get_uuid(Db) of undefined -> null; Uuid0 -> Uuid0 @@ -358,63 +376,33 @@ get_db_info(Db) -> end, InfoList = [ {db_name, Name}, - {doc_count, element(1, DbReduction)}, - {doc_del_count, element(2, DbReduction)}, - {update_seq, SeqNum}, - {purge_seq, couch_db:get_purge_seq(Db)}, - {compact_running, Compactor/=nil}, - {disk_size, FileSize}, % legacy - {other, {[{data_size, SizeInfo#size_info.external}]}}, % legacy - {data_size, ActiveSize}, % legacy - {sizes, {[ - {file, FileSize}, - {active, ActiveSize}, - {external, SizeInfo#size_info.external} - ]}}, + {engine, couch_db_engine:get(Db, engine)}, + {doc_count, DocCount}, + {doc_del_count, DelDocCount}, + {update_seq, get_update_seq(Db)}, + {purge_seq, couch_db_engine:get(Db, purge_seq)}, + {compact_running, Compactor /= nil}, + {sizes, {SizeInfo}}, {instance_start_time, StartTime}, {disk_format_version, DiskVersion}, {committed_update_seq, CommittedUpdateSeq}, {compacted_seq, CompactedSeq}, {uuid, Uuid} - ], + ], {ok, InfoList}. -active_size(#db{}=Db, Size) when is_integer(Size) -> - active_size(Db, #size_info{active=Size}); -active_size(#db{}=Db, #size_info{}=SI) -> - Trees = [ - Db#db.id_tree, - Db#db.seq_tree, - Db#db.local_tree - ], - lists:foldl(fun(T, Acc) -> - case couch_btree:size(T) of - _ when Acc == null -> - null; - undefined -> - null; - Size -> - Acc + Size - end - end, SI#size_info.active, Trees). + get_design_docs(#db{name = <<"shards/", _:18/binary, DbName/binary>>}) -> {_, Ref} = spawn_monitor(fun() -> exit(fabric:design_docs(DbName)) end), receive {'DOWN', Ref, _, _, Response} -> Response end; -get_design_docs(#db{id_tree = IdBtree}) -> - FoldFun = pipe([fun skip_deleted/4], fun - (#full_doc_info{deleted = true}, _Reds, Acc) -> - {ok, Acc}; - (#full_doc_info{id= <<"_design/",_/binary>>}=FullDocInfo, _Reds, Acc) -> - {ok, [FullDocInfo | Acc]}; - (_, _Reds, Acc) -> - {stop, Acc} - end), - KeyOpts = [{start_key, <<"_design/">>}, {end_key_gt, <<"_design0">>}], - {ok, _, Docs} = couch_btree:fold(IdBtree, FoldFun, [], KeyOpts), - {ok, Docs}. +get_design_docs(#db{} = Db) -> + FoldFun = fun(FDI, Acc) -> {ok, [FDI | Acc]} end, + {ok, Docs} = fold_design_docs(Db, FoldFun, [], []), + {ok, lists:reverse(Docs)}. + check_is_admin(#db{user_ctx=UserCtx}=Db) -> @@ -539,8 +527,8 @@ validate_names_and_roles({Props}) when is_list(Props) -> end, ok. -get_revs_limit(#db{revs_limit=Limit}) -> - Limit. +get_revs_limit(#db{} = Db) -> + couch_db_engine:get(Db, revs_limit). set_revs_limit(#db{main_pid=Pid}=Db, Limit) when Limit > 0 -> check_is_admin(Db), @@ -548,12 +536,10 @@ set_revs_limit(#db{main_pid=Pid}=Db, Limit) when Limit > 0 -> set_revs_limit(_Db, _Limit) -> throw(invalid_revs_limit). + name(#db{name=Name}) -> Name. -compression(#db{compression=Compression}) -> - Compression. - update_doc(Db, Doc, Options) -> update_doc(Db, Doc, Options, interactive_edit). @@ -683,6 +669,9 @@ load_validation_funs(#db{main_pid=Pid}=Db) -> gen_server:cast(Pid, {load_validation_funs, Funs}), Funs. +reload_validation_funs(#db{} = Db) -> + gen_server:cast(Db#db.main_pid, {load_validation_funs, undefined}). + prep_and_validate_update(Db, #doc{id=Id,revs={RevStart, Revs}}=Doc, OldFullDocInfo, LeafRevsDict, AllowConflict) -> case Revs of @@ -749,7 +738,7 @@ prep_and_validate_updates(Db, [DocBucket|RestBuckets], [not_found|RestLookups], prep_and_validate_updates(Db, RestBuckets, RestLookups, AllowConflict, [PreppedBucket | AccPrepped], AccErrors3); prep_and_validate_updates(Db, [DocBucket|RestBuckets], - [{ok, #full_doc_info{rev_tree=OldRevTree}=OldFullDocInfo}|RestLookups], + [#full_doc_info{rev_tree=OldRevTree}=OldFullDocInfo|RestLookups], AllowConflict, AccPrepped, AccErrors) -> Leafs = couch_key_tree:get_all_leafs(OldRevTree), LeafRevsDict = dict:from_list([ @@ -800,13 +789,14 @@ prep_and_validate_replicated_updates(Db, [Bucket|RestBuckets], [OldInfo|RestOldI end, {[], AccErrors}, Bucket), prep_and_validate_replicated_updates(Db, RestBuckets, RestOldInfo, [ValidatedBucket | AccPrepped], AccErrors3); - {ok, #full_doc_info{rev_tree=OldTree}} -> + #full_doc_info{rev_tree=OldTree} -> + RevsLimit = get_revs_limit(Db), OldLeafs = couch_key_tree:get_all_leafs_full(OldTree), OldLeafsLU = [{Start, RevId} || {Start, [{RevId, _}|_]} <- OldLeafs], NewRevTree = lists:foldl( fun(NewDoc, AccTree) -> {NewTree, _} = couch_key_tree:merge(AccTree, - couch_doc:to_path(NewDoc), Db#db.revs_limit), + couch_doc:to_path(NewDoc), RevsLimit), NewTree end, OldTree, Bucket), @@ -942,7 +932,7 @@ update_docs(Db, Docs0, Options, replicated_changes) -> DocErrors = [], DocBuckets3 = DocBuckets end, - DocBuckets4 = [[doc_flush_atts(check_dup_atts(Doc), Db#db.fd) + DocBuckets4 = [[doc_flush_atts(Db, check_dup_atts(Doc)) || Doc <- Bucket] || Bucket <- DocBuckets3], {ok, []} = write_and_commit(Db, DocBuckets4, [], [merge_conflicts | Options]), {ok, DocErrors}; @@ -996,8 +986,8 @@ update_docs(Db, Docs0, Options, interactive_edit) -> Options2 = if AllOrNothing -> [merge_conflicts]; true -> [] end ++ Options, DocBuckets3 = [[ - doc_flush_atts(set_new_att_revpos( - check_dup_atts(Doc)), Db#db.fd) + doc_flush_atts(Db, set_new_att_revpos( + check_dup_atts(Doc))) || Doc <- B] || B <- DocBuckets2], {DocBuckets4, IdRevs} = new_revs(DocBuckets3, [], []), @@ -1081,7 +1071,7 @@ write_and_commit(#db{main_pid=Pid, user_ctx=Ctx}=Db, DocBuckets1, % compaction. Retry by reopening the db and writing to the current file {ok, Db2} = open(Db#db.name, [{user_ctx, Ctx}]), DocBuckets2 = [ - [doc_flush_atts(Doc, Db2#db.fd) || Doc <- Bucket] || + [doc_flush_atts(Db2, Doc) || Doc <- Bucket] || Bucket <- DocBuckets1 ], % We only retry once @@ -1100,18 +1090,24 @@ write_and_commit(#db{main_pid=Pid, user_ctx=Ctx}=Db, DocBuckets1, prepare_doc_summaries(Db, BucketList) -> [lists:map( - fun(#doc{body = Body, atts = Atts} = Doc) -> + fun(#doc{atts = Atts} = Doc0) -> DiskAtts = [couch_att:to_disk_term(Att) || Att <- Atts], {ok, SizeInfo} = couch_att:size_info(Atts), - AttsFd = case Atts of - [Att | _] -> - {Fd, _} = couch_att:fetch(data, Att), - Fd; - [] -> - nil + AttsStream = case Atts of + [Att | _] -> + {stream, StreamEngine} = couch_att:fetch(data, Att), + StreamEngine; + [] -> + nil end, - SummaryChunk = couch_db_updater:make_doc_summary(Db, {Body, DiskAtts}), - Doc#doc{body = {summary, SummaryChunk, SizeInfo, AttsFd}} + Doc1 = Doc0#doc{ + atts = DiskAtts, + meta = [ + {size_info, SizeInfo}, + {atts_stream, AttsStream} + ] ++ Doc0#doc.meta + }, + couch_db_engine:serialize_doc(Db, Doc1) end, Bucket) || Bucket <- BucketList]. @@ -1136,12 +1132,8 @@ set_new_att_revpos(#doc{revs={RevPos,_Revs},atts=Atts0}=Doc) -> Doc#doc{atts = Atts}. -doc_flush_atts(Doc, Fd) -> - Doc#doc{atts=[couch_att:flush(Fd, Att) || Att <- Doc#doc.atts]}. - -check_md5(_NewSig, <<>>) -> ok; -check_md5(Sig, Sig) -> ok; -check_md5(_, _) -> throw(md5_mismatch). +doc_flush_atts(Db, Doc) -> + Doc#doc{atts=[couch_att:flush(Db, Att) || Att <- Doc#doc.atts]}. compressible_att_type(MimeType) when is_binary(MimeType) -> @@ -1171,21 +1163,24 @@ compressible_att_type(MimeType) -> % is present in the request, but there is no Content-MD5 % trailer, we're free to ignore this inconsistency and % pretend that no Content-MD5 exists. -with_stream(Fd, Att, Fun) -> +with_stream(Db, Att, Fun) -> [InMd5, Type, Enc] = couch_att:fetch([md5, type, encoding], Att), BufferSize = list_to_integer( config:get("couchdb", "attachment_stream_buffer_size", "4096")), - {ok, OutputStream} = case (Enc =:= identity) andalso - compressible_att_type(Type) of - true -> - CompLevel = list_to_integer( - config:get("attachments", "compression_level", "0") - ), - couch_stream:open(Fd, [{buffer_size, BufferSize}, - {encoding, gzip}, {compression_level, CompLevel}]); - _ -> - couch_stream:open(Fd, [{buffer_size, BufferSize}]) + Options = case (Enc =:= identity) andalso compressible_att_type(Type) of + true -> + CompLevel = list_to_integer( + config:get("attachments", "compression_level", "0") + ), + [ + {buffer_size, BufferSize}, + {encoding, gzip}, + {compression_level, CompLevel} + ]; + _ -> + [{buffer_size, BufferSize}] end, + {ok, OutputStream} = open_write_stream(Db, Options), ReqMd5 = case Fun(OutputStream) of {md5, FooterMd5} -> case InMd5 of @@ -1195,9 +1190,9 @@ with_stream(Fd, Att, Fun) -> _ -> InMd5 end, - {StreamInfo, Len, IdentityLen, Md5, IdentityMd5} = + {StreamEngine, Len, IdentityLen, Md5, IdentityMd5} = couch_stream:close(OutputStream), - check_md5(IdentityMd5, ReqMd5), + couch_util:check_md5(IdentityMd5, ReqMd5), {AttLen, DiskLen, NewEnc} = case Enc of identity -> case {Md5, IdentityMd5} of @@ -1219,7 +1214,7 @@ with_stream(Fd, Att, Fun) -> end end, couch_att:store([ - {data, {Fd,StreamInfo}}, + {data, {stream, StreamEngine}}, {att_len, AttLen}, {disk_len, DiskLen}, {md5, Md5}, @@ -1227,89 +1222,101 @@ with_stream(Fd, Att, Fun) -> ], Att). -enum_docs_since_reduce_to_count(Reds) -> - couch_btree:final_reduce( - fun couch_db_updater:btree_by_seq_reduce/2, Reds). +open_write_stream(Db, Options) -> + couch_db_engine:open_write_stream(Db, Options). -enum_docs_reduce_to_count(Reds) -> - FinalRed = couch_btree:final_reduce( - fun couch_db_updater:btree_by_id_reduce/2, Reds), - element(1, FinalRed). -changes_since(Db, StartSeq, Fun, Acc) -> - changes_since(Db, StartSeq, Fun, [], Acc). +open_read_stream(Db, AttState) -> + couch_db_engine:open_read_stream(Db, AttState). -changes_since(Db, StartSeq, Fun, Options, Acc) when is_record(Db, db) -> - changes_since(Db#db.seq_tree, StartSeq, Fun, Options, Acc); -changes_since(SeqTree, StartSeq, Fun, Options, Acc) -> - Wrapper = fun(FullDocInfo, _Offset, Acc2) -> - DocInfo = case FullDocInfo of - #full_doc_info{} -> - couch_doc:to_doc_info(FullDocInfo); - #doc_info{} -> - FullDocInfo - end, - Fun(DocInfo, Acc2) - end, - {ok, _LastReduction, AccOut} = couch_btree:fold(SeqTree, - Wrapper, Acc, [{start_key, StartSeq + 1}] ++ Options), - {ok, AccOut}. -count_changes_since(Db, SinceSeq) -> - BTree = Db#db.seq_tree, - {ok, Changes} = - couch_btree:fold_reduce(BTree, - fun(_SeqStart, PartialReds, 0) -> - {ok, couch_btree:final_reduce(BTree, PartialReds)} - end, - 0, [{start_key, SinceSeq + 1}]), - Changes. - -enum_docs_since(Db, SinceSeq, InFun, Acc, Options) -> - {ok, LastReduction, AccOut} = couch_btree:fold( - Db#db.seq_tree, InFun, Acc, - [{start_key, SinceSeq + 1} | Options]), - {ok, enum_docs_since_reduce_to_count(LastReduction), AccOut}. - -enum_docs(Db, InFun, InAcc, Options0) -> - {NS, Options} = extract_namespace(Options0), - enum_docs(Db, NS, InFun, InAcc, Options). - -enum_docs(Db, undefined, InFun, InAcc, Options) -> - FoldFun = pipe([fun skip_deleted/4], InFun), - {ok, LastReduce, OutAcc} = couch_btree:fold( - Db#db.id_tree, FoldFun, InAcc, Options), - {ok, enum_docs_reduce_to_count(LastReduce), OutAcc}; -enum_docs(Db, <<"_local">>, InFun, InAcc, Options) -> - FoldFun = pipe([fun skip_deleted/4], InFun), - {ok, _LastReduce, OutAcc} = couch_btree:fold( - Db#db.local_tree, FoldFun, InAcc, Options), - {ok, 0, OutAcc}; -enum_docs(Db, NS, InFun, InAcc, Options0) -> - FoldFun = pipe([ - fun skip_deleted/4, - stop_on_leaving_namespace(NS)], InFun), - Options = set_namespace_range(Options0, NS), - {ok, LastReduce, OutAcc} = couch_btree:fold( - Db#db.id_tree, FoldFun, InAcc, Options), - {ok, enum_docs_reduce_to_count(LastReduce), OutAcc}. - -extract_namespace(Options0) -> - case proplists:split(Options0, [namespace]) of - {[[{namespace, NS}]], Options} -> - {NS, Options}; - {_, Options} -> - {undefined, Options} +is_active_stream(Db, StreamEngine) -> + couch_db_engine:is_active_stream(Db, StreamEngine). + + +fold_docs(Db, Fun, Acc) -> + fold_docs(Db, Fun, Acc, []). + + +fold_docs(Db, UserFun, UserAcc, Options) -> + case lists:keyfind(namespace, 1, Options) of + {namespace, <<"_design">>} -> + fold_design_docs(Db, UserFun, UserAcc, Options); + {namespace, <<"_local">>} -> + fold_local_docs(Db, UserFun, UserAcc, Options); + _Else -> + fold_all_docs(Db, UserFun, UserAcc, Options) end. + +fold_changes(Db, StartSeq, Fun, Acc) -> + fold_changes(Db, StartSeq, Fun, Acc, []). + + +fold_changes(Db, StartSeq, UserFun, UserAcc, Opts) -> + Fun = get_doc_type_conv(Opts), + Acc1 = {Db, UserFun, UserAcc}, + {ok, Acc2} = couch_db_engine:fold_changes(Db, StartSeq, Fun, Acc1, Opts), + {_, _, FinalUserAcc} = Acc2, + {ok, FinalUserAcc}. + + +count_changes_since(Db, SinceSeq) -> + couch_db_engine:count_changes_since(Db, SinceSeq). + + %%% Internal function %%% + +fold_all_docs(Db, UserFun, UserAcc, Options) -> + % FIXME: THIS IS A HUGE HACK + % We'll have to figure out a different implementation + % for the _all_docs handler which is the only thing that + % uses include_reductions. + case lists:member(include_reductions, Options) of + true -> + couch_db_engine:fold_docs(Db, UserFun, UserAcc, Options); + false -> + Fun = get_doc_type_conv(Options), + Acc1 = {Db, UserFun, UserAcc}, + {ok, Acc2} = couch_db_engine:fold_docs(Db, Fun, Acc1, Options), + {_, _, FinalUserAcc} = Acc2, + {ok, FinalUserAcc} + end. + + +fold_design_docs(Db, UserFun, UserAcc, Options1) -> + Options2 = set_design_doc_keys(Options1), + + % FIXME: Same as above. couch_mrview is doing + % terribleness here. + case lists:member(include_reductions, Options2) of + true -> + couch_db_engine:fold_docs(Db, UserFun, UserAcc, Options2); + false -> + Fun1 = get_doc_type_conv(Options1), + Fun2 = fun only_ddoc_fold/2, + Acc1 = {Fun1, {Db, UserFun, UserAcc}}, + {ok, Acc2} = couch_db_engine:fold_docs(Db, Fun2, Acc1, Options2), + {_, {_, _, FinalUserAcc}} = Acc2, + {ok, FinalUserAcc} + end. + + +fold_local_docs(Db, UserFun, UserAcc, Options) -> + Fun = get_doc_type_conv(Options), + Acc1 = {Fun, {Db, UserFun, UserAcc}}, + {ok, Acc2} = couch_db_engine:fold_local_docs(Db, Fun, Acc1, Options), + {_, {_, _, FinalUserAcc}} = Acc2, + {ok, FinalUserAcc}. + + open_doc_revs_int(Db, IdRevs, Options) -> Ids = [Id || {Id, _Revs} <- IdRevs], LookupResults = get_full_doc_infos(Db, Ids), lists:zipwith( fun({Id, Revs}, Lookup) -> case Lookup of - {ok, #full_doc_info{rev_tree=RevTree}} -> + #full_doc_info{rev_tree=RevTree} -> {FoundRevs, MissingRevs} = case Revs of all -> @@ -1343,9 +1350,8 @@ open_doc_revs_int(Db, IdRevs, Options) -> IdRevs, LookupResults). open_doc_int(Db, <> = Id, Options) -> - case couch_btree:lookup(Db#db.local_tree, [Id]) of - [{ok, {_, {Rev, BodyData}}}] -> - Doc = #doc{id=Id, revs={0, [?l2b(integer_to_list(Rev))]}, body=BodyData}, + case couch_db_engine:open_local_docs(Db, [Id]) of + [#doc{} = Doc] -> apply_open_options({ok, Doc}, Options); [not_found] -> {not_found, missing} @@ -1364,7 +1370,7 @@ open_doc_int(Db, #full_doc_info{id=Id,rev_tree=RevTree}=FullDocInfo, Options) -> {ok, Doc#doc{meta=doc_meta_info(DocInfo, RevTree, Options)}}, Options); open_doc_int(Db, Id, Options) -> case get_full_doc_info(Db, Id) of - {ok, FullDocInfo} -> + #full_doc_info{} = FullDocInfo -> open_doc_int(Db, FullDocInfo, Options); not_found -> {not_found, missing} @@ -1410,8 +1416,8 @@ doc_meta_info(#doc_info{high_seq=Seq,revs=[#rev_info{rev=Rev}|RestInfo]}, RevTre true -> [{local_seq, Seq}] end. -read_doc(#db{fd=Fd}, Pos) -> - couch_file:pread_term(Fd, Pos). +read_doc(#db{} = Db, Ptr) -> + couch_db_engine:read_doc(Db, Ptr). make_doc(_Db, Id, Deleted, nil = _Bp, RevisionPath) -> @@ -1422,34 +1428,31 @@ make_doc(_Db, Id, Deleted, nil = _Bp, RevisionPath) -> atts = [], deleted = Deleted }; -make_doc(#db{fd=Fd, revs_limit=RevsLimit}=Db, Id, Deleted, Bp, {Pos, Revs}) -> - {BodyData, Atts0} = case Bp of - nil -> - {[], []}; - _ -> - case read_doc(Db, Bp) of - {ok, {BodyData0, Atts1}} when is_binary(Atts1) -> - {BodyData0, couch_compress:decompress(Atts1)}; - {ok, {BodyData0, Atts1}} when is_list(Atts1) -> - % pre 1.2 format - {BodyData0, Atts1} - end - end, - Atts = [couch_att:from_disk_term(Fd, T) || T <- Atts0], - Doc = #doc{ +make_doc(#db{} = Db, Id, Deleted, Bp, {Pos, Revs}) -> + Doc0 = couch_db_engine:read_doc_body(Db, #doc{ id = Id, revs = {Pos, lists:sublist(Revs, 1, RevsLimit)}, - body = BodyData, - atts = Atts, + body = Bp, deleted = Deleted - }, - after_doc_read(Db, Doc). + }), + Doc1 = case Doc0#doc.atts of + BinAtts when is_binary(BinAtts) -> + Doc0#doc{ + atts = couch_compress:decompress(BinAtts) + }; + ListAtts when is_list(ListAtts) -> + Doc0 + end, + after_doc_read(Db, Doc1#doc{ + atts = [couch_att:from_disk_term(Db, T) || T <- Doc1#doc.atts] + }). after_doc_read(#db{} = Db, Doc) -> DocWithBody = couch_doc:with_ejson_body(Doc), couch_db_plugin:after_doc_read(Db, DocWithBody). + increment_stat(#db{options = Options}, Stat) -> case lists:member(sys_db, Options) of true -> @@ -1458,71 +1461,6 @@ increment_stat(#db{options = Options}, Stat) -> couch_stats:increment_counter(Stat) end. -skip_deleted(traverse, LK, {Undeleted, _, _} = Reds, Acc) when Undeleted == 0 -> - {skip, LK, Reds, Acc}; -skip_deleted(Case, A, B, C) -> - {Case, A, B, C}. - -stop_on_leaving_namespace(NS) -> - fun - (visit, #full_doc_info{id = Key} = FullInfo, Reds, Acc) -> - case has_prefix(Key, NS) of - true -> - {visit, FullInfo, Reds, Acc}; - false -> - {stop, FullInfo, Reds, Acc} - end; - (Case, KV, Reds, Acc) -> - {Case, KV, Reds, Acc} - end. - -has_prefix(Bin, Prefix) -> - S = byte_size(Prefix), - case Bin of - <> -> - true; - _Else -> - false - end. - -pipe(Filters, Final) -> - Wrap = - fun - (visit, KV, Reds, Acc) -> - Final(KV, Reds, Acc); - (skip, _KV, _Reds, Acc) -> - {skip, Acc}; - (stop, _KV, _Reds, Acc) -> - {stop, Acc}; - (traverse, _, _, Acc) -> - {ok, Acc} - end, - do_pipe(Filters, Wrap). - -do_pipe([], Fun) -> Fun; -do_pipe([Filter|Rest], F0) -> - F1 = fun(C0, KV0, Reds0, Acc0) -> - {C, KV, Reds, Acc} = Filter(C0, KV0, Reds0, Acc0), - F0(C, KV, Reds, Acc) - end, - do_pipe(Rest, F1). - -set_namespace_range(Options, undefined) -> Options; -set_namespace_range(Options, NS) -> - %% FIXME depending on order we might need to swap keys - SK = select_gt( - proplists:get_value(start_key, Options, <<"">>), - <>), - EK = select_lt( - proplists:get_value(end_key, Options, <>), - <>), - [{start_key, SK}, {end_key_gt, EK}]. - -select_gt(V1, V2) when V1 < V2 -> V2; -select_gt(V1, _V2) -> V1. - -select_lt(V1, V2) when V1 > V2 -> V2; -select_lt(V1, _V2) -> V1. -spec normalize_dbname(list() | binary()) -> binary(). @@ -1562,6 +1500,117 @@ is_systemdb(DbName) when is_list(DbName) -> is_systemdb(DbName) when is_binary(DbName) -> lists:member(dbname_suffix(DbName), ?SYSTEM_DATABASES). + +get_doc_type_conv(Options) -> + lists:foldl(fun(Opt, Acc) -> + case Opt of + doc -> + fun conv_to_doc/2; + doc_info -> + fun conv_to_doc_info/2; + full_doc_info -> + fun conv_to_full_doc_info/2; + _ -> + Acc + end + end, fun conv_to_full_doc_info/2, Options). + + +conv_to_doc(#full_doc_info{}=FDI, {Db, UserFun, UserAcc}) -> + #full_doc_info{ + id = Id, + rev_tree = RevTree + } = FDI, + #doc_info{ + revs = [#rev_info{deleted = IsDeleted, rev = Rev, body_sp = Bp} | _] + } = couch_doc:to_doc_info(FDI), + {[{_, RevPath}], []} = couch_key_tree:get(RevTree, [Rev]), + Doc = make_doc(Db, Id, IsDeleted, Bp, RevPath), + {Go, NewUserAcc} = UserFun(Doc, UserAcc), + {Go, {Db, UserFun, NewUserAcc}}. + + +conv_to_doc_info(#full_doc_info{} = FDI, {Db, UserFun, UserAcc}) -> + DocInfo = couch_doc:to_doc_info(FDI), + {Go, NewUserAcc} = UserFun(DocInfo, UserAcc), + {Go, {Db, UserFun, NewUserAcc}}. + + +conv_to_full_doc_info(#full_doc_info{} = FDI, {Db, UserFun, UserAcc}) -> + {Go, NewUserAcc} = UserFun(FDI, UserAcc), + {Go, {Db, UserFun, NewUserAcc}}. + + +set_design_doc_keys(Options1) -> + Dir = case lists:keyfind(dir, 1, Options1) of + {dir, D0} -> D0; + _ -> fwd + end, + Options2 = set_design_doc_start_key(Options1, Dir), + set_design_doc_end_key(Options2, Dir). + + +-define(FIRST_DDOC_KEY, <<"_design/">>). +-define(LAST_DDOC_KEY, <<"_design0">>). + + +set_design_doc_start_key(Options, fwd) -> + Key1 = couch_util:get_value(start_key, Options, ?FIRST_DDOC_KEY), + Key2 = case Key1 < ?FIRST_DDOC_KEY of + true -> ?FIRST_DDOC_KEY; + false -> Key1 + end, + lists:keystore(start_key, 1, Options, {start_key, Key2}); +set_design_doc_start_key(Options, rev) -> + Key1 = couch_util:get_value(start_key, Options, ?LAST_DDOC_KEY), + Key2 = case Key1 > ?LAST_DDOC_KEY of + true -> ?LAST_DDOC_KEY; + false -> Key1 + end, + lists:keystore(start_key, 1, Options, {start_key, Key2}). + + +set_design_doc_end_key(Options, fwd) -> + case couch_util:get_value(end_key_gt, Options) of + undefined -> + Key1 = couch_util:get_value(end_key, Options, ?LAST_DDOC_KEY), + Key2 = case Key1 > ?LAST_DDOC_KEY of + true -> ?LAST_DDOC_KEY; + false -> Key1 + end, + lists:keystore(end_key, 1, Options, {end_key, Key2}); + EKeyGT -> + Key2 = case EKeyGT > ?LAST_DDOC_KEY of + true -> ?LAST_DDOC_KEY; + false -> EKeyGT + end, + lists:keystore(end_key_gt, 1, Options, {end_key_gt, Key2}) + end; +set_design_doc_end_key(Options, rev) -> + case couch_util:get_value(end_key_gt, Options) of + undefined -> + Key1 = couch_util:get_value(end_key, Options, ?LAST_DDOC_KEY), + Key2 = case Key1 < ?FIRST_DDOC_KEY of + true -> ?FIRST_DDOC_KEY; + false -> Key1 + end, + lists:keystore(end_key, 1, Options, {end_key, Key2}); + EKeyGT -> + Key2 = case EKeyGT < ?FIRST_DDOC_KEY of + true -> ?FIRST_DDOC_KEY; + false -> EKeyGT + end, + lists:keystore(end_key_gt, 1, Options, {end_key_gt, Key2}) + end. + + +only_ddoc_fold(#full_doc_info{id = <<"_design/", _/binary>>}=FDI, {Fun, Acc}) -> + {Go, NewAcc} = Fun(FDI, Acc), + {Go, {Fun, NewAcc}}; +only_ddoc_fold(_, _) -> + erlang:error(invalid_design_doc_fold). + + -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/a7c6713d/src/couch_db_engine.erl ---------------------------------------------------------------------- diff --git a/src/couch_db_engine.erl b/src/couch_db_engine.erl index d9ccd18..f22c66f 100644 --- a/src/couch_db_engine.erl +++ b/src/couch_db_engine.erl @@ -475,9 +475,6 @@ {ok, CompactedDbHandle::db_handle()}. --include("couch_db_int.hrl"). - - -export([ exists/2, delete/4,