Return-Path: X-Original-To: apmail-couchdb-commits-archive@www.apache.org Delivered-To: apmail-couchdb-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id B842411E64 for ; Tue, 6 May 2014 12:46:50 +0000 (UTC) Received: (qmail 60276 invoked by uid 500); 6 May 2014 12:41:10 -0000 Delivered-To: apmail-couchdb-commits-archive@couchdb.apache.org Received: (qmail 60032 invoked by uid 500); 6 May 2014 12:41:05 -0000 Mailing-List: contact commits-help@couchdb.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@couchdb.apache.org Delivered-To: mailing list commits@couchdb.apache.org Received: (qmail 59678 invoked by uid 99); 6 May 2014 12:40:57 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 06 May 2014 12:40:57 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id A2F519352CC; Tue, 6 May 2014 12:40:56 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: rnewson@apache.org To: commits@couchdb.apache.org Date: Tue, 06 May 2014 12:41:00 -0000 Message-Id: <61a71c56ac0341059de64c452026f8f4@git.apache.org> In-Reply-To: <7da1be4b517f431ca800b5b42fe89a88@git.apache.org> References: <7da1be4b517f431ca800b5b42fe89a88@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [5/6] Merge remote-tracking branch 'origin/import-master' http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/9d629ff6/src/couch_db_updater.erl ---------------------------------------------------------------------- diff --cc src/couch_db_updater.erl index 649826a,0000000..901e8c3 mode 100644,000000..100644 --- a/src/couch_db_updater.erl +++ b/src/couch_db_updater.erl @@@ -1,1264 -1,0 +1,1269 @@@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_db_updater). +-behaviour(gen_server). + +-export([btree_by_id_split/1, btree_by_id_join/2, btree_by_id_reduce/2]). +-export([btree_by_seq_split/1, btree_by_seq_join/2, btree_by_seq_reduce/2]). +-export([make_doc_summary/2]). +-export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]). + +-include_lib("couch/include/couch_db.hrl"). + +-record(comp_header, { + db_header, + meta_state +}). + +-record(merge_st, { + id_tree, + seq_tree, + curr, + rem_seqs, + infos +}). + +init({DbName, Filepath, Fd, Options}) -> + case lists:member(create, Options) of + true -> + % create a new header and writes it to the file + Header = #db_header{}, + ok = couch_file:write_header(Fd, Header), + % delete any old compaction files that might be hanging around + RootDir = config:get("couchdb", "database_dir", "."), + couch_file:delete(RootDir, Filepath ++ ".compact"), + couch_file:delete(RootDir, Filepath ++ ".compact.data"), + couch_file:delete(RootDir, Filepath ++ ".compact.meta"); + false -> + case couch_file:read_header(Fd) of + {ok, Header} -> + ok; + no_valid_header -> + % create a new header and writes it to the file + Header = #db_header{}, + ok = couch_file:write_header(Fd, Header), + % delete any old compaction files that might be hanging around + file:delete(Filepath ++ ".compact"), + file:delete(Filepath ++ ".compact.data"), + file:delete(Filepath ++ ".compact.meta") + end + end, + Db = init_db(DbName, Filepath, Fd, Header, Options), + % we don't load validation funs here because the fabric query is liable to + % race conditions. Instead see couch_db:validate_doc_update, which loads + % them lazily + {ok, Db#db{main_pid = self()}}. + + +terminate(_Reason, Db) -> + % If the reason we died is becuase our fd disappeared + % then we don't need to try closing it again. + case Db#db.fd of + Pid when is_pid(Pid) -> + ok = couch_file:close(Db#db.fd); + _ -> + ok + end, + couch_util:shutdown_sync(Db#db.compactor_pid), + couch_util:shutdown_sync(Db#db.fd), + ok. + +handle_call(get_db, _From, Db) -> + {reply, {ok, Db}, Db}; +handle_call(full_commit, _From, #db{waiting_delayed_commit=nil}=Db) -> + {reply, ok, Db}; % no data waiting, return ok immediately +handle_call(full_commit, _From, Db) -> + {reply, ok, commit_data(Db)}; +handle_call({full_commit, RequiredSeq}, _From, Db) + when RequiredSeq =< Db#db.committed_update_seq -> + {reply, ok, Db}; +handle_call({full_commit, _}, _, Db) -> + {reply, ok, commit_data(Db)}; % commit the data and return ok +handle_call(start_compact, _From, Db) -> + {noreply, NewDb} = handle_cast(start_compact, Db), + {reply, {ok, NewDb#db.compactor_pid}, NewDb}; +handle_call(compactor_pid, _From, #db{compactor_pid = Pid} = Db) -> + {reply, Pid, Db}; +handle_call(cancel_compact, _From, #db{compactor_pid = nil} = Db) -> + {reply, ok, Db}; +handle_call(cancel_compact, _From, #db{compactor_pid = Pid} = Db) -> + unlink(Pid), + exit(Pid, kill), + RootDir = config:get("couchdb", "database_dir", "."), + ok = couch_file:delete(RootDir, Db#db.filepath ++ ".compact"), + Db2 = Db#db{compactor_pid = nil}, + ok = gen_server:call(couch_server, {db_updated, Db2}, infinity), + {reply, ok, Db2}; +handle_call(increment_update_seq, _From, Db) -> + Db2 = commit_data(Db#db{update_seq=Db#db.update_seq+1}), + ok = gen_server:call(couch_server, {db_updated, Db2}, infinity), + couch_db_update_notifier:notify({updated, Db#db.name}), + {reply, {ok, Db2#db.update_seq}, Db2}; + +handle_call({set_security, NewSec}, _From, #db{compression = Comp} = Db) -> + {ok, Ptr, _} = couch_file:append_term( + Db#db.fd, NewSec, [{compression, Comp}]), + Db2 = commit_data(Db#db{security=NewSec, security_ptr=Ptr, + update_seq=Db#db.update_seq+1}), + ok = gen_server:call(couch_server, {db_updated, Db2}, infinity), + {reply, ok, Db2}; + +handle_call({set_revs_limit, Limit}, _From, Db) -> + Db2 = commit_data(Db#db{revs_limit=Limit, + update_seq=Db#db.update_seq+1}), + ok = gen_server:call(couch_server, {db_updated, Db2}, infinity), + {reply, ok, Db2}; + +handle_call({purge_docs, _IdRevs}, _From, + #db{compactor_pid=Pid}=Db) when Pid /= nil -> + {reply, {error, purge_during_compaction}, Db}; +handle_call({purge_docs, IdRevs}, _From, Db) -> + #db{ + fd = Fd, + id_tree = DocInfoByIdBTree, + seq_tree = DocInfoBySeqBTree, + update_seq = LastSeq, + header = Header = #db_header{purge_seq=PurgeSeq}, + compression = Comp + } = Db, + DocLookups = couch_btree:lookup(DocInfoByIdBTree, + [Id || {Id, _Revs} <- IdRevs]), + + NewDocInfos = lists:zipwith( + fun({_Id, Revs}, {ok, #full_doc_info{rev_tree=Tree}=FullDocInfo}) -> + case couch_key_tree:remove_leafs(Tree, Revs) of + {_, []=_RemovedRevs} -> % no change + nil; + {NewTree, RemovedRevs} -> + {FullDocInfo#full_doc_info{rev_tree=NewTree},RemovedRevs} + end; + (_, not_found) -> + nil + end, + IdRevs, DocLookups), + + SeqsToRemove = [Seq + || {#full_doc_info{update_seq=Seq},_} <- NewDocInfos], + + FullDocInfoToUpdate = [FullInfo + || {#full_doc_info{rev_tree=Tree}=FullInfo,_} + <- NewDocInfos, Tree /= []], + + IdRevsPurged = [{Id, Revs} + || {#full_doc_info{id=Id}, Revs} <- NewDocInfos], + + {DocInfoToUpdate, NewSeq} = lists:mapfoldl( + fun(#full_doc_info{rev_tree=Tree}=FullInfo, SeqAcc) -> + Tree2 = couch_key_tree:map_leafs( + fun(_RevId, Leaf) -> + Leaf#leaf{seq=SeqAcc+1} + end, Tree), + {FullInfo#full_doc_info{rev_tree=Tree2}, SeqAcc + 1} + end, LastSeq, FullDocInfoToUpdate), + + IdsToRemove = [Id || {#full_doc_info{id=Id,rev_tree=[]},_} + <- NewDocInfos], + + {ok, DocInfoBySeqBTree2} = couch_btree:add_remove(DocInfoBySeqBTree, + DocInfoToUpdate, SeqsToRemove), + {ok, DocInfoByIdBTree2} = couch_btree:add_remove(DocInfoByIdBTree, + FullDocInfoToUpdate, IdsToRemove), + {ok, Pointer, _} = couch_file:append_term( + Fd, IdRevsPurged, [{compression, Comp}]), + + Db2 = commit_data( + Db#db{ + id_tree = DocInfoByIdBTree2, + seq_tree = DocInfoBySeqBTree2, + update_seq = NewSeq + 1, + header=Header#db_header{purge_seq=PurgeSeq+1, purged_docs=Pointer}}), + + ok = gen_server:call(couch_server, {db_updated, Db2}, infinity), + couch_db_update_notifier:notify({updated, Db#db.name}), + {reply, {ok, (Db2#db.header)#db_header.purge_seq, IdRevsPurged}, Db2}. + + +handle_cast({load_validation_funs, ValidationFuns}, Db) -> + Db2 = Db#db{validate_doc_funs = ValidationFuns}, + ok = gen_server:call(couch_server, {db_updated, Db2}, infinity), + {noreply, Db2}; +handle_cast(start_compact, Db) -> + case Db#db.compactor_pid of + nil -> + ?LOG_INFO("Starting compaction for db \"~s\"", [Db#db.name]), + Pid = spawn_link(fun() -> start_copy_compact(Db) end), + Db2 = Db#db{compactor_pid=Pid}, + ok = gen_server:call(couch_server, {db_updated, Db2}, infinity), + {noreply, Db2}; + _ -> + % compact currently running, this is a no-op + {noreply, Db} + end; +handle_cast({compact_done, CompactFilepath}, #db{filepath=Filepath}=Db) -> + {ok, NewFd} = couch_file:open(CompactFilepath), + {ok, NewHeader} = couch_file:read_header(NewFd), + #db{update_seq=NewSeq} = NewDb = + init_db(Db#db.name, Filepath, NewFd, NewHeader, Db#db.options), + unlink(NewFd), + case Db#db.update_seq == NewSeq of + true -> + % suck up all the local docs into memory and write them to the new db + {ok, _, LocalDocs} = couch_btree:foldl(Db#db.local_tree, + fun(Value, _Offset, Acc) -> {ok, [Value | Acc]} end, []), + {ok, NewLocalBtree} = couch_btree:add(NewDb#db.local_tree, LocalDocs), + + NewDb2 = commit_data(NewDb#db{ + local_tree = NewLocalBtree, + main_pid = self(), + filepath = Filepath, + instance_start_time = Db#db.instance_start_time, + revs_limit = Db#db.revs_limit + }), + + ?LOG_DEBUG("CouchDB swapping files ~s and ~s.", + [Filepath, CompactFilepath]), + ok = file:rename(CompactFilepath, Filepath ++ ".compact"), + RootDir = config:get("couchdb", "database_dir", "."), + couch_file:delete(RootDir, Filepath), + ok = file:rename(Filepath ++ ".compact", Filepath), + % Delete the old meta compaction file after promoting + % the compaction file. + couch_file:delete(RootDir, Filepath ++ ".compact.meta"), + close_db(Db), + NewDb3 = refresh_validate_doc_funs(NewDb2), + ok = gen_server:call(couch_server, {db_updated, NewDb3}, infinity), + couch_db_update_notifier:notify({compacted, NewDb3#db.name}), + ?LOG_INFO("Compaction for db \"~s\" completed.", [Db#db.name]), + {noreply, NewDb3#db{compactor_pid=nil}}; + false -> + ?LOG_INFO("Compaction file still behind main file " + "(update seq=~p. compact update seq=~p). Retrying.", + [Db#db.update_seq, NewSeq]), + close_db(NewDb), + Pid = spawn_link(fun() -> start_copy_compact(Db) end), + Db2 = Db#db{compactor_pid=Pid}, + ok = gen_server:call(couch_server, {db_updated, Db2}, infinity), + {noreply, Db2} + end; + +handle_cast(Msg, #db{name = Name} = Db) -> + ?LOG_ERROR("Database `~s` updater received unexpected cast: ~p", [Name, Msg]), + {stop, Msg, Db}. + + +handle_info({update_docs, Client, GroupedDocs, NonRepDocs, MergeConflicts, + FullCommit}, Db) -> + GroupedDocs2 = [[{Client, D} || D <- DocGroup] || DocGroup <- GroupedDocs], + if NonRepDocs == [] -> + {GroupedDocs3, Clients, FullCommit2} = collect_updates(GroupedDocs2, + [Client], MergeConflicts, FullCommit); + true -> + GroupedDocs3 = GroupedDocs2, + FullCommit2 = FullCommit, + Clients = [Client] + end, + NonRepDocs2 = [{Client, NRDoc} || NRDoc <- NonRepDocs], + try update_docs_int(Db, GroupedDocs3, NonRepDocs2, MergeConflicts, + FullCommit2) of + {ok, Db2, UpdatedDDocIds} -> + ok = gen_server:call(couch_server, {db_updated, Db2}, infinity), + if Db2#db.update_seq /= Db#db.update_seq -> + couch_db_update_notifier:notify({updated, Db2#db.name}); + true -> ok + end, + [catch(ClientPid ! {done, self()}) || ClientPid <- Clients], + lists:foreach(fun(DDocId) -> + couch_db_update_notifier:notify({ddoc_updated, {Db#db.name, DDocId}}) + end, UpdatedDDocIds), + {noreply, Db2, hibernate} + catch + throw: retry -> + [catch(ClientPid ! {retry, self()}) || ClientPid <- Clients], + {noreply, Db, hibernate} + end; +handle_info(delayed_commit, #db{waiting_delayed_commit=nil}=Db) -> + %no outstanding delayed commits, ignore + {noreply, Db}; +handle_info(delayed_commit, Db) -> + case commit_data(Db) of + Db -> + {noreply, Db}; + Db2 -> + ok = gen_server:call(couch_server, {db_updated, Db2}, infinity), + {noreply, Db2} + end; +handle_info({'EXIT', _Pid, normal}, Db) -> + {noreply, Db}; +handle_info({'EXIT', _Pid, Reason}, Db) -> + {stop, Reason, Db}; +handle_info({'DOWN', Ref, _, _, Reason}, #db{fd_monitor=Ref, name=Name} = Db) -> + ?LOG_ERROR("DB ~s shutting down - Fd ~p", [Name, Reason]), + {stop, normal, Db#db{fd=undefined, fd_monitor=undefined}}. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +merge_updates([[{_,{#doc{id=X},_}}|_]=A|RestA], [[{_,{#doc{id=X},_}}|_]=B|RestB]) -> + [A++B | merge_updates(RestA, RestB)]; +merge_updates([[{_,{#doc{id=X},_}}|_]|_]=A, [[{_,{#doc{id=Y},_}}|_]|_]=B) when X < Y -> + [hd(A) | merge_updates(tl(A), B)]; +merge_updates([[{_,{#doc{id=X},_}}|_]|_]=A, [[{_,{#doc{id=Y},_}}|_]|_]=B) when X > Y -> + [hd(B) | merge_updates(A, tl(B))]; +merge_updates([], RestB) -> + RestB; +merge_updates(RestA, []) -> + RestA. + +collect_updates(GroupedDocsAcc, ClientsAcc, MergeConflicts, FullCommit) -> + receive + % Only collect updates with the same MergeConflicts flag and without + % local docs. It's easier to just avoid multiple _local doc + % updaters than deal with their possible conflicts, and local docs + % writes are relatively rare. Can be optmized later if really needed. + {update_docs, Client, GroupedDocs, [], MergeConflicts, FullCommit2} -> + GroupedDocs2 = [[{Client, Doc} || Doc <- DocGroup] + || DocGroup <- GroupedDocs], + GroupedDocsAcc2 = + merge_updates(GroupedDocsAcc, GroupedDocs2), + collect_updates(GroupedDocsAcc2, [Client | ClientsAcc], + MergeConflicts, (FullCommit or FullCommit2)) + after 0 -> + {GroupedDocsAcc, ClientsAcc, FullCommit} + end. + +rev_tree(DiskTree) -> + couch_key_tree:mapfold(fun + (_RevId, {IsDeleted, BodyPointer, UpdateSeq}, leaf, _Acc) -> + % pre 1.2 format, will be upgraded on compaction + {#leaf{deleted=?i2b(IsDeleted), ptr=BodyPointer, seq=UpdateSeq}, nil}; + (_RevId, {IsDeleted, BodyPointer, UpdateSeq}, branch, Acc) -> + {#leaf{deleted=?i2b(IsDeleted), ptr=BodyPointer, seq=UpdateSeq}, Acc}; + (_RevId, {IsDeleted, BodyPointer, UpdateSeq, Size}, leaf, Acc) -> + Acc2 = sum_leaf_sizes(Acc, Size), + {#leaf{deleted=?i2b(IsDeleted), ptr=BodyPointer, seq=UpdateSeq, size=Size}, Acc2}; + (_RevId, {IsDeleted, BodyPointer, UpdateSeq, Size}, branch, Acc) -> + {#leaf{deleted=?i2b(IsDeleted), ptr=BodyPointer, seq=UpdateSeq, size=Size}, Acc}; + (_RevId, ?REV_MISSING, _Type, Acc) -> + {?REV_MISSING, Acc} + end, 0, DiskTree). + +disk_tree(RevTree) -> + couch_key_tree:map(fun + (_RevId, ?REV_MISSING) -> + ?REV_MISSING; + (_RevId, #leaf{deleted=IsDeleted, ptr=BodyPointer, seq=UpdateSeq, size=Size}) -> + {?b2i(IsDeleted), BodyPointer, UpdateSeq, Size} + end, RevTree). + +btree_by_seq_split(#full_doc_info{id=Id, update_seq=Seq, deleted=Del, rev_tree=T}) -> + {Seq, {Id, ?b2i(Del), disk_tree(T)}}. + +btree_by_seq_join(Seq, {Id, Del, DiskTree}) when is_integer(Del) -> + {RevTree, LeafsSize} = rev_tree(DiskTree), + #full_doc_info{ + id = Id, + update_seq = Seq, + deleted = ?i2b(Del), + rev_tree = RevTree, + leafs_size = LeafsSize + }; +btree_by_seq_join(KeySeq, {Id, RevInfos, DeletedRevInfos}) -> + % Older versions stored #doc_info records in the seq_tree. + % Compact to upgrade. + #doc_info{ + id = Id, + high_seq=KeySeq, + revs = + [#rev_info{rev=Rev,seq=Seq,deleted=false,body_sp = Bp} || + {Rev, Seq, Bp} <- RevInfos] ++ + [#rev_info{rev=Rev,seq=Seq,deleted=true,body_sp = Bp} || + {Rev, Seq, Bp} <- DeletedRevInfos]}. + +btree_by_id_split(#full_doc_info{id=Id, update_seq=Seq, + deleted=Deleted, rev_tree=Tree}) -> + {Id, {Seq, ?b2i(Deleted), disk_tree(Tree)}}. + +btree_by_id_join(Id, {HighSeq, Deleted, DiskTree}) -> + {Tree, LeafsSize} = rev_tree(DiskTree), + #full_doc_info{ + id = Id, + update_seq = HighSeq, + deleted = ?i2b(Deleted), + rev_tree = Tree, + leafs_size = LeafsSize + }. + +btree_by_id_reduce(reduce, FullDocInfos) -> + lists:foldl( + fun(Info, {NotDeleted, Deleted, Size}) -> + Size2 = sum_leaf_sizes(Size, Info#full_doc_info.leafs_size), + case Info#full_doc_info.deleted of + true -> + {NotDeleted, Deleted + 1, Size2}; + false -> + {NotDeleted + 1, Deleted, Size2} + end + end, + {0, 0, 0}, FullDocInfos); +btree_by_id_reduce(rereduce, Reds) -> + lists:foldl( + fun({NotDeleted, Deleted}, {AccNotDeleted, AccDeleted, _AccSize}) -> + % pre 1.2 format, will be upgraded on compaction + {AccNotDeleted + NotDeleted, AccDeleted + Deleted, nil}; + ({NotDeleted, Deleted, Size}, {AccNotDeleted, AccDeleted, AccSize}) -> + AccSize2 = sum_leaf_sizes(AccSize, Size), + {AccNotDeleted + NotDeleted, AccDeleted + Deleted, AccSize2} + end, + {0, 0, 0}, Reds). + +sum_leaf_sizes(nil, _) -> + nil; +sum_leaf_sizes(_, nil) -> + nil; +sum_leaf_sizes(Size1, Size2) -> + Size1 + Size2. + +btree_by_seq_reduce(reduce, DocInfos) -> + % count the number of documents + length(DocInfos); +btree_by_seq_reduce(rereduce, Reds) -> + lists:sum(Reds). + +simple_upgrade_record(Old, New) when tuple_size(Old) < tuple_size(New) -> + OldSz = tuple_size(Old), + NewValuesTail = + lists:sublist(tuple_to_list(New), OldSz + 1, tuple_size(New) - OldSz), + list_to_tuple(tuple_to_list(Old) ++ NewValuesTail); +simple_upgrade_record(Old, _New) -> + Old. + +-define(OLD_DISK_VERSION_ERROR, + "Database files from versions smaller than 0.10.0 are no longer supported"). + +init_db(DbName, Filepath, Fd, Header0, Options) -> + Header1 = simple_upgrade_record(Header0, #db_header{}), + Header = + case element(2, Header1) of + 1 -> throw({database_disk_version_error, ?OLD_DISK_VERSION_ERROR}); + 2 -> throw({database_disk_version_error, ?OLD_DISK_VERSION_ERROR}); + 3 -> throw({database_disk_version_error, ?OLD_DISK_VERSION_ERROR}); + 4 -> Header1#db_header{security_ptr = nil}; % 0.10 and pre 0.11 + 5 -> Header1; % pre 1.2 + ?LATEST_DISK_VERSION -> Header1; + _ -> throw({database_disk_version_error, "Incorrect disk header version"}) + end, + + {ok, FsyncOptions} = couch_util:parse_term( + config:get("couchdb", "fsync_options", + "[before_header, after_header, on_file_open]")), + + case lists:member(on_file_open, FsyncOptions) of + true -> ok = couch_file:sync(Fd); + _ -> ok + end, + + Compression = couch_compress:get_compression_method(), + + {ok, IdBtree} = couch_btree:open(Header#db_header.id_tree_state, Fd, + [{split, fun ?MODULE:btree_by_id_split/1}, + {join, fun ?MODULE:btree_by_id_join/2}, + {reduce, fun ?MODULE:btree_by_id_reduce/2}, + {compression, Compression}]), + {ok, SeqBtree} = couch_btree:open(Header#db_header.seq_tree_state, Fd, + [{split, fun ?MODULE:btree_by_seq_split/1}, + {join, fun ?MODULE:btree_by_seq_join/2}, + {reduce, fun ?MODULE:btree_by_seq_reduce/2}, + {compression, Compression}]), + {ok, LocalDocsBtree} = couch_btree:open(Header#db_header.local_tree_state, Fd, + [{compression, Compression}]), + case Header#db_header.security_ptr of + nil -> + Security = [], + SecurityPtr = nil; + SecurityPtr -> + {ok, Security} = couch_file:pread_term(Fd, SecurityPtr) + end, + % convert start time tuple to microsecs and store as a binary string + {MegaSecs, Secs, MicroSecs} = now(), + StartTime = ?l2b(io_lib:format("~p", + [(MegaSecs*1000000*1000000) + (Secs*1000000) + MicroSecs])), + ok = couch_file:set_db_pid(Fd, self()), + #db{ + fd=Fd, + fd_monitor = erlang:monitor(process, Fd), + header=Header, + id_tree = IdBtree, + seq_tree = SeqBtree, + local_tree = LocalDocsBtree, + committed_update_seq = Header#db_header.update_seq, + update_seq = Header#db_header.update_seq, + name = DbName, + filepath = Filepath, + security = Security, + security_ptr = SecurityPtr, + instance_start_time = StartTime, + revs_limit = Header#db_header.revs_limit, + fsync_options = FsyncOptions, + options = Options, + compression = Compression, + before_doc_update = couch_util:get_value(before_doc_update, Options, nil), + after_doc_read = couch_util:get_value(after_doc_read, Options, nil) + }. + + +close_db(#db{fd_monitor = Ref}) -> + erlang:demonitor(Ref). + + +refresh_validate_doc_funs(#db{name = <<"shards/", _/binary>> = Name} = Db) -> + spawn(fabric, reset_validation_funs, [mem3:dbname(Name)]), + Db#db{validate_doc_funs = undefined}; +refresh_validate_doc_funs(Db0) -> + Db = Db0#db{user_ctx = #user_ctx{roles=[<<"_admin">>]}}, + {ok, DesignDocs} = couch_db:get_design_docs(Db), + ProcessDocFuns = lists:flatmap( + fun(DesignDocInfo) -> + {ok, DesignDoc} = couch_db:open_doc_int( + Db, DesignDocInfo, [ejson_body]), + case couch_doc:get_validate_doc_fun(DesignDoc) of + nil -> []; + Fun -> [Fun] + end + end, DesignDocs), + Db#db{validate_doc_funs=ProcessDocFuns}. + +% rev tree functions + +flush_trees(_Db, [], AccFlushedTrees) -> + {ok, lists:reverse(AccFlushedTrees)}; +flush_trees(#db{fd = Fd} = Db, + [InfoUnflushed | RestUnflushed], AccFlushed) -> + #full_doc_info{update_seq=UpdateSeq, rev_tree=Unflushed} = InfoUnflushed, + {Flushed, LeafsSize} = couch_key_tree:mapfold( + fun(_Rev, Value, Type, Acc) -> + case Value of + #doc{deleted = IsDeleted, body = {summary, Summary, AttsFd}} -> + % this node value is actually an unwritten document summary, + % write to disk. + % make sure the Fd in the written bins is the same Fd we are + % and convert bins, removing the FD. + % All bins should have been written to disk already. + case {AttsFd, Fd} of + {nil, _} -> + ok; + {SameFd, SameFd} -> + ok; + _ -> + % Fd where the attachments were written to is not the same + % as our Fd. This can happen when a database is being + % switched out during a compaction. + ?LOG_DEBUG("File where the attachments are written has" + " changed. Possibly retrying.", []), + throw(retry) + end, + {ok, NewSummaryPointer, SummarySize} = + couch_file:append_raw_chunk(Fd, Summary), + TotalSize = lists:foldl( + fun(#att{att_len = L}, A) -> A + L end, + SummarySize, Value#doc.atts), + NewValue = #leaf{deleted=IsDeleted, ptr=NewSummaryPointer, + seq=UpdateSeq, size=TotalSize}, + case Type of + leaf -> + {NewValue, Acc + TotalSize}; + branch -> + {NewValue, Acc} + end; + {_, _, _, LeafSize} when Type =:= leaf, LeafSize =/= nil -> + {Value, Acc + LeafSize}; + _ -> + {Value, Acc} + end + end, 0, Unflushed), + InfoFlushed = InfoUnflushed#full_doc_info{ + rev_tree = Flushed, + leafs_size = LeafsSize + }, + flush_trees(Db, RestUnflushed, [InfoFlushed | AccFlushed]). + + +send_result(Client, Ref, NewResult) -> + % used to send a result to the client + catch(Client ! {result, self(), {Ref, NewResult}}). + +merge_rev_trees(_Limit, _Merge, [], [], AccNewInfos, AccRemoveSeqs, AccSeq) -> + {ok, lists:reverse(AccNewInfos), AccRemoveSeqs, AccSeq}; +merge_rev_trees(Limit, MergeConflicts, [NewDocs|RestDocsList], + [OldDocInfo|RestOldInfo], AccNewInfos, AccRemoveSeqs, AccSeq) -> + #full_doc_info{id=Id,rev_tree=OldTree,deleted=OldDeleted0,update_seq=OldSeq} + = OldDocInfo, + {NewRevTree, _} = lists:foldl( + fun({Client, {#doc{revs={Pos,[_Rev|PrevRevs]}}=NewDoc, Ref}}, {AccTree, OldDeleted}) -> + if not MergeConflicts -> + case couch_key_tree:merge(AccTree, couch_doc:to_path(NewDoc), + Limit) of + {_NewTree, conflicts} when (not OldDeleted) -> + send_result(Client, Ref, conflict), + {AccTree, OldDeleted}; + {NewTree, conflicts} when PrevRevs /= [] -> + % Check to be sure if prev revision was specified, it's + % a leaf node in the tree + Leafs = couch_key_tree:get_all_leafs(AccTree), + IsPrevLeaf = lists:any(fun({_, {LeafPos, [LeafRevId|_]}}) -> + {LeafPos, LeafRevId} == {Pos-1, hd(PrevRevs)} + end, Leafs), + if IsPrevLeaf -> + {NewTree, OldDeleted}; + true -> + send_result(Client, Ref, conflict), + {AccTree, OldDeleted} + end; + {NewTree, no_conflicts} when AccTree == NewTree -> + % the tree didn't change at all + % meaning we are saving a rev that's already + % been editted again. + if (Pos == 1) and OldDeleted -> + % this means we are recreating a brand new document + % into a state that already existed before. + % put the rev into a subsequent edit of the deletion + #doc_info{revs=[#rev_info{rev={OldPos,OldRev}}|_]} = + couch_doc:to_doc_info(OldDocInfo), + NewRevId = couch_db:new_revid( + NewDoc#doc{revs={OldPos, [OldRev]}}), + NewDoc2 = NewDoc#doc{revs={OldPos + 1, [NewRevId, OldRev]}}, + {NewTree2, _} = couch_key_tree:merge(AccTree, + couch_doc:to_path(NewDoc2), Limit), + % we changed the rev id, this tells the caller we did + send_result(Client, Ref, {ok, {OldPos + 1, NewRevId}}), + {NewTree2, OldDeleted}; + true -> + send_result(Client, Ref, conflict), + {AccTree, OldDeleted} + end; + {NewTree, _} -> + {NewTree, NewDoc#doc.deleted} + end; + true -> + {NewTree, _} = couch_key_tree:merge(AccTree, + couch_doc:to_path(NewDoc), Limit), + {NewTree, OldDeleted} + end + end, + {OldTree, OldDeleted0}, NewDocs), + if NewRevTree == OldTree -> + % nothing changed + merge_rev_trees(Limit, MergeConflicts, RestDocsList, RestOldInfo, + AccNewInfos, AccRemoveSeqs, AccSeq); + true -> + % we have updated the document, give it a new seq # + NewInfo = #full_doc_info{id=Id,update_seq=AccSeq+1,rev_tree=NewRevTree}, + RemoveSeqs = case OldSeq of + 0 -> AccRemoveSeqs; + _ -> [OldSeq | AccRemoveSeqs] + end, + merge_rev_trees(Limit, MergeConflicts, RestDocsList, RestOldInfo, + [NewInfo|AccNewInfos], RemoveSeqs, AccSeq+1) + end. + + + +new_index_entries([], AccById, AccDDocIds) -> + {AccById, AccDDocIds}; +new_index_entries([#full_doc_info{id=Id}=Info | Rest], AccById, AccDDocIds) -> + #doc_info{revs=[#rev_info{deleted=Del}|_]} = couch_doc:to_doc_info(Info), + AccById2 = [Info#full_doc_info{deleted=Del} | AccById], + AccDDocIds2 = case Id of + <> -> [Id | AccDDocIds]; + _ -> AccDDocIds + end, + new_index_entries(Rest, AccById2, AccDDocIds2). + + +stem_full_doc_infos(#db{revs_limit=Limit}, DocInfos) -> + [Info#full_doc_info{rev_tree=couch_key_tree:stem(Tree, Limit)} || + #full_doc_info{rev_tree=Tree}=Info <- DocInfos]. + +update_docs_int(Db, DocsList, NonRepDocs, MergeConflicts, FullCommit) -> + #db{ + id_tree = DocInfoByIdBTree, + seq_tree = DocInfoBySeqBTree, + update_seq = LastSeq, + revs_limit = RevsLimit + } = Db, + Ids = [Id || [{_Client, {#doc{id=Id}, _Ref}}|_] <- DocsList], + % lookup up the old documents, if they exist. + OldDocLookups = couch_btree:lookup(DocInfoByIdBTree, Ids), + OldDocInfos = lists:zipwith( + fun(_Id, {ok, FullDocInfo}) -> + FullDocInfo; + (Id, not_found) -> + #full_doc_info{id=Id} + end, + Ids, OldDocLookups), + % Merge the new docs into the revision trees. + {ok, NewFullDocInfos, RemoveSeqs, NewSeq} = merge_rev_trees(RevsLimit, + MergeConflicts, DocsList, OldDocInfos, [], [], LastSeq), + + % All documents are now ready to write. + + {ok, Db2} = update_local_docs(Db, NonRepDocs), + + % Write out the document summaries (the bodies are stored in the nodes of + % the trees, the attachments are already written to disk) + {ok, FlushedFullDocInfos} = flush_trees(Db2, NewFullDocInfos, []), + + {IndexFullDocInfos, UpdatedDDocIds} = + new_index_entries(FlushedFullDocInfos, [], []), + + % and the indexes + {ok, DocInfoByIdBTree2} = couch_btree:add_remove(DocInfoByIdBTree, IndexFullDocInfos, []), + {ok, DocInfoBySeqBTree2} = couch_btree:add_remove(DocInfoBySeqBTree, IndexFullDocInfos, RemoveSeqs), + + Db3 = Db2#db{ + id_tree = DocInfoByIdBTree2, + seq_tree = DocInfoBySeqBTree2, + update_seq = NewSeq}, + + % Check if we just updated any design documents, and update the validation + % funs if we did. + Db4 = case length(UpdatedDDocIds) > 0 of + true -> + ddoc_cache:evict(Db3#db.name, UpdatedDDocIds), + refresh_validate_doc_funs(Db3); + false -> + Db3 + end, + + {ok, commit_data(Db4, not FullCommit), UpdatedDDocIds}. + +update_local_docs(Db, []) -> + {ok, Db}; +update_local_docs(#db{local_tree=Btree}=Db, Docs) -> + Ids = [Id || {_Client, {#doc{id=Id}, _Ref}} <- Docs], + OldDocLookups = couch_btree:lookup(Btree, Ids), + BtreeEntries = lists:zipwith( + fun({Client, {#doc{id=Id,deleted=Delete,revs={0,PrevRevs},body=Body}, Ref}}, _OldDocLookup) -> + case PrevRevs of + [RevStr|_] -> + PrevRev = list_to_integer(?b2l(RevStr)); + [] -> + PrevRev = 0 + end, + %% disabled conflict checking for local docs -- APK 16 June 2010 + % OldRev = + % case OldDocLookup of + % {ok, {_, {OldRev0, _}}} -> OldRev0; + % not_found -> 0 + % end, + % case OldRev == PrevRev of + % true -> + case Delete of + false -> + send_result(Client, Ref, {ok, + {0, ?l2b(integer_to_list(PrevRev + 1))}}), + {update, {Id, {PrevRev + 1, Body}}}; + true -> + send_result(Client, Ref, + {ok, {0, <<"0">>}}), + {remove, Id} + end%; + % false -> + % send_result(Client, Ref, conflict), + % ignore + % end + end, Docs, OldDocLookups), + + BtreeIdsRemove = [Id || {remove, Id} <- BtreeEntries], + BtreeIdsUpdate = [{Key, Val} || {update, {Key, Val}} <- BtreeEntries], + + {ok, Btree2} = + couch_btree:add_remove(Btree, BtreeIdsUpdate, BtreeIdsRemove), + + {ok, Db#db{local_tree = Btree2}}. + +db_to_header(Db, Header) -> + Header#db_header{ + update_seq = Db#db.update_seq, + seq_tree_state = couch_btree:get_state(Db#db.seq_tree), + id_tree_state = couch_btree:get_state(Db#db.id_tree), + local_tree_state = couch_btree:get_state(Db#db.local_tree), + security_ptr = Db#db.security_ptr, + revs_limit = Db#db.revs_limit}. + +commit_data(Db) -> + commit_data(Db, false). + +commit_data(#db{waiting_delayed_commit=nil} = Db, true) -> + TRef = erlang:send_after(1000,self(),delayed_commit), + Db#db{waiting_delayed_commit=TRef}; +commit_data(Db, true) -> + Db; +commit_data(Db, _) -> + #db{ + header = OldHeader, + waiting_delayed_commit = Timer + } = Db, + if is_reference(Timer) -> erlang:cancel_timer(Timer); true -> ok end, + case db_to_header(Db, OldHeader) of + OldHeader -> Db#db{waiting_delayed_commit=nil}; + NewHeader -> sync_header(Db, NewHeader) + end. + +sync_header(Db, NewHeader) -> + #db{ + fd = Fd, + filepath = FilePath, + fsync_options = FsyncOptions, + waiting_delayed_commit = Timer + } = Db, + + if is_reference(Timer) -> erlang:cancel_timer(Timer); true -> ok end, + + Before = lists:member(before_header, FsyncOptions), + After = lists:member(after_header, FsyncOptions), + + if Before -> couch_file:sync(FilePath); true -> ok end, + ok = couch_file:write_header(Fd, NewHeader), + if After -> couch_file:sync(FilePath); true -> ok end, + + Db#db{ + header=NewHeader, + committed_update_seq=Db#db.update_seq, + waiting_delayed_commit=nil + }. + +copy_doc_attachments(#db{fd = SrcFd} = SrcDb, SrcSp, DestFd) -> + {ok, {BodyData, BinInfos0}} = couch_db:read_doc(SrcDb, SrcSp), + BinInfos = case BinInfos0 of + _ when is_binary(BinInfos0) -> + couch_compress:decompress(BinInfos0); + _ when is_list(BinInfos0) -> + % pre 1.2 file format + BinInfos0 + end, + % copy the bin values + NewBinInfos = lists:map( - fun({Name, Type, BinSp, AttLen, RevPos, Md5}) -> ++ fun({Name, Type, BinSp, AttLen, RevPos, ExpectedMd5}) -> + % 010 UPGRADE CODE - {NewBinSp, AttLen, AttLen, Md5, _IdentityMd5} = ++ {NewBinSp, AttLen, AttLen, ActualMd5, _IdentityMd5} = + couch_stream:copy_to_new_stream(SrcFd, BinSp, DestFd), - {Name, Type, NewBinSp, AttLen, AttLen, RevPos, Md5, identity}; - ({Name, Type, BinSp, AttLen, DiskLen, RevPos, Md5, Enc1}) -> - {NewBinSp, AttLen, _, Md5, _IdentityMd5} = ++ check_md5(ExpectedMd5, ActualMd5), ++ {Name, Type, NewBinSp, AttLen, AttLen, RevPos, ExpectedMd5, identity}; ++ ({Name, Type, BinSp, AttLen, DiskLen, RevPos, ExpectedMd5, Enc1}) -> ++ {NewBinSp, AttLen, _, ActualMd5, _IdentityMd5} = + couch_stream:copy_to_new_stream(SrcFd, BinSp, DestFd), ++ check_md5(ExpectedMd5, ActualMd5), + Enc = case Enc1 of + true -> + % 0110 UPGRADE CODE + gzip; + false -> + % 0110 UPGRADE CODE + identity; + _ -> + Enc1 + end, - {Name, Type, NewBinSp, AttLen, DiskLen, RevPos, Md5, Enc} ++ {Name, Type, NewBinSp, AttLen, DiskLen, RevPos, ExpectedMd5, Enc} + end, BinInfos), + {BodyData, NewBinInfos}. + +merge_lookups(Infos, []) -> + Infos; +merge_lookups([], _) -> + []; +merge_lookups([#doc_info{}=DI | RestInfos], [{ok, FDI} | RestLookups]) -> + % Assert we've matched our lookups + if DI#doc_info.id == FDI#full_doc_info.id -> ok; true -> + erlang:error({mismatched_doc_infos, DI#doc_info.id}) + end, + [FDI | merge_lookups(RestInfos, RestLookups)]; +merge_lookups([FDI | RestInfos], Lookups) -> + [FDI | merge_lookups(RestInfos, Lookups)]. + ++check_md5(Md5, Md5) -> ok; ++check_md5(_, _) -> throw(md5_mismatch). ++ +copy_docs(Db, #db{fd = DestFd} = NewDb, MixedInfos, Retry) -> + DocInfoIds = [Id || #doc_info{id=Id} <- MixedInfos], + LookupResults = couch_btree:lookup(Db#db.id_tree, DocInfoIds), + % COUCHDB-968, make sure we prune duplicates during compaction + NewInfos0 = lists:usort(fun(#full_doc_info{id=A}, #full_doc_info{id=B}) -> + A =< B + end, merge_lookups(MixedInfos, LookupResults)), + + NewInfos1 = lists:map( + fun(#full_doc_info{rev_tree=RevTree}=Info) -> + Info#full_doc_info{rev_tree=couch_key_tree:map( + fun(_, _, branch) -> + ?REV_MISSING; + (_Rev, #leaf{ptr=Sp}=Leaf, leaf) -> + {_Body, AttsInfo} = Summary = copy_doc_attachments( + Db, Sp, DestFd), + SummaryChunk = make_doc_summary(NewDb, Summary), + {ok, Pos, SummarySize} = couch_file:append_raw_chunk( + DestFd, SummaryChunk), + TotalLeafSize = lists:foldl( + fun({_, _, _, AttLen, _, _, _, _}, S) -> S + AttLen end, + SummarySize, AttsInfo), + Leaf#leaf{ptr=Pos, size=TotalLeafSize} + end, RevTree)} + end, NewInfos0), + + NewInfos = stem_full_doc_infos(Db, NewInfos1), + RemoveSeqs = + case Retry of + nil -> + []; + OldDocIdTree -> + % Compaction is being rerun to catch up to writes during the + % first pass. This means we may have docs that already exist + % in the seq_tree in the .data file. Here we lookup any old + % update_seqs so that they can be removed. + Ids = [Id || #full_doc_info{id=Id} <- NewInfos], + Existing = couch_btree:lookup(OldDocIdTree, Ids), + [Seq || {ok, #full_doc_info{update_seq=Seq}} <- Existing] + end, + + {ok, SeqTree} = couch_btree:add_remove( + NewDb#db.seq_tree, NewInfos, RemoveSeqs), + + FDIKVs = lists:map(fun(#full_doc_info{id=Id, update_seq=Seq}=FDI) -> + {{Id, Seq}, FDI} + end, NewInfos), + {ok, IdEms} = couch_emsort:add(NewDb#db.id_tree, FDIKVs), + update_compact_task(length(NewInfos)), + NewDb#db{id_tree=IdEms, seq_tree=SeqTree}. + + +copy_compact(Db, NewDb0, Retry) -> + Compression = couch_compress:get_compression_method(), + NewDb = NewDb0#db{compression=Compression}, + TotalChanges = couch_db:count_changes_since(Db, NewDb#db.update_seq), + BufferSize = list_to_integer( + config:get("database_compaction", "doc_buffer_size", "524288")), + CheckpointAfter = couch_util:to_integer( + config:get("database_compaction", "checkpoint_after", + BufferSize * 10)), + + EnumBySeqFun = + fun(DocInfo, _Offset, + {AccNewDb, AccUncopied, AccUncopiedSize, AccCopiedSize}) -> + + Seq = case DocInfo of + #full_doc_info{} -> DocInfo#full_doc_info.update_seq; + #doc_info{} -> DocInfo#doc_info.high_seq + end, + + AccUncopiedSize2 = AccUncopiedSize + ?term_size(DocInfo), + if AccUncopiedSize2 >= BufferSize -> + NewDb2 = copy_docs( + Db, AccNewDb, lists:reverse([DocInfo | AccUncopied]), Retry), + AccCopiedSize2 = AccCopiedSize + AccUncopiedSize2, + if AccCopiedSize2 >= CheckpointAfter -> + CommNewDb2 = commit_compaction_data(NewDb2#db{update_seq=Seq}), + {ok, {CommNewDb2, [], 0, 0}}; + true -> + {ok, {NewDb2#db{update_seq = Seq}, [], 0, AccCopiedSize2}} + end; + true -> + {ok, {AccNewDb, [DocInfo | AccUncopied], AccUncopiedSize2, + AccCopiedSize}} + end + end, + + TaskProps0 = [ + {type, database_compaction}, + {database, Db#db.name}, + {progress, 0}, + {changes_done, 0}, + {total_changes, TotalChanges} + ], + case (Retry =/= nil) and couch_task_status:is_task_added() of + true -> + couch_task_status:update([ + {retry, true}, + {progress, 0}, + {changes_done, 0}, + {total_changes, TotalChanges} + ]); + false -> + couch_task_status:add_task(TaskProps0), + couch_task_status:set_update_frequency(500) + end, + + {ok, _, {NewDb2, Uncopied, _, _}} = + couch_btree:foldl(Db#db.seq_tree, EnumBySeqFun, + {NewDb, [], 0, 0}, + [{start_key, NewDb#db.update_seq + 1}]), + + NewDb3 = copy_docs(Db, NewDb2, lists:reverse(Uncopied), Retry), + + % copy misc header values + if NewDb3#db.security /= Db#db.security -> + {ok, Ptr, _} = couch_file:append_term( + NewDb3#db.fd, Db#db.security, + [{compression, NewDb3#db.compression}]), + NewDb4 = NewDb3#db{security=Db#db.security, security_ptr=Ptr}; + true -> + NewDb4 = NewDb3 + end, + + commit_compaction_data(NewDb4#db{update_seq=Db#db.update_seq}). + + +start_copy_compact(#db{}=Db) -> + #db{name=Name, filepath=Filepath, options=Options} = Db, + ?LOG_DEBUG("Compaction process spawned for db \"~s\"", [Name]), + + {ok, NewDb, DName, DFd, MFd, Retry} = + open_compaction_files(Name, Filepath, Options), + erlang:monitor(process, MFd), + + % This is a bit worrisome. init_db/4 will monitor the data fd + % but it doesn't know about the meta fd. For now I'll maintain + % that the data fd is the old normal fd and meta fd is special + % and hope everything works out for the best. + unlink(DFd), + + NewDb1 = copy_purge_info(Db, NewDb), + NewDb2 = copy_compact(Db, NewDb1, Retry), + NewDb3 = sort_meta_data(NewDb2), + NewDb4 = commit_compaction_data(NewDb3), + NewDb5 = copy_meta_data(NewDb4), + NewDb6 = sync_header(NewDb5, db_to_header(NewDb5, NewDb5#db.header)), + close_db(NewDb6), + + ok = couch_file:close(MFd), + gen_server:cast(Db#db.main_pid, {compact_done, DName}). + + +open_compaction_files(DbName, DbFilePath, Options) -> + DataFile = DbFilePath ++ ".compact.data", + MetaFile = DbFilePath ++ ".compact.meta", + {ok, DataFd, DataHdr} = open_compaction_file(DataFile), + {ok, MetaFd, MetaHdr} = open_compaction_file(MetaFile), + case {DataHdr, MetaHdr} of + {#comp_header{}=A, #comp_header{}=A} -> + DbHeader = A#comp_header.db_header, + Db0 = init_db(DbName, DataFile, DataFd, DbHeader, Options), + Db1 = bind_emsort(Db0, MetaFd, A#comp_header.meta_state), + {ok, Db1, DataFile, DataFd, MetaFd, Db0#db.id_tree}; + {#db_header{}, _} -> + ok = reset_compaction_file(MetaFd, #db_header{}), + Db0 = init_db(DbName, DataFile, DataFd, DataHdr, Options), + Db1 = bind_emsort(Db0, MetaFd, nil), + {ok, Db1, DataFile, DataFd, MetaFd, Db0#db.id_tree}; + _ -> + Header = #db_header{}, + ok = reset_compaction_file(DataFd, Header), + ok = reset_compaction_file(MetaFd, Header), + Db0 = init_db(DbName, DataFile, DataFd, Header, Options), + Db1 = bind_emsort(Db0, MetaFd, nil), + {ok, Db1, DataFile, DataFd, MetaFd, nil} + end. + + +open_compaction_file(FilePath) -> - case couch_file:open(FilePath) of ++ case couch_file:open(FilePath, [nologifmissing]) of + {ok, Fd} -> + case couch_file:read_header(Fd) of + {ok, Header} -> {ok, Fd, Header}; + no_valid_header -> {ok, Fd, nil} + end; + {error, enoent} -> + {ok, Fd} = couch_file:open(FilePath, [create]), + {ok, Fd, nil} + end. + + +reset_compaction_file(Fd, Header) -> + ok = couch_file:truncate(Fd, 0), + ok = couch_file:write_header(Fd, Header). + + +copy_purge_info(OldDb, NewDb) -> + OldHdr = OldDb#db.header, + NewHdr = NewDb#db.header, + if OldHdr#db_header.purge_seq > 0 -> + {ok, PurgedIdsRevs} = couch_db:get_last_purged(OldDb), + Opts = [{compression, NewDb#db.compression}], + {ok, Ptr, _} = couch_file:append_term(NewDb#db.fd, PurgedIdsRevs, Opts), + NewDb#db{ + header=NewHdr#db_header{ + purge_seq=OldHdr#db_header.purge_seq, + purged_docs=Ptr + } + }; + true -> + NewDb + end. + + +commit_compaction_data(#db{}=Db) -> + % Compaction needs to write headers to both the data file + % and the meta file so if we need to restart we can pick + % back up from where we left off. + commit_compaction_data(Db, couch_emsort:get_fd(Db#db.id_tree)), + commit_compaction_data(Db, Db#db.fd). + + +commit_compaction_data(#db{header=OldHeader}=Db0, Fd) -> + % Mostly copied from commit_data/2 but I have to + % replace the logic to commit and fsync to a specific + % fd instead of the Filepath stuff that commit_data/2 + % does. + DataState = OldHeader#db_header.id_tree_state, + MetaFd = couch_emsort:get_fd(Db0#db.id_tree), + MetaState = couch_emsort:get_state(Db0#db.id_tree), + Db1 = bind_id_tree(Db0, Db0#db.fd, DataState), + Header = db_to_header(Db1, OldHeader), + CompHeader = #comp_header{ + db_header = Header, + meta_state = MetaState + }, + ok = couch_file:sync(Fd), + ok = couch_file:write_header(Fd, CompHeader), + Db2 = Db1#db{ + waiting_delayed_commit=nil, + header=Header, + committed_update_seq=Db1#db.update_seq + }, + bind_emsort(Db2, MetaFd, MetaState). + + +bind_emsort(Db, Fd, nil) -> + {ok, Ems} = couch_emsort:open(Fd), + Db#db{id_tree=Ems}; +bind_emsort(Db, Fd, State) -> + {ok, Ems} = couch_emsort:open(Fd, [{root, State}]), + Db#db{id_tree=Ems}. + + +bind_id_tree(Db, Fd, State) -> + {ok, IdBtree} = couch_btree:open(State, Fd, [ + {split, fun ?MODULE:btree_by_id_split/1}, + {join, fun ?MODULE:btree_by_id_join/2}, + {reduce, fun ?MODULE:btree_by_id_reduce/2} + ]), + Db#db{id_tree=IdBtree}. + + +sort_meta_data(Db0) -> + {ok, Ems} = couch_emsort:merge(Db0#db.id_tree), + Db0#db{id_tree=Ems}. + + +copy_meta_data(#db{fd=Fd, header=Header}=Db) -> + Src = Db#db.id_tree, + DstState = Header#db_header.id_tree_state, + {ok, IdTree0} = couch_btree:open(DstState, Fd, [ + {split, fun ?MODULE:btree_by_id_split/1}, + {join, fun ?MODULE:btree_by_id_join/2}, + {reduce, fun ?MODULE:btree_by_id_reduce/2} + ]), + {ok, Iter} = couch_emsort:iter(Src), + Acc0 = #merge_st{ + id_tree=IdTree0, + seq_tree=Db#db.seq_tree, + rem_seqs=[], + infos=[] + }, + Acc = merge_docids(Iter, Acc0), + {ok, IdTree} = couch_btree:add(Acc#merge_st.id_tree, Acc#merge_st.infos), + {ok, SeqTree} = couch_btree:add_remove( + Acc#merge_st.seq_tree, [], Acc#merge_st.rem_seqs + ), + Db#db{id_tree=IdTree, seq_tree=SeqTree}. + + +merge_docids(Iter, #merge_st{infos=Infos}=Acc) when length(Infos) > 1000 -> + #merge_st{ + id_tree=IdTree0, + seq_tree=SeqTree0, + rem_seqs=RemSeqs + } = Acc, + {ok, IdTree1} = couch_btree:add(IdTree0, Infos), + {ok, SeqTree1} = couch_btree:add_remove(SeqTree0, [], RemSeqs), + Acc1 = Acc#merge_st{ + id_tree=IdTree1, + seq_tree=SeqTree1, + rem_seqs=[], + infos=[] + }, + merge_docids(Iter, Acc1); +merge_docids(Iter, #merge_st{curr=Curr}=Acc) -> + case next_info(Iter, Curr, []) of + {NextIter, NewCurr, FDI, Seqs} -> + Acc1 = Acc#merge_st{ + infos = [FDI | Acc#merge_st.infos], + rem_seqs = Seqs ++ Acc#merge_st.rem_seqs, + curr = NewCurr + }, + merge_docids(NextIter, Acc1); + {finished, FDI, Seqs} -> + Acc#merge_st{ + infos = [FDI | Acc#merge_st.infos], + rem_seqs = Seqs ++ Acc#merge_st.rem_seqs, + curr = undefined + }; + empty -> + Acc + end. + + +next_info(Iter, undefined, []) -> + case couch_emsort:next(Iter) of + {ok, {{Id, Seq}, FDI}, NextIter} -> + next_info(NextIter, {Id, Seq, FDI}, []); + finished -> + empty + end; +next_info(Iter, {Id, Seq, FDI}, Seqs) -> + case couch_emsort:next(Iter) of + {ok, {{Id, NSeq}, NFDI}, NextIter} -> + next_info(NextIter, {Id, NSeq, NFDI}, [Seq | Seqs]); + {ok, {{NId, NSeq}, NFDI}, NextIter} -> + {NextIter, {NId, NSeq, NFDI}, FDI, Seqs}; + finished -> + {finished, FDI, Seqs} + end. + + +update_compact_task(NumChanges) -> + [Changes, Total] = couch_task_status:get([changes_done, total_changes]), + Changes2 = Changes + NumChanges, + Progress = case Total of + 0 -> + 0; + _ -> + (Changes2 * 100) div Total + end, + couch_task_status:update([{changes_done, Changes2}, {progress, Progress}]). + + +make_doc_summary(#db{compression = Comp}, {Body0, Atts0}) -> + Body = case couch_compress:is_compressed(Body0, Comp) of + true -> + Body0; + false -> + % pre 1.2 database file format + couch_compress:compress(Body0, Comp) + end, + Atts = case couch_compress:is_compressed(Atts0, Comp) of + true -> + Atts0; + false -> + couch_compress:compress(Atts0, Comp) + end, + SummaryBin = ?term_to_bin({Body, Atts}), + couch_file:assemble_file_chunk(SummaryBin, couch_util:md5(SummaryBin)). http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/9d629ff6/src/couch_doc.erl ---------------------------------------------------------------------- diff --cc src/couch_doc.erl index 6f2ca9b,0000000..05202f4 mode 100644,000000..100644 --- a/src/couch_doc.erl +++ b/src/couch_doc.erl @@@ -1,784 -1,0 +1,787 @@@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_doc). + +-export([to_doc_info/1,to_doc_info_path/1,parse_rev/1,parse_revs/1,rev_to_str/1,revs_to_strs/1]). +-export([att_foldl/3,range_att_foldl/5,att_foldl_decode/3,get_validate_doc_fun/1]). +-export([from_json_obj/1,to_json_obj/2,has_stubs/1, merge_stubs/2]). +-export([validate_docid/1]). +-export([doc_from_multi_part_stream/2, doc_from_multi_part_stream/3]). +-export([doc_to_multi_part_stream/5, len_doc_to_multi_part_stream/4]). +-export([abort_multi_part_stream/1, restart_open_doc_revs/3]). +-export([to_path/1]). +-export([mp_parse_doc/2]). +-export([with_ejson_body/1]). + +-include_lib("couch/include/couch_db.hrl"). + +-spec to_path(#doc{}) -> path(). +to_path(#doc{revs={Start, RevIds}}=Doc) -> + [Branch] = to_branch(Doc, lists:reverse(RevIds)), + {Start - length(RevIds) + 1, Branch}. + +-spec to_branch(#doc{}, [RevId::binary()]) -> [branch()]. +to_branch(Doc, [RevId]) -> + [{RevId, Doc, []}]; +to_branch(Doc, [RevId | Rest]) -> + [{RevId, ?REV_MISSING, to_branch(Doc, Rest)}]. + +% helpers used by to_json_obj +to_json_rev(0, []) -> + []; +to_json_rev(Start, [FirstRevId|_]) -> + [{<<"_rev">>, ?l2b([integer_to_list(Start),"-",revid_to_str(FirstRevId)])}]. + +to_json_body(true, {Body}) -> + Body ++ [{<<"_deleted">>, true}]; +to_json_body(false, {Body}) -> + Body. + +to_json_revisions(Options, Start, RevIds) -> + case lists:member(revs, Options) of + false -> []; + true -> + [{<<"_revisions">>, {[{<<"start">>, Start}, + {<<"ids">>, [revid_to_str(R) ||R <- RevIds]}]}}] + end. + +revid_to_str(RevId) when size(RevId) =:= 16 -> + ?l2b(couch_util:to_hex(RevId)); +revid_to_str(RevId) -> + RevId. + +rev_to_str({Pos, RevId}) -> + ?l2b([integer_to_list(Pos),"-",revid_to_str(RevId)]). + + +revs_to_strs([]) -> + []; +revs_to_strs([{Pos, RevId}| Rest]) -> + [rev_to_str({Pos, RevId}) | revs_to_strs(Rest)]. + +to_json_meta(Meta) -> + lists:map( + fun({revs_info, Start, RevsInfo}) -> + {JsonRevsInfo, _Pos} = lists:mapfoldl( + fun({RevId, Status}, PosAcc) -> + JsonObj = {[{<<"rev">>, rev_to_str({PosAcc, RevId})}, + {<<"status">>, ?l2b(atom_to_list(Status))}]}, + {JsonObj, PosAcc - 1} + end, Start, RevsInfo), + {<<"_revs_info">>, JsonRevsInfo}; + ({local_seq, Seq}) -> + {<<"_local_seq">>, Seq}; + ({conflicts, Conflicts}) -> + {<<"_conflicts">>, revs_to_strs(Conflicts)}; + ({deleted_conflicts, DConflicts}) -> + {<<"_deleted_conflicts">>, revs_to_strs(DConflicts)} + end, Meta). + +to_json_attachments(Attachments, Options) -> + to_json_attachments( + Attachments, + lists:member(attachments, Options), + lists:member(follows, Options), + lists:member(att_encoding_info, Options) + ). + +to_json_attachments([], _OutputData, _DataToFollow, _ShowEncInfo) -> + []; +to_json_attachments(Atts, OutputData, DataToFollow, ShowEncInfo) -> + AttProps = lists:map( + fun(#att{disk_len=DiskLen, att_len=AttLen, encoding=Enc}=Att) -> + {Att#att.name, {[ + {<<"content_type">>, Att#att.type}, + {<<"revpos">>, Att#att.revpos}] ++ + case Att#att.md5 of + <<>> -> + []; + Md5 -> + EncodedMd5 = base64:encode(Md5), + [{<<"digest">>, <<"md5-",EncodedMd5/binary>>}] + end ++ + if not OutputData orelse Att#att.data == stub -> + [{<<"length">>, DiskLen}, {<<"stub">>, true}]; + true -> + if DataToFollow -> + [{<<"length">>, DiskLen}, {<<"follows">>, true}]; + true -> + AttData = case Enc of + gzip -> + zlib:gunzip(att_to_bin(Att)); + identity -> + att_to_bin(Att) + end, + [{<<"data">>, base64:encode(AttData)}] + end + end ++ + case {ShowEncInfo, Enc} of + {false, _} -> + []; + {true, identity} -> + []; + {true, _} -> + [ + {<<"encoding">>, couch_util:to_binary(Enc)}, + {<<"encoded_length">>, AttLen} + ] + end + }} + end, Atts), + [{<<"_attachments">>, {AttProps}}]. + +to_json_obj(Doc, Options) -> + doc_to_json_obj(with_ejson_body(Doc), Options). + +doc_to_json_obj(#doc{id=Id,deleted=Del,body=Body,revs={Start, RevIds}, + meta=Meta}=Doc,Options)-> + {[{<<"_id">>, Id}] + ++ to_json_rev(Start, RevIds) + ++ to_json_body(Del, Body) + ++ to_json_revisions(Options, Start, RevIds) + ++ to_json_meta(Meta) + ++ to_json_attachments(Doc#doc.atts, Options) + }. + +from_json_obj({Props}) -> + transfer_fields(Props, #doc{body=[]}); + +from_json_obj(_Other) -> + throw({bad_request, "Document must be a JSON object"}). + +parse_revid(RevId) when size(RevId) =:= 32 -> + RevInt = erlang:list_to_integer(?b2l(RevId), 16), + <>; +parse_revid(RevId) when length(RevId) =:= 32 -> + RevInt = erlang:list_to_integer(RevId, 16), + <>; +parse_revid(RevId) when is_binary(RevId) -> + RevId; +parse_revid(RevId) when is_list(RevId) -> + ?l2b(RevId). + + +parse_rev(Rev) when is_binary(Rev) -> + parse_rev(?b2l(Rev)); +parse_rev(Rev) when is_list(Rev) -> + SplitRev = lists:splitwith(fun($-) -> false; (_) -> true end, Rev), + case SplitRev of + {Pos, [$- | RevId]} -> {list_to_integer(Pos), parse_revid(RevId)}; + _Else -> throw({bad_request, <<"Invalid rev format">>}) + end; +parse_rev(_BadRev) -> + throw({bad_request, <<"Invalid rev format">>}). + +parse_revs([]) -> + []; +parse_revs([Rev | Rest]) -> + [parse_rev(Rev) | parse_revs(Rest)]. + + +validate_docid(<<"">>) -> + throw({bad_request, <<"Document id must not be empty">>}); +validate_docid(Id) when is_binary(Id) -> + case couch_util:validate_utf8(Id) of + false -> throw({bad_request, <<"Document id must be valid UTF-8">>}); + true -> ok + end, + case Id of + <<"_design/", _/binary>> -> ok; + <<"_local/", _/binary>> -> ok; + <<"_", _/binary>> -> + throw({bad_request, <<"Only reserved document ids may start with underscore.">>}); + _Else -> ok + end; +validate_docid(Id) -> + ?LOG_DEBUG("Document id is not a string: ~p", [Id]), + throw({bad_request, <<"Document id must be a string">>}). + +transfer_fields([], #doc{body=Fields}=Doc) -> + % convert fields back to json object + Doc#doc{body={lists:reverse(Fields)}}; + +transfer_fields([{<<"_id">>, Id} | Rest], Doc) -> + validate_docid(Id), + transfer_fields(Rest, Doc#doc{id=Id}); + +transfer_fields([{<<"_rev">>, Rev} | Rest], #doc{revs={0, []}}=Doc) -> + {Pos, RevId} = parse_rev(Rev), + transfer_fields(Rest, + Doc#doc{revs={Pos, [RevId]}}); + +transfer_fields([{<<"_rev">>, _Rev} | Rest], Doc) -> + % we already got the rev from the _revisions + transfer_fields(Rest,Doc); + +transfer_fields([{<<"_attachments">>, {JsonBins}} | Rest], Doc) -> + Atts = lists:map(fun({Name, {BinProps}}) -> + Md5 = case couch_util:get_value(<<"digest">>, BinProps) of + <<"md5-",EncodedMd5/binary>> -> + base64:decode(EncodedMd5); + _ -> + <<>> + end, + case couch_util:get_value(<<"stub">>, BinProps) of + true -> + Type = couch_util:get_value(<<"content_type">>, BinProps), + RevPos = couch_util:get_value(<<"revpos">>, BinProps, nil), + DiskLen = couch_util:get_value(<<"length">>, BinProps), + {Enc, EncLen} = att_encoding_info(BinProps), + #att{name=Name, data=stub, type=Type, att_len=EncLen, + disk_len=DiskLen, encoding=Enc, revpos=RevPos, md5=Md5}; + _ -> + Type = couch_util:get_value(<<"content_type">>, BinProps, + ?DEFAULT_ATTACHMENT_CONTENT_TYPE), + RevPos = couch_util:get_value(<<"revpos">>, BinProps, 0), + case couch_util:get_value(<<"follows">>, BinProps) of + true -> + DiskLen = couch_util:get_value(<<"length">>, BinProps), + {Enc, EncLen} = att_encoding_info(BinProps), + #att{name=Name, data=follows, type=Type, encoding=Enc, + att_len=EncLen, disk_len=DiskLen, revpos=RevPos, md5=Md5}; + _ -> + Value = couch_util:get_value(<<"data">>, BinProps), + Bin = base64:decode(Value), + LenBin = size(Bin), + #att{name=Name, data=Bin, type=Type, att_len=LenBin, + disk_len=LenBin, revpos=RevPos} + end + end + end, JsonBins), + transfer_fields(Rest, Doc#doc{atts=Atts}); + +transfer_fields([{<<"_revisions">>, {Props}} | Rest], Doc) -> + RevIds = couch_util:get_value(<<"ids">>, Props), + Start = couch_util:get_value(<<"start">>, Props), + if not is_integer(Start) -> + throw({doc_validation, "_revisions.start isn't an integer."}); + not is_list(RevIds) -> + throw({doc_validation, "_revisions.ids isn't a array."}); + true -> + ok + end, + [throw({doc_validation, "RevId isn't a string"}) || + RevId <- RevIds, not is_binary(RevId)], + RevIds2 = [parse_revid(RevId) || RevId <- RevIds], + transfer_fields(Rest, Doc#doc{revs={Start, RevIds2}}); + +transfer_fields([{<<"_deleted">>, B} | Rest], Doc) when is_boolean(B) -> + transfer_fields(Rest, Doc#doc{deleted=B}); + +% ignored fields +transfer_fields([{<<"_revs_info">>, _} | Rest], Doc) -> + transfer_fields(Rest, Doc); +transfer_fields([{<<"_local_seq">>, _} | Rest], Doc) -> + transfer_fields(Rest, Doc); +transfer_fields([{<<"_conflicts">>, _} | Rest], Doc) -> + transfer_fields(Rest, Doc); +transfer_fields([{<<"_deleted_conflicts">>, _} | Rest], Doc) -> + transfer_fields(Rest, Doc); + +% special fields for replication documents +transfer_fields([{<<"_replication_state">>, _} = Field | Rest], + #doc{body=Fields} = Doc) -> + transfer_fields(Rest, Doc#doc{body=[Field|Fields]}); +transfer_fields([{<<"_replication_state_time">>, _} = Field | Rest], + #doc{body=Fields} = Doc) -> + transfer_fields(Rest, Doc#doc{body=[Field|Fields]}); ++transfer_fields([{<<"_replication_state_reason">>, _} = Field | Rest], ++ #doc{body=Fields} = Doc) -> ++ transfer_fields(Rest, Doc#doc{body=[Field|Fields]}); +transfer_fields([{<<"_replication_id">>, _} = Field | Rest], + #doc{body=Fields} = Doc) -> + transfer_fields(Rest, Doc#doc{body=[Field|Fields]}); +transfer_fields([{<<"_replication_stats">>, _} = Field | Rest], + #doc{body=Fields} = Doc) -> + transfer_fields(Rest, Doc#doc{body=[Field|Fields]}); + +% unknown special field +transfer_fields([{<<"_",Name/binary>>, _} | _], _) -> + throw({doc_validation, + ?l2b(io_lib:format("Bad special document member: _~s", [Name]))}); + +transfer_fields([Field | Rest], #doc{body=Fields}=Doc) -> + transfer_fields(Rest, Doc#doc{body=[Field|Fields]}). + +att_encoding_info(BinProps) -> + DiskLen = couch_util:get_value(<<"length">>, BinProps), + case couch_util:get_value(<<"encoding">>, BinProps) of + undefined -> + {identity, DiskLen}; + Enc -> + EncodedLen = couch_util:get_value(<<"encoded_length">>, BinProps, DiskLen), + {list_to_existing_atom(?b2l(Enc)), EncodedLen} + end. + +to_doc_info(FullDocInfo) -> + {DocInfo, _Path} = to_doc_info_path(FullDocInfo), + DocInfo. + +max_seq(Tree, UpdateSeq) -> + FoldFun = fun({_Pos, _Key}, Value, _Type, MaxOldSeq) -> + case Value of + {_Deleted, _DiskPos, OldTreeSeq} -> + % Older versions didn't track data sizes. + erlang:max(MaxOldSeq, OldTreeSeq); + {_Deleted, _DiskPos, OldTreeSeq, _Size} -> % necessary clause? + % Older versions didn't store #leaf records. + erlang:max(MaxOldSeq, OldTreeSeq); + #leaf{seq=OldTreeSeq} -> + erlang:max(MaxOldSeq, OldTreeSeq); + _ -> + MaxOldSeq + end + end, + couch_key_tree:fold(FoldFun, UpdateSeq, Tree). + +to_doc_info_path(#full_doc_info{id=Id,rev_tree=Tree,update_seq=FDISeq}) -> + RevInfosAndPath = [ + {#rev_info{ + deleted = Leaf#leaf.deleted, + body_sp = Leaf#leaf.ptr, + seq = Leaf#leaf.seq, + rev = {Pos, RevId} + }, Path} || {Leaf, {Pos, [RevId | _]} = Path} <- + couch_key_tree:get_all_leafs(Tree) + ], + SortedRevInfosAndPath = lists:sort( + fun({#rev_info{deleted=DeletedA,rev=RevA}, _PathA}, + {#rev_info{deleted=DeletedB,rev=RevB}, _PathB}) -> + % sort descending by {not deleted, rev} + {not DeletedA, RevA} > {not DeletedB, RevB} + end, RevInfosAndPath), + [{_RevInfo, WinPath}|_] = SortedRevInfosAndPath, + RevInfos = [RevInfo || {RevInfo, _Path} <- SortedRevInfosAndPath], + {#doc_info{id=Id, high_seq=max_seq(Tree, FDISeq), revs=RevInfos}, WinPath}. + + + + +att_foldl(#att{data=Bin}, Fun, Acc) when is_binary(Bin) -> + Fun(Bin, Acc); +att_foldl(#att{data={Fd,Sp},md5=Md5}, Fun, Acc) -> + couch_stream:foldl(Fd, Sp, Md5, Fun, Acc); +att_foldl(#att{data=DataFun,att_len=Len}, Fun, Acc) when is_function(DataFun) -> + fold_streamed_data(DataFun, Len, Fun, Acc); +att_foldl(#att{data={follows, Parser, Ref}}=Att, Fun, Acc) -> + ParserRef = erlang:monitor(process, Parser), + DataFun = fun() -> + Parser ! {get_bytes, Ref, self()}, + receive + {started_open_doc_revs, NewRef} -> + couch_doc:restart_open_doc_revs(Parser, Ref, NewRef); + {bytes, Ref, Bytes} -> + Bytes; + {'DOWN', ParserRef, _, _, Reason} -> + throw({mp_parser_died, Reason}) + end + end, + try + att_foldl(Att#att{data=DataFun}, Fun, Acc) + after + erlang:demonitor(ParserRef, [flush]) + end. + +range_att_foldl(#att{data={Fd,Sp}}, From, To, Fun, Acc) -> + couch_stream:range_foldl(Fd, Sp, From, To, Fun, Acc). + +att_foldl_decode(#att{data={Fd,Sp},md5=Md5,encoding=Enc}, Fun, Acc) -> + couch_stream:foldl_decode(Fd, Sp, Md5, Enc, Fun, Acc); +att_foldl_decode(#att{data=Fun2,att_len=Len, encoding=identity}, Fun, Acc) -> + fold_streamed_data(Fun2, Len, Fun, Acc). + +att_to_bin(#att{data=Bin}) when is_binary(Bin) -> + Bin; +att_to_bin(#att{data=Iolist}) when is_list(Iolist) -> + iolist_to_binary(Iolist); +att_to_bin(#att{data={_Fd,_Sp}}=Att) -> + iolist_to_binary( + lists:reverse(att_foldl( + Att, + fun(Bin,Acc) -> [Bin|Acc] end, + [] + )) + ); +att_to_bin(#att{data=DataFun, att_len=Len}) when is_function(DataFun)-> + iolist_to_binary( + lists:reverse(fold_streamed_data( + DataFun, + Len, + fun(Data, Acc) -> [Data | Acc] end, + [] + )) + ). + +get_validate_doc_fun({Props}) -> + get_validate_doc_fun(couch_doc:from_json_obj({Props})); +get_validate_doc_fun(#doc{body={Props}}=DDoc) -> + case couch_util:get_value(<<"validate_doc_update">>, Props) of + undefined -> + nil; + _Else -> + fun(EditDoc, DiskDoc, Ctx, SecObj) -> + couch_query_servers:validate_doc_update(DDoc, EditDoc, DiskDoc, Ctx, SecObj) + end + end. + + +has_stubs(#doc{atts=Atts}) -> + has_stubs(Atts); +has_stubs([]) -> + false; +has_stubs([#att{data=stub}|_]) -> + true; +has_stubs([_Att|Rest]) -> + has_stubs(Rest). + +merge_stubs(#doc{id = Id}, nil) -> + throw({missing_stub, <<"Previous revision missing for document ", Id/binary>>}); +merge_stubs(#doc{id=Id,atts=MemBins}=StubsDoc, #doc{atts=DiskBins}) -> + BinDict = dict:from_list([{Name, Att} || #att{name=Name}=Att <- DiskBins]), + MergedBins = lists:map( + fun(#att{name=Name, data=stub, revpos=StubRevPos}) -> + case dict:find(Name, BinDict) of + {ok, #att{revpos=DiskRevPos}=DiskAtt} + when DiskRevPos == StubRevPos orelse StubRevPos == nil -> + DiskAtt; + _ -> + throw({missing_stub, + <<"id:", Id/binary, ", name:", Name/binary>>}) + end; + (Att) -> + Att + end, MemBins), + StubsDoc#doc{atts= MergedBins}. + +fold_streamed_data(_RcvFun, 0, _Fun, Acc) -> + Acc; +fold_streamed_data(RcvFun, LenLeft, Fun, Acc) when LenLeft > 0-> + Bin = RcvFun(), + ResultAcc = Fun(Bin, Acc), + fold_streamed_data(RcvFun, LenLeft - size(Bin), Fun, ResultAcc). + +len_doc_to_multi_part_stream(Boundary, JsonBytes, Atts, SendEncodedAtts) -> + AttsSize = lists:foldl(fun(Att, AccAttsSize) -> + #att{ + data=Data, + name=Name, + att_len=AttLen, + disk_len=DiskLen, + type=Type, + encoding=Encoding + } = Att, + case Data of + stub -> + AccAttsSize; + _ -> + AccAttsSize + + 4 + % "\r\n\r\n" + case SendEncodedAtts of + true -> + % header + length(integer_to_list(AttLen)) + + AttLen; + _ -> + % header + length(integer_to_list(DiskLen)) + + DiskLen + end + + 4 + % "\r\n--" + size(Boundary) + + + % attachment headers + % (the length of the Content-Length has already been set) + size(Name) + + size(Type) + + length("\r\nContent-Disposition: attachment; filename=\"\"") + + length("\r\nContent-Type: ") + + length("\r\nContent-Length: ") + + case Encoding of + identity -> + 0; + _ -> + length(atom_to_list(Encoding)) + + length("\r\nContent-Encoding: ") + end + end + end, 0, Atts), + if AttsSize == 0 -> + {<<"application/json">>, iolist_size(JsonBytes)}; + true -> + {<<"multipart/related; boundary=\"", Boundary/binary, "\"">>, + 2 + % "--" + size(Boundary) + + 36 + % "\r\ncontent-type: application/json\r\n\r\n" + iolist_size(JsonBytes) + + 4 + % "\r\n--" + size(Boundary) + + + AttsSize + + 2 % "--" + } + end. + +doc_to_multi_part_stream(Boundary, JsonBytes, Atts, WriteFun, + SendEncodedAtts) -> + case lists:any(fun(#att{data=Data})-> Data /= stub end, Atts) of + true -> + WriteFun([<<"--", Boundary/binary, + "\r\nContent-Type: application/json\r\n\r\n">>, + JsonBytes, <<"\r\n--", Boundary/binary>>]), + atts_to_mp(Atts, Boundary, WriteFun, SendEncodedAtts); + false -> + WriteFun(JsonBytes) + end. + +atts_to_mp([], _Boundary, WriteFun, _SendEncAtts) -> + WriteFun(<<"--">>); +atts_to_mp([#att{data=stub} | RestAtts], Boundary, WriteFun, + SendEncodedAtts) -> + atts_to_mp(RestAtts, Boundary, WriteFun, SendEncodedAtts); +atts_to_mp([Att | RestAtts], Boundary, WriteFun, + SendEncodedAtts) -> + #att{ + name=Name, + att_len=AttLen, + disk_len=DiskLen, + type=Type, + encoding=Encoding + } = Att, + + % write headers + LengthBin = case SendEncodedAtts of + true -> list_to_binary(integer_to_list(AttLen)); + false -> list_to_binary(integer_to_list(DiskLen)) + end, + WriteFun(<<"\r\nContent-Disposition: attachment; filename=\"", Name/binary, "\"">>), + WriteFun(<<"\r\nContent-Type: ", Type/binary>>), + WriteFun(<<"\r\nContent-Length: ", LengthBin/binary>>), + case Encoding of + identity -> + ok; + _ -> + EncodingBin = atom_to_binary(Encoding, latin1), + WriteFun(<<"\r\nContent-Encoding: ", EncodingBin/binary>>) + end, + + % write data + WriteFun(<<"\r\n\r\n">>), + AttFun = case SendEncodedAtts of + false -> + fun att_foldl_decode/3; + true -> + fun att_foldl/3 + end, + AttFun(Att, fun(Data, _) -> WriteFun(Data) end, ok), + WriteFun(<<"\r\n--", Boundary/binary>>), + atts_to_mp(RestAtts, Boundary, WriteFun, SendEncodedAtts). + + +doc_from_multi_part_stream(ContentType, DataFun) -> + doc_from_multi_part_stream(ContentType, DataFun, make_ref()). + + +doc_from_multi_part_stream(ContentType, DataFun, Ref) -> + Parent = self(), + NumMpWriters = num_mp_writers(), + Parser = spawn_link(fun() -> + ParentRef = erlang:monitor(process, Parent), + put(mp_parent_ref, ParentRef), + put(num_mp_writers, NumMpWriters), + {<<"--",_/binary>>, _, _} = couch_httpd:parse_multipart_request( + ContentType, DataFun, + fun(Next) -> mp_parse_doc(Next, []) end), + unlink(Parent) + end), + ParserRef = erlang:monitor(process, Parser), + Parser ! {get_doc_bytes, Ref, self()}, + receive + {started_open_doc_revs, NewRef} -> + restart_open_doc_revs(Parser, Ref, NewRef); + {doc_bytes, Ref, DocBytes} -> + Doc = from_json_obj(?JSON_DECODE(DocBytes)), + % we'll send the Parser process ID to the remote nodes so they can + % retrieve their own copies of the attachment data + Atts2 = lists:map( + fun(#att{data=follows}=A) -> + A#att{data={follows, Parser, Ref}}; + (A) -> + A + end, Doc#doc.atts), + WaitFun = fun() -> + receive {'DOWN', ParserRef, _, _, _} -> ok end, + erlang:put(mochiweb_request_recv, true) + end, + {ok, Doc#doc{atts=Atts2}, WaitFun, Parser} + end. + + +mp_parse_doc({headers, H}, []) -> + case couch_util:get_value("content-type", H) of + {"application/json", _} -> + fun (Next) -> + mp_parse_doc(Next, []) + end + end; +mp_parse_doc({body, Bytes}, AccBytes) -> + fun (Next) -> + mp_parse_doc(Next, [Bytes | AccBytes]) + end; +mp_parse_doc(body_end, AccBytes) -> + receive {get_doc_bytes, Ref, From} -> + From ! {doc_bytes, Ref, lists:reverse(AccBytes)} + end, + fun(Next) -> + mp_parse_atts(Next, {Ref, [], 0, orddict:new(), []}) + end. + +mp_parse_atts({headers, _}, Acc) -> + fun(Next) -> mp_parse_atts(Next, Acc) end; +mp_parse_atts(body_end, Acc) -> + fun(Next) -> mp_parse_atts(Next, Acc) end; +mp_parse_atts({body, Bytes}, {Ref, Chunks, Offset, Counters, Waiting}) -> + case maybe_send_data({Ref, Chunks++[Bytes], Offset, Counters, Waiting}) of + abort_parsing -> + fun(Next) -> mp_abort_parse_atts(Next, nil) end; + NewAcc -> + fun(Next) -> mp_parse_atts(Next, NewAcc) end + end; +mp_parse_atts(eof, {Ref, Chunks, Offset, Counters, Waiting}) -> + N = num_mp_writers(), + M = length(Counters), + case (M == N) andalso Chunks == [] of + true -> + ok; + false -> + ParentRef = get(mp_parent_ref), + receive + abort_parsing -> + ok; + {get_bytes, Ref, From} -> + C2 = orddict:update_counter(From, 1, Counters), + NewAcc = maybe_send_data({Ref, Chunks, Offset, C2, [From|Waiting]}), + mp_parse_atts(eof, NewAcc); + {'DOWN', ParentRef, _, _, _} -> + exit(mp_reader_coordinator_died) + after 3600000 -> + ok + end + end. + +mp_abort_parse_atts(eof, _) -> + ok; +mp_abort_parse_atts(_, _) -> + fun(Next) -> mp_abort_parse_atts(Next, nil) end. + +maybe_send_data({Ref, Chunks, Offset, Counters, Waiting}) -> + receive {get_bytes, Ref, From} -> + NewCounters = orddict:update_counter(From, 1, Counters), + maybe_send_data({Ref, Chunks, Offset, NewCounters, [From|Waiting]}) + after 0 -> + % reply to as many writers as possible + NewWaiting = lists:filter(fun(Writer) -> + WhichChunk = orddict:fetch(Writer, Counters), + ListIndex = WhichChunk - Offset, + if ListIndex =< length(Chunks) -> + Writer ! {bytes, Ref, lists:nth(ListIndex, Chunks)}, + false; + true -> + true + end + end, Waiting), + + % check if we can drop a chunk from the head of the list + case Counters of + [] -> + SmallestIndex = 0; + _ -> + SmallestIndex = lists:min(element(2, lists:unzip(Counters))) + end, + Size = length(Counters), + N = num_mp_writers(), + if Size == N andalso SmallestIndex == (Offset+1) -> + NewChunks = tl(Chunks), + NewOffset = Offset+1; + true -> + NewChunks = Chunks, + NewOffset = Offset + end, + + % we should wait for a writer if no one has written the last chunk + LargestIndex = lists:max([0|element(2, lists:unzip(Counters))]), + if LargestIndex >= (Offset + length(Chunks)) -> + % someone has written all possible chunks, keep moving + {Ref, NewChunks, NewOffset, Counters, NewWaiting}; + true -> + ParentRef = get(mp_parent_ref), + receive + abort_parsing -> + abort_parsing; + {'DOWN', ParentRef, _, _, _} -> + exit(mp_reader_coordinator_died); + {get_bytes, Ref, X} -> + C2 = orddict:update_counter(X, 1, Counters), + maybe_send_data({Ref, NewChunks, NewOffset, C2, [X|NewWaiting]}) + end + end + end. + + +num_mp_writers() -> + case erlang:get(mp_att_writers) of + undefined -> 1; + Count -> Count + end. + + +abort_multi_part_stream(Parser) -> + MonRef = erlang:monitor(process, Parser), + Parser ! abort_parsing, + receive + {'DOWN', MonRef, _, _, _} -> ok + after 60000 -> + % One minute is quite on purpose for this timeout. We + % want to try and read data to keep the socket open + % when possible but we also don't want to just make + % this a super long timeout because people have to + % wait this long to see if they just had an error + % like a validate_doc_update failure. + throw(multi_part_abort_timeout) + end. + + +restart_open_doc_revs(Parser, Ref, NewRef) -> + unlink(Parser), + exit(Parser, kill), + flush_parser_messages(Ref), + erlang:error({restart_open_doc_revs, NewRef}). + + +flush_parser_messages(Ref) -> + receive + {headers, Ref, _} -> + flush_parser_messages(Ref); + {body_bytes, Ref, _} -> + flush_parser_messages(Ref); + {body_done, Ref} -> + flush_parser_messages(Ref); + {done, Ref} -> + flush_parser_messages(Ref) + after 0 -> + ok + end. + + +with_ejson_body(#doc{body = Body} = Doc) when is_binary(Body) -> + Doc#doc{body = couch_compress:decompress(Body)}; +with_ejson_body(#doc{body = {_}} = Doc) -> + Doc.