couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rnew...@apache.org
Subject [6/6] couch commit: updated refs/heads/master to 9d629ff
Date Tue, 06 May 2014 12:41:01 GMT
Merge remote-tracking branch 'origin/import-master'

Conflicts:
	include/couch_db.hrl
	priv/couch_js/sm170.c
	priv/couch_js/sm180.c
	priv/couch_js/sm185.c
	src/couch.erl
	src/couch_db_updater.erl
	src/couch_httpd.erl
	src/couch_httpd_misc_handlers.erl
	src/couch_httpd_oauth.erl
	src/couch_httpd_rewrite.erl
	src/couch_os_process.erl
	src/couch_query_servers.erl
	src/couch_server.erl
	src/couch_util.erl


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch/commit/9d629ff6
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch/tree/9d629ff6
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch/diff/9d629ff6

Branch: refs/heads/master
Commit: 9d629ff64e8d365dfa0e9261ea5a33c380c4e9aa
Parents: d04839e bc467c3
Author: Robert Newson <rnewson@apache.org>
Authored: Tue May 6 13:28:41 2014 +0100
Committer: Robert Newson <rnewson@apache.org>
Committed: Tue May 6 13:39:31 2014 +0100

----------------------------------------------------------------------
 include/couch_db.hrl               |   3 +
 priv/couch_js/help.h               |   4 +-
 priv/couch_js/utf8.c               |  29 ++--
 priv/couch_js/util.c               |   2 +-
 src/couch_changes.erl              |  16 +-
 src/couch_config.erl               | 251 --------------------------------
 src/couch_config_writer.erl        |  88 -----------
 src/couch_db.erl                   |   9 +-
 src/couch_db_updater.erl           |  19 ++-
 src/couch_doc.erl                  |   3 +
 src/couch_httpd.erl                | 149 ++++++++-----------
 src/couch_httpd_auth.erl           |   8 +-
 src/couch_httpd_cors.erl           |  26 ++--
 src/couch_httpd_db.erl             |  32 +++-
 src/couch_httpd_external.erl       |  18 ++-
 src/couch_httpd_misc_handlers.erl  |  41 +++++-
 src/couch_httpd_oauth.erl          |   3 +-
 src/couch_httpd_rewrite.erl        |  75 +++++-----
 src/couch_httpd_stats_handlers.erl |   2 +-
 src/couch_js_functions.hrl         |  15 ++
 src/couch_native_process.erl       |  11 +-
 src/couch_os_daemons.erl           |  12 +-
 src/couch_os_process.erl           |  38 +----
 src/couch_query_servers.erl        |  15 +-
 src/couch_secondary_sup.erl        |   9 +-
 src/couch_server.erl               |   7 +-
 src/couch_users_db.erl             |  15 +-
 src/couch_util.erl                 |  32 ++++
 28 files changed, 339 insertions(+), 593 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/9d629ff6/include/couch_db.hrl
----------------------------------------------------------------------
diff --cc include/couch_db.hrl
index fdc353b,0000000..1fa16f2
mode 100644,000000..100644
--- a/include/couch_db.hrl
+++ b/include/couch_db.hrl
@@@ -1,256 -1,0 +1,259 @@@
 +% Licensed under the Apache License, Version 2.0 (the "License"); you may not
 +% use this file except in compliance with the License. You may obtain a copy of
 +% the License at
 +%
 +%   http://www.apache.org/licenses/LICENSE-2.0
 +%
 +% Unless required by applicable law or agreed to in writing, software
 +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 +% License for the specific language governing permissions and limitations under
 +% the License.
 +
 +-define(LOCAL_DOC_PREFIX, "_local/").
 +-define(DESIGN_DOC_PREFIX0, "_design").
 +-define(DESIGN_DOC_PREFIX, "_design/").
 +-define(DEFAULT_COMPRESSION, snappy).
 +
 +-define(MIN_STR, <<"">>).
 +-define(MAX_STR, <<255>>). % illegal utf string
 +
++-define(REWRITE_COUNT, couch_rewrite_count).
++
 +-define(JSON_ENCODE(V), couch_util:json_encode(V)).
 +-define(JSON_DECODE(V), couch_util:json_decode(V)).
 +
 +-define(b2l(V), binary_to_list(V)).
 +-define(l2b(V), list_to_binary(V)).
 +-define(i2b(V), couch_util:integer_to_boolean(V)).
 +-define(b2i(V), couch_util:boolean_to_integer(V)).
 +-define(term_to_bin(T), term_to_binary(T, [{minor_version, 1}])).
 +-define(term_size(T),
 +    try
 +        erlang:external_size(T)
 +    catch _:_ ->
 +        byte_size(?term_to_bin(T))
 +    end).
 +
 +-define(DEFAULT_ATTACHMENT_CONTENT_TYPE, <<"application/octet-stream">>).
 +
 +-define(LOG_DEBUG(Format, Args), couch_log:debug(Format, Args)).
 +-define(LOG_INFO(Format, Args), couch_log:info(Format, Args)).
 +-define(LOG_WARN(Format, Args), couch_log:warning(Format, Args)).
 +-define(LOG_ERROR(Format, Args), couch_log:error(Format, Args)).
 +
 +% Tree::term() is really a tree(), but we don't want to require R13B04 yet
 +-type branch() :: {Key::term(), Value::term(), Tree::term()}.
 +-type path() :: {Start::pos_integer(), branch()}.
 +-type tree() :: [branch()]. % sorted by key
 +
 +-record(rev_info,
 +    {
 +    rev,
 +    seq = 0,
 +    deleted = false,
 +    body_sp = nil % stream pointer
 +    }).
 +
 +-record(doc_info,
 +    {
 +    id = <<"">>,
 +    high_seq = 0,
 +    revs = [] % rev_info
 +    }).
 +
 +-record(full_doc_info,
 +    {id = <<"">>,
 +    update_seq = 0,
 +    deleted = false,
 +    rev_tree = [],
 +    leafs_size = 0
 +    }).
 +
 +-record(httpd,
 +    {mochi_req,
 +    peer,
 +    method,
 +    requested_path_parts,
 +    path_parts,
 +    db_url_handlers,
 +    user_ctx,
 +    req_body = undefined,
 +    design_url_handlers,
 +    auth,
 +    default_fun,
 +    url_handlers
 +    }).
 +
 +
 +-record(doc,
 +    {
 +    id = <<"">>,
 +    revs = {0, []},
 +
 +    % the json body object.
 +    body = {[]},
 +
 +    atts = [], % attachments
 +
 +    deleted = false,
 +
 +    % key/value tuple of meta information, provided when using special options:
 +    % couch_db:open_doc(Db, Id, Options).
 +    meta = []
 +    }).
 +
 +
 +-record(att,
 +    {
 +    name,
 +    type,
 +    att_len,
 +    disk_len, % length of the attachment in its identity form
 +              % (that is, without a content encoding applied to it)
 +              % differs from att_len when encoding /= identity
 +    md5= <<>>,
 +    revpos=0,
 +    data,
 +    encoding=identity % currently supported values are:
 +                      %     identity, gzip
 +                      % additional values to support in the future:
 +                      %     deflate, compress
 +    }).
 +
 +
 +-record(user_ctx,
 +    {
 +    name=null,
 +    roles=[],
 +    handler
 +    }).
 +
 +% This should be updated anytime a header change happens that requires more
 +% than filling in new defaults.
 +%
 +% As long the changes are limited to new header fields (with inline
 +% defaults) added to the end of the record, then there is no need to increment
 +% the disk revision number.
 +%
 +% if the disk revision is incremented, then new upgrade logic will need to be
 +% added to couch_db_updater:init_db.
 +
 +-define(LATEST_DISK_VERSION, 6).
 +
 +-record(db_header,
 +    {disk_version = ?LATEST_DISK_VERSION,
 +     update_seq = 0,
 +     unused = 0,
 +     id_tree_state = nil,
 +     seq_tree_state = nil,
 +     local_tree_state = nil,
 +     purge_seq = 0,
 +     purged_docs = nil,
 +     security_ptr = nil,
 +     revs_limit = 1000
 +    }).
 +
 +-record(db,
 +    {main_pid = nil,
 +    compactor_pid = nil,
 +    instance_start_time, % number of microsecs since jan 1 1970 as a binary string
 +    fd,
 +    fd_monitor,
 +    header = #db_header{},
 +    committed_update_seq,
 +    id_tree,
 +    seq_tree,
 +    local_tree,
 +    update_seq,
 +    name,
 +    filepath,
 +    validate_doc_funs = undefined,
 +    security = [],
 +    security_ptr = nil,
 +    user_ctx = #user_ctx{},
 +    waiting_delayed_commit = nil,
 +    revs_limit = 1000,
 +    fsync_options = [],
 +    options = [],
 +    compression,
 +    before_doc_update = nil, % nil | fun(Doc, Db) -> NewDoc
 +    after_doc_read = nil     % nil | fun(Doc, Db) -> NewDoc
 +    }).
 +
 +-record(view_fold_helper_funs, {
 +    reduce_count,
 +    passed_end,
 +    start_response,
 +    send_row
 +}).
 +
 +-record(reduce_fold_helper_funs, {
 +    start_response,
 +    send_row
 +}).
 +
 +-record(extern_resp_args, {
 +    code = 200,
 +    stop = false,
 +    data = <<>>,
 +    ctype = "application/json",
 +    headers = [],
 +    json = nil
 +}).
 +
 +-record(index_header,
 +    {seq=0,
 +    purge_seq=0,
 +    id_btree_state=nil,
 +    view_states=nil
 +    }).
 +
 +% small value used in revision trees to indicate the revision isn't stored
 +-define(REV_MISSING, []).
 +
 +-record(changes_args, {
 +    feed = "normal",
 +    dir = fwd,
 +    since = 0,
 +    limit = 1000000000000000,
 +    style = main_only,
 +    heartbeat,
 +    timeout,
 +    filter = "",
 +    filter_fun,
 +    filter_args = [],
 +    include_docs = false,
++    doc_options = [],
 +    conflicts = false,
 +    db_open_options = []
 +}).
 +
 +-record(btree, {
 +    fd,
 +    root,
 +    extract_kv,
 +    assemble_kv,
 +    less,
 +    reduce = nil,
 +    compression = ?DEFAULT_COMPRESSION
 +}).
 +
 +-record(proc, {
 +    pid,
 +    lang,
 +    client = nil,
 +    ddoc_keys = [],
 +    prompt_fun,
 +    prompt_many_fun,
 +    set_timeout_fun,
 +    stop_fun
 +}).
 +
 +-record(leaf,  {
 +    deleted,
 +    ptr,
 +    seq,
 +    size = nil
 +}).
 +

