couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rnew...@apache.org
Subject [5/6] Merge remote-tracking branch 'origin/import-master'
Date Tue, 06 May 2014 12:41:00 GMT
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/9d629ff6/src/couch_db_updater.erl
----------------------------------------------------------------------
diff --cc src/couch_db_updater.erl
index 649826a,0000000..901e8c3
mode 100644,000000..100644
--- a/src/couch_db_updater.erl
+++ b/src/couch_db_updater.erl
@@@ -1,1264 -1,0 +1,1269 @@@
 +% Licensed under the Apache License, Version 2.0 (the "License"); you may not
 +% use this file except in compliance with the License. You may obtain a copy of
 +% the License at
 +%
 +%   http://www.apache.org/licenses/LICENSE-2.0
 +%
 +% Unless required by applicable law or agreed to in writing, software
 +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 +% License for the specific language governing permissions and limitations under
 +% the License.
 +
 +-module(couch_db_updater).
 +-behaviour(gen_server).
 +
 +-export([btree_by_id_split/1, btree_by_id_join/2, btree_by_id_reduce/2]).
 +-export([btree_by_seq_split/1, btree_by_seq_join/2, btree_by_seq_reduce/2]).
 +-export([make_doc_summary/2]).
 +-export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]).
 +
 +-include_lib("couch/include/couch_db.hrl").
 +
 +-record(comp_header, {
 +    db_header,
 +    meta_state
 +}).
 +
 +-record(merge_st, {
 +    id_tree,
 +    seq_tree,
 +    curr,
 +    rem_seqs,
 +    infos
 +}).
 +
 +init({DbName, Filepath, Fd, Options}) ->
 +    case lists:member(create, Options) of
 +    true ->
 +        % create a new header and writes it to the file
 +        Header =  #db_header{},
 +        ok = couch_file:write_header(Fd, Header),
 +        % delete any old compaction files that might be hanging around
 +        RootDir = config:get("couchdb", "database_dir", "."),
 +        couch_file:delete(RootDir, Filepath ++ ".compact"),
 +        couch_file:delete(RootDir, Filepath ++ ".compact.data"),
 +        couch_file:delete(RootDir, Filepath ++ ".compact.meta");
 +    false ->
 +        case couch_file:read_header(Fd) of
 +        {ok, Header} ->
 +            ok;
 +        no_valid_header ->
 +            % create a new header and writes it to the file
 +            Header =  #db_header{},
 +            ok = couch_file:write_header(Fd, Header),
 +            % delete any old compaction files that might be hanging around
 +            file:delete(Filepath ++ ".compact"),
 +            file:delete(Filepath ++ ".compact.data"),
 +            file:delete(Filepath ++ ".compact.meta")
 +        end
 +    end,
 +    Db = init_db(DbName, Filepath, Fd, Header, Options),
 +    % we don't load validation funs here because the fabric query is liable to
 +    % race conditions.  Instead see couch_db:validate_doc_update, which loads
 +    % them lazily
 +    {ok, Db#db{main_pid = self()}}.
 +
 +
 +terminate(_Reason, Db) ->
 +    % If the reason we died is becuase our fd disappeared
 +    % then we don't need to try closing it again.
 +    case Db#db.fd of
 +        Pid when is_pid(Pid) ->
 +            ok = couch_file:close(Db#db.fd);
 +        _ ->
 +            ok
 +    end,
 +    couch_util:shutdown_sync(Db#db.compactor_pid),
 +    couch_util:shutdown_sync(Db#db.fd),
 +    ok.
 +
 +handle_call(get_db, _From, Db) ->
 +    {reply, {ok, Db}, Db};
 +handle_call(full_commit, _From, #db{waiting_delayed_commit=nil}=Db) ->
 +    {reply, ok, Db}; % no data waiting, return ok immediately
 +handle_call(full_commit, _From,  Db) ->
 +    {reply, ok, commit_data(Db)};
 +handle_call({full_commit, RequiredSeq}, _From, Db)
 +        when RequiredSeq =< Db#db.committed_update_seq ->
 +    {reply, ok, Db};
 +handle_call({full_commit, _}, _, Db) ->
 +    {reply, ok, commit_data(Db)}; % commit the data and return ok
 +handle_call(start_compact, _From, Db) ->
 +    {noreply, NewDb} = handle_cast(start_compact, Db),
 +    {reply, {ok, NewDb#db.compactor_pid}, NewDb};
 +handle_call(compactor_pid, _From, #db{compactor_pid = Pid} = Db) ->
 +    {reply, Pid, Db};
 +handle_call(cancel_compact, _From, #db{compactor_pid = nil} = Db) ->
 +    {reply, ok, Db};
 +handle_call(cancel_compact, _From, #db{compactor_pid = Pid} = Db) ->
 +    unlink(Pid),
 +    exit(Pid, kill),
 +    RootDir = config:get("couchdb", "database_dir", "."),
 +    ok = couch_file:delete(RootDir, Db#db.filepath ++ ".compact"),
 +    Db2 = Db#db{compactor_pid = nil},
 +    ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
 +    {reply, ok, Db2};
 +handle_call(increment_update_seq, _From, Db) ->
 +    Db2 = commit_data(Db#db{update_seq=Db#db.update_seq+1}),
 +    ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
 +    couch_db_update_notifier:notify({updated, Db#db.name}),
 +    {reply, {ok, Db2#db.update_seq}, Db2};
 +
 +handle_call({set_security, NewSec}, _From, #db{compression = Comp} = Db) ->
 +    {ok, Ptr, _} = couch_file:append_term(
 +        Db#db.fd, NewSec, [{compression, Comp}]),
 +    Db2 = commit_data(Db#db{security=NewSec, security_ptr=Ptr,
 +            update_seq=Db#db.update_seq+1}),
 +    ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
 +    {reply, ok, Db2};
 +
 +handle_call({set_revs_limit, Limit}, _From, Db) ->
 +    Db2 = commit_data(Db#db{revs_limit=Limit,
 +            update_seq=Db#db.update_seq+1}),
 +    ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
 +    {reply, ok, Db2};
 +
 +handle_call({purge_docs, _IdRevs}, _From,
 +        #db{compactor_pid=Pid}=Db) when Pid /= nil ->
 +    {reply, {error, purge_during_compaction}, Db};
 +handle_call({purge_docs, IdRevs}, _From, Db) ->
 +    #db{
 +        fd = Fd,
 +        id_tree = DocInfoByIdBTree,
 +        seq_tree = DocInfoBySeqBTree,
 +        update_seq = LastSeq,
 +        header = Header = #db_header{purge_seq=PurgeSeq},
 +        compression = Comp
 +        } = Db,
 +    DocLookups = couch_btree:lookup(DocInfoByIdBTree,
 +            [Id || {Id, _Revs} <- IdRevs]),
 +
 +    NewDocInfos = lists:zipwith(
 +        fun({_Id, Revs}, {ok, #full_doc_info{rev_tree=Tree}=FullDocInfo}) ->
 +            case couch_key_tree:remove_leafs(Tree, Revs) of
 +            {_, []=_RemovedRevs} -> % no change
 +                nil;
 +            {NewTree, RemovedRevs} ->
 +                {FullDocInfo#full_doc_info{rev_tree=NewTree},RemovedRevs}
 +            end;
 +        (_, not_found) ->
 +            nil
 +        end,
 +        IdRevs, DocLookups),
 +
 +    SeqsToRemove = [Seq
 +            || {#full_doc_info{update_seq=Seq},_} <- NewDocInfos],
 +
 +    FullDocInfoToUpdate = [FullInfo
 +            || {#full_doc_info{rev_tree=Tree}=FullInfo,_}
 +            <- NewDocInfos, Tree /= []],
 +
 +    IdRevsPurged = [{Id, Revs}
 +            || {#full_doc_info{id=Id}, Revs} <- NewDocInfos],
 +
 +    {DocInfoToUpdate, NewSeq} = lists:mapfoldl(
 +        fun(#full_doc_info{rev_tree=Tree}=FullInfo, SeqAcc) ->
 +            Tree2 = couch_key_tree:map_leafs(
 +                fun(_RevId, Leaf) ->
 +                    Leaf#leaf{seq=SeqAcc+1}
 +                end, Tree),
 +            {FullInfo#full_doc_info{rev_tree=Tree2}, SeqAcc + 1}
 +        end, LastSeq, FullDocInfoToUpdate),
 +
 +    IdsToRemove = [Id || {#full_doc_info{id=Id,rev_tree=[]},_}
 +            <- NewDocInfos],
 +
 +    {ok, DocInfoBySeqBTree2} = couch_btree:add_remove(DocInfoBySeqBTree,
 +            DocInfoToUpdate, SeqsToRemove),
 +    {ok, DocInfoByIdBTree2} = couch_btree:add_remove(DocInfoByIdBTree,
 +            FullDocInfoToUpdate, IdsToRemove),
 +    {ok, Pointer, _} = couch_file:append_term(
 +            Fd, IdRevsPurged, [{compression, Comp}]),
 +
 +    Db2 = commit_data(
 +        Db#db{
 +            id_tree = DocInfoByIdBTree2,
 +            seq_tree = DocInfoBySeqBTree2,
 +            update_seq = NewSeq + 1,
 +            header=Header#db_header{purge_seq=PurgeSeq+1, purged_docs=Pointer}}),
 +
 +    ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
 +    couch_db_update_notifier:notify({updated, Db#db.name}),
 +    {reply, {ok, (Db2#db.header)#db_header.purge_seq, IdRevsPurged}, Db2}.
 +
 +
 +handle_cast({load_validation_funs, ValidationFuns}, Db) ->
 +    Db2 = Db#db{validate_doc_funs = ValidationFuns},
 +    ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
 +    {noreply, Db2};
 +handle_cast(start_compact, Db) ->
 +    case Db#db.compactor_pid of
 +    nil ->
 +        ?LOG_INFO("Starting compaction for db \"~s\"", [Db#db.name]),
 +        Pid = spawn_link(fun() -> start_copy_compact(Db) end),
 +        Db2 = Db#db{compactor_pid=Pid},
 +        ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
 +        {noreply, Db2};
 +    _ ->
 +        % compact currently running, this is a no-op
 +        {noreply, Db}
 +    end;
 +handle_cast({compact_done, CompactFilepath}, #db{filepath=Filepath}=Db) ->
 +    {ok, NewFd} = couch_file:open(CompactFilepath),
 +    {ok, NewHeader} = couch_file:read_header(NewFd),
 +    #db{update_seq=NewSeq} = NewDb =
 +        init_db(Db#db.name, Filepath, NewFd, NewHeader, Db#db.options),
 +    unlink(NewFd),
 +    case Db#db.update_seq == NewSeq of
 +    true ->
 +        % suck up all the local docs into memory and write them to the new db
 +        {ok, _, LocalDocs} = couch_btree:foldl(Db#db.local_tree,
 +                fun(Value, _Offset, Acc) -> {ok, [Value | Acc]} end, []),
 +        {ok, NewLocalBtree} = couch_btree:add(NewDb#db.local_tree, LocalDocs),
 +
 +        NewDb2 = commit_data(NewDb#db{
 +            local_tree = NewLocalBtree,
 +            main_pid = self(),
 +            filepath = Filepath,
 +            instance_start_time = Db#db.instance_start_time,
 +            revs_limit = Db#db.revs_limit
 +        }),
 +
 +        ?LOG_DEBUG("CouchDB swapping files ~s and ~s.",
 +                [Filepath, CompactFilepath]),
 +        ok = file:rename(CompactFilepath, Filepath ++ ".compact"),
 +        RootDir = config:get("couchdb", "database_dir", "."),
 +        couch_file:delete(RootDir, Filepath),
 +        ok = file:rename(Filepath ++ ".compact", Filepath),
 +        % Delete the old meta compaction file after promoting
 +        % the compaction file.
 +        couch_file:delete(RootDir, Filepath ++ ".compact.meta"),
 +        close_db(Db),
 +        NewDb3 = refresh_validate_doc_funs(NewDb2),
 +        ok = gen_server:call(couch_server, {db_updated, NewDb3}, infinity),
 +        couch_db_update_notifier:notify({compacted, NewDb3#db.name}),
 +        ?LOG_INFO("Compaction for db \"~s\" completed.", [Db#db.name]),
 +        {noreply, NewDb3#db{compactor_pid=nil}};
 +    false ->
 +        ?LOG_INFO("Compaction file still behind main file "
 +            "(update seq=~p. compact update seq=~p). Retrying.",
 +            [Db#db.update_seq, NewSeq]),
 +        close_db(NewDb),
 +        Pid = spawn_link(fun() -> start_copy_compact(Db) end),
 +        Db2 = Db#db{compactor_pid=Pid},
 +        ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
 +        {noreply, Db2}
 +    end;
 +
 +handle_cast(Msg, #db{name = Name} = Db) ->
 +    ?LOG_ERROR("Database `~s` updater received unexpected cast: ~p", [Name, Msg]),
 +    {stop, Msg, Db}.
 +
 +
 +handle_info({update_docs, Client, GroupedDocs, NonRepDocs, MergeConflicts,
 +        FullCommit}, Db) ->
 +    GroupedDocs2 = [[{Client, D} || D <- DocGroup] || DocGroup <- GroupedDocs],
 +    if NonRepDocs == [] ->
 +        {GroupedDocs3, Clients, FullCommit2} = collect_updates(GroupedDocs2,
 +                [Client], MergeConflicts, FullCommit);
 +    true ->
 +        GroupedDocs3 = GroupedDocs2,
 +        FullCommit2 = FullCommit,
 +        Clients = [Client]
 +    end,
 +    NonRepDocs2 = [{Client, NRDoc} || NRDoc <- NonRepDocs],
 +    try update_docs_int(Db, GroupedDocs3, NonRepDocs2, MergeConflicts,
 +                FullCommit2) of
 +    {ok, Db2, UpdatedDDocIds} ->
 +        ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
 +        if Db2#db.update_seq /= Db#db.update_seq ->
 +            couch_db_update_notifier:notify({updated, Db2#db.name});
 +        true -> ok
 +        end,
 +        [catch(ClientPid ! {done, self()}) || ClientPid <- Clients],
 +        lists:foreach(fun(DDocId) ->
 +            couch_db_update_notifier:notify({ddoc_updated, {Db#db.name, DDocId}})
 +        end, UpdatedDDocIds),
 +        {noreply, Db2, hibernate}
 +    catch
 +        throw: retry ->
 +            [catch(ClientPid ! {retry, self()}) || ClientPid <- Clients],
 +            {noreply, Db, hibernate}
 +    end;
 +handle_info(delayed_commit, #db{waiting_delayed_commit=nil}=Db) ->
 +    %no outstanding delayed commits, ignore
 +    {noreply, Db};
 +handle_info(delayed_commit, Db) ->
 +    case commit_data(Db) of
 +        Db ->
 +            {noreply, Db};
 +        Db2 ->
 +            ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
 +            {noreply, Db2}
 +    end;
 +handle_info({'EXIT', _Pid, normal}, Db) ->
 +    {noreply, Db};
 +handle_info({'EXIT', _Pid, Reason}, Db) ->
 +    {stop, Reason, Db};
 +handle_info({'DOWN', Ref, _, _, Reason}, #db{fd_monitor=Ref, name=Name} = Db) ->
 +    ?LOG_ERROR("DB ~s shutting down - Fd ~p", [Name, Reason]),
 +    {stop, normal, Db#db{fd=undefined, fd_monitor=undefined}}.
 +
 +code_change(_OldVsn, State, _Extra) ->
 +    {ok, State}.
 +
 +merge_updates([[{_,{#doc{id=X},_}}|_]=A|RestA], [[{_,{#doc{id=X},_}}|_]=B|RestB]) ->
 +    [A++B | merge_updates(RestA, RestB)];
 +merge_updates([[{_,{#doc{id=X},_}}|_]|_]=A, [[{_,{#doc{id=Y},_}}|_]|_]=B) when X < Y ->
 +    [hd(A) | merge_updates(tl(A), B)];
 +merge_updates([[{_,{#doc{id=X},_}}|_]|_]=A, [[{_,{#doc{id=Y},_}}|_]|_]=B) when X > Y ->
 +    [hd(B) | merge_updates(A, tl(B))];
 +merge_updates([], RestB) ->
 +    RestB;
 +merge_updates(RestA, []) ->
 +    RestA.
 +
 +collect_updates(GroupedDocsAcc, ClientsAcc, MergeConflicts, FullCommit) ->
 +    receive
 +        % Only collect updates with the same MergeConflicts flag and without
 +        % local docs. It's easier to just avoid multiple _local doc
 +        % updaters than deal with their possible conflicts, and local docs
 +        % writes are relatively rare. Can be optmized later if really needed.
 +        {update_docs, Client, GroupedDocs, [], MergeConflicts, FullCommit2} ->
 +            GroupedDocs2 = [[{Client, Doc} || Doc <- DocGroup]
 +                    || DocGroup <- GroupedDocs],
 +            GroupedDocsAcc2 =
 +                merge_updates(GroupedDocsAcc, GroupedDocs2),
 +            collect_updates(GroupedDocsAcc2, [Client | ClientsAcc],
 +                    MergeConflicts, (FullCommit or FullCommit2))
 +    after 0 ->
 +        {GroupedDocsAcc, ClientsAcc, FullCommit}
 +    end.
 +
 +rev_tree(DiskTree) ->
 +    couch_key_tree:mapfold(fun
 +        (_RevId, {IsDeleted, BodyPointer, UpdateSeq}, leaf, _Acc) ->
 +            % pre 1.2 format, will be upgraded on compaction
 +            {#leaf{deleted=?i2b(IsDeleted), ptr=BodyPointer, seq=UpdateSeq}, nil};
 +        (_RevId, {IsDeleted, BodyPointer, UpdateSeq}, branch, Acc) ->
 +            {#leaf{deleted=?i2b(IsDeleted), ptr=BodyPointer, seq=UpdateSeq}, Acc};
 +        (_RevId, {IsDeleted, BodyPointer, UpdateSeq, Size}, leaf, Acc) ->
 +            Acc2 = sum_leaf_sizes(Acc, Size),
 +            {#leaf{deleted=?i2b(IsDeleted), ptr=BodyPointer, seq=UpdateSeq, size=Size}, Acc2};
 +        (_RevId, {IsDeleted, BodyPointer, UpdateSeq, Size}, branch, Acc) ->
 +            {#leaf{deleted=?i2b(IsDeleted), ptr=BodyPointer, seq=UpdateSeq, size=Size}, Acc};
 +        (_RevId, ?REV_MISSING, _Type, Acc) ->
 +            {?REV_MISSING, Acc}
 +    end, 0, DiskTree).
 +
 +disk_tree(RevTree) ->
 +    couch_key_tree:map(fun
 +        (_RevId, ?REV_MISSING) ->
 +            ?REV_MISSING;
 +        (_RevId, #leaf{deleted=IsDeleted, ptr=BodyPointer, seq=UpdateSeq, size=Size}) ->
 +            {?b2i(IsDeleted), BodyPointer, UpdateSeq, Size}
 +    end, RevTree).
 +
 +btree_by_seq_split(#full_doc_info{id=Id, update_seq=Seq, deleted=Del, rev_tree=T}) ->
 +    {Seq, {Id, ?b2i(Del), disk_tree(T)}}.
 +
 +btree_by_seq_join(Seq, {Id, Del, DiskTree}) when is_integer(Del) ->
 +    {RevTree, LeafsSize} = rev_tree(DiskTree),
 +    #full_doc_info{
 +        id = Id,
 +        update_seq = Seq,
 +        deleted = ?i2b(Del),
 +        rev_tree = RevTree,
 +        leafs_size = LeafsSize
 +    };
 +btree_by_seq_join(KeySeq, {Id, RevInfos, DeletedRevInfos}) ->
 +    % Older versions stored #doc_info records in the seq_tree.
 +    % Compact to upgrade.
 +    #doc_info{
 +        id = Id,
 +        high_seq=KeySeq,
 +        revs =
 +            [#rev_info{rev=Rev,seq=Seq,deleted=false,body_sp = Bp} ||
 +                {Rev, Seq, Bp} <- RevInfos] ++
 +            [#rev_info{rev=Rev,seq=Seq,deleted=true,body_sp = Bp} ||
 +                {Rev, Seq, Bp} <- DeletedRevInfos]}.
 +
 +btree_by_id_split(#full_doc_info{id=Id, update_seq=Seq,
 +        deleted=Deleted, rev_tree=Tree}) ->
 +    {Id, {Seq, ?b2i(Deleted), disk_tree(Tree)}}.
 +
 +btree_by_id_join(Id, {HighSeq, Deleted, DiskTree}) ->
 +    {Tree, LeafsSize} = rev_tree(DiskTree),
 +    #full_doc_info{
 +        id = Id,
 +        update_seq = HighSeq,
 +        deleted = ?i2b(Deleted),
 +        rev_tree = Tree,
 +        leafs_size = LeafsSize
 +    }.
 +
 +btree_by_id_reduce(reduce, FullDocInfos) ->
 +    lists:foldl(
 +        fun(Info, {NotDeleted, Deleted, Size}) ->
 +            Size2 = sum_leaf_sizes(Size, Info#full_doc_info.leafs_size),
 +            case Info#full_doc_info.deleted of
 +            true ->
 +                {NotDeleted, Deleted + 1, Size2};
 +            false ->
 +                {NotDeleted + 1, Deleted, Size2}
 +            end
 +        end,
 +        {0, 0, 0}, FullDocInfos);
 +btree_by_id_reduce(rereduce, Reds) ->
 +    lists:foldl(
 +        fun({NotDeleted, Deleted}, {AccNotDeleted, AccDeleted, _AccSize}) ->
 +            % pre 1.2 format, will be upgraded on compaction
 +            {AccNotDeleted + NotDeleted, AccDeleted + Deleted, nil};
 +        ({NotDeleted, Deleted, Size}, {AccNotDeleted, AccDeleted, AccSize}) ->
 +            AccSize2 = sum_leaf_sizes(AccSize, Size),
 +            {AccNotDeleted + NotDeleted, AccDeleted + Deleted, AccSize2}
 +        end,
 +        {0, 0, 0}, Reds).
 +
 +sum_leaf_sizes(nil, _) ->
 +    nil;
 +sum_leaf_sizes(_, nil) ->
 +    nil;
 +sum_leaf_sizes(Size1, Size2) ->
 +    Size1 + Size2.
 +
 +btree_by_seq_reduce(reduce, DocInfos) ->
 +    % count the number of documents
 +    length(DocInfos);
 +btree_by_seq_reduce(rereduce, Reds) ->
 +    lists:sum(Reds).
 +
 +simple_upgrade_record(Old, New) when tuple_size(Old) < tuple_size(New) ->
 +    OldSz = tuple_size(Old),
 +    NewValuesTail =
 +        lists:sublist(tuple_to_list(New), OldSz + 1, tuple_size(New) - OldSz),
 +    list_to_tuple(tuple_to_list(Old) ++ NewValuesTail);
 +simple_upgrade_record(Old, _New) ->
 +    Old.
 +
 +-define(OLD_DISK_VERSION_ERROR,
 +    "Database files from versions smaller than 0.10.0 are no longer supported").
 +
 +init_db(DbName, Filepath, Fd, Header0, Options) ->
 +    Header1 = simple_upgrade_record(Header0, #db_header{}),
 +    Header =
 +    case element(2, Header1) of
 +    1 -> throw({database_disk_version_error, ?OLD_DISK_VERSION_ERROR});
 +    2 -> throw({database_disk_version_error, ?OLD_DISK_VERSION_ERROR});
 +    3 -> throw({database_disk_version_error, ?OLD_DISK_VERSION_ERROR});
 +    4 -> Header1#db_header{security_ptr = nil}; % 0.10 and pre 0.11
 +    5 -> Header1; % pre 1.2
 +    ?LATEST_DISK_VERSION -> Header1;
 +    _ -> throw({database_disk_version_error, "Incorrect disk header version"})
 +    end,
 +
 +    {ok, FsyncOptions} = couch_util:parse_term(
 +            config:get("couchdb", "fsync_options",
 +                    "[before_header, after_header, on_file_open]")),
 +
 +    case lists:member(on_file_open, FsyncOptions) of
 +    true -> ok = couch_file:sync(Fd);
 +    _ -> ok
 +    end,
 +
 +    Compression = couch_compress:get_compression_method(),
 +
 +    {ok, IdBtree} = couch_btree:open(Header#db_header.id_tree_state, Fd,
 +        [{split, fun ?MODULE:btree_by_id_split/1},
 +        {join, fun ?MODULE:btree_by_id_join/2},
 +        {reduce, fun ?MODULE:btree_by_id_reduce/2},
 +        {compression, Compression}]),
 +    {ok, SeqBtree} = couch_btree:open(Header#db_header.seq_tree_state, Fd,
 +            [{split, fun ?MODULE:btree_by_seq_split/1},
 +            {join, fun ?MODULE:btree_by_seq_join/2},
 +            {reduce, fun ?MODULE:btree_by_seq_reduce/2},
 +            {compression, Compression}]),
 +    {ok, LocalDocsBtree} = couch_btree:open(Header#db_header.local_tree_state, Fd,
 +        [{compression, Compression}]),
 +    case Header#db_header.security_ptr of
 +    nil ->
 +        Security = [],
 +        SecurityPtr = nil;
 +    SecurityPtr ->
 +        {ok, Security} = couch_file:pread_term(Fd, SecurityPtr)
 +    end,
 +    % convert start time tuple to microsecs and store as a binary string
 +    {MegaSecs, Secs, MicroSecs} = now(),
 +    StartTime = ?l2b(io_lib:format("~p",
 +            [(MegaSecs*1000000*1000000) + (Secs*1000000) + MicroSecs])),
 +    ok = couch_file:set_db_pid(Fd, self()),
 +    #db{
 +        fd=Fd,
 +        fd_monitor = erlang:monitor(process, Fd),
 +        header=Header,
 +        id_tree = IdBtree,
 +        seq_tree = SeqBtree,
 +        local_tree = LocalDocsBtree,
 +        committed_update_seq = Header#db_header.update_seq,
 +        update_seq = Header#db_header.update_seq,
 +        name = DbName,
 +        filepath = Filepath,
 +        security = Security,
 +        security_ptr = SecurityPtr,
 +        instance_start_time = StartTime,
 +        revs_limit = Header#db_header.revs_limit,
 +        fsync_options = FsyncOptions,
 +        options = Options,
 +        compression = Compression,
 +        before_doc_update = couch_util:get_value(before_doc_update, Options, nil),
 +        after_doc_read = couch_util:get_value(after_doc_read, Options, nil)
 +        }.
 +
 +
 +close_db(#db{fd_monitor = Ref}) ->
 +    erlang:demonitor(Ref).
 +
 +
 +refresh_validate_doc_funs(#db{name = <<"shards/", _/binary>> = Name} = Db) ->
 +    spawn(fabric, reset_validation_funs, [mem3:dbname(Name)]),
 +    Db#db{validate_doc_funs = undefined};
 +refresh_validate_doc_funs(Db0) ->
 +    Db = Db0#db{user_ctx = #user_ctx{roles=[<<"_admin">>]}},
 +    {ok, DesignDocs} = couch_db:get_design_docs(Db),
 +    ProcessDocFuns = lists:flatmap(
 +        fun(DesignDocInfo) ->
 +            {ok, DesignDoc} = couch_db:open_doc_int(
 +                Db, DesignDocInfo, [ejson_body]),
 +            case couch_doc:get_validate_doc_fun(DesignDoc) of
 +            nil -> [];
 +            Fun -> [Fun]
 +            end
 +        end, DesignDocs),
 +    Db#db{validate_doc_funs=ProcessDocFuns}.
 +
 +% rev tree functions
 +
 +flush_trees(_Db, [], AccFlushedTrees) ->
 +    {ok, lists:reverse(AccFlushedTrees)};
 +flush_trees(#db{fd = Fd} = Db,
 +        [InfoUnflushed | RestUnflushed], AccFlushed) ->
 +    #full_doc_info{update_seq=UpdateSeq, rev_tree=Unflushed} = InfoUnflushed,
 +    {Flushed, LeafsSize} = couch_key_tree:mapfold(
 +        fun(_Rev, Value, Type, Acc) ->
 +            case Value of
 +            #doc{deleted = IsDeleted, body = {summary, Summary, AttsFd}} ->
 +                % this node value is actually an unwritten document summary,
 +                % write to disk.
 +                % make sure the Fd in the written bins is the same Fd we are
 +                % and convert bins, removing the FD.
 +                % All bins should have been written to disk already.
 +                case {AttsFd, Fd} of
 +                {nil, _} ->
 +                    ok;
 +                {SameFd, SameFd} ->
 +                    ok;
 +                _ ->
 +                    % Fd where the attachments were written to is not the same
 +                    % as our Fd. This can happen when a database is being
 +                    % switched out during a compaction.
 +                    ?LOG_DEBUG("File where the attachments are written has"
 +                            " changed. Possibly retrying.", []),
 +                    throw(retry)
 +                end,
 +                {ok, NewSummaryPointer, SummarySize} =
 +                    couch_file:append_raw_chunk(Fd, Summary),
 +                TotalSize = lists:foldl(
 +                    fun(#att{att_len = L}, A) -> A + L end,
 +                    SummarySize, Value#doc.atts),
 +                NewValue = #leaf{deleted=IsDeleted, ptr=NewSummaryPointer,
 +                                 seq=UpdateSeq, size=TotalSize},
 +                case Type of
 +                leaf ->
 +                    {NewValue, Acc + TotalSize};
 +                branch ->
 +                    {NewValue, Acc}
 +                end;
 +             {_, _, _, LeafSize} when Type =:= leaf, LeafSize =/= nil ->
 +                {Value, Acc + LeafSize};
 +             _ ->
 +                {Value, Acc}
 +            end
 +        end, 0, Unflushed),
 +    InfoFlushed = InfoUnflushed#full_doc_info{
 +        rev_tree = Flushed,
 +        leafs_size = LeafsSize
 +    },
 +    flush_trees(Db, RestUnflushed, [InfoFlushed | AccFlushed]).
 +
 +
 +send_result(Client, Ref, NewResult) ->
 +    % used to send a result to the client
 +    catch(Client ! {result, self(), {Ref, NewResult}}).
 +
 +merge_rev_trees(_Limit, _Merge, [], [], AccNewInfos, AccRemoveSeqs, AccSeq) ->
 +    {ok, lists:reverse(AccNewInfos), AccRemoveSeqs, AccSeq};
 +merge_rev_trees(Limit, MergeConflicts, [NewDocs|RestDocsList],
 +        [OldDocInfo|RestOldInfo], AccNewInfos, AccRemoveSeqs, AccSeq) ->
 +    #full_doc_info{id=Id,rev_tree=OldTree,deleted=OldDeleted0,update_seq=OldSeq}
 +            = OldDocInfo,
 +    {NewRevTree, _} = lists:foldl(
 +        fun({Client, {#doc{revs={Pos,[_Rev|PrevRevs]}}=NewDoc, Ref}}, {AccTree, OldDeleted}) ->
 +            if not MergeConflicts ->
 +                case couch_key_tree:merge(AccTree, couch_doc:to_path(NewDoc),
 +                    Limit) of
 +                {_NewTree, conflicts} when (not OldDeleted) ->
 +                    send_result(Client, Ref, conflict),
 +                    {AccTree, OldDeleted};
 +                {NewTree, conflicts} when PrevRevs /= [] ->
 +                    % Check to be sure if prev revision was specified, it's
 +                    % a leaf node in the tree
 +                    Leafs = couch_key_tree:get_all_leafs(AccTree),
 +                    IsPrevLeaf = lists:any(fun({_, {LeafPos, [LeafRevId|_]}}) ->
 +                            {LeafPos, LeafRevId} == {Pos-1, hd(PrevRevs)}
 +                        end, Leafs),
 +                    if IsPrevLeaf ->
 +                        {NewTree, OldDeleted};
 +                    true ->
 +                        send_result(Client, Ref, conflict),
 +                        {AccTree, OldDeleted}
 +                    end;
 +                {NewTree, no_conflicts} when  AccTree == NewTree ->
 +                    % the tree didn't change at all
 +                    % meaning we are saving a rev that's already
 +                    % been editted again.
 +                    if (Pos == 1) and OldDeleted ->
 +                        % this means we are recreating a brand new document
 +                        % into a state that already existed before.
 +                        % put the rev into a subsequent edit of the deletion
 +                        #doc_info{revs=[#rev_info{rev={OldPos,OldRev}}|_]} =
 +                                couch_doc:to_doc_info(OldDocInfo),
 +                        NewRevId = couch_db:new_revid(
 +                                NewDoc#doc{revs={OldPos, [OldRev]}}),
 +                        NewDoc2 = NewDoc#doc{revs={OldPos + 1, [NewRevId, OldRev]}},
 +                        {NewTree2, _} = couch_key_tree:merge(AccTree,
 +                                couch_doc:to_path(NewDoc2), Limit),
 +                        % we changed the rev id, this tells the caller we did
 +                        send_result(Client, Ref, {ok, {OldPos + 1, NewRevId}}),
 +                        {NewTree2, OldDeleted};
 +                    true ->
 +                        send_result(Client, Ref, conflict),
 +                        {AccTree, OldDeleted}
 +                    end;
 +                {NewTree, _} ->
 +                    {NewTree, NewDoc#doc.deleted}
 +                end;
 +            true ->
 +                {NewTree, _} = couch_key_tree:merge(AccTree,
 +                            couch_doc:to_path(NewDoc), Limit),
 +                {NewTree, OldDeleted}
 +            end
 +        end,
 +        {OldTree, OldDeleted0}, NewDocs),
 +    if NewRevTree == OldTree ->
 +        % nothing changed
 +        merge_rev_trees(Limit, MergeConflicts, RestDocsList, RestOldInfo,
 +            AccNewInfos, AccRemoveSeqs, AccSeq);
 +    true ->
 +        % we have updated the document, give it a new seq #
 +        NewInfo = #full_doc_info{id=Id,update_seq=AccSeq+1,rev_tree=NewRevTree},
 +        RemoveSeqs = case OldSeq of
 +            0 -> AccRemoveSeqs;
 +            _ -> [OldSeq | AccRemoveSeqs]
 +        end,
 +        merge_rev_trees(Limit, MergeConflicts, RestDocsList, RestOldInfo,
 +            [NewInfo|AccNewInfos], RemoveSeqs, AccSeq+1)
 +    end.
 +
 +
 +
 +new_index_entries([], AccById, AccDDocIds) ->
 +    {AccById, AccDDocIds};
 +new_index_entries([#full_doc_info{id=Id}=Info | Rest], AccById, AccDDocIds) ->
 +    #doc_info{revs=[#rev_info{deleted=Del}|_]} = couch_doc:to_doc_info(Info),
 +    AccById2 = [Info#full_doc_info{deleted=Del} | AccById],
 +    AccDDocIds2 = case Id of
 +        <<?DESIGN_DOC_PREFIX, _/binary>> -> [Id | AccDDocIds];
 +        _ -> AccDDocIds
 +    end,
 +    new_index_entries(Rest, AccById2, AccDDocIds2).
 +
 +
 +stem_full_doc_infos(#db{revs_limit=Limit}, DocInfos) ->
 +    [Info#full_doc_info{rev_tree=couch_key_tree:stem(Tree, Limit)} ||
 +            #full_doc_info{rev_tree=Tree}=Info <- DocInfos].
 +
 +update_docs_int(Db, DocsList, NonRepDocs, MergeConflicts, FullCommit) ->
 +    #db{
 +        id_tree = DocInfoByIdBTree,
 +        seq_tree = DocInfoBySeqBTree,
 +        update_seq = LastSeq,
 +        revs_limit = RevsLimit
 +        } = Db,
 +    Ids = [Id || [{_Client, {#doc{id=Id}, _Ref}}|_] <- DocsList],
 +    % lookup up the old documents, if they exist.
 +    OldDocLookups = couch_btree:lookup(DocInfoByIdBTree, Ids),
 +    OldDocInfos = lists:zipwith(
 +        fun(_Id, {ok, FullDocInfo}) ->
 +            FullDocInfo;
 +        (Id, not_found) ->
 +            #full_doc_info{id=Id}
 +        end,
 +        Ids, OldDocLookups),
 +    % Merge the new docs into the revision trees.
 +    {ok, NewFullDocInfos, RemoveSeqs, NewSeq} = merge_rev_trees(RevsLimit,
 +            MergeConflicts, DocsList, OldDocInfos, [], [], LastSeq),
 +
 +    % All documents are now ready to write.
 +
 +    {ok, Db2}  = update_local_docs(Db, NonRepDocs),
 +
 +    % Write out the document summaries (the bodies are stored in the nodes of
 +    % the trees, the attachments are already written to disk)
 +    {ok, FlushedFullDocInfos} = flush_trees(Db2, NewFullDocInfos, []),
 +
 +    {IndexFullDocInfos, UpdatedDDocIds} =
 +            new_index_entries(FlushedFullDocInfos, [], []),
 +
 +    % and the indexes
 +    {ok, DocInfoByIdBTree2} = couch_btree:add_remove(DocInfoByIdBTree, IndexFullDocInfos, []),
 +    {ok, DocInfoBySeqBTree2} = couch_btree:add_remove(DocInfoBySeqBTree, IndexFullDocInfos, RemoveSeqs),
 +
 +    Db3 = Db2#db{
 +        id_tree = DocInfoByIdBTree2,
 +        seq_tree = DocInfoBySeqBTree2,
 +        update_seq = NewSeq},
 +
 +    % Check if we just updated any design documents, and update the validation
 +    % funs if we did.
 +    Db4 = case length(UpdatedDDocIds) > 0 of
 +        true ->
 +            ddoc_cache:evict(Db3#db.name, UpdatedDDocIds),
 +            refresh_validate_doc_funs(Db3);
 +        false ->
 +            Db3
 +    end,
 +
 +    {ok, commit_data(Db4, not FullCommit), UpdatedDDocIds}.
 +
 +update_local_docs(Db, []) ->
 +    {ok, Db};
 +update_local_docs(#db{local_tree=Btree}=Db, Docs) ->
 +    Ids = [Id || {_Client, {#doc{id=Id}, _Ref}} <- Docs],
 +    OldDocLookups = couch_btree:lookup(Btree, Ids),
 +    BtreeEntries = lists:zipwith(
 +        fun({Client, {#doc{id=Id,deleted=Delete,revs={0,PrevRevs},body=Body}, Ref}}, _OldDocLookup) ->
 +            case PrevRevs of
 +            [RevStr|_] ->
 +                PrevRev = list_to_integer(?b2l(RevStr));
 +            [] ->
 +                PrevRev = 0
 +            end,
 +            %% disabled conflict checking for local docs -- APK 16 June 2010
 +            % OldRev =
 +            % case OldDocLookup of
 +            %     {ok, {_, {OldRev0, _}}} -> OldRev0;
 +            %     not_found -> 0
 +            % end,
 +            % case OldRev == PrevRev of
 +            % true ->
 +                case Delete of
 +                    false ->
 +                        send_result(Client, Ref, {ok,
 +                                {0, ?l2b(integer_to_list(PrevRev + 1))}}),
 +                        {update, {Id, {PrevRev + 1, Body}}};
 +                    true  ->
 +                        send_result(Client, Ref,
 +                                {ok, {0, <<"0">>}}),
 +                        {remove, Id}
 +                end%;
 +            % false ->
 +            %     send_result(Client, Ref, conflict),
 +            %     ignore
 +            % end
 +        end, Docs, OldDocLookups),
 +
 +    BtreeIdsRemove = [Id || {remove, Id} <- BtreeEntries],
 +    BtreeIdsUpdate = [{Key, Val} || {update, {Key, Val}} <- BtreeEntries],
 +
 +    {ok, Btree2} =
 +        couch_btree:add_remove(Btree, BtreeIdsUpdate, BtreeIdsRemove),
 +
 +    {ok, Db#db{local_tree = Btree2}}.
 +
 +db_to_header(Db, Header) ->
 +    Header#db_header{
 +        update_seq = Db#db.update_seq,
 +        seq_tree_state = couch_btree:get_state(Db#db.seq_tree),
 +        id_tree_state = couch_btree:get_state(Db#db.id_tree),
 +        local_tree_state = couch_btree:get_state(Db#db.local_tree),
 +        security_ptr = Db#db.security_ptr,
 +        revs_limit = Db#db.revs_limit}.
 +
 +commit_data(Db) ->
 +    commit_data(Db, false).
 +
 +commit_data(#db{waiting_delayed_commit=nil} = Db, true) ->
 +    TRef = erlang:send_after(1000,self(),delayed_commit),
 +    Db#db{waiting_delayed_commit=TRef};
 +commit_data(Db, true) ->
 +    Db;
 +commit_data(Db, _) ->
 +    #db{
 +        header = OldHeader,
 +        waiting_delayed_commit = Timer
 +    } = Db,
 +    if is_reference(Timer) -> erlang:cancel_timer(Timer); true -> ok end,
 +    case db_to_header(Db, OldHeader) of
 +        OldHeader -> Db#db{waiting_delayed_commit=nil};
 +        NewHeader -> sync_header(Db, NewHeader)
 +    end.
 +
 +sync_header(Db, NewHeader) ->
 +    #db{
 +        fd = Fd,
 +        filepath = FilePath,
 +        fsync_options = FsyncOptions,
 +        waiting_delayed_commit = Timer
 +    } = Db,
 +
 +    if is_reference(Timer) -> erlang:cancel_timer(Timer); true -> ok end,
 +
 +    Before = lists:member(before_header, FsyncOptions),
 +    After = lists:member(after_header, FsyncOptions),
 +
 +    if Before -> couch_file:sync(FilePath); true -> ok end,
 +    ok = couch_file:write_header(Fd, NewHeader),
 +    if After -> couch_file:sync(FilePath); true -> ok end,
 +
 +    Db#db{
 +        header=NewHeader,
 +        committed_update_seq=Db#db.update_seq,
 +        waiting_delayed_commit=nil
 +    }.
 +
 +copy_doc_attachments(#db{fd = SrcFd} = SrcDb, SrcSp, DestFd) ->
 +    {ok, {BodyData, BinInfos0}} = couch_db:read_doc(SrcDb, SrcSp),
 +    BinInfos = case BinInfos0 of
 +    _ when is_binary(BinInfos0) ->
 +        couch_compress:decompress(BinInfos0);
 +    _ when is_list(BinInfos0) ->
 +        % pre 1.2 file format
 +        BinInfos0
 +    end,
 +    % copy the bin values
 +    NewBinInfos = lists:map(
-         fun({Name, Type, BinSp, AttLen, RevPos, Md5}) ->
++        fun({Name, Type, BinSp, AttLen, RevPos, ExpectedMd5}) ->
 +            % 010 UPGRADE CODE
-             {NewBinSp, AttLen, AttLen, Md5, _IdentityMd5} =
++            {NewBinSp, AttLen, AttLen, ActualMd5, _IdentityMd5} =
 +                couch_stream:copy_to_new_stream(SrcFd, BinSp, DestFd),
-             {Name, Type, NewBinSp, AttLen, AttLen, RevPos, Md5, identity};
-         ({Name, Type, BinSp, AttLen, DiskLen, RevPos, Md5, Enc1}) ->
-             {NewBinSp, AttLen, _, Md5, _IdentityMd5} =
++            check_md5(ExpectedMd5, ActualMd5),
++            {Name, Type, NewBinSp, AttLen, AttLen, RevPos, ExpectedMd5, identity};
++        ({Name, Type, BinSp, AttLen, DiskLen, RevPos, ExpectedMd5, Enc1}) ->
++            {NewBinSp, AttLen, _, ActualMd5, _IdentityMd5} =
 +                couch_stream:copy_to_new_stream(SrcFd, BinSp, DestFd),
++            check_md5(ExpectedMd5, ActualMd5),
 +            Enc = case Enc1 of
 +            true ->
 +                % 0110 UPGRADE CODE
 +                gzip;
 +            false ->
 +                % 0110 UPGRADE CODE
 +                identity;
 +            _ ->
 +                Enc1
 +            end,
-             {Name, Type, NewBinSp, AttLen, DiskLen, RevPos, Md5, Enc}
++            {Name, Type, NewBinSp, AttLen, DiskLen, RevPos, ExpectedMd5, Enc}
 +        end, BinInfos),
 +    {BodyData, NewBinInfos}.
 +
 +merge_lookups(Infos, []) ->
 +    Infos;
 +merge_lookups([], _) ->
 +    [];
 +merge_lookups([#doc_info{}=DI | RestInfos], [{ok, FDI} | RestLookups]) ->
 +    % Assert we've matched our lookups
 +    if DI#doc_info.id == FDI#full_doc_info.id -> ok; true ->
 +        erlang:error({mismatched_doc_infos, DI#doc_info.id})
 +    end,
 +    [FDI | merge_lookups(RestInfos, RestLookups)];
 +merge_lookups([FDI | RestInfos], Lookups) ->
 +    [FDI | merge_lookups(RestInfos, Lookups)].
 +
++check_md5(Md5, Md5) -> ok;
++check_md5(_, _) -> throw(md5_mismatch).
++
 +copy_docs(Db, #db{fd = DestFd} = NewDb, MixedInfos, Retry) ->
 +    DocInfoIds = [Id || #doc_info{id=Id} <- MixedInfos],
 +    LookupResults = couch_btree:lookup(Db#db.id_tree, DocInfoIds),
 +    % COUCHDB-968, make sure we prune duplicates during compaction
 +    NewInfos0 = lists:usort(fun(#full_doc_info{id=A}, #full_doc_info{id=B}) ->
 +        A =< B
 +    end, merge_lookups(MixedInfos, LookupResults)),
 +
 +    NewInfos1 = lists:map(
 +        fun(#full_doc_info{rev_tree=RevTree}=Info) ->
 +            Info#full_doc_info{rev_tree=couch_key_tree:map(
 +                fun(_, _, branch) ->
 +                    ?REV_MISSING;
 +                (_Rev, #leaf{ptr=Sp}=Leaf, leaf) ->
 +                    {_Body, AttsInfo} = Summary = copy_doc_attachments(
 +                        Db, Sp, DestFd),
 +                    SummaryChunk = make_doc_summary(NewDb, Summary),
 +                    {ok, Pos, SummarySize} = couch_file:append_raw_chunk(
 +                        DestFd, SummaryChunk),
 +                    TotalLeafSize = lists:foldl(
 +                        fun({_, _, _, AttLen, _, _, _, _}, S) -> S + AttLen end,
 +                        SummarySize, AttsInfo),
 +                    Leaf#leaf{ptr=Pos, size=TotalLeafSize}
 +                end, RevTree)}
 +        end, NewInfos0),
 +
 +    NewInfos = stem_full_doc_infos(Db, NewInfos1),
 +    RemoveSeqs =
 +    case Retry of
 +    nil ->
 +        [];
 +    OldDocIdTree ->
 +        % Compaction is being rerun to catch up to writes during the
 +        % first pass. This means we may have docs that already exist
 +        % in the seq_tree in the .data file. Here we lookup any old
 +        % update_seqs so that they can be removed.
 +        Ids = [Id || #full_doc_info{id=Id} <- NewInfos],
 +        Existing = couch_btree:lookup(OldDocIdTree, Ids),
 +        [Seq || {ok, #full_doc_info{update_seq=Seq}} <- Existing]
 +    end,
 +
 +    {ok, SeqTree} = couch_btree:add_remove(
 +            NewDb#db.seq_tree, NewInfos, RemoveSeqs),
 +
 +    FDIKVs = lists:map(fun(#full_doc_info{id=Id, update_seq=Seq}=FDI) ->
 +        {{Id, Seq}, FDI}
 +    end, NewInfos),
 +    {ok, IdEms} = couch_emsort:add(NewDb#db.id_tree, FDIKVs),
 +    update_compact_task(length(NewInfos)),
 +    NewDb#db{id_tree=IdEms, seq_tree=SeqTree}.
 +
 +
 +copy_compact(Db, NewDb0, Retry) ->
 +    Compression = couch_compress:get_compression_method(),
 +    NewDb = NewDb0#db{compression=Compression},
 +    TotalChanges = couch_db:count_changes_since(Db, NewDb#db.update_seq),
 +    BufferSize = list_to_integer(
 +        config:get("database_compaction", "doc_buffer_size", "524288")),
 +    CheckpointAfter = couch_util:to_integer(
 +        config:get("database_compaction", "checkpoint_after",
 +            BufferSize * 10)),
 +
 +    EnumBySeqFun =
 +    fun(DocInfo, _Offset,
 +            {AccNewDb, AccUncopied, AccUncopiedSize, AccCopiedSize}) ->
 +
 +        Seq = case DocInfo of
 +            #full_doc_info{} -> DocInfo#full_doc_info.update_seq;
 +            #doc_info{} -> DocInfo#doc_info.high_seq
 +        end,
 +
 +        AccUncopiedSize2 = AccUncopiedSize + ?term_size(DocInfo),
 +        if AccUncopiedSize2 >= BufferSize ->
 +            NewDb2 = copy_docs(
 +                Db, AccNewDb, lists:reverse([DocInfo | AccUncopied]), Retry),
 +            AccCopiedSize2 = AccCopiedSize + AccUncopiedSize2,
 +            if AccCopiedSize2 >= CheckpointAfter ->
 +                CommNewDb2 = commit_compaction_data(NewDb2#db{update_seq=Seq}),
 +                {ok, {CommNewDb2, [], 0, 0}};
 +            true ->
 +                {ok, {NewDb2#db{update_seq = Seq}, [], 0, AccCopiedSize2}}
 +            end;
 +        true ->
 +            {ok, {AccNewDb, [DocInfo | AccUncopied], AccUncopiedSize2,
 +                AccCopiedSize}}
 +        end
 +    end,
 +
 +    TaskProps0 = [
 +        {type, database_compaction},
 +        {database, Db#db.name},
 +        {progress, 0},
 +        {changes_done, 0},
 +        {total_changes, TotalChanges}
 +    ],
 +    case (Retry =/= nil) and couch_task_status:is_task_added() of
 +    true ->
 +        couch_task_status:update([
 +            {retry, true},
 +            {progress, 0},
 +            {changes_done, 0},
 +            {total_changes, TotalChanges}
 +        ]);
 +    false ->
 +        couch_task_status:add_task(TaskProps0),
 +        couch_task_status:set_update_frequency(500)
 +    end,
 +
 +    {ok, _, {NewDb2, Uncopied, _, _}} =
 +        couch_btree:foldl(Db#db.seq_tree, EnumBySeqFun,
 +            {NewDb, [], 0, 0},
 +            [{start_key, NewDb#db.update_seq + 1}]),
 +
 +    NewDb3 = copy_docs(Db, NewDb2, lists:reverse(Uncopied), Retry),
 +
 +    % copy misc header values
 +    if NewDb3#db.security /= Db#db.security ->
 +        {ok, Ptr, _} = couch_file:append_term(
 +            NewDb3#db.fd, Db#db.security,
 +            [{compression, NewDb3#db.compression}]),
 +        NewDb4 = NewDb3#db{security=Db#db.security, security_ptr=Ptr};
 +    true ->
 +        NewDb4 = NewDb3
 +    end,
 +
 +    commit_compaction_data(NewDb4#db{update_seq=Db#db.update_seq}).
 +
 +
 +start_copy_compact(#db{}=Db) ->
 +    #db{name=Name, filepath=Filepath, options=Options} = Db,
 +    ?LOG_DEBUG("Compaction process spawned for db \"~s\"", [Name]),
 +
 +    {ok, NewDb, DName, DFd, MFd, Retry} =
 +        open_compaction_files(Name, Filepath, Options),
 +    erlang:monitor(process, MFd),
 +
 +    % This is a bit worrisome. init_db/4 will monitor the data fd
 +    % but it doesn't know about the meta fd. For now I'll maintain
 +    % that the data fd is the old normal fd and meta fd is special
 +    % and hope everything works out for the best.
 +    unlink(DFd),
 +
 +    NewDb1 = copy_purge_info(Db, NewDb),
 +    NewDb2 = copy_compact(Db, NewDb1, Retry),
 +    NewDb3 = sort_meta_data(NewDb2),
 +    NewDb4 = commit_compaction_data(NewDb3),
 +    NewDb5 = copy_meta_data(NewDb4),
 +    NewDb6 = sync_header(NewDb5, db_to_header(NewDb5, NewDb5#db.header)),
 +    close_db(NewDb6),
 +
 +    ok = couch_file:close(MFd),
 +    gen_server:cast(Db#db.main_pid, {compact_done, DName}).
 +
 +
 +open_compaction_files(DbName, DbFilePath, Options) ->
 +    DataFile = DbFilePath ++ ".compact.data",
 +    MetaFile = DbFilePath ++ ".compact.meta",
 +    {ok, DataFd, DataHdr} = open_compaction_file(DataFile),
 +    {ok, MetaFd, MetaHdr} = open_compaction_file(MetaFile),
 +    case {DataHdr, MetaHdr} of
 +        {#comp_header{}=A, #comp_header{}=A} ->
 +            DbHeader = A#comp_header.db_header,
 +            Db0 = init_db(DbName, DataFile, DataFd, DbHeader, Options),
 +            Db1 = bind_emsort(Db0, MetaFd, A#comp_header.meta_state),
 +            {ok, Db1, DataFile, DataFd, MetaFd, Db0#db.id_tree};
 +        {#db_header{}, _} ->
 +            ok = reset_compaction_file(MetaFd, #db_header{}),
 +            Db0 = init_db(DbName, DataFile, DataFd, DataHdr, Options),
 +            Db1 = bind_emsort(Db0, MetaFd, nil),
 +            {ok, Db1, DataFile, DataFd, MetaFd, Db0#db.id_tree};
 +        _ ->
 +            Header = #db_header{},
 +            ok = reset_compaction_file(DataFd, Header),
 +            ok = reset_compaction_file(MetaFd, Header),
 +            Db0 = init_db(DbName, DataFile, DataFd, Header, Options),
 +            Db1 = bind_emsort(Db0, MetaFd, nil),
 +            {ok, Db1, DataFile, DataFd, MetaFd, nil}
 +    end.
 +
 +
 +open_compaction_file(FilePath) ->
-     case couch_file:open(FilePath) of
++    case couch_file:open(FilePath, [nologifmissing]) of
 +        {ok, Fd} ->
 +            case couch_file:read_header(Fd) of
 +                {ok, Header} -> {ok, Fd, Header};
 +                no_valid_header -> {ok, Fd, nil}
 +            end;
 +        {error, enoent} ->
 +            {ok, Fd} = couch_file:open(FilePath, [create]),
 +            {ok, Fd, nil}
 +    end.
 +
 +
 +reset_compaction_file(Fd, Header) ->
 +    ok = couch_file:truncate(Fd, 0),
 +    ok = couch_file:write_header(Fd, Header).
 +
 +
 +copy_purge_info(OldDb, NewDb) ->
 +    OldHdr = OldDb#db.header,
 +    NewHdr = NewDb#db.header,
 +    if OldHdr#db_header.purge_seq > 0 ->
 +        {ok, PurgedIdsRevs} = couch_db:get_last_purged(OldDb),
 +        Opts = [{compression, NewDb#db.compression}],
 +        {ok, Ptr, _} = couch_file:append_term(NewDb#db.fd, PurgedIdsRevs, Opts),
 +        NewDb#db{
 +            header=NewHdr#db_header{
 +                purge_seq=OldHdr#db_header.purge_seq,
 +                purged_docs=Ptr
 +            }
 +        };
 +    true ->
 +        NewDb
 +    end.
 +
 +
 +commit_compaction_data(#db{}=Db) ->
 +    % Compaction needs to write headers to both the data file
 +    % and the meta file so if we need to restart we can pick
 +    % back up from where we left off.
 +    commit_compaction_data(Db, couch_emsort:get_fd(Db#db.id_tree)),
 +    commit_compaction_data(Db, Db#db.fd).
 +
 +
 +commit_compaction_data(#db{header=OldHeader}=Db0, Fd) ->
 +    % Mostly copied from commit_data/2 but I have to
 +    % replace the logic to commit and fsync to a specific
 +    % fd instead of the Filepath stuff that commit_data/2
 +    % does.
 +    DataState = OldHeader#db_header.id_tree_state,
 +    MetaFd = couch_emsort:get_fd(Db0#db.id_tree),
 +    MetaState = couch_emsort:get_state(Db0#db.id_tree),
 +    Db1 = bind_id_tree(Db0, Db0#db.fd, DataState),
 +    Header = db_to_header(Db1, OldHeader),
 +    CompHeader = #comp_header{
 +        db_header = Header,
 +        meta_state = MetaState
 +    },
 +    ok = couch_file:sync(Fd),
 +    ok = couch_file:write_header(Fd, CompHeader),
 +    Db2 = Db1#db{
 +        waiting_delayed_commit=nil,
 +        header=Header,
 +        committed_update_seq=Db1#db.update_seq
 +    },
 +    bind_emsort(Db2, MetaFd, MetaState).
 +
 +
 +bind_emsort(Db, Fd, nil) ->
 +    {ok, Ems} = couch_emsort:open(Fd),
 +    Db#db{id_tree=Ems};
 +bind_emsort(Db, Fd, State) ->
 +    {ok, Ems} = couch_emsort:open(Fd, [{root, State}]),
 +    Db#db{id_tree=Ems}.
 +
 +
 +bind_id_tree(Db, Fd, State) ->
 +    {ok, IdBtree} = couch_btree:open(State, Fd, [
 +        {split, fun ?MODULE:btree_by_id_split/1},
 +        {join, fun ?MODULE:btree_by_id_join/2},
 +        {reduce, fun ?MODULE:btree_by_id_reduce/2}
 +    ]),
 +    Db#db{id_tree=IdBtree}.
 +
 +
 +sort_meta_data(Db0) ->
 +    {ok, Ems} = couch_emsort:merge(Db0#db.id_tree),
 +    Db0#db{id_tree=Ems}.
 +
 +
 +copy_meta_data(#db{fd=Fd, header=Header}=Db) ->
 +    Src = Db#db.id_tree,
 +    DstState = Header#db_header.id_tree_state,
 +    {ok, IdTree0} = couch_btree:open(DstState, Fd, [
 +        {split, fun ?MODULE:btree_by_id_split/1},
 +        {join, fun ?MODULE:btree_by_id_join/2},
 +        {reduce, fun ?MODULE:btree_by_id_reduce/2}
 +    ]),
 +    {ok, Iter} = couch_emsort:iter(Src),
 +    Acc0 = #merge_st{
 +        id_tree=IdTree0,
 +        seq_tree=Db#db.seq_tree,
 +        rem_seqs=[],
 +        infos=[]
 +    },
 +    Acc = merge_docids(Iter, Acc0),
 +    {ok, IdTree} = couch_btree:add(Acc#merge_st.id_tree, Acc#merge_st.infos),
 +    {ok, SeqTree} = couch_btree:add_remove(
 +        Acc#merge_st.seq_tree, [], Acc#merge_st.rem_seqs
 +    ),
 +    Db#db{id_tree=IdTree, seq_tree=SeqTree}.
 +
 +
 +merge_docids(Iter, #merge_st{infos=Infos}=Acc) when length(Infos) > 1000 ->
 +    #merge_st{
 +        id_tree=IdTree0,
 +        seq_tree=SeqTree0,
 +        rem_seqs=RemSeqs
 +    } = Acc,
 +    {ok, IdTree1} = couch_btree:add(IdTree0, Infos),
 +    {ok, SeqTree1} = couch_btree:add_remove(SeqTree0, [], RemSeqs),
 +    Acc1 = Acc#merge_st{
 +        id_tree=IdTree1,
 +        seq_tree=SeqTree1,
 +        rem_seqs=[],
 +        infos=[]
 +    },
 +    merge_docids(Iter, Acc1);
 +merge_docids(Iter, #merge_st{curr=Curr}=Acc) ->
 +    case next_info(Iter, Curr, []) of
 +        {NextIter, NewCurr, FDI, Seqs} ->
 +            Acc1 = Acc#merge_st{
 +                infos = [FDI | Acc#merge_st.infos],
 +                rem_seqs = Seqs ++ Acc#merge_st.rem_seqs,
 +                curr = NewCurr
 +            },
 +            merge_docids(NextIter, Acc1);
 +        {finished, FDI, Seqs} ->
 +            Acc#merge_st{
 +                infos = [FDI | Acc#merge_st.infos],
 +                rem_seqs = Seqs ++ Acc#merge_st.rem_seqs,
 +                curr = undefined
 +            };
 +        empty ->
 +            Acc
 +    end.
 +
 +
 +next_info(Iter, undefined, []) ->
 +    case couch_emsort:next(Iter) of
 +        {ok, {{Id, Seq}, FDI}, NextIter} ->
 +            next_info(NextIter, {Id, Seq, FDI}, []);
 +        finished ->
 +            empty
 +    end;
 +next_info(Iter, {Id, Seq, FDI}, Seqs) ->
 +    case couch_emsort:next(Iter) of
 +        {ok, {{Id, NSeq}, NFDI}, NextIter} ->
 +            next_info(NextIter, {Id, NSeq, NFDI}, [Seq | Seqs]);
 +        {ok, {{NId, NSeq}, NFDI}, NextIter} ->
 +            {NextIter, {NId, NSeq, NFDI}, FDI, Seqs};
 +        finished ->
 +            {finished, FDI, Seqs}
 +    end.
 +
 +
 +update_compact_task(NumChanges) ->
 +    [Changes, Total] = couch_task_status:get([changes_done, total_changes]),
 +    Changes2 = Changes + NumChanges,
 +    Progress = case Total of
 +    0 ->
 +        0;
 +    _ ->
 +        (Changes2 * 100) div Total
 +    end,
 +    couch_task_status:update([{changes_done, Changes2}, {progress, Progress}]).
 +
 +
 +make_doc_summary(#db{compression = Comp}, {Body0, Atts0}) ->
 +    Body = case couch_compress:is_compressed(Body0, Comp) of
 +    true ->
 +        Body0;
 +    false ->
 +        % pre 1.2 database file format
 +        couch_compress:compress(Body0, Comp)
 +    end,
 +    Atts = case couch_compress:is_compressed(Atts0, Comp) of
 +    true ->
 +        Atts0;
 +    false ->
 +        couch_compress:compress(Atts0, Comp)
 +    end,
 +    SummaryBin = ?term_to_bin({Body, Atts}),
 +    couch_file:assemble_file_chunk(SummaryBin, couch_util:md5(SummaryBin)).

http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/9d629ff6/src/couch_doc.erl
----------------------------------------------------------------------
diff --cc src/couch_doc.erl
index 6f2ca9b,0000000..05202f4
mode 100644,000000..100644
--- a/src/couch_doc.erl
+++ b/src/couch_doc.erl
@@@ -1,784 -1,0 +1,787 @@@
 +% Licensed under the Apache License, Version 2.0 (the "License"); you may not
 +% use this file except in compliance with the License. You may obtain a copy of
 +% the License at
 +%
 +%   http://www.apache.org/licenses/LICENSE-2.0
 +%
 +% Unless required by applicable law or agreed to in writing, software
 +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 +% License for the specific language governing permissions and limitations under
 +% the License.
 +
 +-module(couch_doc).
 +
 +-export([to_doc_info/1,to_doc_info_path/1,parse_rev/1,parse_revs/1,rev_to_str/1,revs_to_strs/1]).
 +-export([att_foldl/3,range_att_foldl/5,att_foldl_decode/3,get_validate_doc_fun/1]).
 +-export([from_json_obj/1,to_json_obj/2,has_stubs/1, merge_stubs/2]).
 +-export([validate_docid/1]).
 +-export([doc_from_multi_part_stream/2, doc_from_multi_part_stream/3]).
 +-export([doc_to_multi_part_stream/5, len_doc_to_multi_part_stream/4]).
 +-export([abort_multi_part_stream/1, restart_open_doc_revs/3]).
 +-export([to_path/1]).
 +-export([mp_parse_doc/2]).
 +-export([with_ejson_body/1]).
 +
 +-include_lib("couch/include/couch_db.hrl").
 +
 +-spec to_path(#doc{}) -> path().
 +to_path(#doc{revs={Start, RevIds}}=Doc) ->
 +    [Branch] = to_branch(Doc, lists:reverse(RevIds)),
 +    {Start - length(RevIds) + 1, Branch}.
 +
 +-spec to_branch(#doc{}, [RevId::binary()]) -> [branch()].
 +to_branch(Doc, [RevId]) ->
 +    [{RevId, Doc, []}];
 +to_branch(Doc, [RevId | Rest]) ->
 +    [{RevId, ?REV_MISSING, to_branch(Doc, Rest)}].
 +
 +% helpers used by to_json_obj
 +to_json_rev(0, []) ->
 +    [];
 +to_json_rev(Start, [FirstRevId|_]) ->
 +    [{<<"_rev">>, ?l2b([integer_to_list(Start),"-",revid_to_str(FirstRevId)])}].
 +
 +to_json_body(true, {Body}) ->
 +    Body ++ [{<<"_deleted">>, true}];
 +to_json_body(false, {Body}) ->
 +    Body.
 +
 +to_json_revisions(Options, Start, RevIds) ->
 +    case lists:member(revs, Options) of
 +    false -> [];
 +    true ->
 +        [{<<"_revisions">>, {[{<<"start">>, Start},
 +                {<<"ids">>, [revid_to_str(R) ||R <- RevIds]}]}}]
 +    end.
 +
 +revid_to_str(RevId) when size(RevId) =:= 16 ->
 +    ?l2b(couch_util:to_hex(RevId));
 +revid_to_str(RevId) ->
 +    RevId.
 +
 +rev_to_str({Pos, RevId}) ->
 +    ?l2b([integer_to_list(Pos),"-",revid_to_str(RevId)]).
 +
 +
 +revs_to_strs([]) ->
 +    [];
 +revs_to_strs([{Pos, RevId}| Rest]) ->
 +    [rev_to_str({Pos, RevId}) | revs_to_strs(Rest)].
 +
 +to_json_meta(Meta) ->
 +    lists:map(
 +        fun({revs_info, Start, RevsInfo}) ->
 +            {JsonRevsInfo, _Pos}  = lists:mapfoldl(
 +                fun({RevId, Status}, PosAcc) ->
 +                    JsonObj = {[{<<"rev">>, rev_to_str({PosAcc, RevId})},
 +                        {<<"status">>, ?l2b(atom_to_list(Status))}]},
 +                    {JsonObj, PosAcc - 1}
 +                end, Start, RevsInfo),
 +            {<<"_revs_info">>, JsonRevsInfo};
 +        ({local_seq, Seq}) ->
 +            {<<"_local_seq">>, Seq};
 +        ({conflicts, Conflicts}) ->
 +            {<<"_conflicts">>, revs_to_strs(Conflicts)};
 +        ({deleted_conflicts, DConflicts}) ->
 +            {<<"_deleted_conflicts">>, revs_to_strs(DConflicts)}
 +        end, Meta).
 +
 +to_json_attachments(Attachments, Options) ->
 +    to_json_attachments(
 +        Attachments,
 +        lists:member(attachments, Options),
 +        lists:member(follows, Options),
 +        lists:member(att_encoding_info, Options)
 +    ).
 +
 +to_json_attachments([], _OutputData, _DataToFollow, _ShowEncInfo) ->
 +    [];
 +to_json_attachments(Atts, OutputData, DataToFollow, ShowEncInfo) ->
 +    AttProps = lists:map(
 +        fun(#att{disk_len=DiskLen, att_len=AttLen, encoding=Enc}=Att) ->
 +            {Att#att.name, {[
 +                {<<"content_type">>, Att#att.type},
 +                {<<"revpos">>, Att#att.revpos}] ++
 +                case Att#att.md5 of
 +                    <<>> ->
 +                        [];
 +                    Md5 ->
 +                        EncodedMd5 = base64:encode(Md5),
 +                        [{<<"digest">>, <<"md5-",EncodedMd5/binary>>}]
 +                end ++
 +                if not OutputData orelse Att#att.data == stub ->
 +                    [{<<"length">>, DiskLen}, {<<"stub">>, true}];
 +                true ->
 +                    if DataToFollow ->
 +                        [{<<"length">>, DiskLen}, {<<"follows">>, true}];
 +                    true ->
 +                        AttData = case Enc of
 +                        gzip ->
 +                            zlib:gunzip(att_to_bin(Att));
 +                        identity ->
 +                            att_to_bin(Att)
 +                        end,
 +                        [{<<"data">>, base64:encode(AttData)}]
 +                    end
 +                end ++
 +                    case {ShowEncInfo, Enc} of
 +                    {false, _} ->
 +                        [];
 +                    {true, identity} ->
 +                        [];
 +                    {true, _} ->
 +                        [
 +                            {<<"encoding">>, couch_util:to_binary(Enc)},
 +                            {<<"encoded_length">>, AttLen}
 +                        ]
 +                    end
 +            }}
 +        end, Atts),
 +    [{<<"_attachments">>, {AttProps}}].
 +
 +to_json_obj(Doc, Options) ->
 +    doc_to_json_obj(with_ejson_body(Doc), Options).
 +
 +doc_to_json_obj(#doc{id=Id,deleted=Del,body=Body,revs={Start, RevIds},
 +            meta=Meta}=Doc,Options)->
 +    {[{<<"_id">>, Id}]
 +        ++ to_json_rev(Start, RevIds)
 +        ++ to_json_body(Del, Body)
 +        ++ to_json_revisions(Options, Start, RevIds)
 +        ++ to_json_meta(Meta)
 +        ++ to_json_attachments(Doc#doc.atts, Options)
 +    }.
 +
 +from_json_obj({Props}) ->
 +    transfer_fields(Props, #doc{body=[]});
 +
 +from_json_obj(_Other) ->
 +    throw({bad_request, "Document must be a JSON object"}).
 +
 +parse_revid(RevId) when size(RevId) =:= 32 ->
 +    RevInt = erlang:list_to_integer(?b2l(RevId), 16),
 +     <<RevInt:128>>;
 +parse_revid(RevId) when length(RevId) =:= 32 ->
 +    RevInt = erlang:list_to_integer(RevId, 16),
 +     <<RevInt:128>>;
 +parse_revid(RevId) when is_binary(RevId) ->
 +    RevId;
 +parse_revid(RevId) when is_list(RevId) ->
 +    ?l2b(RevId).
 +
 +
 +parse_rev(Rev) when is_binary(Rev) ->
 +    parse_rev(?b2l(Rev));
 +parse_rev(Rev) when is_list(Rev) ->
 +    SplitRev = lists:splitwith(fun($-) -> false; (_) -> true end, Rev),
 +    case SplitRev of
 +        {Pos, [$- | RevId]} -> {list_to_integer(Pos), parse_revid(RevId)};
 +        _Else -> throw({bad_request, <<"Invalid rev format">>})
 +    end;
 +parse_rev(_BadRev) ->
 +    throw({bad_request, <<"Invalid rev format">>}).
 +
 +parse_revs([]) ->
 +    [];
 +parse_revs([Rev | Rest]) ->
 +    [parse_rev(Rev) | parse_revs(Rest)].
 +
 +
 +validate_docid(<<"">>) ->
 +    throw({bad_request, <<"Document id must not be empty">>});
 +validate_docid(Id) when is_binary(Id) ->
 +    case couch_util:validate_utf8(Id) of
 +        false -> throw({bad_request, <<"Document id must be valid UTF-8">>});
 +        true -> ok
 +    end,
 +    case Id of
 +    <<"_design/", _/binary>> -> ok;
 +    <<"_local/", _/binary>> -> ok;
 +    <<"_", _/binary>> ->
 +        throw({bad_request, <<"Only reserved document ids may start with underscore.">>});
 +    _Else -> ok
 +    end;
 +validate_docid(Id) ->
 +    ?LOG_DEBUG("Document id is not a string: ~p", [Id]),
 +    throw({bad_request, <<"Document id must be a string">>}).
 +
 +transfer_fields([], #doc{body=Fields}=Doc) ->
 +    % convert fields back to json object
 +    Doc#doc{body={lists:reverse(Fields)}};
 +
 +transfer_fields([{<<"_id">>, Id} | Rest], Doc) ->
 +    validate_docid(Id),
 +    transfer_fields(Rest, Doc#doc{id=Id});
 +
 +transfer_fields([{<<"_rev">>, Rev} | Rest], #doc{revs={0, []}}=Doc) ->
 +    {Pos, RevId} = parse_rev(Rev),
 +    transfer_fields(Rest,
 +            Doc#doc{revs={Pos, [RevId]}});
 +
 +transfer_fields([{<<"_rev">>, _Rev} | Rest], Doc) ->
 +    % we already got the rev from the _revisions
 +    transfer_fields(Rest,Doc);
 +
 +transfer_fields([{<<"_attachments">>, {JsonBins}} | Rest], Doc) ->
 +    Atts = lists:map(fun({Name, {BinProps}}) ->
 +        Md5 = case couch_util:get_value(<<"digest">>, BinProps) of
 +            <<"md5-",EncodedMd5/binary>> ->
 +                base64:decode(EncodedMd5);
 +            _ ->
 +               <<>>
 +        end,
 +        case couch_util:get_value(<<"stub">>, BinProps) of
 +        true ->
 +            Type = couch_util:get_value(<<"content_type">>, BinProps),
 +            RevPos = couch_util:get_value(<<"revpos">>, BinProps, nil),
 +            DiskLen = couch_util:get_value(<<"length">>, BinProps),
 +            {Enc, EncLen} = att_encoding_info(BinProps),
 +            #att{name=Name, data=stub, type=Type, att_len=EncLen,
 +                disk_len=DiskLen, encoding=Enc, revpos=RevPos, md5=Md5};
 +        _ ->
 +            Type = couch_util:get_value(<<"content_type">>, BinProps,
 +                    ?DEFAULT_ATTACHMENT_CONTENT_TYPE),
 +            RevPos = couch_util:get_value(<<"revpos">>, BinProps, 0),
 +            case couch_util:get_value(<<"follows">>, BinProps) of
 +            true ->
 +                DiskLen = couch_util:get_value(<<"length">>, BinProps),
 +                {Enc, EncLen} = att_encoding_info(BinProps),
 +                #att{name=Name, data=follows, type=Type, encoding=Enc,
 +                    att_len=EncLen, disk_len=DiskLen, revpos=RevPos, md5=Md5};
 +            _ ->
 +                Value = couch_util:get_value(<<"data">>, BinProps),
 +                Bin = base64:decode(Value),
 +                LenBin = size(Bin),
 +                #att{name=Name, data=Bin, type=Type, att_len=LenBin,
 +                        disk_len=LenBin, revpos=RevPos}
 +            end
 +        end
 +    end, JsonBins),
 +    transfer_fields(Rest, Doc#doc{atts=Atts});
 +
 +transfer_fields([{<<"_revisions">>, {Props}} | Rest], Doc) ->
 +    RevIds = couch_util:get_value(<<"ids">>, Props),
 +    Start = couch_util:get_value(<<"start">>, Props),
 +    if not is_integer(Start) ->
 +        throw({doc_validation, "_revisions.start isn't an integer."});
 +    not is_list(RevIds) ->
 +        throw({doc_validation, "_revisions.ids isn't a array."});
 +    true ->
 +        ok
 +    end,
 +    [throw({doc_validation, "RevId isn't a string"}) ||
 +            RevId <- RevIds, not is_binary(RevId)],
 +    RevIds2 = [parse_revid(RevId) || RevId <- RevIds],
 +    transfer_fields(Rest, Doc#doc{revs={Start, RevIds2}});
 +
 +transfer_fields([{<<"_deleted">>, B} | Rest], Doc) when is_boolean(B) ->
 +    transfer_fields(Rest, Doc#doc{deleted=B});
 +
 +% ignored fields
 +transfer_fields([{<<"_revs_info">>, _} | Rest], Doc) ->
 +    transfer_fields(Rest, Doc);
 +transfer_fields([{<<"_local_seq">>, _} | Rest], Doc) ->
 +    transfer_fields(Rest, Doc);
 +transfer_fields([{<<"_conflicts">>, _} | Rest], Doc) ->
 +    transfer_fields(Rest, Doc);
 +transfer_fields([{<<"_deleted_conflicts">>, _} | Rest], Doc) ->
 +    transfer_fields(Rest, Doc);
 +
 +% special fields for replication documents
 +transfer_fields([{<<"_replication_state">>, _} = Field | Rest],
 +    #doc{body=Fields} = Doc) ->
 +    transfer_fields(Rest, Doc#doc{body=[Field|Fields]});
 +transfer_fields([{<<"_replication_state_time">>, _} = Field | Rest],
 +    #doc{body=Fields} = Doc) ->
 +    transfer_fields(Rest, Doc#doc{body=[Field|Fields]});
++transfer_fields([{<<"_replication_state_reason">>, _} = Field | Rest],
++    #doc{body=Fields} = Doc) ->
++    transfer_fields(Rest, Doc#doc{body=[Field|Fields]});
 +transfer_fields([{<<"_replication_id">>, _} = Field | Rest],
 +    #doc{body=Fields} = Doc) ->
 +    transfer_fields(Rest, Doc#doc{body=[Field|Fields]});
 +transfer_fields([{<<"_replication_stats">>, _} = Field | Rest],
 +    #doc{body=Fields} = Doc) ->
 +    transfer_fields(Rest, Doc#doc{body=[Field|Fields]});
 +
 +% unknown special field
 +transfer_fields([{<<"_",Name/binary>>, _} | _], _) ->
 +    throw({doc_validation,
 +            ?l2b(io_lib:format("Bad special document member: _~s", [Name]))});
 +
 +transfer_fields([Field | Rest], #doc{body=Fields}=Doc) ->
 +    transfer_fields(Rest, Doc#doc{body=[Field|Fields]}).
 +
 +att_encoding_info(BinProps) ->
 +    DiskLen = couch_util:get_value(<<"length">>, BinProps),
 +    case couch_util:get_value(<<"encoding">>, BinProps) of
 +    undefined ->
 +        {identity, DiskLen};
 +    Enc ->
 +        EncodedLen = couch_util:get_value(<<"encoded_length">>, BinProps, DiskLen),
 +        {list_to_existing_atom(?b2l(Enc)), EncodedLen}
 +    end.
 +
 +to_doc_info(FullDocInfo) ->
 +    {DocInfo, _Path} = to_doc_info_path(FullDocInfo),
 +    DocInfo.
 +
 +max_seq(Tree, UpdateSeq) ->
 +    FoldFun = fun({_Pos, _Key}, Value, _Type, MaxOldSeq) ->
 +        case Value of
 +            {_Deleted, _DiskPos, OldTreeSeq} ->
 +                % Older versions didn't track data sizes.
 +                erlang:max(MaxOldSeq, OldTreeSeq);
 +            {_Deleted, _DiskPos, OldTreeSeq, _Size} -> % necessary clause?
 +                % Older versions didn't store #leaf records.
 +                erlang:max(MaxOldSeq, OldTreeSeq);
 +            #leaf{seq=OldTreeSeq} ->
 +                erlang:max(MaxOldSeq, OldTreeSeq);
 +            _ ->
 +                MaxOldSeq
 +        end
 +    end,
 +    couch_key_tree:fold(FoldFun, UpdateSeq, Tree).
 +
 +to_doc_info_path(#full_doc_info{id=Id,rev_tree=Tree,update_seq=FDISeq}) ->
 +    RevInfosAndPath = [
 +        {#rev_info{
 +            deleted = Leaf#leaf.deleted,
 +            body_sp = Leaf#leaf.ptr,
 +            seq = Leaf#leaf.seq,
 +            rev = {Pos, RevId}
 +        }, Path} || {Leaf, {Pos, [RevId | _]} = Path} <-
 +            couch_key_tree:get_all_leafs(Tree)
 +    ],
 +    SortedRevInfosAndPath = lists:sort(
 +            fun({#rev_info{deleted=DeletedA,rev=RevA}, _PathA},
 +                {#rev_info{deleted=DeletedB,rev=RevB}, _PathB}) ->
 +            % sort descending by {not deleted, rev}
 +            {not DeletedA, RevA} > {not DeletedB, RevB}
 +        end, RevInfosAndPath),
 +    [{_RevInfo, WinPath}|_] = SortedRevInfosAndPath,
 +    RevInfos = [RevInfo || {RevInfo, _Path} <- SortedRevInfosAndPath],
 +    {#doc_info{id=Id, high_seq=max_seq(Tree, FDISeq), revs=RevInfos}, WinPath}.
 +
 +
 +
 +
 +att_foldl(#att{data=Bin}, Fun, Acc) when is_binary(Bin) ->
 +    Fun(Bin, Acc);
 +att_foldl(#att{data={Fd,Sp},md5=Md5}, Fun, Acc) ->
 +    couch_stream:foldl(Fd, Sp, Md5, Fun, Acc);
 +att_foldl(#att{data=DataFun,att_len=Len}, Fun, Acc) when is_function(DataFun) ->
 +   fold_streamed_data(DataFun, Len, Fun, Acc);
 +att_foldl(#att{data={follows, Parser, Ref}}=Att, Fun, Acc) ->
 +    ParserRef = erlang:monitor(process, Parser),
 +    DataFun = fun() ->
 +        Parser ! {get_bytes, Ref, self()},
 +        receive
 +            {started_open_doc_revs, NewRef} ->
 +                couch_doc:restart_open_doc_revs(Parser, Ref, NewRef);
 +            {bytes, Ref, Bytes} ->
 +                Bytes;
 +            {'DOWN', ParserRef, _, _, Reason} ->
 +                throw({mp_parser_died, Reason})
 +        end
 +    end,
 +    try
 +        att_foldl(Att#att{data=DataFun}, Fun, Acc)
 +    after
 +        erlang:demonitor(ParserRef, [flush])
 +    end.
 +
 +range_att_foldl(#att{data={Fd,Sp}}, From, To, Fun, Acc) ->
 +   couch_stream:range_foldl(Fd, Sp, From, To, Fun, Acc).
 +
 +att_foldl_decode(#att{data={Fd,Sp},md5=Md5,encoding=Enc}, Fun, Acc) ->
 +    couch_stream:foldl_decode(Fd, Sp, Md5, Enc, Fun, Acc);
 +att_foldl_decode(#att{data=Fun2,att_len=Len, encoding=identity}, Fun, Acc) ->
 +       fold_streamed_data(Fun2, Len, Fun, Acc).
 +
 +att_to_bin(#att{data=Bin}) when is_binary(Bin) ->
 +    Bin;
 +att_to_bin(#att{data=Iolist}) when is_list(Iolist) ->
 +    iolist_to_binary(Iolist);
 +att_to_bin(#att{data={_Fd,_Sp}}=Att) ->
 +    iolist_to_binary(
 +        lists:reverse(att_foldl(
 +                Att,
 +                fun(Bin,Acc) -> [Bin|Acc] end,
 +                []
 +        ))
 +    );
 +att_to_bin(#att{data=DataFun, att_len=Len}) when is_function(DataFun)->
 +    iolist_to_binary(
 +        lists:reverse(fold_streamed_data(
 +            DataFun,
 +            Len,
 +            fun(Data, Acc) -> [Data | Acc] end,
 +            []
 +        ))
 +    ).
 +
 +get_validate_doc_fun({Props}) ->
 +    get_validate_doc_fun(couch_doc:from_json_obj({Props}));
 +get_validate_doc_fun(#doc{body={Props}}=DDoc) ->
 +    case couch_util:get_value(<<"validate_doc_update">>, Props) of
 +    undefined ->
 +        nil;
 +    _Else ->
 +        fun(EditDoc, DiskDoc, Ctx, SecObj) ->
 +            couch_query_servers:validate_doc_update(DDoc, EditDoc, DiskDoc, Ctx, SecObj)
 +        end
 +    end.
 +
 +
 +has_stubs(#doc{atts=Atts}) ->
 +    has_stubs(Atts);
 +has_stubs([]) ->
 +    false;
 +has_stubs([#att{data=stub}|_]) ->
 +    true;
 +has_stubs([_Att|Rest]) ->
 +    has_stubs(Rest).
 +
 +merge_stubs(#doc{id = Id}, nil) ->
 +    throw({missing_stub, <<"Previous revision missing for document ", Id/binary>>});
 +merge_stubs(#doc{id=Id,atts=MemBins}=StubsDoc, #doc{atts=DiskBins}) ->
 +    BinDict = dict:from_list([{Name, Att} || #att{name=Name}=Att <- DiskBins]),
 +    MergedBins = lists:map(
 +        fun(#att{name=Name, data=stub, revpos=StubRevPos}) ->
 +            case dict:find(Name, BinDict) of
 +            {ok, #att{revpos=DiskRevPos}=DiskAtt}
 +                    when DiskRevPos == StubRevPos orelse StubRevPos == nil ->
 +                DiskAtt;
 +            _ ->
 +                throw({missing_stub,
 +                        <<"id:", Id/binary, ", name:", Name/binary>>})
 +            end;
 +        (Att) ->
 +            Att
 +        end, MemBins),
 +    StubsDoc#doc{atts= MergedBins}.
 +
 +fold_streamed_data(_RcvFun, 0, _Fun, Acc) ->
 +    Acc;
 +fold_streamed_data(RcvFun, LenLeft, Fun, Acc) when LenLeft > 0->
 +    Bin = RcvFun(),
 +    ResultAcc = Fun(Bin, Acc),
 +    fold_streamed_data(RcvFun, LenLeft - size(Bin), Fun, ResultAcc).
 +
 +len_doc_to_multi_part_stream(Boundary, JsonBytes, Atts, SendEncodedAtts) ->
 +    AttsSize = lists:foldl(fun(Att, AccAttsSize) ->
 +            #att{
 +                data=Data,
 +                name=Name,
 +                att_len=AttLen,
 +                disk_len=DiskLen,
 +                type=Type,
 +                encoding=Encoding
 +            } = Att,
 +            case Data of
 +            stub ->
 +                AccAttsSize;
 +            _ ->
 +                AccAttsSize +
 +                4 + % "\r\n\r\n"
 +                case SendEncodedAtts of
 +                true ->
 +                    % header
 +                    length(integer_to_list(AttLen)) +
 +                    AttLen;
 +                _ ->
 +                    % header
 +                    length(integer_to_list(DiskLen)) +
 +                    DiskLen
 +                end +
 +                4 + % "\r\n--"
 +                size(Boundary) +
 +
 +                % attachment headers
 +                % (the length of the Content-Length has already been set)
 +                size(Name) +
 +                size(Type) +
 +                length("\r\nContent-Disposition: attachment; filename=\"\"") +
 +                length("\r\nContent-Type: ") +
 +                length("\r\nContent-Length: ") +
 +                case Encoding of
 +                identity ->
 +                    0;
 +                 _ ->
 +                    length(atom_to_list(Encoding)) +
 +                    length("\r\nContent-Encoding: ")
 +                end
 +            end
 +        end, 0, Atts),
 +    if AttsSize == 0 ->
 +        {<<"application/json">>, iolist_size(JsonBytes)};
 +    true ->
 +        {<<"multipart/related; boundary=\"", Boundary/binary, "\"">>,
 +            2 + % "--"
 +            size(Boundary) +
 +            36 + % "\r\ncontent-type: application/json\r\n\r\n"
 +            iolist_size(JsonBytes) +
 +            4 + % "\r\n--"
 +            size(Boundary) +
 +            + AttsSize +
 +            2 % "--"
 +            }
 +    end.
 +
 +doc_to_multi_part_stream(Boundary, JsonBytes, Atts, WriteFun,
 +    SendEncodedAtts) ->
 +    case lists:any(fun(#att{data=Data})-> Data /= stub end, Atts) of
 +    true ->
 +        WriteFun([<<"--", Boundary/binary,
 +                "\r\nContent-Type: application/json\r\n\r\n">>,
 +                JsonBytes, <<"\r\n--", Boundary/binary>>]),
 +        atts_to_mp(Atts, Boundary, WriteFun, SendEncodedAtts);
 +    false ->
 +        WriteFun(JsonBytes)
 +    end.
 +
 +atts_to_mp([], _Boundary, WriteFun, _SendEncAtts) ->
 +    WriteFun(<<"--">>);
 +atts_to_mp([#att{data=stub} | RestAtts], Boundary, WriteFun,
 +        SendEncodedAtts) ->
 +    atts_to_mp(RestAtts, Boundary, WriteFun, SendEncodedAtts);
 +atts_to_mp([Att | RestAtts], Boundary, WriteFun,
 +        SendEncodedAtts)  ->
 +    #att{
 +        name=Name,
 +        att_len=AttLen,
 +        disk_len=DiskLen,
 +        type=Type,
 +        encoding=Encoding
 +    } = Att,
 +
 +    % write headers
 +    LengthBin = case SendEncodedAtts of
 +    true -> list_to_binary(integer_to_list(AttLen));
 +    false -> list_to_binary(integer_to_list(DiskLen))
 +    end,
 +    WriteFun(<<"\r\nContent-Disposition: attachment; filename=\"", Name/binary, "\"">>),
 +    WriteFun(<<"\r\nContent-Type: ", Type/binary>>),
 +    WriteFun(<<"\r\nContent-Length: ", LengthBin/binary>>),
 +    case Encoding of
 +    identity ->
 +        ok;
 +    _ ->
 +        EncodingBin = atom_to_binary(Encoding, latin1),
 +        WriteFun(<<"\r\nContent-Encoding: ", EncodingBin/binary>>)
 +    end,
 +
 +    % write data
 +    WriteFun(<<"\r\n\r\n">>),
 +    AttFun = case SendEncodedAtts of
 +    false ->
 +        fun att_foldl_decode/3;
 +    true ->
 +        fun att_foldl/3
 +    end,
 +    AttFun(Att, fun(Data, _) -> WriteFun(Data) end, ok),
 +    WriteFun(<<"\r\n--", Boundary/binary>>),
 +    atts_to_mp(RestAtts, Boundary, WriteFun, SendEncodedAtts).
 +
 +
 +doc_from_multi_part_stream(ContentType, DataFun) ->
 +    doc_from_multi_part_stream(ContentType, DataFun, make_ref()).
 +
 +
 +doc_from_multi_part_stream(ContentType, DataFun, Ref) ->
 +    Parent = self(),
 +    NumMpWriters = num_mp_writers(),
 +    Parser = spawn_link(fun() ->
 +        ParentRef = erlang:monitor(process, Parent),
 +        put(mp_parent_ref, ParentRef),
 +        put(num_mp_writers, NumMpWriters),
 +        {<<"--",_/binary>>, _, _} = couch_httpd:parse_multipart_request(
 +            ContentType, DataFun,
 +            fun(Next) -> mp_parse_doc(Next, []) end),
 +        unlink(Parent)
 +        end),
 +    ParserRef = erlang:monitor(process, Parser),
 +    Parser ! {get_doc_bytes, Ref, self()},
 +    receive
 +    {started_open_doc_revs, NewRef} ->
 +        restart_open_doc_revs(Parser, Ref, NewRef);
 +    {doc_bytes, Ref, DocBytes} ->
 +        Doc = from_json_obj(?JSON_DECODE(DocBytes)),
 +        % we'll send the Parser process ID to the remote nodes so they can
 +        % retrieve their own copies of the attachment data
 +        Atts2 = lists:map(
 +            fun(#att{data=follows}=A) ->
 +                A#att{data={follows, Parser, Ref}};
 +            (A) ->
 +                A
 +            end, Doc#doc.atts),
 +        WaitFun = fun() ->
 +            receive {'DOWN', ParserRef, _, _, _} -> ok end,
 +            erlang:put(mochiweb_request_recv, true)
 +        end,
 +        {ok, Doc#doc{atts=Atts2}, WaitFun, Parser}
 +    end.
 +
 +
 +mp_parse_doc({headers, H}, []) ->
 +    case couch_util:get_value("content-type", H) of
 +    {"application/json", _} ->
 +        fun (Next) ->
 +            mp_parse_doc(Next, [])
 +        end
 +    end;
 +mp_parse_doc({body, Bytes}, AccBytes) ->
 +    fun (Next) ->
 +        mp_parse_doc(Next, [Bytes | AccBytes])
 +    end;
 +mp_parse_doc(body_end, AccBytes) ->
 +    receive {get_doc_bytes, Ref, From} ->
 +        From ! {doc_bytes, Ref, lists:reverse(AccBytes)}
 +    end,
 +    fun(Next) ->
 +        mp_parse_atts(Next, {Ref, [], 0, orddict:new(), []})
 +    end.
 +
 +mp_parse_atts({headers, _}, Acc) ->
 +    fun(Next) -> mp_parse_atts(Next, Acc) end;
 +mp_parse_atts(body_end, Acc) ->
 +    fun(Next) -> mp_parse_atts(Next, Acc) end;
 +mp_parse_atts({body, Bytes}, {Ref, Chunks, Offset, Counters, Waiting}) ->
 +    case maybe_send_data({Ref, Chunks++[Bytes], Offset, Counters, Waiting}) of
 +        abort_parsing ->
 +            fun(Next) -> mp_abort_parse_atts(Next, nil) end;
 +        NewAcc ->
 +            fun(Next) -> mp_parse_atts(Next, NewAcc) end
 +    end;
 +mp_parse_atts(eof, {Ref, Chunks, Offset, Counters, Waiting}) ->
 +    N = num_mp_writers(),
 +    M = length(Counters),
 +    case (M == N) andalso Chunks == [] of
 +    true ->
 +        ok;
 +    false ->
 +        ParentRef = get(mp_parent_ref),
 +        receive
 +        abort_parsing ->
 +            ok;
 +        {get_bytes, Ref, From} ->
 +            C2 = orddict:update_counter(From, 1, Counters),
 +            NewAcc = maybe_send_data({Ref, Chunks, Offset, C2, [From|Waiting]}),
 +            mp_parse_atts(eof, NewAcc);
 +        {'DOWN', ParentRef, _, _, _} ->
 +            exit(mp_reader_coordinator_died)
 +        after 3600000 ->
 +            ok
 +        end
 +    end.
 +
 +mp_abort_parse_atts(eof, _) ->
 +    ok;
 +mp_abort_parse_atts(_, _) ->
 +    fun(Next) -> mp_abort_parse_atts(Next, nil) end.
 +
 +maybe_send_data({Ref, Chunks, Offset, Counters, Waiting}) ->
 +    receive {get_bytes, Ref, From} ->
 +        NewCounters = orddict:update_counter(From, 1, Counters),
 +        maybe_send_data({Ref, Chunks, Offset, NewCounters, [From|Waiting]})
 +    after 0 ->
 +        % reply to as many writers as possible
 +        NewWaiting = lists:filter(fun(Writer) ->
 +            WhichChunk = orddict:fetch(Writer, Counters),
 +            ListIndex = WhichChunk - Offset,
 +            if ListIndex =< length(Chunks) ->
 +                Writer ! {bytes, Ref, lists:nth(ListIndex, Chunks)},
 +                false;
 +            true ->
 +                true
 +            end
 +        end, Waiting),
 +
 +        % check if we can drop a chunk from the head of the list
 +        case Counters of
 +        [] ->
 +            SmallestIndex = 0;
 +        _ ->
 +            SmallestIndex = lists:min(element(2, lists:unzip(Counters)))
 +        end,
 +        Size = length(Counters),
 +        N = num_mp_writers(),
 +        if Size == N andalso SmallestIndex == (Offset+1) ->
 +            NewChunks = tl(Chunks),
 +            NewOffset = Offset+1;
 +        true ->
 +            NewChunks = Chunks,
 +            NewOffset = Offset
 +        end,
 +
 +        % we should wait for a writer if no one has written the last chunk
 +        LargestIndex = lists:max([0|element(2, lists:unzip(Counters))]),
 +        if LargestIndex  >= (Offset + length(Chunks)) ->
 +            % someone has written all possible chunks, keep moving
 +            {Ref, NewChunks, NewOffset, Counters, NewWaiting};
 +        true ->
 +            ParentRef = get(mp_parent_ref),
 +            receive
 +            abort_parsing ->
 +                abort_parsing;
 +            {'DOWN', ParentRef, _, _, _} ->
 +                exit(mp_reader_coordinator_died);
 +            {get_bytes, Ref, X} ->
 +                C2 = orddict:update_counter(X, 1, Counters),
 +                maybe_send_data({Ref, NewChunks, NewOffset, C2, [X|NewWaiting]})
 +            end
 +        end
 +    end.
 +
 +
 +num_mp_writers() ->
 +    case erlang:get(mp_att_writers) of
 +        undefined -> 1;
 +        Count -> Count
 +    end.
 +
 +
 +abort_multi_part_stream(Parser) ->
 +    MonRef = erlang:monitor(process, Parser),
 +    Parser ! abort_parsing,
 +    receive
 +        {'DOWN', MonRef, _, _, _} -> ok
 +    after 60000 ->
 +        % One minute is quite on purpose for this timeout. We
 +        % want to try and read data to keep the socket open
 +        % when possible but we also don't want to just make
 +        % this a super long timeout because people have to
 +        % wait this long to see if they just had an error
 +        % like a validate_doc_update failure.
 +        throw(multi_part_abort_timeout)
 +    end.
 +
 +
 +restart_open_doc_revs(Parser, Ref, NewRef) ->
 +    unlink(Parser),
 +    exit(Parser, kill),
 +    flush_parser_messages(Ref),
 +    erlang:error({restart_open_doc_revs, NewRef}).
 +
 +
 +flush_parser_messages(Ref) ->
 +    receive
 +        {headers, Ref, _} ->
 +            flush_parser_messages(Ref);
 +        {body_bytes, Ref, _} ->
 +            flush_parser_messages(Ref);
 +        {body_done, Ref} ->
 +            flush_parser_messages(Ref);
 +        {done, Ref} ->
 +            flush_parser_messages(Ref)
 +    after 0 ->
 +        ok
 +    end.
 +
 +
 +with_ejson_body(#doc{body = Body} = Doc) when is_binary(Body) ->
 +    Doc#doc{body = couch_compress:decompress(Body)};
 +with_ejson_body(#doc{body = {_}} = Doc) ->
 +    Doc.


Mime
View raw message