http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/9d629ff6/src/couch_changes.erl
----------------------------------------------------------------------
diff --cc src/couch_changes.erl
index 4346109,0000000..f81ee60
mode 100644,000000..100644
--- a/src/couch_changes.erl
+++ b/src/couch_changes.erl
@@@ -1,583 -1,0 +1,593 @@@
 +% Licensed under the Apache License, Version 2.0 (the "License"); you may not
 +% use this file except in compliance with the License. You may obtain a copy of
 +% the License at
 +%
 +%   http://www.apache.org/licenses/LICENSE-2.0
 +%
 +% Unless required by applicable law or agreed to in writing, software
 +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 +% License for the specific language governing permissions and limitations under
 +% the License.
 +
 +-module(couch_changes).
 +-include_lib("couch/include/couch_db.hrl").
 +
 +-export([
 +    handle_changes/3,
 +    get_changes_timeout/2,
 +    wait_db_updated/3,
 +    get_rest_db_updated/1,
 +    configure_filter/4,
 +    filter/3
 +]).
 +
 +-export([changes_enumerator/2]).
 +
 +% For the builtin filter _docs_ids, this is the maximum number
 +% of documents for which we trigger the optimized code path.
 +-define(MAX_DOC_IDS, 100).
 +
 +-record(changes_acc, {
 +    db,
 +    seq,
 +    prepend,
 +    filter,
 +    callback,
 +    user_acc,
 +    resp_type,
 +    limit,
 +    include_docs,
++    doc_options,
 +    conflicts,
 +    timeout,
 +    timeout_fun
 +}).
 +
 +%% @type Req -> #httpd{} | {json_req, JsonObj()}
 +handle_changes(Args1, Req, Db0) ->
 +    #changes_args{
 +        style = Style,
 +        filter = FilterName,
 +        feed = Feed,
 +        dir = Dir,
 +        since = Since
 +    } = Args1,
 +    Filter = configure_filter(FilterName, Style, Req, Db0),
 +    Args = Args1#changes_args{filter_fun = Filter},
 +    Start = fun() ->
 +        {ok, Db} = couch_db:reopen(Db0),
 +        StartSeq = case Dir of
 +        rev ->
 +            couch_db:get_update_seq(Db);
 +        fwd ->
 +            Since
 +        end,
 +        {Db, StartSeq}
 +    end,
 +    % begin timer to deal with heartbeat when filter function fails
 +    case Args#changes_args.heartbeat of
 +    undefined ->
 +        erlang:erase(last_changes_heartbeat);
 +    Val when is_integer(Val); Val =:= true ->
 +        put(last_changes_heartbeat, now())
 +    end,
 +
 +    case lists:member(Feed, ["continuous", "longpoll", "eventsource"]) of
 +    true ->
 +        fun(CallbackAcc) ->
 +            {Callback, UserAcc} = get_callback_acc(CallbackAcc),
 +            Self = self(),
 +            {ok, Notify} = couch_db_update_notifier:start_link(
 +                fun({_, DbName}) when  Db0#db.name == DbName ->
 +                    Self ! db_updated;
 +                (_) ->
 +                    ok
 +                end
 +            ),
 +            {Db, StartSeq} = Start(),
 +            UserAcc2 = start_sending_changes(Callback, UserAcc, Feed),
 +            {Timeout, TimeoutFun} = get_changes_timeout(Args, Callback),
 +            Acc0 = build_acc(Args, Callback, UserAcc2, Db, StartSeq,
 +                             <<"">>, Timeout, TimeoutFun),
 +            try
 +                keep_sending_changes(
 +                    Args#changes_args{dir=fwd},
 +                    Acc0,
 +                    true)
 +            after
 +                couch_db_update_notifier:stop(Notify),
 +                get_rest_db_updated(ok) % clean out any remaining update messages
 +            end
 +        end;
 +    false ->
 +        fun(CallbackAcc) ->
 +            {Callback, UserAcc} = get_callback_acc(CallbackAcc),
 +            UserAcc2 = start_sending_changes(Callback, UserAcc, Feed),
 +            {Timeout, TimeoutFun} = get_changes_timeout(Args, Callback),
 +            {Db, StartSeq} = Start(),
 +            Acc0 = build_acc(Args#changes_args{feed="normal"}, Callback,
 +                             UserAcc2, Db, StartSeq, <<>>, Timeout, TimeoutFun),
 +            {ok, #changes_acc{seq = LastSeq, user_acc = UserAcc3}} =
 +                send_changes(
 +                    Args#changes_args{feed="normal"},
 +                    Acc0,
 +                    true),
 +            end_sending_changes(Callback, UserAcc3, LastSeq, Feed)
 +        end
 +    end.
 +
 +get_callback_acc({Callback, _UserAcc} = Pair) when is_function(Callback, 3) ->
 +    Pair;
 +get_callback_acc(Callback) when is_function(Callback, 2) ->
 +    {fun(Ev, Data, _) -> Callback(Ev, Data) end, ok}.
 +
 +
 +configure_filter("_doc_ids", Style, Req, _Db) ->
 +    {doc_ids, Style, get_doc_ids(Req)};
 +configure_filter("_design", Style, _Req, _Db) ->
 +    {design_docs, Style};
 +configure_filter("_view", Style, Req, Db) ->
 +    ViewName = couch_httpd:qs_value(Req, "view", ""),
 +    if ViewName /= "" -> ok; true ->
 +        throw({bad_request, "`view` filter parameter is not provided."})
 +    end,
 +    ViewNameParts = string:tokens(ViewName, "/"),
 +    case [?l2b(couch_httpd:unquote(Part)) || Part <- ViewNameParts] of
 +        [DName, VName] ->
 +            {ok, DDoc} = open_ddoc(Db, <<"_design/", DName/binary>>),
 +            check_member_exists(DDoc, [<<"views">>, VName]),
 +            {view, Style, DDoc, VName};
 +        [] ->
 +            Msg = "`view` must be of the form `designname/viewname`",
 +            throw({bad_request, Msg})
 +    end;
 +configure_filter([$_ | _], _Style, _Req, _Db) ->
 +    throw({bad_request, "unknown builtin filter name"});
 +configure_filter("", main_only, _Req, _Db) ->
 +    {default, main_only};
 +configure_filter("", all_docs, _Req, _Db) ->
 +    {default, all_docs};
 +configure_filter(FilterName, Style, Req, Db) ->
 +    FilterNameParts = string:tokens(FilterName, "/"),
 +    case [?l2b(couch_httpd:unquote(Part)) || Part <- FilterNameParts] of
 +        [DName, FName] ->
 +            {ok, DDoc} = open_ddoc(Db, <<"_design/", DName/binary>>),
 +            check_member_exists(DDoc, [<<"filters">>, FName]),
 +            {custom, Style, Req, DDoc, FName};
 +        [] ->
 +            {default, Style};
 +        _Else ->
 +            Msg = "`filter` must be of the form `designname/filtername`",
 +            throw({bad_request, Msg})
 +    end.
 +
 +
 +filter(Db, #full_doc_info{}=FDI, Filter) ->
 +    filter(Db, couch_doc:to_doc_info(FDI), Filter);
 +filter(_Db, DocInfo, {default, Style}) ->
 +    apply_style(DocInfo, Style);
 +filter(_Db, DocInfo, {doc_ids, Style, DocIds}) ->
 +    case lists:member(DocInfo#doc_info.id, DocIds) of
 +        true ->
 +            apply_style(DocInfo, Style);
 +        false ->
 +            []
 +    end;
 +filter(_Db, DocInfo, {design_docs, Style}) ->
 +    case DocInfo#doc_info.id of
 +        <<"_design", _/binary>> ->
 +            apply_style(DocInfo, Style);
 +        _ ->
 +            []
 +    end;
 +filter(Db, DocInfo, {view, Style, DDoc, VName}) ->
 +    Docs = open_revs(Db, DocInfo, Style),
 +    {ok, Passes} = couch_query_servers:filter_view(DDoc, VName, Docs),
 +    filter_revs(Passes, Docs);
 +filter(Db, DocInfo, {custom, Style, Req0, DDoc, FName}) ->
 +    Req = case Req0 of
 +        {json_req, _} -> Req0;
 +        #httpd{} -> {json_req, couch_httpd_external:json_req_obj(Req0, Db)}
 +    end,
 +    Docs = open_revs(Db, DocInfo, Style),
 +    {ok, Passes} = couch_query_servers:filter_docs(Req, Db, DDoc, FName, Docs),
 +    filter_revs(Passes, Docs).
 +
 +
 +get_doc_ids({json_req, {Props}}) ->
 +    check_docids(couch_util:get_value(<<"doc_ids">>, Props));
 +get_doc_ids(#httpd{method='POST'}=Req) ->
 +    {Props} = couch_httpd:json_body_obj(Req),
 +    check_docids(couch_util:get_value(<<"doc_ids">>, Props));
 +get_doc_ids(#httpd{method='GET'}=Req) ->
 +    DocIds = ?JSON_DECODE(couch_httpd:qs_value(Req, "doc_ids", "null")),
 +    check_docids(DocIds);
 +get_doc_ids(_) ->
 +    throw({bad_request, no_doc_ids_provided}).
 +
 +
 +check_docids(DocIds) when is_list(DocIds) ->
 +    lists:foreach(fun
 +        (DocId) when not is_binary(DocId) ->
 +            Msg = "`doc_ids` filter parameter is not a list of binaries.",
 +            throw({bad_request, Msg});
 +        (_) -> ok
 +    end, DocIds),
 +    DocIds;
 +check_docids(_) ->
 +    Msg = "`doc_ids` filter parameter is not a list of binaries.",
 +    throw({bad_request, Msg}).
 +
 +
 +open_ddoc(#db{name= <<"shards/", _/binary>> =ShardName}, DDocId) ->
 +    {_, Ref} = spawn_monitor(fun() ->
 +        exit(fabric:open_doc(mem3:dbname(ShardName), DDocId, []))
 +    end),
 +    receive
 +        {'DOWN', Ref, _, _, {ok, _}=Response} ->
 +            Response;
 +        {'DOWN', Ref, _, _, Response} ->
 +            throw(Response)
 +    end;
 +open_ddoc(Db, DDocId) ->
 +    case couch_db:open_doc(Db, DDocId, [ejson_body]) of
 +        {ok, _} = Resp -> Resp;
 +        Else -> throw(Else)
 +    end.
 +
 +
 +check_member_exists(#doc{body={Props}}, Path) ->
 +    couch_util:get_nested_json_value({Props}, Path).
 +
 +
 +apply_style(#doc_info{revs=Revs}, main_only) ->
 +    [#rev_info{rev=Rev} | _] = Revs,
 +    [{[{<<"rev">>, couch_doc:rev_to_str(Rev)}]}];
 +apply_style(#doc_info{revs=Revs}, all_docs) ->
 +    [{[{<<"rev">>, couch_doc:rev_to_str(R)}]} || #rev_info{rev=R} <- Revs].
 +
 +
 +open_revs(Db, DocInfo, Style) ->
 +    DocInfos = case Style of
 +        main_only -> [DocInfo];
 +        all_docs -> [DocInfo#doc_info{revs=[R]}|| R <- DocInfo#doc_info.revs]
 +    end,
 +    OpenOpts = [deleted, conflicts],
 +    % Relying on list comprehensions to silence errors
 +    OpenResults = [couch_db:open_doc(Db, DI, OpenOpts) || DI <- DocInfos],
 +    [Doc || {ok, Doc} <- OpenResults].
 +
 +
 +filter_revs(Passes, Docs) ->
 +    lists:flatmap(fun
 +        ({true, #doc{revs={RevPos, [RevId | _]}}}) ->
 +            RevStr = couch_doc:rev_to_str({RevPos, RevId}),
 +            Change = {[{<<"rev">>, RevStr}]},
 +            [Change];
 +        (_) ->
 +            []
 +    end, lists:zip(Passes, Docs)).
 +
 +
 +get_changes_timeout(Args, Callback) ->
 +    #changes_args{
 +        heartbeat = Heartbeat,
 +        timeout = Timeout,
 +        feed = ResponseType
 +    } = Args,
 +    DefaultTimeout = list_to_integer(
 +        config:get("httpd", "changes_timeout", "60000")
 +    ),
 +    case Heartbeat of
 +    undefined ->
 +        case Timeout of
 +        undefined ->
 +            {DefaultTimeout, fun(UserAcc) -> {stop, UserAcc} end};
 +        infinity ->
 +            {infinity, fun(UserAcc) -> {stop, UserAcc} end};
 +        _ ->
 +            {lists:min([DefaultTimeout, Timeout]),
 +                fun(UserAcc) -> {stop, UserAcc} end}
 +        end;
 +    true ->
 +        {DefaultTimeout,
 +            fun(UserAcc) -> {ok, Callback(timeout, ResponseType, UserAcc)} end};
 +    _ ->
 +        {lists:min([DefaultTimeout, Heartbeat]),
 +            fun(UserAcc) -> {ok, Callback(timeout, ResponseType, UserAcc)} end}
 +    end.
 +
 +start_sending_changes(_Callback, UserAcc, ResponseType)
 +        when ResponseType =:= "continuous"
 +        orelse ResponseType =:= "eventsource" ->
 +    UserAcc;
 +start_sending_changes(Callback, UserAcc, ResponseType) ->
 +    Callback(start, ResponseType, UserAcc).
 +
 +build_acc(Args, Callback, UserAcc, Db, StartSeq, Prepend, Timeout, TimeoutFun) ->
 +    #changes_args{
 +        include_docs = IncludeDocs,
++        doc_options = DocOpts,
 +        conflicts = Conflicts,
 +        limit = Limit,
 +        feed = ResponseType,
 +        filter_fun = Filter
 +    } = Args,
 +    #changes_acc{
 +        db = Db,
 +        seq = StartSeq,
 +        prepend = Prepend,
 +        filter = Filter,
 +        callback = Callback,
 +        user_acc = UserAcc,
 +        resp_type = ResponseType,
 +        limit = Limit,
 +        include_docs = IncludeDocs,
++        doc_options = DocOpts,
 +        conflicts = Conflicts,
 +        timeout = Timeout,
 +        timeout_fun = TimeoutFun
 +    }.
 +
 +send_changes(Args, Acc0, FirstRound) ->
 +    #changes_args{
 +        dir = Dir
 +    } = Args,
 +    #changes_acc{
 +        db = Db,
 +        seq = StartSeq,
 +        filter = Filter
 +    } = Acc0,
 +    EnumFun = fun ?MODULE:changes_enumerator/2,
 +    case can_optimize(FirstRound, Filter) of
 +        {true, Fun} ->
 +            Fun(Db, StartSeq, Dir, EnumFun, Acc0, Filter);
 +        _ ->
 +            couch_db:changes_since(Db, StartSeq, EnumFun, [{dir, Dir}], Acc0)
 +    end.
 +
 +
 +can_optimize(true, {doc_ids, _Style, DocIds})
 +        when length(DocIds) =< ?MAX_DOC_IDS ->
 +    {true, fun send_changes_doc_ids/6};
 +can_optimize(true, {design_docs, _Style}) ->
 +    {true, fun send_changes_design_docs/6};
 +can_optimize(_, _) ->
 +    false.
 +
 +
 +send_changes_doc_ids(Db, StartSeq, Dir, Fun, Acc0, {doc_ids, _Style, DocIds}) ->
 +    Lookups = couch_btree:lookup(Db#db.id_tree, DocIds),
 +    FullInfos = lists:foldl(fun
 +        ({ok, FDI}, Acc) -> [FDI | Acc];
 +        (not_found, Acc) -> Acc
 +    end, [], Lookups),
 +    send_lookup_changes(FullInfos, StartSeq, Dir, Db, Fun, Acc0).
 +
 +
 +send_changes_design_docs(Db, StartSeq, Dir, Fun, Acc0, {design_docs, _Style}) ->
 +    FoldFun = fun(FullDocInfo, _, Acc) ->
 +        {ok, [FullDocInfo | Acc]}
 +    end,
 +    KeyOpts = [{start_key, <<"_design/">>}, {end_key_gt, <<"_design0">>}],
 +    {ok, _, FullInfos} = couch_btree:fold(Db#db.id_tree, FoldFun, [], KeyOpts),
 +    send_lookup_changes(FullInfos, StartSeq, Dir, Db, Fun, Acc0).
 +
 +
 +send_lookup_changes(FullDocInfos, StartSeq, Dir, Db, Fun, Acc0) ->
 +    FoldFun = case Dir of
 +        fwd -> fun lists:foldl/3;
 +        rev -> fun lists:foldr/3
 +    end,
 +    GreaterFun = case Dir of
 +        fwd -> fun(A, B) -> A > B end;
 +        rev -> fun(A, B) -> A =< B end
 +    end,
 +    DocInfos = lists:foldl(fun(FDI, Acc) ->
 +        DI = couch_doc:to_doc_info(FDI),
 +        case GreaterFun(DI#doc_info.high_seq, StartSeq) of
 +            true -> [DI | Acc];
 +            false -> Acc
 +        end
 +    end, [], FullDocInfos),
 +    SortedDocInfos = lists:keysort(#doc_info.high_seq, DocInfos),
 +    FinalAcc = try
 +        FoldFun(fun(DocInfo, Acc) ->
 +            case Fun(DocInfo, Acc) of
 +                {ok, NewAcc} ->
 +                    NewAcc;
 +                {stop, NewAcc} ->
 +                    throw({stop, NewAcc})
 +            end
 +        end, Acc0, SortedDocInfos)
 +    catch
 +        {stop, Acc} -> Acc
 +    end,
 +    case Dir of
 +        fwd -> {ok, FinalAcc#changes_acc{seq = couch_db:get_update_seq(Db)}};
 +        rev -> {ok, FinalAcc}
 +    end.
 +
 +
 +keep_sending_changes(Args, Acc0, FirstRound) ->
 +    #changes_args{
 +        feed = ResponseType,
 +        limit = Limit,
 +        db_open_options = DbOptions
 +    } = Args,
 +
 +    {ok, ChangesAcc} = send_changes(
 +        Args#changes_args{dir=fwd},
 +        Acc0,
 +        FirstRound),
 +    #changes_acc{
 +        db = Db, callback = Callback, timeout = Timeout, timeout_fun = TimeoutFun,
 +        seq = EndSeq, prepend = Prepend2, user_acc = UserAcc2, limit = NewLimit
 +    } = ChangesAcc,
 +
 +    couch_db:close(Db),
 +    if Limit > NewLimit, ResponseType == "longpoll" ->
 +        end_sending_changes(Callback, UserAcc2, EndSeq, ResponseType);
 +    true ->
 +        case wait_db_updated(Timeout, TimeoutFun, UserAcc2) of
 +        {updated, UserAcc4} ->
 +            DbOptions1 = [{user_ctx, Db#db.user_ctx} | DbOptions],
 +            case couch_db:open(Db#db.name, DbOptions1) of
 +            {ok, Db2} ->
 +                keep_sending_changes(
 +                  Args#changes_args{limit=NewLimit},
 +                  ChangesAcc#changes_acc{
 +                    db = Db2,
 +                    user_acc = UserAcc4,
 +                    seq = EndSeq,
 +                    prepend = Prepend2,
 +                    timeout = Timeout,
 +                    timeout_fun = TimeoutFun},
 +                  false);
 +            _Else ->
 +                end_sending_changes(Callback, UserAcc2, EndSeq, ResponseType)
 +            end;
 +        {stop, UserAcc4} ->
 +            end_sending_changes(Callback, UserAcc4, EndSeq, ResponseType)
 +        end
 +    end.
 +
 +end_sending_changes(Callback, UserAcc, EndSeq, ResponseType) ->
 +    Callback({stop, EndSeq}, ResponseType, UserAcc).
 +
 +changes_enumerator(DocInfo, #changes_acc{resp_type = ResponseType} = Acc)
 +        when ResponseType =:= "continuous"
 +        orelse ResponseType =:= "eventsource" ->
 +    #changes_acc{
 +        filter = Filter, callback = Callback,
 +        user_acc = UserAcc, limit = Limit, db = Db,
 +        timeout = Timeout, timeout_fun = TimeoutFun
 +    } = Acc,
 +    #doc_info{high_seq = Seq} = DocInfo,
 +    Results0 = filter(Db, DocInfo, Filter),
 +    Results = [Result || Result <- Results0, Result /= null],
 +    %% TODO: I'm thinking this should be < 1 and not =< 1
 +    Go = if Limit =< 1 -> stop; true -> ok end,
 +    case Results of
 +    [] ->
 +        {Done, UserAcc2} = maybe_heartbeat(Timeout, TimeoutFun, UserAcc),
 +        case Done of
 +        stop ->
 +            {stop, Acc#changes_acc{seq = Seq, user_acc = UserAcc2}};
 +        ok ->
 +            {Go, Acc#changes_acc{seq = Seq, user_acc = UserAcc2}}
 +        end;
 +    _ ->
 +        ChangesRow = changes_row(Results, DocInfo, Acc),
 +        UserAcc2 = Callback({change, ChangesRow, <<>>}, ResponseType, UserAcc),
 +        reset_heartbeat(),
 +        {Go, Acc#changes_acc{seq = Seq, user_acc = UserAcc2, limit = Limit - 1}}
 +    end;
 +changes_enumerator(DocInfo, Acc) ->
 +    #changes_acc{
 +        filter = Filter, callback = Callback, prepend = Prepend,
 +        user_acc = UserAcc, limit = Limit, resp_type = ResponseType, db = Db,
 +        timeout = Timeout, timeout_fun = TimeoutFun
 +    } = Acc,
 +    #doc_info{high_seq = Seq} = DocInfo,
 +    Results0 = filter(Db, DocInfo, Filter),
 +    Results = [Result || Result <- Results0, Result /= null],
 +    Go = if (Limit =< 1) andalso Results =/= [] -> stop; true -> ok end,
 +    case Results of
 +    [] ->
 +        {Done, UserAcc2} = maybe_heartbeat(Timeout, TimeoutFun, UserAcc),
 +        case Done of
 +        stop ->
 +            {stop, Acc#changes_acc{seq = Seq, user_acc = UserAcc2}};
 +        ok ->
 +            {Go, Acc#changes_acc{seq = Seq, user_acc = UserAcc2}}
 +        end;
 +    _ ->
 +        ChangesRow = changes_row(Results, DocInfo, Acc),
 +        UserAcc2 = Callback({change, ChangesRow, Prepend}, ResponseType, UserAcc),
 +        reset_heartbeat(),
 +        {Go, Acc#changes_acc{
 +            seq = Seq, prepend = <<",\n">>,
 +            user_acc = UserAcc2, limit = Limit - 1}}
 +    end.
 +
 +
 +changes_row(Results, DocInfo, Acc) ->
 +    #doc_info{
 +        id = Id, high_seq = Seq, revs = [#rev_info{deleted = Del} | _]
 +    } = DocInfo,
-     #changes_acc{db = Db, include_docs = IncDoc, conflicts = Conflicts} = Acc,
++    #changes_acc{
++        db = Db,
++        include_docs = IncDoc,
++        doc_options = DocOpts,
++        conflicts = Conflicts
++    } = Acc,
 +    {[{<<"seq">>, Seq}, {<<"id">>, Id}, {<<"changes">>, Results}] ++
 +        deleted_item(Del) ++ case IncDoc of
 +            true ->
 +                Opts = case Conflicts of
 +                    true -> [deleted, conflicts];
 +                    false -> [deleted]
 +                end,
 +                Doc = couch_index_util:load_doc(Db, DocInfo, Opts),
 +                case Doc of
-                     null -> [{doc, null}];
-                     _ ->  [{doc, couch_doc:to_json_obj(Doc, [])}]
++                    null ->
++                        [{doc, null}];
++                    _ ->
++                        [{doc, couch_doc:to_json_obj(Doc, DocOpts)}]
 +                end;
 +            false ->
 +                []
 +        end}.
 +
 +deleted_item(true) -> [{<<"deleted">>, true}];
 +deleted_item(_) -> [].
 +
 +% waits for a db_updated msg, if there are multiple msgs, collects them.
 +wait_db_updated(Timeout, TimeoutFun, UserAcc) ->
 +    receive
 +    db_updated ->
 +        get_rest_db_updated(UserAcc)
 +    after Timeout ->
 +        {Go, UserAcc2} = TimeoutFun(UserAcc),
 +        case Go of
 +        ok ->
 +            wait_db_updated(Timeout, TimeoutFun, UserAcc2);
 +        stop ->
 +            {stop, UserAcc2}
 +        end
 +    end.
 +
 +get_rest_db_updated(UserAcc) ->
 +    receive
 +    db_updated ->
 +        get_rest_db_updated(UserAcc)
 +    after 0 ->
 +        {updated, UserAcc}
 +    end.
 +
 +reset_heartbeat() ->
 +    case get(last_changes_heartbeat) of
 +    undefined ->
 +        ok;
 +    _ ->
 +        put(last_changes_heartbeat, now())
 +    end.
 +
 +maybe_heartbeat(Timeout, TimeoutFun, Acc) ->
 +    Before = get(last_changes_heartbeat),
 +    case Before of
 +    undefined ->
 +        {ok, Acc};
 +    _ ->
 +        Now = now(),
 +        case timer:now_diff(Now, Before) div 1000 >= Timeout of
 +        true ->
 +            Acc2 = TimeoutFun(Acc),
 +            put(last_changes_heartbeat, Now),
 +            Acc2;
 +        false ->
 +            {ok, Acc}
 +        end
 +    end.

http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/9d629ff6/src/couch_db.erl
----------------------------------------------------------------------
diff --cc src/couch_db.erl
index 32a0049,0000000..018878f
mode 100644,000000..100644
--- a/src/couch_db.erl
+++ b/src/couch_db.erl
@@@ -1,1412 -1,0 +1,1417 @@@
 +% Licensed under the Apache License, Version 2.0 (the "License"); you may not
 +% use this file except in compliance with the License. You may obtain a copy of
 +% the License at
 +%
 +%   http://www.apache.org/licenses/LICENSE-2.0
 +%
 +% Unless required by applicable law or agreed to in writing, software
 +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 +% License for the specific language governing permissions and limitations under
 +% the License.
 +
 +-module(couch_db).
 +
 +-export([open/2,open_int/2,close/1,create/2,get_db_info/1,get_design_docs/1]).
 +-export([start_compact/1, cancel_compact/1]).
 +-export([wait_for_compaction/1, wait_for_compaction/2]).
 +-export([is_idle/1,monitor/1,count_changes_since/2]).
 +-export([update_doc/3,update_doc/4,update_docs/4,update_docs/2,update_docs/3,delete_doc/3]).
 +-export([get_doc_info/2,get_full_doc_info/2,get_full_doc_infos/2]).
 +-export([open_doc/2,open_doc/3,open_doc_revs/4]).
 +-export([set_revs_limit/2,get_revs_limit/1]).
 +-export([get_missing_revs/2,name/1,get_update_seq/1,get_committed_update_seq/1]).
 +-export([enum_docs/4,enum_docs_since/5]).
 +-export([enum_docs_since_reduce_to_count/1,enum_docs_reduce_to_count/1]).
 +-export([increment_update_seq/1,get_purge_seq/1,purge_docs/2,get_last_purged/1]).
 +-export([start_link/3,open_doc_int/3,ensure_full_commit/1,ensure_full_commit/2]).
 +-export([set_security/2,get_security/1]).
 +-export([changes_since/4,changes_since/5,read_doc/2,new_revid/1]).
 +-export([check_is_admin/1, check_is_member/1, get_doc_count/1]).
 +-export([reopen/1, is_system_db/1, compression/1, make_doc/5]).
 +-export([load_validation_funs/1]).
 +
 +-include_lib("couch/include/couch_db.hrl").
 +
 +-define(VALID_DB_NAME, "^[a-z][a-z0-9\\_\\$()\\+\\-\\/]*$").
 +
 +start_link(DbName, Filepath, Options) ->
 +    case open_db_file(Filepath, Options) of
 +    {ok, Fd} ->
 +        {ok, UpdaterPid} = gen_server:start_link(couch_db_updater, {DbName,
 +            Filepath, Fd, Options}, []),
 +        unlink(Fd),
 +        gen_server:call(UpdaterPid, get_db);
 +    Else ->
 +        Else
 +    end.
 +
 +open_db_file(Filepath, Options) ->
 +    case couch_file:open(Filepath, Options) of
 +    {ok, Fd} ->
 +        {ok, Fd};
 +    {error, enoent} ->
 +        % couldn't find file. is there a compact version? This can happen if
 +        % crashed during the file switch.
 +        case couch_file:open(Filepath ++ ".compact", [nologifmissing]) of
 +        {ok, Fd} ->
 +            ?LOG_INFO("Found ~s~s compaction file, using as primary storage.", [Filepath, ".compact"]),
 +            ok = file:rename(Filepath ++ ".compact", Filepath),
 +            ok = couch_file:sync(Fd),
 +            {ok, Fd};
 +        {error, enoent} ->
 +            {not_found, no_db_file}
 +        end;
 +    Error ->
 +        Error
 +    end.
 +
 +
 +create(DbName, Options) ->
 +    couch_server:create(DbName, Options).
 +
 +% this is for opening a database for internal purposes like the replicator
 +% or the view indexer. it never throws a reader error.
 +open_int(DbName, Options) ->
 +    couch_server:open(DbName, Options).
 +
 +% this should be called anytime an http request opens the database.
 +% it ensures that the http userCtx is a valid reader
 +open(DbName, Options) ->
 +    case couch_server:open(DbName, Options) of
 +        {ok, Db} ->
 +            try
 +                check_is_member(Db),
 +                {ok, Db}
 +            catch
 +                throw:Error ->
 +                    close(Db),
 +                    throw(Error)
 +            end;
 +        Else -> Else
 +    end.
 +
 +reopen(#db{main_pid = Pid, fd = Fd, fd_monitor = OldRef, user_ctx = UserCtx}) ->
 +    {ok, #db{fd = NewFd} = NewDb} = gen_server:call(Pid, get_db, infinity),
 +    case NewFd =:= Fd of
 +    true ->
 +        {ok, NewDb#db{user_ctx = UserCtx}};
 +    false ->
 +        erlang:demonitor(OldRef, [flush]),
 +        NewRef = erlang:monitor(process, NewFd),
 +        {ok, NewDb#db{user_ctx = UserCtx, fd_monitor = NewRef}}
 +    end.
 +
 +is_system_db(#db{options = Options}) ->
 +    lists:member(sys_db, Options).
 +
 +ensure_full_commit(#db{main_pid=Pid, instance_start_time=StartTime}) ->
 +    ok = gen_server:call(Pid, full_commit, infinity),
 +    {ok, StartTime}.
 +
 +ensure_full_commit(Db, RequiredSeq) ->
 +    #db{main_pid=Pid, instance_start_time=StartTime} = Db,
 +    ok = gen_server:call(Pid, {full_commit, RequiredSeq}, infinity),
 +    {ok, StartTime}.
 +
 +close(#db{fd_monitor=RefCntr}) ->
 +    erlang:demonitor(RefCntr, [flush]),
 +    ok.
 +
 +is_idle(#db{compactor_pid=nil, waiting_delayed_commit=nil} = Db) ->
 +    case erlang:process_info(Db#db.fd, monitored_by) of
 +    undefined ->
 +        true;
 +    {monitored_by, Pids} ->
 +        (Pids -- [Db#db.main_pid, whereis(couch_stats_collector)]) =:= []
 +    end;
 +is_idle(_Db) ->
 +    false.
 +
 +monitor(#db{main_pid=MainPid}) ->
 +    erlang:monitor(process, MainPid).
 +
 +start_compact(#db{main_pid=Pid}) ->
 +    gen_server:call(Pid, start_compact).
 +
 +cancel_compact(#db{main_pid=Pid}) ->
 +    gen_server:call(Pid, cancel_compact).
 +
 +wait_for_compaction(Db) ->
 +    wait_for_compaction(Db, infinity).
 +
 +wait_for_compaction(#db{main_pid=Pid}=Db, Timeout) ->
 +    Start = erlang:now(),
 +    case gen_server:call(Pid, compactor_pid) of
 +        CPid when is_pid(CPid) ->
 +            Ref = erlang:monitor(process, CPid),
 +            receive
 +                {'DOWN', Ref, _, _, normal} when Timeout == infinity ->
 +                    wait_for_compaction(Db, Timeout);
 +                {'DOWN', Ref, _, _, normal} ->
 +                    Elapsed = timer:now_diff(now(), Start) div 1000,
 +                    wait_for_compaction(Db, Timeout - Elapsed);
 +                {'DOWN', Ref, _, _, Reason} ->
 +                    {error, Reason}
 +            after Timeout ->
 +                erlang:demonitor(Ref, [flush]),
 +                {error, Timeout}
 +            end;
 +        _ ->
 +            ok
 +    end.
 +
 +delete_doc(Db, Id, Revisions) ->
 +    DeletedDocs = [#doc{id=Id, revs=[Rev], deleted=true} || Rev <- Revisions],
 +    {ok, [Result]} = update_docs(Db, DeletedDocs, []),
 +    {ok, Result}.
 +
 +open_doc(Db, IdOrDocInfo) ->
 +    open_doc(Db, IdOrDocInfo, []).
 +
 +open_doc(Db, Id, Options) ->
 +    increment_stat(Db, {couchdb, database_reads}),
 +    case open_doc_int(Db, Id, Options) of
 +    {ok, #doc{deleted=true}=Doc} ->
 +        case lists:member(deleted, Options) of
 +        true ->
 +            apply_open_options({ok, Doc},Options);
 +        false ->
 +            {not_found, deleted}
 +        end;
 +    Else ->
 +        apply_open_options(Else,Options)
 +    end.
 +
 +apply_open_options({ok, Doc},Options) ->
 +    apply_open_options2(Doc,Options);
 +apply_open_options(Else,_Options) ->
 +    Else.
 +
 +apply_open_options2(Doc,[]) ->
 +    {ok, Doc};
 +apply_open_options2(#doc{atts=Atts,revs=Revs}=Doc,
 +        [{atts_since, PossibleAncestors}|Rest]) ->
 +    RevPos = find_ancestor_rev_pos(Revs, PossibleAncestors),
 +    apply_open_options2(Doc#doc{atts=[A#att{data=
 +        if AttPos>RevPos -> Data; true -> stub end}
 +        || #att{revpos=AttPos,data=Data}=A <- Atts]}, Rest);
 +apply_open_options2(Doc, [ejson_body | Rest]) ->
 +    apply_open_options2(couch_doc:with_ejson_body(Doc), Rest);
 +apply_open_options2(Doc,[_|Rest]) ->
 +    apply_open_options2(Doc,Rest).
 +
 +
 +find_ancestor_rev_pos({_, []}, _AttsSinceRevs) ->
 +    0;
 +find_ancestor_rev_pos(_DocRevs, []) ->
 +    0;
 +find_ancestor_rev_pos({RevPos, [RevId|Rest]}, AttsSinceRevs) ->
 +    case lists:member({RevPos, RevId}, AttsSinceRevs) of
 +    true ->
 +        RevPos;
 +    false ->
 +        find_ancestor_rev_pos({RevPos - 1, Rest}, AttsSinceRevs)
 +    end.
 +
 +open_doc_revs(Db, Id, Revs, Options) ->
 +    increment_stat(Db, {couchdb, database_reads}),
 +    [{ok, Results}] = open_doc_revs_int(Db, [{Id, Revs}], Options),
 +    {ok, [apply_open_options(Result, Options) || Result <- Results]}.
 +
 +% Each returned result is a list of tuples:
 +% {Id, MissingRevs, PossibleAncestors}
 +% if no revs are missing, it's omitted from the results.
 +get_missing_revs(Db, IdRevsList) ->
 +    Results = get_full_doc_infos(Db, [Id1 || {Id1, _Revs} <- IdRevsList]),
 +    {ok, find_missing(IdRevsList, Results)}.
 +
 +find_missing([], []) ->
 +    [];
 +find_missing([{Id, Revs}|RestIdRevs], [{ok, FullInfo} | RestLookupInfo]) ->
 +    case couch_key_tree:find_missing(FullInfo#full_doc_info.rev_tree, Revs) of
 +    [] ->
 +        find_missing(RestIdRevs, RestLookupInfo);
 +    MissingRevs ->
 +        #doc_info{revs=RevsInfo} = couch_doc:to_doc_info(FullInfo),
 +        LeafRevs = [Rev || #rev_info{rev=Rev} <- RevsInfo],
 +        % Find the revs that are possible parents of this rev
 +        PossibleAncestors =
 +        lists:foldl(fun({LeafPos, LeafRevId}, Acc) ->
 +            % this leaf is a "possible ancenstor" of the missing
 +            % revs if this LeafPos lessthan any of the missing revs
 +            case lists:any(fun({MissingPos, _}) ->
 +                    LeafPos < MissingPos end, MissingRevs) of
 +            true ->
 +                [{LeafPos, LeafRevId} | Acc];
 +            false ->
 +                Acc
 +            end
 +        end, [], LeafRevs),
 +        [{Id, MissingRevs, PossibleAncestors} |
 +                find_missing(RestIdRevs, RestLookupInfo)]
 +    end;
 +find_missing([{Id, Revs}|RestIdRevs], [not_found | RestLookupInfo]) ->
 +    [{Id, Revs, []} | find_missing(RestIdRevs, RestLookupInfo)].
 +
 +get_doc_info(Db, Id) ->
 +    case get_full_doc_info(Db, Id) of
 +    {ok, DocInfo} ->
 +        {ok, couch_doc:to_doc_info(DocInfo)};
 +    Else ->
 +        Else
 +    end.
 +
 +%   returns {ok, DocInfo} or not_found
 +get_full_doc_info(Db, Id) ->
 +    [Result] = get_full_doc_infos(Db, [Id]),
 +    Result.
 +
 +get_full_doc_infos(Db, Ids) ->
 +    couch_btree:lookup(Db#db.id_tree, Ids).
 +
 +increment_update_seq(#db{main_pid=Pid}) ->
 +    gen_server:call(Pid, increment_update_seq).
 +
 +purge_docs(#db{main_pid=Pid}, IdsRevs) ->
 +    gen_server:call(Pid, {purge_docs, IdsRevs}).
 +
 +get_committed_update_seq(#db{committed_update_seq=Seq}) ->
 +    Seq.
 +
 +get_update_seq(#db{update_seq=Seq})->
 +    Seq.
 +
 +get_purge_seq(#db{header=#db_header{purge_seq=PurgeSeq}})->
 +    PurgeSeq.
 +
 +get_last_purged(#db{header=#db_header{purged_docs=nil}}) ->
 +    {ok, []};
 +get_last_purged(#db{fd=Fd, header=#db_header{purged_docs=PurgedPointer}}) ->
 +    couch_file:pread_term(Fd, PurgedPointer).
 +
 +get_doc_count(Db) ->
 +    {ok, {Count, _, _}} = couch_btree:full_reduce(Db#db.id_tree),
 +    {ok, Count}.
 +
 +get_db_info(Db) ->
 +    #db{fd=Fd,
 +        header=#db_header{disk_version=DiskVersion},
 +        compactor_pid=Compactor,
 +        update_seq=SeqNum,
 +        name=Name,
 +        instance_start_time=StartTime,
 +        committed_update_seq=CommittedUpdateSeq,
 +        id_tree = IdBtree,
 +        seq_tree = SeqBtree,
 +        local_tree = LocalBtree
 +    } = Db,
 +    {ok, Size} = couch_file:bytes(Fd),
 +    {ok, DbReduction} = couch_btree:full_reduce(IdBtree),
 +    InfoList = [
 +        {db_name, Name},
 +        {doc_count, element(1, DbReduction)},
 +        {doc_del_count, element(2, DbReduction)},
 +        {update_seq, SeqNum},
 +        {purge_seq, couch_db:get_purge_seq(Db)},
 +        {compact_running, Compactor/=nil},
 +        {disk_size, Size},
 +        {data_size, db_data_size(DbReduction, [SeqBtree, IdBtree, LocalBtree])},
 +        {instance_start_time, StartTime},
 +        {disk_format_version, DiskVersion},
 +        {committed_update_seq, CommittedUpdateSeq}
 +        ],
 +    {ok, InfoList}.
 +
 +db_data_size({_Count, _DelCount}, _Trees) ->
 +    % pre 1.2 format, upgraded on compaction
 +    null;
 +db_data_size({_Count, _DelCount, nil}, _Trees) ->
 +    null;
 +db_data_size({_Count, _DelCount, DocAndAttsSize}, Trees) ->
 +    sum_tree_sizes(DocAndAttsSize, Trees).
 +
 +sum_tree_sizes(Acc, []) ->
 +    Acc;
 +sum_tree_sizes(Acc, [T | Rest]) ->
 +    case couch_btree:size(T) of
 +    nil ->
 +        null;
 +    Sz ->
 +        sum_tree_sizes(Acc + Sz, Rest)
 +    end.
 +
 +get_design_docs(#db{name = <<"shards/", _:18/binary, DbName/binary>>}) ->
 +    {_, Ref} = spawn_monitor(fun() -> exit(fabric:design_docs(DbName)) end),
 +    receive {'DOWN', Ref, _, _, Response} ->
 +        Response
 +    end;
 +get_design_docs(#db{id_tree = IdBtree}) ->
 +    FoldFun = skip_deleted(fun
 +        (#full_doc_info{deleted = true}, _Reds, Acc) ->
 +            {ok, Acc};
 +        (#full_doc_info{id= <<"_design/",_/binary>>}=FullDocInfo, _Reds, Acc) ->
 +            {ok, [FullDocInfo | Acc]};
 +        (_, _Reds, Acc) ->
 +            {stop, Acc}
 +    end),
 +    KeyOpts = [{start_key, <<"_design/">>}, {end_key_gt, <<"_design0">>}],
 +    {ok, _, Docs} = couch_btree:fold(IdBtree, FoldFun, [], KeyOpts),
 +    {ok, Docs}.
 +
 +check_is_admin(#db{user_ctx=#user_ctx{name=Name,roles=Roles}}=Db) ->
 +    {Admins} = get_admins(Db),
 +    AdminRoles = [<<"_admin">> | couch_util:get_value(<<"roles">>, Admins, [])],
 +    AdminNames = couch_util:get_value(<<"names">>, Admins,[]),
 +    case AdminRoles -- Roles of
 +    AdminRoles -> % same list, not an admin role
 +        case AdminNames -- [Name] of
 +        AdminNames -> % same names, not an admin
 +            throw({unauthorized, <<"You are not a db or server admin.">>});
 +        _ ->
 +            ok
 +        end;
 +    _ ->
 +        ok
 +    end.
 +
 +check_is_member(#db{user_ctx=#user_ctx{name=Name,roles=Roles}=UserCtx}=Db) ->
 +    case (catch check_is_admin(Db)) of
 +    ok -> ok;
 +    _ ->
 +        {Members} = get_members(Db),
 +        ReaderRoles = couch_util:get_value(<<"roles">>, Members,[]),
 +        WithAdminRoles = [<<"_admin">> | ReaderRoles],
 +        ReaderNames = couch_util:get_value(<<"names">>, Members,[]),
 +        case ReaderRoles ++ ReaderNames of
 +        [] -> ok; % no readers == public access
 +        _Else ->
 +            case WithAdminRoles -- Roles of
 +            WithAdminRoles -> % same list, not an reader role
 +                case ReaderNames -- [Name] of
 +                ReaderNames -> % same names, not a reader
 +                    ?LOG_DEBUG("Not a reader: UserCtx ~p vs Names ~p Roles ~p",[UserCtx, ReaderNames, WithAdminRoles]),
 +                    throw({unauthorized, <<"You are not authorized to access this db.">>});
 +                _ ->
 +                    ok
 +                end;
 +            _ ->
 +                ok
 +            end
 +        end
 +    end.
 +
 +get_admins(#db{security=SecProps}) ->
 +    couch_util:get_value(<<"admins">>, SecProps, {[]}).
 +
 +get_members(#db{security=SecProps}) ->
 +    % we fallback to readers here for backwards compatibility
 +    couch_util:get_value(<<"members">>, SecProps,
 +        couch_util:get_value(<<"readers">>, SecProps, {[]})).
 +
 +get_security(#db{security=SecProps}) ->
 +    {SecProps}.
 +
 +set_security(#db{main_pid=Pid}=Db, {NewSecProps}) when is_list(NewSecProps) ->
 +    check_is_admin(Db),
 +    ok = validate_security_object(NewSecProps),
 +    ok = gen_server:call(Pid, {set_security, NewSecProps}, infinity),
 +    {ok, _} = ensure_full_commit(Db),
 +    ok;
 +set_security(_, _) ->
 +    throw(bad_request).
 +
 +validate_security_object(SecProps) ->
 +    Admins = couch_util:get_value(<<"admins">>, SecProps, {[]}),
 +    % we fallback to readers here for backwards compatibility
 +    Members = couch_util:get_value(<<"members">>, SecProps,
 +        couch_util:get_value(<<"readers">>, SecProps, {[]})),
 +    ok = validate_names_and_roles(Admins),
 +    ok = validate_names_and_roles(Members),
 +    ok.
 +
 +% validate user input
 +validate_names_and_roles({Props}) when is_list(Props) ->
 +    case couch_util:get_value(<<"names">>,Props,[]) of
 +    Ns when is_list(Ns) ->
 +            [throw("names must be a JSON list of strings") ||N <- Ns, not is_binary(N)],
 +            Ns;
 +    _ -> throw("names must be a JSON list of strings")
 +    end,
 +    case couch_util:get_value(<<"roles">>,Props,[]) of
 +    Rs when is_list(Rs) ->
 +        [throw("roles must be a JSON list of strings") ||R <- Rs, not is_binary(R)],
 +        Rs;
 +    _ -> throw("roles must be a JSON list of strings")
 +    end,
 +    ok.
 +
 +get_revs_limit(#db{revs_limit=Limit}) ->
 +    Limit.
 +
 +set_revs_limit(#db{main_pid=Pid}=Db, Limit) when Limit > 0 ->
 +    check_is_admin(Db),
 +    gen_server:call(Pid, {set_revs_limit, Limit}, infinity);
 +set_revs_limit(_Db, _Limit) ->
 +    throw(invalid_revs_limit).
 +
 +name(#db{name=Name}) ->
 +    Name.
 +
 +compression(#db{compression=Compression}) ->
 +    Compression.
 +
 +update_doc(Db, Doc, Options) ->
 +    update_doc(Db, Doc, Options, interactive_edit).
 +
 +update_doc(Db, Doc, Options, UpdateType) ->
 +    case update_docs(Db, [Doc], Options, UpdateType) of
 +    {ok, [{ok, NewRev}]} ->
 +        {ok, NewRev};
 +    {ok, [{{_Id, _Rev}, Error}]} ->
 +        throw(Error);
 +    {ok, [Error]} ->
 +        throw(Error);
 +    {ok, []} ->
 +        % replication success
 +        {Pos, [RevId | _]} = Doc#doc.revs,
 +        {ok, {Pos, RevId}}
 +    end.
 +
 +update_docs(Db, Docs) ->
 +    update_docs(Db, Docs, []).
 +
 +% group_alike_docs groups the sorted documents into sublist buckets, by id.
 +% ([DocA, DocA, DocB, DocC], []) -> [[DocA, DocA], [DocB], [DocC]]
 +group_alike_docs(Docs) ->
 +    Sorted = lists:sort(fun({#doc{id=A},_},{#doc{id=B},_})-> A < B end, Docs),
 +    group_alike_docs(Sorted, []).
 +
 +group_alike_docs([], Buckets) ->
 +    lists:reverse(lists:map(fun lists:reverse/1, Buckets));
 +group_alike_docs([Doc|Rest], []) ->
 +    group_alike_docs(Rest, [[Doc]]);
 +group_alike_docs([{Doc,Ref}|Rest], [Bucket|RestBuckets]) ->
 +    [{#doc{id=BucketId},_Ref}|_] = Bucket,
 +    case Doc#doc.id == BucketId of
 +    true ->
 +        % add to existing bucket
 +        group_alike_docs(Rest, [[{Doc,Ref}|Bucket]|RestBuckets]);
 +    false ->
 +        % add to new bucket
 +       group_alike_docs(Rest, [[{Doc,Ref}]|[Bucket|RestBuckets]])
 +    end.
 +
 +validate_doc_update(#db{}=Db, #doc{id= <<"_design/",_/binary>>}=Doc, _GetDiskDocFun) ->
 +    case catch check_is_admin(Db) of
 +        ok -> validate_ddoc(Db#db.name, Doc);
 +        Error -> Error
 +    end;
 +validate_doc_update(#db{validate_doc_funs = undefined} = Db, Doc, Fun) ->
 +    ValidationFuns = load_validation_funs(Db),
 +    validate_doc_update(Db#db{validate_doc_funs=ValidationFuns}, Doc, Fun);
 +validate_doc_update(#db{validate_doc_funs=[]}, _Doc, _GetDiskDocFun) ->
 +    ok;
 +validate_doc_update(_Db, #doc{id= <<"_local/",_/binary>>}, _GetDiskDocFun) ->
 +    ok;
 +validate_doc_update(Db, Doc, GetDiskDocFun) ->
 +    case get(io_priority) of
 +        {internal_repl, _} ->
 +            ok;
 +        _ ->
 +            validate_doc_update_int(Db, Doc, GetDiskDocFun)
 +    end.
 +
 +validate_ddoc(DbName, DDoc) ->
 +    try
 +        couch_index_server:validate(DbName, couch_doc:with_ejson_body(DDoc))
 +    catch
 +        throw:Error ->
 +            Error
 +    end.
 +
 +validate_doc_update_int(Db, Doc, GetDiskDocFun) ->
 +    DiskDoc = GetDiskDocFun(),
 +    JsonCtx = couch_util:json_user_ctx(Db),
 +    SecObj = get_security(Db),
 +    try [case Fun(Doc, DiskDoc, JsonCtx, SecObj) of
 +            ok -> ok;
 +            Error -> throw(Error)
 +        end || Fun <- Db#db.validate_doc_funs],
 +        ok
 +    catch
 +        throw:Error ->
 +            Error
 +    end.
 +
 +
 +% to be safe, spawn a middleman here
 +load_validation_funs(#db{main_pid=Pid, name = <<"shards/", _/binary>>}=Db) ->
 +    {_, Ref} = spawn_monitor(fun() ->
 +        exit(ddoc_cache:open(mem3:dbname(Db#db.name), validation_funs))
 +    end),
 +    receive
 +        {'DOWN', Ref, _, _, {ok, Funs}} ->
 +            gen_server:cast(Pid, {load_validation_funs, Funs}),
 +            Funs;
 +        {'DOWN', Ref, _, _, Reason} ->
 +            ?LOG_ERROR("could not load validation funs ~p", [Reason]),
 +            throw(internal_server_error)
 +    end;
 +load_validation_funs(#db{main_pid=Pid}=Db) ->
 +    {ok, DDocInfos} = get_design_docs(Db),
 +    OpenDocs = fun
 +        (#full_doc_info{}=D) ->
 +            {ok, Doc} = open_doc_int(Db, D, [ejson_body]),
 +            Doc
 +    end,
 +    DDocs = lists:map(OpenDocs, DDocInfos),
 +    Funs = lists:flatmap(fun(DDoc) ->
 +        case couch_doc:get_validate_doc_fun(DDoc) of
 +            nil -> [];
 +            Fun -> [Fun]
 +        end
 +    end, DDocs),
 +    gen_server:cast(Pid, {load_validation_funs, Funs}),
 +    Funs.
 +
 +prep_and_validate_update(Db, #doc{id=Id,revs={RevStart, Revs}}=Doc,
 +        OldFullDocInfo, LeafRevsDict, AllowConflict) ->
 +    case Revs of
 +    [PrevRev|_] ->
 +        case dict:find({RevStart, PrevRev}, LeafRevsDict) of
 +        {ok, {#leaf{deleted=Deleted, ptr=DiskSp}, DiskRevs}} ->
 +            case couch_doc:has_stubs(Doc) of
 +            true ->
 +                DiskDoc = make_doc(Db, Id, Deleted, DiskSp, DiskRevs),
 +                Doc2 = couch_doc:merge_stubs(Doc, DiskDoc),
 +                {validate_doc_update(Db, Doc2, fun() -> DiskDoc end), Doc2};
 +            false ->
 +                LoadDiskDoc = fun() -> make_doc(Db,Id,Deleted,DiskSp,DiskRevs) end,
 +                {validate_doc_update(Db, Doc, LoadDiskDoc), Doc}
 +            end;
 +        error when AllowConflict ->
 +            couch_doc:merge_stubs(Doc, #doc{}), % will generate error if
 +                                                        % there are stubs
 +            {validate_doc_update(Db, Doc, fun() -> nil end), Doc};
 +        error ->
 +            {conflict, Doc}
 +        end;
 +    [] ->
 +        % new doc, and we have existing revs.
 +        % reuse existing deleted doc
 +        if OldFullDocInfo#full_doc_info.deleted orelse AllowConflict ->
 +            {validate_doc_update(Db, Doc, fun() -> nil end), Doc};
 +        true ->
 +            {conflict, Doc}
 +        end
 +    end.
 +
 +
 +
 +prep_and_validate_updates(_Db, [], [], _AllowConflict, AccPrepped,
 +        AccFatalErrors) ->
 +   {AccPrepped, AccFatalErrors};
 +prep_and_validate_updates(Db, [DocBucket|RestBuckets], [not_found|RestLookups],
 +        AllowConflict, AccPrepped, AccErrors) ->
 +    {PreppedBucket, AccErrors3} = lists:foldl(
 +        fun({#doc{revs=Revs}=Doc,Ref}, {AccBucket, AccErrors2}) ->
 +            case couch_doc:has_stubs(Doc) of
 +            true ->
 +                couch_doc:merge_stubs(Doc, #doc{}); % will throw exception
 +            false -> ok
 +            end,
 +            case Revs of
 +            {0, []} ->
 +                case validate_doc_update(Db, Doc, fun() -> nil end) of
 +                ok ->
 +                    {[{Doc, Ref} | AccBucket], AccErrors2};
 +                Error ->
 +                    {AccBucket, [{Ref, Error} | AccErrors2]}
 +                end;
 +            _ ->
 +                % old revs specified but none exist, a conflict
 +                {AccBucket, [{Ref, conflict} | AccErrors2]}
 +            end
 +        end,
 +        {[], AccErrors}, DocBucket),
 +
 +    prep_and_validate_updates(Db, RestBuckets, RestLookups, AllowConflict,
 +            [lists:reverse(PreppedBucket) | AccPrepped], AccErrors3);
 +prep_and_validate_updates(Db, [DocBucket|RestBuckets],
 +        [{ok, #full_doc_info{rev_tree=OldRevTree}=OldFullDocInfo}|RestLookups],
 +        AllowConflict, AccPrepped, AccErrors) ->
 +    Leafs = couch_key_tree:get_all_leafs(OldRevTree),
 +    LeafRevsDict = dict:from_list([
 +        {{Start, RevId}, {Leaf, Revs}} ||
 +        {Leaf, {Start, [RevId | _]} = Revs} <- Leafs
 +    ]),
 +    {PreppedBucket, AccErrors3} = lists:foldl(
 +        fun({Doc, Ref}, {Docs2Acc, AccErrors2}) ->
 +            case prep_and_validate_update(Db, Doc, OldFullDocInfo,
 +                    LeafRevsDict, AllowConflict) of
 +            {ok, Doc2} ->
 +                {[{Doc2, Ref} | Docs2Acc], AccErrors2};
 +            {Error, #doc{}} ->
 +                % Record the error
 +                {Docs2Acc, [{Ref, Error} |AccErrors2]}
 +            end
 +        end,
 +        {[], AccErrors}, DocBucket),
 +    prep_and_validate_updates(Db, RestBuckets, RestLookups, AllowConflict,
 +            [PreppedBucket | AccPrepped], AccErrors3).
 +
 +
 +update_docs(Db, Docs, Options) ->
 +    update_docs(Db, Docs, Options, interactive_edit).
 +
 +
 +prep_and_validate_replicated_updates(_Db, [], [], AccPrepped, AccErrors) ->
 +    Errors2 = [{{Id, {Pos, Rev}}, Error} ||
 +            {#doc{id=Id,revs={Pos,[Rev|_]}}, Error} <- AccErrors],
 +    {lists:reverse(AccPrepped), lists:reverse(Errors2)};
 +prep_and_validate_replicated_updates(Db, [Bucket|RestBuckets], [OldInfo|RestOldInfo], AccPrepped, AccErrors) ->
 +    case OldInfo of
 +    not_found ->
 +        {ValidatedBucket, AccErrors3} = lists:foldl(
 +            fun({Doc, Ref}, {AccPrepped2, AccErrors2}) ->
 +                case couch_doc:has_stubs(Doc) of
 +                true ->
 +                    couch_doc:merge_stubs(Doc, #doc{}); % will throw exception
 +                false -> ok
 +                end,
 +                case validate_doc_update(Db, Doc, fun() -> nil end) of
 +                ok ->
 +                    {[{Doc, Ref} | AccPrepped2], AccErrors2};
 +                Error ->
 +                    {AccPrepped2, [{Doc, Error} | AccErrors2]}
 +                end
 +            end,
 +            {[], AccErrors}, Bucket),
 +        prep_and_validate_replicated_updates(Db, RestBuckets, RestOldInfo, [ValidatedBucket | AccPrepped], AccErrors3);
 +    {ok, #full_doc_info{rev_tree=OldTree}} ->
 +        OldLeafs = couch_key_tree:get_all_leafs_full(OldTree),
 +        OldLeafsLU = [{Start, RevId} || {Start, [{RevId, _}|_]} <- OldLeafs],
 +        NewRevTree = lists:foldl(
 +            fun({NewDoc, _Ref}, AccTree) ->
 +                {NewTree, _} = couch_key_tree:merge(AccTree,
 +                    couch_doc:to_path(NewDoc), Db#db.revs_limit),
 +                NewTree
 +            end,
 +            OldTree, Bucket),
 +        Leafs = couch_key_tree:get_all_leafs_full(NewRevTree),
 +        LeafRevsFullDict = dict:from_list( [{{Start, RevId}, FullPath} || {Start, [{RevId, _}|_]}=FullPath <- Leafs]),
 +        {ValidatedBucket, AccErrors3} =
 +        lists:foldl(
 +            fun({#doc{id=Id,revs={Pos, [RevId|_]}}=Doc, Ref}, {AccValidated, AccErrors2}) ->
 +                IsOldLeaf = lists:member({Pos, RevId}, OldLeafsLU),
 +                case dict:find({Pos, RevId}, LeafRevsFullDict) of
 +                {ok, {Start, Path}} when not IsOldLeaf ->
 +                    % our unflushed doc is a leaf node. Go back on the path
 +                    % to find the previous rev that's on disk.
 +
 +                    LoadPrevRevFun = fun() ->
 +                                make_first_doc_on_disk(Db,Id,Start-1, tl(Path))
 +                            end,
 +
 +                    case couch_doc:has_stubs(Doc) of
 +                    true ->
 +                        DiskDoc = case LoadPrevRevFun() of
 +                            #doc{} = DiskDoc0 ->
 +                                DiskDoc0;
 +                            _ ->
 +                                % Force a missing_stub exception
 +                                couch_doc:merge_stubs(Doc, #doc{})
 +                        end,
 +                        Doc2 = couch_doc:merge_stubs(Doc, DiskDoc),
 +                        GetDiskDocFun = fun() -> DiskDoc end;
 +                    false ->
 +                        Doc2 = Doc,
 +                        GetDiskDocFun = LoadPrevRevFun
 +                    end,
 +
 +                    case validate_doc_update(Db, Doc2, GetDiskDocFun) of
 +                    ok ->
 +                        {[{Doc2, Ref} | AccValidated], AccErrors2};
 +                    Error ->
 +                        {AccValidated, [{Doc, Error} | AccErrors2]}
 +                    end;
 +                _ ->
 +                    % this doc isn't a leaf or already exists in the tree.
 +                    % ignore but consider it a success.
 +                    {AccValidated, AccErrors2}
 +                end
 +            end,
 +            {[], AccErrors}, Bucket),
 +        prep_and_validate_replicated_updates(Db, RestBuckets, RestOldInfo,
 +                [ValidatedBucket | AccPrepped], AccErrors3)
 +    end.
 +
 +
 +
 +new_revid(#doc{body=Body,revs={OldStart,OldRevs},
 +        atts=Atts,deleted=Deleted}) ->
 +    case [{N, T, M} || #att{name=N,type=T,md5=M} <- Atts, M =/= <<>>] of
 +    Atts2 when length(Atts) =/= length(Atts2) ->
 +        % We must have old style non-md5 attachments
 +        ?l2b(integer_to_list(couch_util:rand32()));
 +    Atts2 ->
 +        OldRev = case OldRevs of [] -> 0; [OldRev0|_] -> OldRev0 end,
 +        couch_util:md5(term_to_binary([Deleted, OldStart, OldRev, Body, Atts2]))
 +    end.
 +
 +new_revs([], OutBuckets, IdRevsAcc) ->
 +    {lists:reverse(OutBuckets), IdRevsAcc};
 +new_revs([Bucket|RestBuckets], OutBuckets, IdRevsAcc) ->
 +    {NewBucket, IdRevsAcc3} = lists:mapfoldl(
 +        fun({#doc{revs={Start, RevIds}}=Doc, Ref}, IdRevsAcc2)->
 +        NewRevId = new_revid(Doc),
 +        {{Doc#doc{revs={Start+1, [NewRevId | RevIds]}}, Ref},
 +            [{Ref, {ok, {Start+1, NewRevId}}} | IdRevsAcc2]}
 +    end, IdRevsAcc, Bucket),
 +    new_revs(RestBuckets, [NewBucket|OutBuckets], IdRevsAcc3).
 +
 +check_dup_atts(#doc{atts=Atts}=Doc) ->
 +    Atts2 = lists:sort(fun(#att{name=N1}, #att{name=N2}) -> N1 < N2 end, Atts),
 +    check_dup_atts2(Atts2),
 +    Doc.
 +
 +check_dup_atts2([#att{name=N}, #att{name=N} | _]) ->
 +    throw({bad_request, <<"Duplicate attachments">>});
 +check_dup_atts2([_ | Rest]) ->
 +    check_dup_atts2(Rest);
 +check_dup_atts2(_) ->
 +    ok.
 +
 +
 +update_docs(Db, Docs, Options, replicated_changes) ->
 +    increment_stat(Db, {couchdb, database_writes}),
 +    % associate reference with each doc in order to track duplicates
 +    Docs2 = lists:map(fun(Doc) -> {Doc, make_ref()} end, Docs),
 +    DocBuckets = before_docs_update(Db, group_alike_docs(Docs2)),
 +    case (Db#db.validate_doc_funs /= []) orelse
 +        lists:any(
 +            fun({#doc{id= <<?DESIGN_DOC_PREFIX, _/binary>>}, _Ref}) -> true;
 +            ({#doc{atts=Atts}, _Ref}) ->
 +                Atts /= []
 +            end, Docs2) of
 +    true ->
 +        Ids = [Id || [{#doc{id=Id}, _Ref}|_] <- DocBuckets],
 +        ExistingDocs = get_full_doc_infos(Db, Ids),
 +
 +        {DocBuckets2, DocErrors} =
 +                prep_and_validate_replicated_updates(Db, DocBuckets, ExistingDocs, [], []),
 +        DocBuckets3 = [Bucket || [_|_]=Bucket <- DocBuckets2]; % remove empty buckets
 +    false ->
 +        DocErrors = [],
 +        DocBuckets3 = DocBuckets
 +    end,
 +    DocBuckets4 = [[{doc_flush_atts(check_dup_atts(Doc), Db#db.fd), Ref}
 +            || {Doc, Ref} <- Bucket] || Bucket <- DocBuckets3],
 +    {ok, []} = write_and_commit(Db, DocBuckets4, [], [merge_conflicts | Options]),
 +    {ok, DocErrors};
 +
 +update_docs(Db, Docs, Options, interactive_edit) ->
 +    increment_stat(Db, {couchdb, database_writes}),
 +    AllOrNothing = lists:member(all_or_nothing, Options),
 +    % go ahead and generate the new revision ids for the documents.
 +    % separate out the NonRep documents from the rest of the documents
 +
 +    % associate reference with each doc in order to track duplicates
 +    Docs2 = lists:map(fun(Doc) -> {Doc, make_ref()} end,Docs),
 +    {Docs3, NonRepDocs} = lists:foldl(
 +         fun({#doc{id=Id},_Ref}=Doc, {DocsAcc, NonRepDocsAcc}) ->
 +            case Id of
 +            <<?LOCAL_DOC_PREFIX, _/binary>> ->
 +                {DocsAcc, [Doc | NonRepDocsAcc]};
 +            Id->
 +                {[Doc | DocsAcc], NonRepDocsAcc}
 +            end
 +        end, {[], []}, Docs2),
 +
 +    DocBuckets = before_docs_update(Db, group_alike_docs(Docs3)),
 +
 +    case (Db#db.validate_doc_funs /= []) orelse
 +        lists:any(
 +            fun({#doc{id= <<?DESIGN_DOC_PREFIX, _/binary>>}, _Ref}) ->
 +                true;
 +            ({#doc{atts=Atts}, _Ref}) ->
 +                Atts /= []
 +            end, Docs3) of
 +    true ->
 +        % lookup the doc by id and get the most recent
 +        Ids = [Id || [{#doc{id=Id}, _Ref}|_] <- DocBuckets],
 +        ExistingDocInfos = get_full_doc_infos(Db, Ids),
 +
 +        {DocBucketsPrepped, PreCommitFailures} = prep_and_validate_updates(Db,
 +                DocBuckets, ExistingDocInfos, AllOrNothing, [], []),
 +
 +        % strip out any empty buckets
 +        DocBuckets2 = [Bucket || [_|_] = Bucket <- DocBucketsPrepped];
 +    false ->
 +        PreCommitFailures = [],
 +        DocBuckets2 = DocBuckets
 +    end,
 +
 +    if (AllOrNothing) and (PreCommitFailures /= []) ->
 +        {aborted,
-          lists:foldl(fun({#doc{id=Id,revs={Pos, RevIds}}, Ref},Acc) ->
++         lists:foldl(fun({#doc{id=Id,revs=Revs}, Ref},Acc) ->
 +                         case lists:keyfind(Ref,1,PreCommitFailures) of
 +                         {Ref, Error} ->
-                              [{{Id,{Pos,RevIds}}, Error} | Acc];
++                             case Revs of
++                             {Pos, [RevId|_]} ->
++                                 [{{Id,{Pos, RevId}}, Error} | Acc];
++                             {0, []} ->
++                                 [{{Id,{0, <<>>}}, Error} | Acc]
++                             end;
 +                         false ->
 +                             Acc
 +                         end
 +                     end,[],Docs3)};
 +
 +    true ->
 +        Options2 = if AllOrNothing -> [merge_conflicts];
 +                true -> [] end ++ Options,
 +        DocBuckets3 = [[
 +                {doc_flush_atts(set_new_att_revpos(
 +                        check_dup_atts(Doc)), Db#db.fd), Ref}
 +                || {Doc, Ref} <- B] || B <- DocBuckets2],
 +        {DocBuckets4, IdRevs} = new_revs(DocBuckets3, [], []),
 +
 +        {ok, CommitResults} = write_and_commit(Db, DocBuckets4, NonRepDocs, Options2),
 +
 +        ResultsDict = dict:from_list(IdRevs ++ CommitResults ++ PreCommitFailures),
 +        {ok, lists:map(
 +            fun({#doc{}, Ref}) ->
 +                {ok, Result} = dict:find(Ref, ResultsDict),
 +                Result
 +            end, Docs2)}
 +    end.
 +
 +% Returns the first available document on disk. Input list is a full rev path
 +% for the doc.
 +make_first_doc_on_disk(_Db, _Id, _Pos, []) ->
 +    nil;
 +make_first_doc_on_disk(Db, Id, Pos, [{_Rev, #doc{}} | RestPath]) ->
 +    make_first_doc_on_disk(Db, Id, Pos-1, RestPath);
 +make_first_doc_on_disk(Db, Id, Pos, [{_Rev, ?REV_MISSING}|RestPath]) ->
 +    make_first_doc_on_disk(Db, Id, Pos - 1, RestPath);
 +make_first_doc_on_disk(Db, Id, Pos, [{_Rev, #leaf{deleted=IsDel, ptr=Sp}} |_]=DocPath) ->
 +    Revs = [Rev || {Rev, _} <- DocPath],
 +    make_doc(Db, Id, IsDel, Sp, {Pos, Revs}).
 +
 +set_commit_option(Options) ->
 +    CommitSettings = {
 +        [true || O <- Options, O==full_commit orelse O==delay_commit],
 +        config:get("couchdb", "delayed_commits", "false")
 +    },
 +    case CommitSettings of
 +    {[true], _} ->
 +        Options; % user requested explicit commit setting, do not change it
 +    {_, "true"} ->
 +        Options; % delayed commits are enabled, do nothing
 +    {_, "false"} ->
 +        [full_commit|Options];
 +    {_, Else} ->
 +        ?LOG_ERROR("[couchdb] delayed_commits setting must be true/false, not ~p",
 +            [Else]),
 +        [full_commit|Options]
 +    end.
 +
 +collect_results(Pid, MRef, ResultsAcc) ->
 +    receive
 +    {result, Pid, Result} ->
 +        collect_results(Pid, MRef, [Result | ResultsAcc]);
 +    {done, Pid} ->
 +        {ok, ResultsAcc};
 +    {retry, Pid} ->
 +        retry;
 +    {'DOWN', MRef, _, _, Reason} ->
 +        exit(Reason)
 +    end.
 +
 +write_and_commit(#db{main_pid=Pid, user_ctx=Ctx}=Db, DocBuckets1,
 +        NonRepDocs, Options0) ->
 +    DocBuckets = prepare_doc_summaries(Db, DocBuckets1),
 +    Options = set_commit_option(Options0),
 +    MergeConflicts = lists:member(merge_conflicts, Options),
 +    FullCommit = lists:member(full_commit, Options),
 +    MRef = erlang:monitor(process, Pid),
 +    try
 +        Pid ! {update_docs, self(), DocBuckets, NonRepDocs, MergeConflicts, FullCommit},
 +        case collect_results(Pid, MRef, []) of
 +        {ok, Results} -> {ok, Results};
 +        retry ->
 +            % This can happen if the db file we wrote to was swapped out by
 +            % compaction. Retry by reopening the db and writing to the current file
 +            {ok, Db2} = open(Db#db.name, [{user_ctx, Ctx}]),
 +            DocBuckets2 = [
 +                [{doc_flush_atts(Doc, Db2#db.fd), Ref} || {Doc, Ref} <- Bucket] ||
 +                Bucket <- DocBuckets1
 +            ],
 +            % We only retry once
 +            DocBuckets3 = prepare_doc_summaries(Db2, DocBuckets2),
 +            close(Db2),
 +            Pid ! {update_docs, self(), DocBuckets3, NonRepDocs, MergeConflicts, FullCommit},
 +            case collect_results(Pid, MRef, []) of
 +            {ok, Results} -> {ok, Results};
 +            retry -> throw({update_error, compaction_retry})
 +            end
 +        end
 +    after
 +        erlang:demonitor(MRef, [flush])
 +    end.
 +
 +
 +prepare_doc_summaries(Db, BucketList) ->
 +    [lists:map(
 +        fun({#doc{body = Body, atts = Atts} = Doc, Ref}) ->
 +            DiskAtts = [{N, T, P, AL, DL, R, M, E} ||
 +                #att{name = N, type = T, data = {_, P}, md5 = M, revpos = R,
 +                    att_len = AL, disk_len = DL, encoding = E} <- Atts],
 +            AttsFd = case Atts of
 +            [#att{data = {Fd, _}} | _] ->
 +                Fd;
 +            [] ->
 +                nil
 +            end,
 +            SummaryChunk = couch_db_updater:make_doc_summary(Db, {Body, DiskAtts}),
 +            {Doc#doc{body = {summary, SummaryChunk, AttsFd}}, Ref}
 +        end,
 +        Bucket) || Bucket <- BucketList].
 +
 +
 +before_docs_update(#db{before_doc_update = nil}, BucketList) ->
 +    BucketList;
 +before_docs_update(#db{before_doc_update = Fun} = Db, BucketList) ->
 +    [lists:map(
 +        fun({Doc, Ref}) ->
 +            NewDoc = Fun(couch_doc:with_ejson_body(Doc), Db),
 +            {NewDoc, Ref}
 +        end,
 +        Bucket) || Bucket <- BucketList].
 +
 +
 +set_new_att_revpos(#doc{revs={RevPos,_Revs},atts=Atts}=Doc) ->
 +    Doc#doc{atts= lists:map(fun(#att{data={_Fd,_Sp}}=Att) ->
 +            % already commited to disk, do not set new rev
 +            Att;
 +        (Att) ->
 +            Att#att{revpos=RevPos+1}
 +        end, Atts)}.
 +
 +
 +doc_flush_atts(Doc, Fd) ->
 +    Doc#doc{atts=[flush_att(Fd, Att) || Att <- Doc#doc.atts]}.
 +
 +check_md5(_NewSig, <<>>) -> ok;
 +check_md5(Sig, Sig) -> ok;
 +check_md5(_, _) -> throw(md5_mismatch).
 +
 +flush_att(Fd, #att{data={Fd0, _}}=Att) when Fd0 == Fd ->
 +    % already written to our file, nothing to write
 +    Att;
 +
 +flush_att(Fd, #att{data={OtherFd,StreamPointer}, md5=InMd5,
 +    disk_len=InDiskLen} = Att) ->
 +    {NewStreamData, Len, _IdentityLen, Md5, IdentityMd5} =
 +            couch_stream:copy_to_new_stream(OtherFd, StreamPointer, Fd),
 +    check_md5(IdentityMd5, InMd5),
 +    Att#att{data={Fd, NewStreamData}, md5=Md5, att_len=Len, disk_len=InDiskLen};
 +
 +flush_att(Fd, #att{data=Data}=Att) when is_binary(Data) ->
 +    with_stream(Fd, Att, fun(OutputStream) ->
 +        couch_stream:write(OutputStream, Data)
 +    end);
 +
 +flush_att(Fd, #att{data=Fun,att_len=undefined}=Att) when is_function(Fun) ->
 +    MaxChunkSize = list_to_integer(
 +        config:get("couchdb", "attachment_stream_buffer_size", "4096")),
 +    with_stream(Fd, Att, fun(OutputStream) ->
 +        % Fun(MaxChunkSize, WriterFun) must call WriterFun
 +        % once for each chunk of the attachment,
 +        Fun(MaxChunkSize,
 +            % WriterFun({Length, Binary}, State)
 +            % WriterFun({0, _Footers}, State)
 +            % Called with Length == 0 on the last time.
 +            % WriterFun returns NewState.
 +            fun({0, Footers}, _) ->
 +                F = mochiweb_headers:from_binary(Footers),
 +                case mochiweb_headers:get_value("Content-MD5", F) of
 +                undefined ->
 +                    ok;
 +                Md5 ->
 +                    {md5, base64:decode(Md5)}
 +                end;
 +            ({_Length, Chunk}, _) ->
 +                couch_stream:write(OutputStream, Chunk)
 +            end, ok)
 +    end);
 +
 +flush_att(Fd, #att{data=Fun,att_len=AttLen}=Att) when is_function(Fun) ->
 +    with_stream(Fd, Att, fun(OutputStream) ->
 +        write_streamed_attachment(OutputStream, Fun, AttLen)
 +    end);
 +
 +flush_att(Fd, #att{data={follows, Parser, Ref}}=Att) when is_pid(Parser) ->
 +    ParserRef = erlang:monitor(process, Parser),
 +    Fun = fun() ->
 +        Parser ! {get_bytes, Ref, self()},
 +        receive
 +            {started_open_doc_revs, NewRef} ->
 +                couch_doc:restart_open_doc_revs(Parser, Ref, NewRef);
 +            {bytes, Ref, Bytes} ->
 +                Bytes;
 +            {'DOWN', ParserRef, _, _, Reason} ->
 +                throw({mp_parser_died, Reason})
 +        end
 +    end,
 +    try
 +        flush_att(Fd, Att#att{data=Fun})
 +    after
 +        erlang:demonitor(ParserRef, [flush])
 +    end.
 +
 +
 +compressible_att_type(MimeType) when is_binary(MimeType) ->
 +    compressible_att_type(?b2l(MimeType));
 +compressible_att_type(MimeType) ->
 +    TypeExpList = re:split(
 +        config:get("attachments", "compressible_types", ""),
 +        "\\s*,\\s*",
 +        [{return, list}]
 +    ),
 +    lists:any(
 +        fun(TypeExp) ->
 +            Regexp = ["^\\s*", re:replace(TypeExp, "\\*", ".*"),
 +                "(?:\\s*;.*?)?\\s*", $$],
 +            re:run(MimeType, Regexp, [caseless]) =/= nomatch
 +        end,
 +        [T || T <- TypeExpList, T /= []]
 +    ).
 +
 +% From RFC 2616 3.6.1 - Chunked Transfer Coding
 +%
 +%   In other words, the origin server is willing to accept
 +%   the possibility that the trailer fields might be silently
 +%   discarded along the path to the client.
 +%
 +% I take this to mean that if "Trailers: Content-MD5\r\n"
 +% is present in the request, but there is no Content-MD5
 +% trailer, we're free to ignore this inconsistency and
 +% pretend that no Content-MD5 exists.
 +with_stream(Fd, #att{md5=InMd5,type=Type,encoding=Enc}=Att, Fun) ->
 +    BufferSize = list_to_integer(
 +        config:get("couchdb", "attachment_stream_buffer_size", "4096")),
 +    {ok, OutputStream} = case (Enc =:= identity) andalso
 +        compressible_att_type(Type) of
 +    true ->
 +        CompLevel = list_to_integer(
 +            config:get("attachments", "compression_level", "0")
 +        ),
 +        couch_stream:open(Fd, [{buffer_size, BufferSize},
 +            {encoding, gzip}, {compression_level, CompLevel}]);
 +    _ ->
 +        couch_stream:open(Fd, [{buffer_size, BufferSize}])
 +    end,
 +    ReqMd5 = case Fun(OutputStream) of
 +        {md5, FooterMd5} ->
 +            case InMd5 of
 +                md5_in_footer -> FooterMd5;
 +                _ -> InMd5
 +            end;
 +        _ ->
 +            InMd5
 +    end,
 +    {StreamInfo, Len, IdentityLen, Md5, IdentityMd5} =
 +        couch_stream:close(OutputStream),
 +    check_md5(IdentityMd5, ReqMd5),
 +    {AttLen, DiskLen, NewEnc} = case Enc of
 +    identity ->
 +        case {Md5, IdentityMd5} of
 +        {Same, Same} ->
 +            {Len, IdentityLen, identity};
 +        _ ->
 +            {Len, IdentityLen, gzip}
 +        end;
 +    gzip ->
 +        case {Att#att.att_len, Att#att.disk_len} of
 +        {AL, DL} when AL =:= undefined orelse DL =:= undefined ->
 +            % Compressed attachment uploaded through the standalone API.
 +            {Len, Len, gzip};
 +        {AL, DL} ->
 +            % This case is used for efficient push-replication, where a
 +            % compressed attachment is located in the body of multipart
 +            % content-type request.
 +            {AL, DL, gzip}
 +        end
 +    end,
 +    Att#att{
 +        data={Fd,StreamInfo},
 +        att_len=AttLen,
 +        disk_len=DiskLen,
 +        md5=Md5,
 +        encoding=NewEnc
 +    }.
 +
 +
 +write_streamed_attachment(_Stream, _F, 0) ->
 +    ok;
 +write_streamed_attachment(Stream, F, LenLeft) when LenLeft > 0 ->
 +    Bin = read_next_chunk(F, LenLeft),
 +    ok = couch_stream:write(Stream, Bin),
 +    write_streamed_attachment(Stream, F, LenLeft - size(Bin)).
 +
 +read_next_chunk(F, _) when is_function(F, 0) ->
 +    F();
 +read_next_chunk(F, LenLeft) when is_function(F, 1) ->
 +    F(lists:min([LenLeft, 16#2000])).
 +
 +enum_docs_since_reduce_to_count(Reds) ->
 +    couch_btree:final_reduce(
 +            fun couch_db_updater:btree_by_seq_reduce/2, Reds).
 +
 +enum_docs_reduce_to_count(Reds) ->
 +    FinalRed = couch_btree:final_reduce(
 +            fun couch_db_updater:btree_by_id_reduce/2, Reds),
 +    element(1, FinalRed).
 +
 +changes_since(Db, StartSeq, Fun, Acc) ->
 +    changes_since(Db, StartSeq, Fun, [], Acc).
 +
 +changes_since(Db, StartSeq, Fun, Options, Acc) ->
 +    Wrapper = fun(FullDocInfo, _Offset, Acc2) ->
 +        DocInfo = case FullDocInfo of
 +            #full_doc_info{} ->
 +                couch_doc:to_doc_info(FullDocInfo);
 +            #doc_info{} ->
 +                FullDocInfo
 +        end,
 +        Fun(DocInfo, Acc2)
 +    end,
 +    {ok, _LastReduction, AccOut} = couch_btree:fold(Db#db.seq_tree,
 +        Wrapper, Acc, [{start_key, StartSeq + 1}] ++ Options),
 +    {ok, AccOut}.
 +
 +count_changes_since(Db, SinceSeq) ->
 +    BTree = Db#db.seq_tree,
 +    {ok, Changes} =
 +    couch_btree:fold_reduce(BTree,
 +        fun(_SeqStart, PartialReds, 0) ->
 +            {ok, couch_btree:final_reduce(BTree, PartialReds)}
 +        end,
 +        0, [{start_key, SinceSeq + 1}]),
 +    Changes.
 +
 +enum_docs_since(Db, SinceSeq, InFun, Acc, Options) ->
 +    {ok, LastReduction, AccOut} = couch_btree:fold(
 +        Db#db.seq_tree, InFun, Acc,
 +            [{start_key, SinceSeq + 1} | Options]),
 +    {ok, enum_docs_since_reduce_to_count(LastReduction), AccOut}.
 +
 +enum_docs(Db, InFun, InAcc, Options) ->
 +    FoldFun = skip_deleted(InFun),
 +    {ok, LastReduce, OutAcc} = couch_btree:fold(
 +        Db#db.id_tree, FoldFun, InAcc, Options),
 +    {ok, enum_docs_reduce_to_count(LastReduce), OutAcc}.
 +
 +
 +%%% Internal function %%%
 +open_doc_revs_int(Db, IdRevs, Options) ->
 +    Ids = [Id || {Id, _Revs} <- IdRevs],
 +    LookupResults = get_full_doc_infos(Db, Ids),
 +    lists:zipwith(
 +        fun({Id, Revs}, Lookup) ->
 +            case Lookup of
 +            {ok, #full_doc_info{rev_tree=RevTree}} ->
 +                {FoundRevs, MissingRevs} =
 +                case Revs of
 +                all ->
 +                    {couch_key_tree:get_all_leafs(RevTree), []};
 +                _ ->
 +                    case lists:member(latest, Options) of
 +                    true ->
 +                        couch_key_tree:get_key_leafs(RevTree, Revs);
 +                    false ->
 +                        couch_key_tree:get(RevTree, Revs)
 +                    end
 +                end,
 +                FoundResults =
 +                lists:map(fun({Value, {Pos, [Rev|_]}=FoundRevPath}) ->
 +                    case Value of
 +                    ?REV_MISSING ->
 +                        % we have the rev in our list but know nothing about it
 +                        {{not_found, missing}, {Pos, Rev}};
 +                    #leaf{deleted=IsDeleted, ptr=SummaryPtr} ->
 +                        {ok, make_doc(Db, Id, IsDeleted, SummaryPtr, FoundRevPath)}
 +                    end
 +                end, FoundRevs),
 +                Results = FoundResults ++ [{{not_found, missing}, MissingRev} || MissingRev <- MissingRevs],
 +                {ok, Results};
 +            not_found when Revs == all ->
 +                {ok, []};
 +            not_found ->
 +                {ok, [{{not_found, missing}, Rev} || Rev <- Revs]}
 +            end
 +        end,
 +        IdRevs, LookupResults).
 +
 +open_doc_int(Db, <<?LOCAL_DOC_PREFIX, _/binary>> = Id, Options) ->
 +    case couch_btree:lookup(Db#db.local_tree, [Id]) of
 +    [{ok, {_, {Rev, BodyData}}}] ->
 +        Doc = #doc{id=Id, revs={0, [?l2b(integer_to_list(Rev))]}, body=BodyData},
 +        apply_open_options({ok, Doc}, Options);
 +    [not_found] ->
 +        {not_found, missing}
 +    end;
 +open_doc_int(Db, #doc_info{id=Id,revs=[RevInfo|_]}=DocInfo, Options) ->
 +    #rev_info{deleted=IsDeleted,rev={Pos,RevId},body_sp=Bp} = RevInfo,
 +    Doc = make_doc(Db, Id, IsDeleted, Bp, {Pos,[RevId]}),
 +    apply_open_options(
 +       {ok, Doc#doc{meta=doc_meta_info(DocInfo, [], Options)}}, Options);
 +open_doc_int(Db, #full_doc_info{id=Id,rev_tree=RevTree}=FullDocInfo, Options) ->
 +    #doc_info{revs=[#rev_info{deleted=IsDeleted,rev=Rev,body_sp=Bp}|_]} =
 +        DocInfo = couch_doc:to_doc_info(FullDocInfo),
 +    {[{_, RevPath}], []} = couch_key_tree:get(RevTree, [Rev]),
 +    Doc = make_doc(Db, Id, IsDeleted, Bp, RevPath),
 +    apply_open_options(
 +        {ok, Doc#doc{meta=doc_meta_info(DocInfo, RevTree, Options)}}, Options);
 +open_doc_int(Db, Id, Options) ->
 +    case get_full_doc_info(Db, Id) of
 +    {ok, FullDocInfo} ->
 +        open_doc_int(Db, FullDocInfo, Options);
 +    not_found ->
 +        {not_found, missing}
 +    end.
 +
 +doc_meta_info(#doc_info{high_seq=Seq,revs=[#rev_info{rev=Rev}|RestInfo]}, RevTree, Options) ->
 +    case lists:member(revs_info, Options) of
 +    false -> [];
 +    true ->
 +        {[{Pos, RevPath}],[]} =
 +            couch_key_tree:get_full_key_paths(RevTree, [Rev]),
 +
 +        [{revs_info, Pos, lists:map(
 +            fun({Rev1, ?REV_MISSING}) ->
 +                {Rev1, missing};
 +            ({Rev1, Leaf}) ->
 +                case Leaf#leaf.deleted of
 +                true ->
 +                    {Rev1, deleted};
 +                false ->
 +                    {Rev1, available}
 +                end
 +            end, RevPath)}]
 +    end ++
 +    case lists:member(conflicts, Options) of
 +    false -> [];
 +    true ->
 +        case [Rev1 || #rev_info{rev=Rev1,deleted=false} <- RestInfo] of
 +        [] -> [];
 +        ConflictRevs -> [{conflicts, ConflictRevs}]
 +        end
 +    end ++
 +    case lists:member(deleted_conflicts, Options) of
 +    false -> [];
 +    true ->
 +        case [Rev1 || #rev_info{rev=Rev1,deleted=true} <- RestInfo] of
 +        [] -> [];
 +        DelConflictRevs -> [{deleted_conflicts, DelConflictRevs}]
 +        end
 +    end ++
 +    case lists:member(local_seq, Options) of
 +    false -> [];
 +    true -> [{local_seq, Seq}]
 +    end.
 +
 +read_doc(#db{fd=Fd}, Pos) ->
 +    couch_file:pread_term(Fd, Pos).
 +
 +
 +make_doc(#db{fd = Fd} = Db, Id, Deleted, Bp, RevisionPath) ->
 +    {BodyData, Atts} =
 +    case Bp of
 +    nil ->
 +        {[], []};
 +    _ ->
 +        {ok, {BodyData0, Atts00}} = read_doc(Db, Bp),
 +        Atts0 = case Atts00 of
 +        _ when is_binary(Atts00) ->
 +            couch_compress:decompress(Atts00);
 +        _ when is_list(Atts00) ->
 +            % pre 1.2 format
 +            Atts00
 +        end,
 +        {BodyData0,
 +            lists:map(
 +                fun({Name,Type,Sp,AttLen,DiskLen,RevPos,Md5,Enc}) ->
 +                    #att{name=Name,
 +                        type=Type,
 +                        att_len=AttLen,
 +                        disk_len=DiskLen,
 +                        md5=Md5,
 +                        revpos=RevPos,
 +                        data={Fd,Sp},
 +                        encoding=
 +                            case Enc of
 +                            true ->
 +                                % 0110 UPGRADE CODE
 +                                gzip;
 +                            false ->
 +                                % 0110 UPGRADE CODE
 +                                identity;
 +                            _ ->
 +                                Enc
 +                            end
 +                    };
 +                ({Name,Type,Sp,AttLen,RevPos,Md5}) ->
 +                    #att{name=Name,
 +                        type=Type,
 +                        att_len=AttLen,
 +                        disk_len=AttLen,
 +                        md5=Md5,
 +                        revpos=RevPos,
 +                        data={Fd,Sp}};
 +                ({Name,{Type,Sp,AttLen}}) ->
 +                    #att{name=Name,
 +                        type=Type,
 +                        att_len=AttLen,
 +                        disk_len=AttLen,
 +                        md5= <<>>,
 +                        revpos=0,
 +                        data={Fd,Sp}}
 +                end, Atts0)}
 +    end,
 +    Doc = #doc{
 +        id = Id,
 +        revs = RevisionPath,
 +        body = BodyData,
 +        atts = Atts,
 +        deleted = Deleted
 +    },
 +    after_doc_read(Db, Doc).
 +
 +
 +after_doc_read(#db{after_doc_read = nil}, Doc) ->
 +    Doc;
 +after_doc_read(#db{after_doc_read = Fun} = Db, Doc) ->
 +    Fun(couch_doc:with_ejson_body(Doc), Db).
 +
 +
 +increment_stat(#db{options = Options}, Stat) ->
 +    case lists:member(sys_db, Options) of
 +    true ->
 +        ok;
 +    false ->
 +        couch_stats_collector:increment(Stat)
 +    end.
 +
 +skip_deleted(FoldFun) ->
 +    fun
 +        (visit, KV, Reds, Acc) ->
 +            FoldFun(KV, Reds, Acc);
 +        (traverse, _LK, {Undeleted, _Del, _Size}, Acc) when Undeleted == 0 ->
 +            {skip, Acc};
 +        (traverse, _, _, Acc) ->
 +            {ok, Acc}
 +    end.


Mime
View raw message