couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From beno...@apache.org
Subject [40/57] [abbrv] [partial] inital move to rebar compilation
Date Tue, 07 Jan 2014 00:37:00 GMT
http://git-wip-us.apache.org/repos/asf/couchdb/blob/add91738/apps/couch_mrview/src/couch_mrview.erl
----------------------------------------------------------------------
diff --git a/apps/couch_mrview/src/couch_mrview.erl b/apps/couch_mrview/src/couch_mrview.erl
new file mode 100644
index 0000000..d31ed18
--- /dev/null
+++ b/apps/couch_mrview/src/couch_mrview.erl
@@ -0,0 +1,387 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_mrview).
+
+-export([query_all_docs/2, query_all_docs/4]).
+-export([query_view/3, query_view/4, query_view/6]).
+-export([get_info/2]).
+-export([compact/2, compact/3, cancel_compaction/2]).
+-export([cleanup/1]).
+
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch_mrview/include/couch_mrview.hrl").
+
+-record(mracc, {
+    db,
+    meta_sent=false,
+    total_rows,
+    offset,
+    limit,
+    skip,
+    group_level,
+    doc_info,
+    callback,
+    user_acc,
+    last_go=ok,
+    reduce_fun,
+    update_seq,
+    args
+}).
+
+
+query_all_docs(Db, Args) ->
+    query_all_docs(Db, Args, fun default_cb/2, []).
+
+
+query_all_docs(Db, Args, Callback, Acc) when is_list(Args) ->
+    query_all_docs(Db, to_mrargs(Args), Callback, Acc);
+query_all_docs(Db, Args0, Callback, Acc) ->
+    Sig = couch_util:with_db(Db, fun(WDb) ->
+        {ok, Info} = couch_db:get_db_info(WDb),
+        couch_index_util:hexsig(couch_util:md5(term_to_binary(Info)))
+    end),
+    Args1 = Args0#mrargs{view_type=map},
+    Args2 = couch_mrview_util:validate_args(Args1),
+    {ok, Acc1} = case Args2#mrargs.preflight_fun of
+        PFFun when is_function(PFFun, 2) -> PFFun(Sig, Acc);
+        _ -> {ok, Acc}
+    end,
+    all_docs_fold(Db, Args2, Callback, Acc1).
+
+
+query_view(Db, DDoc, VName) ->
+    query_view(Db, DDoc, VName, #mrargs{}).
+
+
+query_view(Db, DDoc, VName, Args) when is_list(Args) ->
+    query_view(Db, DDoc, VName, to_mrargs(Args), fun default_cb/2, []);
+query_view(Db, DDoc, VName, Args) ->
+    query_view(Db, DDoc, VName, Args, fun default_cb/2, []).
+
+
+query_view(Db, DDoc, VName, Args, Callback, Acc) when is_list(Args) ->
+    query_view(Db, DDoc, VName, to_mrargs(Args), Callback, Acc);
+query_view(Db, DDoc, VName, Args0, Callback, Acc0) ->
+    {ok, VInfo, Sig, Args} = couch_mrview_util:get_view(Db, DDoc, VName, Args0),
+    {ok, Acc1} = case Args#mrargs.preflight_fun of
+        PFFun when is_function(PFFun, 2) -> PFFun(Sig, Acc0);
+        _ -> {ok, Acc0}
+    end,
+    query_view(Db, VInfo, Args, Callback, Acc1).
+
+
+query_view(Db, {Type, View}, Args, Callback, Acc) ->
+    case Type of
+        map -> map_fold(Db, View, Args, Callback, Acc);
+        red -> red_fold(Db, View, Args, Callback, Acc)
+    end.
+
+
+get_info(Db, DDoc) ->
+    {ok, Pid} = couch_index_server:get_index(couch_mrview_index, Db, DDoc),
+    couch_index:get_info(Pid).
+
+
+compact(Db, DDoc) ->
+    compact(Db, DDoc, []).
+
+
+compact(Db, DDoc, Opts) ->
+    {ok, Pid} = couch_index_server:get_index(couch_mrview_index, Db, DDoc),
+    couch_index:compact(Pid, Opts).
+
+
+cancel_compaction(Db, DDoc) ->
+    {ok, IPid} = couch_index_server:get_index(couch_mrview_index, Db, DDoc),
+    {ok, CPid} = couch_index:get_compactor_pid(IPid),
+    ok = couch_index_compactor:cancel(CPid),
+
+    % Cleanup the compaction file if it exists
+    {ok, #mrst{sig=Sig, db_name=DbName}} = couch_index:get_state(IPid, 0),
+    couch_mrview_util:delete_compaction_file(DbName, Sig),
+    ok.
+
+
+cleanup(Db) ->
+    couch_mrview_cleanup:run(Db).
+
+
+all_docs_fold(Db, #mrargs{keys=undefined}=Args, Callback, UAcc) ->
+    {ok, Info} = couch_db:get_db_info(Db),
+    Total = couch_util:get_value(doc_count, Info),
+    UpdateSeq = couch_db:get_update_seq(Db),
+    Acc = #mracc{
+        db=Db,
+        total_rows=Total,
+        limit=Args#mrargs.limit,
+        skip=Args#mrargs.skip,
+        callback=Callback,
+        user_acc=UAcc,
+        reduce_fun=fun couch_mrview_util:all_docs_reduce_to_count/1,
+        update_seq=UpdateSeq,
+        args=Args
+    },
+    [Opts] = couch_mrview_util:all_docs_key_opts(Args),
+    {ok, Offset, FinalAcc} = couch_db:enum_docs(Db, fun map_fold/3, Acc, Opts),
+    finish_fold(FinalAcc, [{total, Total}, {offset, Offset}]);
+all_docs_fold(Db, #mrargs{direction=Dir, keys=Keys0}=Args, Callback, UAcc) ->
+    {ok, Info} = couch_db:get_db_info(Db),
+    Total = couch_util:get_value(doc_count, Info),
+    UpdateSeq = couch_db:get_update_seq(Db),
+    Acc = #mracc{
+        db=Db,
+        total_rows=Total,
+        limit=Args#mrargs.limit,
+        skip=Args#mrargs.skip,
+        callback=Callback,
+        user_acc=UAcc,
+        reduce_fun=fun couch_mrview_util:all_docs_reduce_to_count/1,
+        update_seq=UpdateSeq,
+        args=Args
+    },
+    % Backwards compatibility hack. The old _all_docs iterates keys
+    % in reverse if descending=true was passed. Here we'll just
+    % reverse the list instead.
+    Keys = if Dir =:= fwd -> Keys0; true -> lists:reverse(Keys0) end,
+
+    FoldFun = fun(Key, Acc0) ->
+        DocInfo = (catch couch_db:get_doc_info(Db, Key)),
+        {Doc, Acc1} = case DocInfo of
+            {ok, #doc_info{id=Id, revs=[RevInfo | _RestRevs]}=DI} ->
+                Rev = couch_doc:rev_to_str(RevInfo#rev_info.rev),
+                Props = [{rev, Rev}] ++ case RevInfo#rev_info.deleted of
+                    true -> [{deleted, true}];
+                    false -> []
+                end,
+                {{{Id, Id}, {Props}}, Acc0#mracc{doc_info=DI}};
+            not_found ->
+                {{{Key, error}, not_found}, Acc0}
+        end,
+        {_, Acc2} = map_fold(Doc, {[], [{0, 0, 0}]}, Acc1),
+        Acc2
+    end,
+    FinalAcc = lists:foldl(FoldFun, Acc, Keys),
+    finish_fold(FinalAcc, [{total, Total}]).
+
+
+map_fold(Db, View, Args, Callback, UAcc) ->
+    {ok, Total} = couch_mrview_util:get_row_count(View),
+    Acc = #mracc{
+        db=Db,
+        total_rows=Total,
+        limit=Args#mrargs.limit,
+        skip=Args#mrargs.skip,
+        callback=Callback,
+        user_acc=UAcc,
+        reduce_fun=fun couch_mrview_util:reduce_to_count/1,
+        update_seq=View#mrview.update_seq,
+        args=Args
+    },
+    OptList = couch_mrview_util:key_opts(Args),
+    {Reds, Acc2} = lists:foldl(fun(Opts, {_, Acc0}) ->
+        {ok, R, A} = couch_mrview_util:fold(View, fun map_fold/3, Acc0, Opts),
+        {R, A}
+    end, {nil, Acc}, OptList),
+    Offset = couch_mrview_util:reduce_to_count(Reds),
+    finish_fold(Acc2, [{total, Total}, {offset, Offset}]).
+
+
+map_fold(#full_doc_info{} = FullDocInfo, OffsetReds, Acc) ->
+    % matches for _all_docs and translates #full_doc_info{} -> KV pair
+    case couch_doc:to_doc_info(FullDocInfo) of
+        #doc_info{id=Id, revs=[#rev_info{deleted=false, rev=Rev}|_]} = DI ->
+            Value = {[{rev, couch_doc:rev_to_str(Rev)}]},
+            map_fold({{Id, Id}, Value}, OffsetReds, Acc#mracc{doc_info=DI});
+        #doc_info{revs=[#rev_info{deleted=true}|_]} ->
+            {ok, Acc}
+    end;
+map_fold(_KV, _Offset, #mracc{skip=N}=Acc) when N > 0 ->
+    {ok, Acc#mracc{skip=N-1, last_go=ok}};
+map_fold(KV, OffsetReds, #mracc{offset=undefined}=Acc) ->
+    #mracc{
+        total_rows=Total,
+        callback=Callback,
+        user_acc=UAcc0,
+        reduce_fun=Reduce,
+        update_seq=UpdateSeq,
+        args=Args
+    } = Acc,
+    Offset = Reduce(OffsetReds),
+    Meta = make_meta(Args, UpdateSeq, [{total, Total}, {offset, Offset}]),
+    {Go, UAcc1} = Callback(Meta, UAcc0),
+    Acc1 = Acc#mracc{meta_sent=true, offset=Offset, user_acc=UAcc1, last_go=Go},
+    case Go of
+        ok -> map_fold(KV, OffsetReds, Acc1);
+        stop -> {stop, Acc1}
+    end;
+map_fold(_KV, _Offset, #mracc{limit=0}=Acc) ->
+    {stop, Acc};
+map_fold({{Key, Id}, Val}, _Offset, Acc) ->
+    #mracc{
+        db=Db,
+        limit=Limit,
+        doc_info=DI,
+        callback=Callback,
+        user_acc=UAcc0,
+        args=Args
+    } = Acc,
+    Doc = case DI of
+        #doc_info{} -> couch_mrview_util:maybe_load_doc(Db, DI, Args);
+        _ -> couch_mrview_util:maybe_load_doc(Db, Id, Val, Args)
+    end,
+    Row = [{id, Id}, {key, Key}, {value, Val}] ++ Doc,
+    {Go, UAcc1} = Callback({row, Row}, UAcc0),
+    {Go, Acc#mracc{
+        limit=Limit-1,
+        doc_info=undefined,
+        user_acc=UAcc1,
+        last_go=Go
+    }}.
+
+
+red_fold(Db, {_Nth, _Lang, View}=RedView, Args, Callback, UAcc) ->
+    Acc = #mracc{
+        db=Db,
+        total_rows=null,
+        limit=Args#mrargs.limit,
+        skip=Args#mrargs.skip,
+        group_level=Args#mrargs.group_level,
+        callback=Callback,
+        user_acc=UAcc,
+        update_seq=View#mrview.update_seq,
+        args=Args
+    },
+    GroupFun = group_rows_fun(Args#mrargs.group_level),
+    OptList = couch_mrview_util:key_opts(Args, [{key_group_fun, GroupFun}]),
+    Acc2 = lists:foldl(fun(Opts, Acc0) ->
+        {ok, Acc1} =
+            couch_mrview_util:fold_reduce(RedView, fun red_fold/3,  Acc0, Opts),
+        Acc1
+    end, Acc, OptList),
+    finish_fold(Acc2, []).
+
+red_fold(_Key, _Red, #mracc{skip=N}=Acc) when N > 0 ->
+    {ok, Acc#mracc{skip=N-1, last_go=ok}};
+red_fold(Key, Red, #mracc{meta_sent=false}=Acc) ->
+    #mracc{
+        args=Args,
+        callback=Callback,
+        user_acc=UAcc0,
+        update_seq=UpdateSeq
+    } = Acc,
+    Meta = make_meta(Args, UpdateSeq, []),
+    {Go, UAcc1} = Callback(Meta, UAcc0),
+    Acc1 = Acc#mracc{user_acc=UAcc1, meta_sent=true, last_go=Go},
+    case Go of
+        ok -> red_fold(Key, Red, Acc1);
+        _ -> {Go, Acc1}
+    end;
+red_fold(_Key, _Red, #mracc{limit=0} = Acc) ->
+    {stop, Acc};
+red_fold(_Key, Red, #mracc{group_level=0} = Acc) ->
+    #mracc{
+        limit=Limit,
+        callback=Callback,
+        user_acc=UAcc0
+    } = Acc,
+    Row = [{key, null}, {value, Red}],
+    {Go, UAcc1} = Callback({row, Row}, UAcc0),
+    {Go, Acc#mracc{user_acc=UAcc1, limit=Limit-1, last_go=Go}};
+red_fold(Key, Red, #mracc{group_level=exact} = Acc) ->
+    #mracc{
+        limit=Limit,
+        callback=Callback,
+        user_acc=UAcc0
+    } = Acc,
+    Row = [{key, Key}, {value, Red}],
+    {Go, UAcc1} = Callback({row, Row}, UAcc0),
+    {Go, Acc#mracc{user_acc=UAcc1, limit=Limit-1, last_go=Go}};
+red_fold(K, Red, #mracc{group_level=I} = Acc) when I > 0, is_list(K) ->
+    #mracc{
+        limit=Limit,
+        callback=Callback,
+        user_acc=UAcc0
+    } = Acc,
+    Row = [{key, lists:sublist(K, I)}, {value, Red}],
+    {Go, UAcc1} = Callback({row, Row}, UAcc0),
+    {Go, Acc#mracc{user_acc=UAcc1, limit=Limit-1, last_go=Go}};
+red_fold(K, Red, #mracc{group_level=I} = Acc) when I > 0 ->
+    #mracc{
+        limit=Limit,
+        callback=Callback,
+        user_acc=UAcc0
+    } = Acc,
+    Row = [{key, K}, {value, Red}],
+    {Go, UAcc1} = Callback({row, Row}, UAcc0),
+    {Go, Acc#mracc{user_acc=UAcc1, limit=Limit-1, last_go=Go}}.
+
+
+finish_fold(#mracc{last_go=ok, update_seq=UpdateSeq}=Acc,  ExtraMeta) ->
+    #mracc{callback=Callback, user_acc=UAcc, args=Args}=Acc,
+    % Possible send meta info
+    Meta = make_meta(Args, UpdateSeq, ExtraMeta),
+    {Go, UAcc1} = case Acc#mracc.meta_sent of
+        false -> Callback(Meta, UAcc);
+        _ -> {ok, Acc#mracc.user_acc}
+    end,
+    % Notify callback that the fold is complete.
+    {_, UAcc2} = case Go of
+        ok -> Callback(complete, UAcc1);
+        _ -> {ok, UAcc1}
+    end,
+    {ok, UAcc2};
+finish_fold(#mracc{user_acc=UAcc}, _ExtraMeta) ->
+    {ok, UAcc}.
+
+
+make_meta(Args, UpdateSeq, Base) ->
+    case Args#mrargs.update_seq of
+        true -> {meta, Base ++ [{update_seq, UpdateSeq}]};
+        _ -> {meta, Base}
+    end.
+
+
+group_rows_fun(exact) ->
+    fun({Key1,_}, {Key2,_}) -> Key1 == Key2 end;
+group_rows_fun(0) ->
+    fun(_A, _B) -> true end;
+group_rows_fun(GroupLevel) when is_integer(GroupLevel) ->
+    fun({[_|_] = Key1,_}, {[_|_] = Key2,_}) ->
+        lists:sublist(Key1, GroupLevel) == lists:sublist(Key2, GroupLevel);
+    ({Key1,_}, {Key2,_}) ->
+        Key1 == Key2
+    end.
+
+
+default_cb(complete, Acc) ->
+    {ok, lists:reverse(Acc)};
+default_cb({final, Info}, []) ->
+    {ok, [Info]};
+default_cb({final, _}, Acc) ->
+    {ok, Acc};
+default_cb(Row, Acc) ->
+    {ok, [Row | Acc]}.
+
+
+to_mrargs(KeyList) ->
+    lists:foldl(fun({Key, Value}, Acc) ->
+        Index = lookup_index(couch_util:to_existing_atom(Key)),
+        setelement(Index, Acc, Value)
+    end, #mrargs{}, KeyList).
+
+
+lookup_index(Key) ->
+    Index = lists:zip(
+        record_info(fields, mrargs), lists:seq(2, record_info(size, mrargs))
+    ),
+    couch_util:get_value(Key, Index).

http://git-wip-us.apache.org/repos/asf/couchdb/blob/add91738/apps/couch_mrview/src/couch_mrview_cleanup.erl
----------------------------------------------------------------------
diff --git a/apps/couch_mrview/src/couch_mrview_cleanup.erl b/apps/couch_mrview/src/couch_mrview_cleanup.erl
new file mode 100644
index 0000000..0593570
--- /dev/null
+++ b/apps/couch_mrview/src/couch_mrview_cleanup.erl
@@ -0,0 +1,45 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%-include_lib("couch/include/couch_db.hrl").
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_mrview_cleanup).
+
+-export([run/1]).
+
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch_mrview/include/couch_mrview.hrl").
+
+run(Db) ->
+    RootDir = couch_index_util:root_dir(),
+    DbName = couch_db:name(Db),
+
+    DesignDocs = couch_db:get_design_docs(Db),
+    SigFiles = lists:foldl(fun(DDocInfo, SFAcc) ->
+        {ok, DDoc} = couch_db:open_doc_int(Db, DDocInfo, [ejson_body]),
+        {ok, InitState} = couch_mrview_util:ddoc_to_mrst(DbName, DDoc),
+        Sig = InitState#mrst.sig,
+        IFName = couch_mrview_util:index_file(DbName, Sig),
+        CFName = couch_mrview_util:compaction_file(DbName, Sig),
+        [IFName, CFName | SFAcc]
+    end, [], [DD || DD <- DesignDocs, DD#full_doc_info.deleted == false]),
+
+    IdxDir = couch_index_util:index_dir(mrview, DbName),
+    DiskFiles = filelib:wildcard(filename:join(IdxDir, "*")),
+
+    % We need to delete files that have no ddoc.
+    ToDelete = DiskFiles -- SigFiles,
+
+    lists:foreach(fun(FN) ->
+        ?LOG_DEBUG("Deleting stale view file: ~s", [FN]),
+        couch_file:delete(RootDir, FN, false)
+    end, ToDelete),
+
+    ok.

http://git-wip-us.apache.org/repos/asf/couchdb/blob/add91738/apps/couch_mrview/src/couch_mrview_compactor.erl
----------------------------------------------------------------------
diff --git a/apps/couch_mrview/src/couch_mrview_compactor.erl b/apps/couch_mrview/src/couch_mrview_compactor.erl
new file mode 100644
index 0000000..4ab8dd1
--- /dev/null
+++ b/apps/couch_mrview/src/couch_mrview_compactor.erl
@@ -0,0 +1,178 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_mrview_compactor).
+
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch_mrview/include/couch_mrview.hrl").
+
+-export([compact/3, swap_compacted/2]).
+
+-record(acc, {
+   btree = nil,
+   last_id = nil,
+   kvs = [],
+   kvs_size = 0,
+   changes = 0,
+   total_changes
+}).
+
+
+compact(_Db, State, Opts) ->
+    case lists:member(recompact, Opts) of
+        false -> compact(State);
+        true -> recompact(State)
+    end.
+
+compact(State) ->
+    #mrst{
+        db_name=DbName,
+        idx_name=IdxName,
+        sig=Sig,
+        update_seq=Seq,
+        id_btree=IdBtree,
+        views=Views
+    } = State,
+
+    {EmptyState, NumDocIds} = couch_util:with_db(DbName, fun(Db) ->
+        CompactFName = couch_mrview_util:compaction_file(DbName, Sig),
+        {ok, Fd} = couch_mrview_util:open_file(CompactFName),
+        ESt = couch_mrview_util:reset_index(Db, Fd, State),
+
+        {ok, DbReduce} = couch_btree:full_reduce(Db#db.fulldocinfo_by_id_btree),
+        Count = element(1, DbReduce),
+
+        {ESt, Count}
+    end),
+
+    #mrst{
+        id_btree = EmptyIdBtree,
+        views = EmptyViews
+    } = EmptyState,
+
+    TotalChanges = lists:foldl(
+        fun(View, Acc) ->
+            {ok, Kvs} = couch_mrview_util:get_row_count(View),
+            Acc + Kvs
+        end,
+        NumDocIds, Views),
+    couch_task_status:add_task([
+        {type, view_compaction},
+        {database, DbName},
+        {design_document, IdxName},
+        {progress, 0}
+    ]),
+
+    BufferSize0 = couch_config:get(
+        "view_compaction", "keyvalue_buffer_size", "2097152"
+    ),
+    BufferSize = list_to_integer(BufferSize0),
+
+    FoldFun = fun({DocId, _} = KV, Acc) ->
+        #acc{btree = Bt, kvs = Kvs, kvs_size = KvsSize, last_id = LastId} = Acc,
+        if DocId =:= LastId ->
+            % COUCHDB-999 regression test
+            ?LOG_ERROR("Duplicate docid `~s` detected in view group `~s`"
+                ++ ", database `~s` - This view needs to be rebuilt.",
+                [DocId, IdxName, DbName]
+            ),
+            exit({view_duplicate_id, DocId});
+        true -> ok end,
+        KvsSize2 = KvsSize + ?term_size(KV),
+        case KvsSize2 >= BufferSize of
+            true ->
+                {ok, Bt2} = couch_btree:add(Bt, lists:reverse([KV | Kvs])),
+                Acc2 = update_task(Acc, 1 + length(Kvs)),
+                {ok, Acc2#acc{
+                    btree = Bt2, kvs = [], kvs_size = 0, last_id = DocId}};
+            _ ->
+                {ok, Acc#acc{
+                    kvs = [KV | Kvs], kvs_size = KvsSize2, last_id = DocId}}
+        end
+    end,
+
+    InitAcc = #acc{total_changes = TotalChanges, btree = EmptyIdBtree},
+    {ok, _, FinalAcc} = couch_btree:foldl(IdBtree, FoldFun, InitAcc),
+    #acc{btree = Bt3, kvs = Uncopied} = FinalAcc,
+    {ok, NewIdBtree} = couch_btree:add(Bt3, lists:reverse(Uncopied)),
+    FinalAcc2 = update_task(FinalAcc, length(Uncopied)),
+
+    {NewViews, _} = lists:mapfoldl(fun({View, EmptyView}, Acc) ->
+        compact_view(View, EmptyView, BufferSize, Acc)
+    end, FinalAcc2, lists:zip(Views, EmptyViews)),
+
+    unlink(EmptyState#mrst.fd),
+    {ok, EmptyState#mrst{
+        id_btree=NewIdBtree,
+        views=NewViews,
+        update_seq=Seq
+    }}.
+
+
+recompact(State) ->
+    link(State#mrst.fd),
+    {Pid, Ref} = erlang:spawn_monitor(fun() ->
+        couch_index_updater:update(couch_mrview_index, State)
+    end),
+    receive
+        {'DOWN', Ref, _, _, {updated, Pid, State2}} ->
+            unlink(State#mrst.fd),
+            {ok, State2}
+    end.
+
+
+%% @spec compact_view(View, EmptyView, Retry, Acc) -> {CompactView, NewAcc}
+compact_view(View, EmptyView, BufferSize, Acc0) ->
+    Fun = fun(KV, #acc{btree = Bt, kvs = Kvs, kvs_size = KvsSize} = Acc) ->
+        KvsSize2 = KvsSize + ?term_size(KV),
+        if KvsSize2 >= BufferSize ->
+            {ok, Bt2} = couch_btree:add(Bt, lists:reverse([KV | Kvs])),
+            Acc2 = update_task(Acc, 1 + length(Kvs)),
+            {ok, Acc2#acc{btree = Bt2, kvs = [], kvs_size = 0}};
+        true ->
+            {ok, Acc#acc{kvs = [KV | Kvs], kvs_size = KvsSize2}}
+        end
+    end,
+
+    InitAcc = Acc0#acc{kvs = [], kvs_size = 0, btree = EmptyView#mrview.btree},
+    {ok, _, FinalAcc} = couch_btree:foldl(View#mrview.btree, Fun, InitAcc),
+    #acc{btree = Bt3, kvs = Uncopied} = FinalAcc,
+    {ok, NewBt} = couch_btree:add(Bt3, lists:reverse(Uncopied)),
+    FinalAcc2 = update_task(FinalAcc, length(Uncopied)),
+    {EmptyView#mrview{btree=NewBt}, FinalAcc2}.
+
+
+update_task(#acc{changes = Changes, total_changes = Total} = Acc, ChangesInc) ->
+    Changes2 = Changes + ChangesInc,
+    couch_task_status:update([{progress, (Changes2 * 100) div Total}]),
+    Acc#acc{changes = Changes2}.
+
+
+swap_compacted(OldState, NewState) ->
+    #mrst{
+        sig=Sig,
+        db_name=DbName
+    } = NewState,
+
+    link(NewState#mrst.fd),
+
+    RootDir = couch_index_util:root_dir(),
+    IndexFName = couch_mrview_util:index_file(DbName, Sig),
+    CompactFName = couch_mrview_util:compaction_file(DbName, Sig),
+    ok = couch_file:delete(RootDir, IndexFName),
+    ok = file:rename(CompactFName, IndexFName),
+
+    unlink(OldState#mrst.fd),
+    couch_ref_counter:drop(OldState#mrst.refc),
+    {ok, NewRefCounter} = couch_ref_counter:start([NewState#mrst.fd]),
+
+    {ok, NewState#mrst{refc=NewRefCounter}}.

http://git-wip-us.apache.org/repos/asf/couchdb/blob/add91738/apps/couch_mrview/src/couch_mrview_http.erl
----------------------------------------------------------------------
diff --git a/apps/couch_mrview/src/couch_mrview_http.erl b/apps/couch_mrview/src/couch_mrview_http.erl
new file mode 100644
index 0000000..22e0dc3
--- /dev/null
+++ b/apps/couch_mrview/src/couch_mrview_http.erl
@@ -0,0 +1,409 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_mrview_http).
+
+-export([
+    handle_all_docs_req/2,
+    handle_view_req/3,
+    handle_temp_view_req/2,
+    handle_info_req/3,
+    handle_compact_req/3,
+    handle_cleanup_req/2,
+    parse_qs/2
+]).
+
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch_mrview/include/couch_mrview.hrl").
+
+
+-record(vacc, {
+    db,
+    req,
+    resp,
+    prepend,
+    etag
+}).
+
+
+handle_all_docs_req(#httpd{method='GET'}=Req, Db) ->
+    all_docs_req(Req, Db, undefined);
+handle_all_docs_req(#httpd{method='POST'}=Req, Db) ->
+    Keys = get_view_keys(couch_httpd:json_body_obj(Req)),
+    all_docs_req(Req, Db, Keys);
+handle_all_docs_req(Req, _Db) ->
+    couch_httpd:send_method_not_allowed(Req, "GET,POST,HEAD").
+
+
+handle_view_req(#httpd{method='GET'}=Req, Db, DDoc) ->
+    [_, _, _, _, ViewName] = Req#httpd.path_parts,
+    couch_stats_collector:increment({httpd, view_reads}),
+    design_doc_view(Req, Db, DDoc, ViewName, undefined);
+handle_view_req(#httpd{method='POST'}=Req, Db, DDoc) ->
+    [_, _, _, _, ViewName] = Req#httpd.path_parts,
+    Keys = get_view_keys(couch_httpd:json_body_obj(Req)),
+    couch_stats_collector:increment({httpd, view_reads}),
+    design_doc_view(Req, Db, DDoc, ViewName, Keys);
+handle_view_req(Req, _Db, _DDoc) ->
+    couch_httpd:send_method_not_allowed(Req, "GET,POST,HEAD").
+
+
+handle_temp_view_req(#httpd{method='POST'}=Req, Db) ->
+    couch_httpd:validate_ctype(Req, "application/json"),
+    ok = couch_db:check_is_admin(Db),
+    {Body} = couch_httpd:json_body_obj(Req),
+    DDoc = couch_mrview_util:temp_view_to_ddoc({Body}),
+    Keys = get_view_keys({Body}),
+    couch_stats_collector:increment({httpd, temporary_view_reads}),
+    design_doc_view(Req, Db, DDoc, <<"temp">>, Keys);
+handle_temp_view_req(Req, _Db) ->
+    couch_httpd:send_method_not_allowed(Req, "POST").
+
+
+handle_info_req(#httpd{method='GET'}=Req, Db, DDoc) ->
+    [_, _, Name, _] = Req#httpd.path_parts,
+    {ok, Info} = couch_mrview:get_info(Db, DDoc),
+    couch_httpd:send_json(Req, 200, {[
+        {name, Name},
+        {view_index, {Info}}
+    ]});
+handle_info_req(Req, _Db, _DDoc) ->
+    couch_httpd:send_method_not_allowed(Req, "GET").
+
+
+handle_compact_req(#httpd{method='POST'}=Req, Db, DDoc) ->
+    ok = couch_db:check_is_admin(Db),
+    couch_httpd:validate_ctype(Req, "application/json"),
+    ok = couch_mrview:compact(Db, DDoc),
+    couch_httpd:send_json(Req, 202, {[{ok, true}]});
+handle_compact_req(Req, _Db, _DDoc) ->
+    couch_httpd:send_method_not_allowed(Req, "POST").
+
+
+handle_cleanup_req(#httpd{method='POST'}=Req, Db) ->
+    ok = couch_db:check_is_admin(Db),
+    couch_httpd:validate_ctype(Req, "application/json"),
+    ok = couch_mrview:cleanup(Db),
+    couch_httpd:send_json(Req, 202, {[{ok, true}]});
+handle_cleanup_req(Req, _Db) ->
+    couch_httpd:send_method_not_allowed(Req, "POST").
+
+
+all_docs_req(Req, Db, Keys) ->
+    case couch_db:is_system_db(Db) of
+    true ->
+        case (catch couch_db:check_is_admin(Db)) of
+        ok ->
+            do_all_docs_req(Req, Db, Keys);
+        _ ->
+            DbName = ?b2l(Db#db.name),
+            case couch_config:get("couch_httpd_auth",
+                                  "authentication_db",
+                                  "_users") of
+            DbName ->
+                UsersDbPublic = couch_config:get("couch_httpd_auth", "users_db_public", "false"),
+                PublicFields = couch_config:get("couch_httpd_auth", "public_fields"),
+                case {UsersDbPublic, PublicFields} of
+                {"true", PublicFields} when PublicFields =/= undefined ->
+                    do_all_docs_req(Req, Db, Keys);
+                {_, _} ->
+                    throw({forbidden, <<"Only admins can access _all_docs",
+                                        " of system databases.">>})
+                end;
+            _ ->
+                throw({forbidden, <<"Only admins can access _all_docs",
+                                    " of system databases.">>})
+            end
+        end;
+    false ->
+        do_all_docs_req(Req, Db, Keys)
+    end.
+
+do_all_docs_req(Req, Db, Keys) ->
+    Args0 = parse_qs(Req, Keys),
+    ETagFun = fun(Sig, Acc0) ->
+        ETag = couch_httpd:make_etag(Sig),
+        case couch_httpd:etag_match(Req, ETag) of
+            true -> throw({etag_match, ETag});
+            false -> {ok, Acc0#vacc{etag=ETag}}
+        end
+    end,
+    Args = Args0#mrargs{preflight_fun=ETagFun},
+    {ok, Resp} = couch_httpd:etag_maybe(Req, fun() ->
+        VAcc0 = #vacc{db=Db, req=Req},
+        DbName = ?b2l(Db#db.name),
+        UsersDbName = couch_config:get("couch_httpd_auth",
+                                         "authentication_db",
+                                         "_users"),
+        IsAdmin = is_admin(Db),
+        Callback = get_view_callback(DbName, UsersDbName, IsAdmin),
+        couch_mrview:query_all_docs(Db, Args, Callback, VAcc0)
+    end),
+    case is_record(Resp, vacc) of
+        true -> {ok, Resp#vacc.resp};
+        _ -> {ok, Resp}
+    end.
+
+is_admin(Db) ->
+    case catch couch_db:check_is_admin(Db) of
+    {unauthorized, _} ->
+        false;
+    ok ->
+        true
+    end.
+
+
+% admin users always get all fields
+get_view_callback(_, _, true) ->
+    fun view_cb/2;
+% if we are operating on the users db and we aren't
+% admin, filter the view
+get_view_callback(_DbName, _DbName, false) ->
+    fun filtered_view_cb/2;
+% non _users databases get all fields
+get_view_callback(_, _, _) ->
+    fun view_cb/2.
+
+
+design_doc_view(Req, Db, DDoc, ViewName, Keys) ->
+    Args0 = parse_qs(Req, Keys),
+    ETagFun = fun(Sig, Acc0) ->
+        ETag = couch_httpd:make_etag(Sig),
+        case couch_httpd:etag_match(Req, ETag) of
+            true -> throw({etag_match, ETag});
+            false -> {ok, Acc0#vacc{etag=ETag}}
+        end
+    end,
+    Args = Args0#mrargs{preflight_fun=ETagFun},
+    {ok, Resp} = couch_httpd:etag_maybe(Req, fun() ->
+        VAcc0 = #vacc{db=Db, req=Req},
+        couch_mrview:query_view(Db, DDoc, ViewName, Args, fun view_cb/2, VAcc0)
+    end),
+    case is_record(Resp, vacc) of
+        true -> {ok, Resp#vacc.resp};
+        _ -> {ok, Resp}
+    end.
+
+
+filtered_view_cb({row, Row0}, Acc) ->
+  Row1 = lists:map(fun({doc, null}) ->
+        {doc, null};
+    ({doc, Body}) ->
+        Doc = couch_users_db:strip_non_public_fields(#doc{body=Body}),
+        {doc, Doc#doc.body};
+    (KV) ->
+        KV
+    end, Row0),
+    view_cb({row, Row1}, Acc);
+filtered_view_cb(Obj, Acc) ->
+    view_cb(Obj, Acc).
+
+
+view_cb({meta, Meta}, #vacc{resp=undefined}=Acc) ->
+    Headers = [{"ETag", Acc#vacc.etag}],
+    {ok, Resp} = couch_httpd:start_json_response(Acc#vacc.req, 200, Headers),
+    % Map function starting
+    Parts = case couch_util:get_value(total, Meta) of
+        undefined -> [];
+        Total -> [io_lib:format("\"total_rows\":~p", [Total])]
+    end ++ case couch_util:get_value(offset, Meta) of
+        undefined -> [];
+        Offset -> [io_lib:format("\"offset\":~p", [Offset])]
+    end ++ case couch_util:get_value(update_seq, Meta) of
+        undefined -> [];
+        UpdateSeq -> [io_lib:format("\"update_seq\":~p", [UpdateSeq])]
+    end ++ ["\"rows\":["],
+    Chunk = lists:flatten("{" ++ string:join(Parts, ",") ++ "\r\n"),
+    couch_httpd:send_chunk(Resp, Chunk),
+    {ok, Acc#vacc{resp=Resp, prepend=""}};
+view_cb({row, Row}, #vacc{resp=undefined}=Acc) ->
+    % Reduce function starting
+    Headers = [{"ETag", Acc#vacc.etag}],
+    {ok, Resp} = couch_httpd:start_json_response(Acc#vacc.req, 200, Headers),
+    couch_httpd:send_chunk(Resp, ["{\"rows\":[\r\n", row_to_json(Row)]),
+    {ok, #vacc{resp=Resp, prepend=",\r\n"}};
+view_cb({row, Row}, Acc) ->
+    % Adding another row
+    couch_httpd:send_chunk(Acc#vacc.resp, [Acc#vacc.prepend, row_to_json(Row)]),
+    {ok, Acc#vacc{prepend=",\r\n"}};
+view_cb(complete, #vacc{resp=undefined}=Acc) ->
+    % Nothing in view
+    {ok, Resp} = couch_httpd:send_json(Acc#vacc.req, 200, {[{rows, []}]}),
+    {ok, Acc#vacc{resp=Resp}};
+view_cb(complete, Acc) ->
+    % Finish view output
+    couch_httpd:send_chunk(Acc#vacc.resp, "\r\n]}"),
+    couch_httpd:end_json_response(Acc#vacc.resp),
+    {ok, Acc}.
+
+
+row_to_json(Row) ->
+    Id = couch_util:get_value(id, Row),
+    row_to_json(Id, Row).
+
+
+row_to_json(error, Row) ->
+    % Special case for _all_docs request with KEYS to
+    % match prior behavior.
+    Key = couch_util:get_value(key, Row),
+    Val = couch_util:get_value(value, Row),
+    Obj = {[{key, Key}, {error, Val}]},
+    ?JSON_ENCODE(Obj);
+row_to_json(Id0, Row) ->
+    Id = case Id0 of
+        undefined -> [];
+        Id0 -> [{id, Id0}]
+    end,
+    Key = couch_util:get_value(key, Row, null),
+    Val = couch_util:get_value(value, Row),
+    Doc = case couch_util:get_value(doc, Row) of
+        undefined -> [];
+        Doc0 -> [{doc, Doc0}]
+    end,
+    Obj = {Id ++ [{key, Key}, {value, Val}] ++ Doc},
+    ?JSON_ENCODE(Obj).
+
+
+get_view_keys({Props}) ->
+    case couch_util:get_value(<<"keys">>, Props) of
+        undefined ->
+            ?LOG_DEBUG("POST with no keys member.", []),
+            undefined;
+        Keys when is_list(Keys) ->
+            Keys;
+        _ ->
+            throw({bad_request, "`keys` member must be a array."})
+    end.
+
+
+parse_qs(Req, Keys) ->
+    Args = #mrargs{keys=Keys},
+    lists:foldl(fun({K, V}, Acc) ->
+        parse_qs(K, V, Acc)
+    end, Args, couch_httpd:qs(Req)).
+
+
+parse_qs(Key, Val, Args) ->
+    case Key of
+        "" ->
+            Args;
+        "reduce" ->
+            Args#mrargs{reduce=parse_boolean(Val)};
+        "key" ->
+            JsonKey = ?JSON_DECODE(Val),
+            Args#mrargs{start_key=JsonKey, end_key=JsonKey};
+        "keys" ->
+            Args#mrargs{keys=?JSON_DECODE(Val)};
+        "startkey" ->
+            Args#mrargs{start_key=?JSON_DECODE(Val)};
+        "start_key" ->
+            Args#mrargs{start_key=?JSON_DECODE(Val)};
+        "startkey_docid" ->
+            Args#mrargs{start_key_docid=list_to_binary(Val)};
+        "start_key_doc_id" ->
+            Args#mrargs{start_key_docid=list_to_binary(Val)};
+        "endkey" ->
+            Args#mrargs{end_key=?JSON_DECODE(Val)};
+        "end_key" ->
+            Args#mrargs{end_key=?JSON_DECODE(Val)};
+        "endkey_docid" ->
+            Args#mrargs{end_key_docid=list_to_binary(Val)};
+        "end_key_doc_id" ->
+            Args#mrargs{end_key_docid=list_to_binary(Val)};
+        "limit" ->
+            Args#mrargs{limit=parse_pos_int(Val)};
+        "count" ->
+            throw({query_parse_error, <<"QS param `count` is not `limit`">>});
+        "stale" when Val == "ok" ->
+            Args#mrargs{stale=ok};
+        "stale" when Val == "update_after" ->
+            Args#mrargs{stale=update_after};
+        "stale" ->
+            throw({query_parse_error, <<"Invalid value for `stale`.">>});
+        "descending" ->
+            case parse_boolean(Val) of
+                true -> Args#mrargs{direction=rev};
+                _ -> Args#mrargs{direction=fwd}
+            end;
+        "skip" ->
+            Args#mrargs{skip=parse_pos_int(Val)};
+        "group" ->
+            case parse_boolean(Val) of
+                true -> Args#mrargs{group_level=exact};
+                _ -> Args#mrargs{group_level=0}
+            end;
+        "group_level" ->
+            Args#mrargs{group_level=parse_pos_int(Val)};
+        "inclusive_end" ->
+            Args#mrargs{inclusive_end=parse_boolean(Val)};
+        "include_docs" ->
+            Args#mrargs{include_docs=parse_boolean(Val)};
+        "attachments" ->
+            case parse_boolean(Val) of
+            true ->
+                Opts = Args#mrargs.doc_options,
+                Args#mrargs{doc_options=[attachments|Opts]};
+            false ->
+                Args
+            end;
+        "att_encoding_info" ->
+            case parse_boolean(Val) of
+            true ->
+                Opts = Args#mrargs.doc_options,
+                Args#mrargs{doc_options=[att_encoding_info|Opts]};
+            false ->
+                Args
+            end;
+        "update_seq" ->
+            Args#mrargs{update_seq=parse_boolean(Val)};
+        "conflicts" ->
+            Args#mrargs{conflicts=parse_boolean(Val)};
+        "list" ->
+            Args#mrargs{list=list_to_binary(Val)};
+        "callback" ->
+            Args#mrargs{callback=list_to_binary(Val)};
+        _ ->
+            BKey = list_to_binary(Key),
+            BVal = list_to_binary(Val),
+            Args#mrargs{extra=[{BKey, BVal} | Args#mrargs.extra]}
+    end.
+
+
+parse_boolean(Val) ->
+    case string:to_lower(Val) of
+    "true" -> true;
+    "false" -> false;
+    _ ->
+        Msg = io_lib:format("Invalid boolean parameter: ~p", [Val]),
+        throw({query_parse_error, ?l2b(Msg)})
+    end.
+
+
+parse_int(Val) ->
+    case (catch list_to_integer(Val)) of
+    IntVal when is_integer(IntVal) ->
+        IntVal;
+    _ ->
+        Msg = io_lib:format("Invalid value for integer: ~p", [Val]),
+        throw({query_parse_error, ?l2b(Msg)})
+    end.
+
+
+parse_pos_int(Val) ->
+    case parse_int(Val) of
+    IntVal when IntVal >= 0 ->
+        IntVal;
+    _ ->
+        Fmt = "Invalid value for positive integer: ~p",
+        Msg = io_lib:format(Fmt, [Val]),
+        throw({query_parse_error, ?l2b(Msg)})
+    end.

http://git-wip-us.apache.org/repos/asf/couchdb/blob/add91738/apps/couch_mrview/src/couch_mrview_index.erl
----------------------------------------------------------------------
diff --git a/apps/couch_mrview/src/couch_mrview_index.erl b/apps/couch_mrview/src/couch_mrview_index.erl
new file mode 100644
index 0000000..7506f34
--- /dev/null
+++ b/apps/couch_mrview/src/couch_mrview_index.erl
@@ -0,0 +1,162 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_mrview_index).
+
+
+-export([get/2]).
+-export([init/2, open/2, close/1, reset/1, delete/1]).
+-export([start_update/3, purge/4, process_doc/3, finish_update/1, commit/1]).
+-export([compact/3, swap_compacted/2]).
+
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch_mrview/include/couch_mrview.hrl").
+
+
+get(Property, State) ->
+    case Property of
+        db_name ->
+            State#mrst.db_name;
+        idx_name ->
+            State#mrst.idx_name;
+        signature ->
+            State#mrst.sig;
+        update_seq ->
+            State#mrst.update_seq;
+        purge_seq ->
+            State#mrst.purge_seq;
+        update_options ->
+            Opts = State#mrst.design_opts,
+            IncDesign = couch_util:get_value(<<"include_design">>, Opts, false),
+            LocalSeq = couch_util:get_value(<<"local_seq">>, Opts, false),
+            if IncDesign -> [include_design]; true -> [] end
+                ++ if LocalSeq -> [local_seq]; true -> [] end;
+        info ->
+            #mrst{
+                fd = Fd,
+                sig = Sig,
+                id_btree = Btree,
+                language = Lang,
+                update_seq = UpdateSeq,
+                purge_seq = PurgeSeq,
+                views = Views
+            } = State,
+            {ok, Size} = couch_file:bytes(Fd),
+            {ok, DataSize} = couch_mrview_util:calculate_data_size(Btree,Views),
+            {ok, [
+                {signature, list_to_binary(couch_index_util:hexsig(Sig))},
+                {language, Lang},
+                {disk_size, Size},
+                {data_size, DataSize},
+                {update_seq, UpdateSeq},
+                {purge_seq, PurgeSeq}
+            ]};
+        Other ->
+            throw({unknown_index_property, Other})
+    end.
+
+
+init(Db, DDoc) ->
+    couch_mrview_util:ddoc_to_mrst(couch_db:name(Db), DDoc).
+
+
+open(Db, State) ->
+    #mrst{
+        db_name=DbName,
+        sig=Sig
+    } = State,
+    IndexFName = couch_mrview_util:index_file(DbName, Sig),
+
+    % If we are upgrading from <=1.2.x, we upgrade the view
+    % index file on the fly, avoiding an index reset.
+    %
+    % OldSig is `ok` if no upgrade happened.
+    %
+    % To remove suppport for 1.2.x auto-upgrades in the
+    % future, just remove the next line and the code
+    % between "upgrade code for <= 1.2.x" and
+    % "end upgrade code for <= 1.2.x" and the corresponding
+    % code in couch_mrview_util
+
+    OldSig = couch_mrview_util:maybe_update_index_file(State),
+
+    case couch_mrview_util:open_file(IndexFName) of
+        {ok, Fd} ->
+            case (catch couch_file:read_header(Fd)) of
+                % upgrade code for <= 1.2.x
+                {ok, {OldSig, Header}} ->
+                    % Matching view signatures.
+                    NewSt = couch_mrview_util:init_state(Db, Fd, State, Header),
+                    {ok, RefCounter} = couch_ref_counter:start([Fd]),
+                    {ok, NewSt#mrst{refc=RefCounter}};
+                % end of upgrade code for <= 1.2.x
+                {ok, {Sig, Header}} ->
+                    % Matching view signatures.
+                    NewSt = couch_mrview_util:init_state(Db, Fd, State, Header),
+                    {ok, RefCounter} = couch_ref_counter:start([Fd]),
+                    {ok, NewSt#mrst{refc=RefCounter}};
+                _ ->
+                    NewSt = couch_mrview_util:reset_index(Db, Fd, State),
+                    {ok, RefCounter} = couch_ref_counter:start([Fd]),
+                    {ok, NewSt#mrst{refc=RefCounter}}
+            end;
+        {error, Reason} = Error ->
+            ?LOG_ERROR("Failed to open view file '~s': ~s",
+                       [IndexFName, file:format_error(Reason)]),
+            Error
+    end.
+
+
+close(State) ->
+    couch_file:close(State#mrst.fd).
+
+
+delete(#mrst{db_name=DbName, sig=Sig}=State) ->
+    couch_file:close(State#mrst.fd),
+    catch couch_mrview_util:delete_files(DbName, Sig).
+
+
+reset(State) ->
+    couch_util:with_db(State#mrst.db_name, fun(Db) ->
+        NewState = couch_mrview_util:reset_index(Db, State#mrst.fd, State),
+        {ok, NewState}
+    end).
+
+
+start_update(PartialDest, State, NumChanges) ->
+    couch_mrview_updater:start_update(PartialDest, State, NumChanges).
+
+
+purge(Db, PurgeSeq, PurgedIdRevs, State) ->
+    couch_mrview_updater:purge(Db, PurgeSeq, PurgedIdRevs, State).
+
+
+process_doc(Doc, Seq, State) ->
+    couch_mrview_updater:process_doc(Doc, Seq, State).
+
+
+finish_update(State) ->
+    couch_mrview_updater:finish_update(State).
+
+
+commit(State) ->
+    Header = {State#mrst.sig, couch_mrview_util:make_header(State)},
+    couch_file:write_header(State#mrst.fd, Header).
+
+
+compact(Db, State, Opts) ->
+    couch_mrview_compactor:compact(Db, State, Opts).
+
+
+swap_compacted(OldState, NewState) ->
+    couch_mrview_compactor:swap_compacted(OldState, NewState).
+

http://git-wip-us.apache.org/repos/asf/couchdb/blob/add91738/apps/couch_mrview/src/couch_mrview_show.erl
----------------------------------------------------------------------
diff --git a/apps/couch_mrview/src/couch_mrview_show.erl b/apps/couch_mrview/src/couch_mrview_show.erl
new file mode 100644
index 0000000..4ab8e17
--- /dev/null
+++ b/apps/couch_mrview/src/couch_mrview_show.erl
@@ -0,0 +1,368 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_mrview_show).
+
+-export([
+    handle_doc_show_req/3,
+    handle_doc_update_req/3,
+    handle_view_list_req/3
+]).
+
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch_mrview/include/couch_mrview.hrl").
+
+-record(lacc, {
+    db,
+    req,
+    resp,
+    qserver,
+    lname,
+    etag,
+    code,
+    headers
+}).
+
+% /db/_design/foo/_show/bar/docid
+% show converts a json doc to a response of any content-type.
+% it looks up the doc an then passes it to the query server.
+% then it sends the response from the query server to the http client.
+
+maybe_open_doc(Db, DocId) ->
+    case catch couch_httpd_db:couch_doc_open(Db, DocId, nil, [conflicts]) of
+        #doc{} = Doc -> Doc;
+        {not_found, _} -> nil
+    end.
+
+handle_doc_show_req(#httpd{
+        path_parts=[_, _, _, _, ShowName, DocId]
+    }=Req, Db, DDoc) ->
+
+    % open the doc
+    Doc = maybe_open_doc(Db, DocId),
+
+    % we don't handle revs here b/c they are an internal api
+    % returns 404 if there is no doc with DocId
+    handle_doc_show(Req, Db, DDoc, ShowName, Doc, DocId);
+
+handle_doc_show_req(#httpd{
+        path_parts=[_, _, _, _, ShowName, DocId|Rest]
+    }=Req, Db, DDoc) ->
+
+    DocParts = [DocId|Rest],
+    DocId1 = ?l2b(string:join([?b2l(P)|| P <- DocParts], "/")),
+
+    % open the doc
+    Doc = maybe_open_doc(Db, DocId1),
+
+    % we don't handle revs here b/c they are an internal api
+    % pass 404 docs to the show function
+    handle_doc_show(Req, Db, DDoc, ShowName, Doc, DocId1);
+
+handle_doc_show_req(#httpd{
+        path_parts=[_, _, _, _, ShowName]
+    }=Req, Db, DDoc) ->
+    % with no docid the doc is nil
+    handle_doc_show(Req, Db, DDoc, ShowName, nil);
+
+handle_doc_show_req(Req, _Db, _DDoc) ->
+    couch_httpd:send_error(Req, 404, <<"show_error">>, <<"Invalid path.">>).
+
+handle_doc_show(Req, Db, DDoc, ShowName, Doc) ->
+    handle_doc_show(Req, Db, DDoc, ShowName, Doc, null).
+
+handle_doc_show(Req, Db, DDoc, ShowName, Doc, DocId) ->
+    % get responder for ddoc/showname
+    CurrentEtag = show_etag(Req, Doc, DDoc, []),
+    couch_httpd:etag_respond(Req, CurrentEtag, fun() ->
+        JsonReq = couch_httpd_external:json_req_obj(Req, Db, DocId),
+        JsonDoc = couch_query_servers:json_doc(Doc),
+        [<<"resp">>, ExternalResp] =
+            couch_query_servers:ddoc_prompt(DDoc, [<<"shows">>, ShowName],
+                [JsonDoc, JsonReq]),
+        JsonResp = apply_etag(ExternalResp, CurrentEtag),
+        couch_httpd_external:send_external_response(Req, JsonResp)
+    end).
+
+
+show_etag(#httpd{user_ctx=UserCtx}=Req, Doc, DDoc, More) ->
+    Accept = couch_httpd:header_value(Req, "Accept"),
+    DocPart = case Doc of
+        nil -> nil;
+        Doc -> couch_httpd:doc_etag(Doc)
+    end,
+    couch_httpd:make_etag({couch_httpd:doc_etag(DDoc), DocPart, Accept,
+        {UserCtx#user_ctx.name, UserCtx#user_ctx.roles}, More}).
+
+% updates a doc based on a request
+% handle_doc_update_req(#httpd{method = 'GET'}=Req, _Db, _DDoc) ->
+%     % anything but GET
+%     send_method_not_allowed(Req, "POST,PUT,DELETE,ETC");
+
+% This call is creating a new doc using an _update function to
+% modify the provided request body.
+% /db/_design/foo/_update/bar
+handle_doc_update_req(#httpd{
+        path_parts=[_, _, _, _, UpdateName]
+    }=Req, Db, DDoc) ->
+    send_doc_update_response(Req, Db, DDoc, UpdateName, nil, null);
+
+% /db/_design/foo/_update/bar/docid
+handle_doc_update_req(#httpd{
+        path_parts=[_, _, _, _, UpdateName | DocIdParts]
+    }=Req, Db, DDoc) ->
+    DocId = ?l2b(string:join([?b2l(P) || P <- DocIdParts], "/")),
+    Doc = maybe_open_doc(Db, DocId),
+    send_doc_update_response(Req, Db, DDoc, UpdateName, Doc, DocId);
+
+
+handle_doc_update_req(Req, _Db, _DDoc) ->
+    couch_httpd:send_error(Req, 404, <<"update_error">>, <<"Invalid path.">>).
+
+send_doc_update_response(Req, Db, DDoc, UpdateName, Doc, DocId) ->
+    JsonReq = couch_httpd_external:json_req_obj(Req, Db, DocId),
+    JsonDoc = couch_query_servers:json_doc(Doc),
+    Cmd = [<<"updates">>, UpdateName],
+    UpdateResp = couch_query_servers:ddoc_prompt(DDoc, Cmd, [JsonDoc, JsonReq]),
+    JsonResp = case UpdateResp of
+        [<<"up">>, {NewJsonDoc}, {JsonResp0}] ->
+            case couch_httpd:header_value(
+                    Req, "X-Couch-Full-Commit", "false") of
+                "true" ->
+                    Options = [full_commit, {user_ctx, Req#httpd.user_ctx}];
+                _ ->
+                    Options = [{user_ctx, Req#httpd.user_ctx}]
+            end,
+            NewDoc = couch_doc:from_json_obj({NewJsonDoc}),
+            couch_doc:validate_docid(NewDoc#doc.id),
+            {ok, NewRev} = couch_db:update_doc(Db, NewDoc, Options),
+            NewRevStr = couch_doc:rev_to_str(NewRev),
+            {[
+                {<<"code">>, 201},
+                {<<"headers">>, {[
+                    {<<"X-Couch-Update-NewRev">>, NewRevStr},
+                    {<<"X-Couch-Id">>, NewDoc#doc.id}
+                ]}}
+                | JsonResp0]};
+        [<<"up">>, _Other, {JsonResp0}] ->
+            {[{<<"code">>, 200} | JsonResp0]}
+    end,
+    % todo set location field
+    couch_httpd_external:send_external_response(Req, JsonResp).
+
+
+handle_view_list_req(#httpd{method=Method}=Req, Db, DDoc)
+    when Method =:= 'GET' orelse Method =:= 'OPTIONS' ->
+    case Req#httpd.path_parts of
+        [_, _, _DName, _, LName, VName] ->
+            % Same design doc for view and list
+            handle_view_list(Req, Db, DDoc, LName, DDoc, VName, undefined);
+        [_, _, _, _, LName, DName, VName] ->
+            % Different design docs for view and list
+            VDocId = <<"_design/", DName/binary>>,
+            {ok, VDDoc} = couch_db:open_doc(Db, VDocId, [ejson_body]),
+            handle_view_list(Req, Db, DDoc, LName, VDDoc, VName, undefined);
+        _ ->
+            couch_httpd:send_error(Req, 404, <<"list_error">>, <<"Bad path.">>)
+    end;
+handle_view_list_req(#httpd{method='POST'}=Req, Db, DDoc) ->
+    {Props} = couch_httpd:json_body_obj(Req),
+    Keys = proplists:get_value(<<"keys">>, Props),
+    case Req#httpd.path_parts of
+        [_, _, _DName, _, LName, VName] ->
+            handle_view_list(Req, Db, DDoc, LName, DDoc, VName, Keys);
+        [_, _, _, _, LName, DName, VName] ->
+            % Different design docs for view and list
+            VDocId = <<"_design/", DName/binary>>,
+            {ok, VDDoc} = couch_db:open_doc(Db, VDocId, [ejson_body]),
+            handle_view_list(Req, Db, DDoc, LName, VDDoc, VName, Keys);
+        _ ->
+            couch_httpd:send_error(Req, 404, <<"list_error">>, <<"Bad path.">>)
+    end;
+handle_view_list_req(Req, _Db, _DDoc) ->
+    couch_httpd:send_method_not_allowed(Req, "GET,POST,HEAD").
+
+
+handle_view_list(Req, Db, DDoc, LName, VDDoc, VName, Keys) ->
+    Args0 = couch_mrview_http:parse_qs(Req, Keys),
+    ETagFun = fun(BaseSig, Acc0) ->
+        UserCtx = Req#httpd.user_ctx,
+        Name = UserCtx#user_ctx.name,
+        Roles = UserCtx#user_ctx.roles,
+        Accept = couch_httpd:header_value(Req, "Accept"),
+        Parts = {couch_httpd:doc_etag(DDoc), Accept, {Name, Roles}},
+        ETag = couch_httpd:make_etag({BaseSig, Parts}),
+        case couch_httpd:etag_match(Req, ETag) of
+            true -> throw({etag_match, ETag});
+            false -> {ok, Acc0#lacc{etag=ETag}}
+        end
+    end,
+    Args = Args0#mrargs{preflight_fun=ETagFun},
+    couch_httpd:etag_maybe(Req, fun() ->
+        couch_query_servers:with_ddoc_proc(DDoc, fun(QServer) ->
+            Acc = #lacc{db=Db, req=Req, qserver=QServer, lname=LName},
+            case VName of
+              <<"_all_docs">> ->
+                couch_mrview:query_all_docs(Db, Args, fun list_cb/2, Acc);
+              _ ->
+                couch_mrview:query_view(Db, VDDoc, VName, Args, fun list_cb/2, Acc)
+            end
+        end)
+    end).
+
+
+list_cb({meta, Meta}, #lacc{code=undefined} = Acc) ->
+    MetaProps = case couch_util:get_value(total, Meta) of
+        undefined -> [];
+        Total -> [{total_rows, Total}]
+    end ++ case couch_util:get_value(offset, Meta) of
+        undefined -> [];
+        Offset -> [{offset, Offset}]
+    end ++ case couch_util:get_value(update_seq, Meta) of
+        undefined -> [];
+        UpdateSeq -> [{update_seq, UpdateSeq}]
+    end,
+    start_list_resp({MetaProps}, Acc);
+list_cb({row, Row}, #lacc{code=undefined} = Acc) ->
+    {ok, NewAcc} = start_list_resp({[]}, Acc),
+    send_list_row(Row, NewAcc);
+list_cb({row, Row}, Acc) ->
+    send_list_row(Row, Acc);
+list_cb(complete, Acc) ->
+    #lacc{qserver = {Proc, _}, resp = Resp0} = Acc,
+    if Resp0 =:= nil ->
+        {ok, #lacc{resp = Resp}} = start_list_resp({[]}, Acc);
+    true ->
+        Resp = Resp0
+    end,
+    case couch_query_servers:proc_prompt(Proc, [<<"list_end">>]) of
+        [<<"end">>, Data, Headers] ->
+            Acc2 = fixup_headers(Headers, Acc#lacc{resp=Resp}),
+            #lacc{resp = Resp2} = send_non_empty_chunk(Acc2, Data);
+        [<<"end">>, Data] ->
+            #lacc{resp = Resp2} = send_non_empty_chunk(Acc#lacc{resp=Resp}, Data)
+    end,
+    couch_httpd:last_chunk(Resp2),
+    {ok, Resp2}.
+
+start_list_resp(Head, Acc) ->
+    #lacc{db=Db, req=Req, qserver=QServer, lname=LName} = Acc,
+    JsonReq = couch_httpd_external:json_req_obj(Req, Db),
+
+    [<<"start">>,Chunk,JsonResp] = couch_query_servers:ddoc_proc_prompt(QServer,
+        [<<"lists">>, LName], [Head, JsonReq]),
+    Acc2 = send_non_empty_chunk(fixup_headers(JsonResp, Acc), Chunk),
+    {ok, Acc2}.
+
+fixup_headers(Headers, #lacc{etag=ETag} = Acc) ->
+    Headers2 = apply_etag(Headers, ETag),
+    #extern_resp_args{
+        code = Code,
+        ctype = CType,
+        headers = ExtHeaders
+    } = couch_httpd_external:parse_external_response(Headers2),
+    Headers3 = couch_httpd_external:default_or_content_type(CType, ExtHeaders),
+    Acc#lacc{code=Code, headers=Headers3}.
+
+send_list_row(Row, #lacc{qserver = {Proc, _}, resp = Resp} = Acc) ->
+    RowObj = case couch_util:get_value(id, Row) of
+        undefined -> [];
+        Id -> [{id, Id}]
+    end ++ case couch_util:get_value(key, Row) of
+        undefined -> [];
+        Key -> [{key, Key}]
+    end ++ case couch_util:get_value(value, Row) of
+        undefined -> [];
+        Val -> [{value, Val}]
+    end ++ case couch_util:get_value(doc, Row) of
+        undefined -> [];
+        Doc -> [{doc, Doc}]
+    end,
+    try couch_query_servers:proc_prompt(Proc, [<<"list_row">>, {RowObj}]) of
+    [<<"chunks">>, Chunk, Headers] ->
+        Acc2 = send_non_empty_chunk(fixup_headers(Headers, Acc), Chunk),
+        {ok, Acc2};
+    [<<"chunks">>, Chunk] ->
+        Acc2 = send_non_empty_chunk(Acc, Chunk),
+        {ok, Acc2};
+    [<<"end">>, Chunk, Headers] ->
+        Acc2 = send_non_empty_chunk(fixup_headers(Headers, Acc), Chunk),
+        #lacc{resp = Resp2} = Acc2,
+        couch_httpd:last_chunk(Resp2),
+        {stop, Acc2};
+    [<<"end">>, Chunk] ->
+        Acc2 = send_non_empty_chunk(Acc, Chunk),
+        #lacc{resp = Resp2} = Acc2,
+        couch_httpd:last_chunk(Resp2),
+        {stop, Acc2}
+    catch Error ->
+        case Resp of
+            undefined ->
+                {Code, _, _} = couch_httpd:error_info(Error),
+                #lacc{req=Req, headers=Headers} = Acc,
+                {ok, Resp2} = couch_httpd:start_chunked_response(Req, Code, Headers),
+                Acc2 = Acc#lacc{resp=Resp2, code=Code};
+            _ -> Resp2 = Resp, Acc2 = Acc
+        end,
+        couch_httpd:send_chunked_error(Resp2, Error),
+        {stop, Acc2}
+    end.
+
+send_non_empty_chunk(Acc, []) ->
+    Acc;
+send_non_empty_chunk(#lacc{resp=undefined} = Acc, Chunk) ->
+    #lacc{req=Req, code=Code, headers=Headers} = Acc,
+    {ok, Resp} = couch_httpd:start_chunked_response(Req, Code, Headers),
+    send_non_empty_chunk(Acc#lacc{resp = Resp}, Chunk);
+send_non_empty_chunk(#lacc{resp=Resp} = Acc, Chunk) ->
+    couch_httpd:send_chunk(Resp, Chunk),
+    Acc.
+
+
+apply_etag({ExternalResponse}, CurrentEtag) ->
+    % Here we embark on the delicate task of replacing or creating the
+    % headers on the JsonResponse object. We need to control the Etag and
+    % Vary headers. If the external function controls the Etag, we'd have to
+    % run it to check for a match, which sort of defeats the purpose.
+    case couch_util:get_value(<<"headers">>, ExternalResponse, nil) of
+    nil ->
+        % no JSON headers
+        % add our Etag and Vary headers to the response
+        {[{<<"headers">>, {[{<<"Etag">>, CurrentEtag}, {<<"Vary">>, <<"Accept">>}]}} | ExternalResponse]};
+    JsonHeaders ->
+        {[case Field of
+        {<<"headers">>, JsonHeaders} -> % add our headers
+            JsonHeadersEtagged = json_apply_field({<<"Etag">>, CurrentEtag}, JsonHeaders),
+            JsonHeadersVaried = json_apply_field({<<"Vary">>, <<"Accept">>}, JsonHeadersEtagged),
+            {<<"headers">>, JsonHeadersVaried};
+        _ -> % skip non-header fields
+            Field
+        end || Field <- ExternalResponse]}
+    end.
+
+
+% Maybe this is in the proplists API
+% todo move to couch_util
+json_apply_field(H, {L}) ->
+    json_apply_field(H, L, []).
+
+
+json_apply_field({Key, NewValue}, [{Key, _OldVal} | Headers], Acc) ->
+    % drop matching keys
+    json_apply_field({Key, NewValue}, Headers, Acc);
+json_apply_field({Key, NewValue}, [{OtherKey, OtherVal} | Headers], Acc) ->
+    % something else is next, leave it alone.
+    json_apply_field({Key, NewValue}, Headers, [{OtherKey, OtherVal} | Acc]);
+json_apply_field({Key, NewValue}, [], Acc) ->
+    % end of list, add ours
+    {[{Key, NewValue}|Acc]}.
+

http://git-wip-us.apache.org/repos/asf/couchdb/blob/add91738/apps/couch_mrview/src/couch_mrview_test_util.erl
----------------------------------------------------------------------
diff --git a/apps/couch_mrview/src/couch_mrview_test_util.erl b/apps/couch_mrview/src/couch_mrview_test_util.erl
new file mode 100644
index 0000000..adc2675
--- /dev/null
+++ b/apps/couch_mrview/src/couch_mrview_test_util.erl
@@ -0,0 +1,92 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_mrview_test_util).
+
+-compile(export_all).
+
+-include_lib("couch/include/couch_db.hrl").
+
+-define(ADMIN, #user_ctx{roles=[<<"_admin">>]}).
+
+
+init_db(Name, Type) ->
+    init_db(Name, Type, 10).
+
+
+init_db(Name, Type, Count) ->
+    {ok, Db} = new_db(Name, Type),
+    Docs = make_docs(Count),
+    save_docs(Db, Docs).
+
+
+new_db(Name, Type) ->
+    couch_server:delete(Name, [{user_ctx, ?ADMIN}]),
+    {ok, Db} = couch_db:create(Name, [{user_ctx, ?ADMIN}]),
+    save_docs(Db, [ddoc(Type)]).
+
+
+save_docs(Db, Docs) ->
+    {ok, _} = couch_db:update_docs(Db, Docs, []),
+    couch_db:reopen(Db).
+
+
+make_docs(Count) ->
+    make_docs(Count, []).
+
+make_docs(Count, Acc) when Count =< 0 ->
+    Acc;
+make_docs(Count, Acc) ->
+    make_docs(Count-1, [doc(Count) | Acc]).
+
+
+ddoc(map) ->
+    couch_doc:from_json_obj({[
+        {<<"_id">>, <<"_design/bar">>},
+        {<<"views">>, {[
+            {<<"baz">>, {[
+                {<<"map">>, <<"function(doc) {emit(doc.val, doc.val);}">>}
+            ]}},
+            {<<"bing">>, {[
+                {<<"map">>, <<"function(doc) {}">>}
+            ]}},
+            {<<"zing">>, {[
+                {<<"map">>, <<
+                    "function(doc) {\n"
+                    "  if(doc.foo !== undefined)\n"
+                    "    emit(doc.foo, 0);\n"
+                    "}"
+                >>}
+            ]}}
+        ]}}
+    ]});
+ddoc(red) ->
+    couch_doc:from_json_obj({[
+        {<<"_id">>, <<"_design/bar">>},
+        {<<"views">>, {[
+            {<<"baz">>, {[
+                {<<"map">>, <<
+                    "function(doc) {\n"
+                    "  emit([doc.val % 2, doc.val], doc.val);\n"
+                    "}\n"
+                >>},
+                {<<"reduce">>, <<"function(keys, vals) {return sum(vals);}">>}
+            ]}}
+        ]}}
+    ]}).
+
+
+doc(Id) ->
+    couch_doc:from_json_obj({[
+        {<<"_id">>, list_to_binary(integer_to_list(Id))},
+        {<<"val">>, Id}
+    ]}).

http://git-wip-us.apache.org/repos/asf/couchdb/blob/add91738/apps/couch_mrview/src/couch_mrview_updater.erl
----------------------------------------------------------------------
diff --git a/apps/couch_mrview/src/couch_mrview_updater.erl b/apps/couch_mrview/src/couch_mrview_updater.erl
new file mode 100644
index 0000000..980b5cf
--- /dev/null
+++ b/apps/couch_mrview/src/couch_mrview_updater.erl
@@ -0,0 +1,282 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_mrview_updater).
+
+-export([start_update/3, purge/4, process_doc/3, finish_update/1]).
+
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch_mrview/include/couch_mrview.hrl").
+
+
+start_update(Partial, State, NumChanges) ->
+    QueueOpts = [{max_size, 100000}, {max_items, 500}],
+    {ok, DocQueue} = couch_work_queue:new(QueueOpts),
+    {ok, WriteQueue} = couch_work_queue:new(QueueOpts),
+
+    InitState = State#mrst{
+        first_build=State#mrst.update_seq==0,
+        partial_resp_pid=Partial,
+        doc_acc=[],
+        doc_queue=DocQueue,
+        write_queue=WriteQueue
+    },
+
+    Self = self(),
+    MapFun = fun() ->
+        couch_task_status:add_task([
+            {type, indexer},
+            {database, State#mrst.db_name},
+            {design_document, State#mrst.idx_name},
+            {progress, 0},
+            {changes_done, 0},
+            {total_changes, NumChanges}
+        ]),
+        couch_task_status:set_update_frequency(500),
+        map_docs(Self, InitState)
+    end,
+    WriteFun = fun() -> write_results(Self, InitState) end,
+
+    spawn_link(MapFun),
+    spawn_link(WriteFun),
+
+    {ok, InitState}.
+
+
+purge(_Db, PurgeSeq, PurgedIdRevs, State) ->
+    #mrst{
+        id_btree=IdBtree,
+        views=Views
+    } = State,
+
+    Ids = [Id || {Id, _Revs} <- PurgedIdRevs],
+    {ok, Lookups, IdBtree2} = couch_btree:query_modify(IdBtree, Ids, [], Ids),
+
+    MakeDictFun = fun
+        ({ok, {DocId, ViewNumRowKeys}}, DictAcc) ->
+            FoldFun = fun({ViewNum, RowKey}, DictAcc2) ->
+                dict:append(ViewNum, {RowKey, DocId}, DictAcc2)
+            end,
+            lists:foldl(FoldFun, DictAcc, ViewNumRowKeys);
+        ({not_found, _}, DictAcc) ->
+            DictAcc
+    end,
+    KeysToRemove = lists:foldl(MakeDictFun, dict:new(), Lookups),
+
+    RemKeysFun = fun(#mrview{id_num=Num, btree=Btree}=View) ->
+        case dict:find(Num, KeysToRemove) of
+            {ok, RemKeys} ->
+                {ok, Btree2} = couch_btree:add_remove(Btree, [], RemKeys),
+                NewPurgeSeq = case Btree2 /= Btree of
+                    true -> PurgeSeq;
+                    _ -> View#mrview.purge_seq
+                end,
+                View#mrview{btree=Btree2, purge_seq=NewPurgeSeq};
+            error ->
+                View
+        end
+    end,
+
+    Views2 = lists:map(RemKeysFun, Views),
+    {ok, State#mrst{
+        id_btree=IdBtree2,
+        views=Views2,
+        purge_seq=PurgeSeq
+    }}.
+
+
+process_doc(Doc, Seq, #mrst{doc_acc=Acc}=State) when length(Acc) > 100 ->
+    couch_work_queue:queue(State#mrst.doc_queue, lists:reverse(Acc)),
+    process_doc(Doc, Seq, State#mrst{doc_acc=[]});
+process_doc(nil, Seq, #mrst{doc_acc=Acc}=State) ->
+    {ok, State#mrst{doc_acc=[{nil, Seq, nil} | Acc]}};
+process_doc(#doc{id=Id, deleted=true}, Seq, #mrst{doc_acc=Acc}=State) ->
+    {ok, State#mrst{doc_acc=[{Id, Seq, deleted} | Acc]}};
+process_doc(#doc{id=Id}=Doc, Seq, #mrst{doc_acc=Acc}=State) ->
+    {ok, State#mrst{doc_acc=[{Id, Seq, Doc} | Acc]}}.
+
+
+finish_update(#mrst{doc_acc=Acc}=State) ->
+    if Acc /= [] ->
+        couch_work_queue:queue(State#mrst.doc_queue, Acc);
+        true -> ok
+    end,
+    couch_work_queue:close(State#mrst.doc_queue),
+    receive
+        {new_state, NewState} ->
+            {ok, NewState#mrst{
+                first_build=undefined,
+                partial_resp_pid=undefined,
+                doc_acc=undefined,
+                doc_queue=undefined,
+                write_queue=undefined,
+                qserver=nil
+            }}
+    end.
+
+
+map_docs(Parent, State0) ->
+    case couch_work_queue:dequeue(State0#mrst.doc_queue) of
+        closed ->
+            couch_query_servers:stop_doc_map(State0#mrst.qserver),
+            couch_work_queue:close(State0#mrst.write_queue);
+        {ok, Dequeued} ->
+            % Run all the non deleted docs through the view engine and
+            % then pass the results on to the writer process.
+            State1 = case State0#mrst.qserver of
+                nil -> start_query_server(State0);
+                _ -> State0
+            end,
+            QServer = State1#mrst.qserver,
+            DocFun = fun
+                ({nil, Seq, _}, {SeqAcc, Results}) ->
+                    {erlang:max(Seq, SeqAcc), Results};
+                ({Id, Seq, deleted}, {SeqAcc, Results}) ->
+                    {erlang:max(Seq, SeqAcc), [{Id, []} | Results]};
+                ({Id, Seq, Doc}, {SeqAcc, Results}) ->
+                    {ok, Res} = couch_query_servers:map_doc_raw(QServer, Doc),
+                    {erlang:max(Seq, SeqAcc), [{Id, Res} | Results]}
+            end,
+            FoldFun = fun(Docs, Acc) ->
+                update_task(length(Docs)),
+                lists:foldl(DocFun, Acc, Docs)
+            end,
+            Results = lists:foldl(FoldFun, {0, []}, Dequeued),
+            couch_work_queue:queue(State1#mrst.write_queue, Results),
+            map_docs(Parent, State1)
+    end.
+
+
+write_results(Parent, State) ->
+    case couch_work_queue:dequeue(State#mrst.write_queue) of
+        closed ->
+            Parent ! {new_state, State};
+        {ok, Info} ->
+            EmptyKVs = [{V#mrview.id_num, []} || V <- State#mrst.views],
+            {Seq, ViewKVs, DocIdKeys} = merge_results(Info, 0, EmptyKVs, []),
+            NewState = write_kvs(State, Seq, ViewKVs, DocIdKeys),
+            send_partial(NewState#mrst.partial_resp_pid, NewState),
+            write_results(Parent, NewState)
+    end.
+
+
+start_query_server(State) ->
+    #mrst{
+        language=Language,
+        lib=Lib,
+        views=Views
+    } = State,
+    Defs = [View#mrview.def || View <- Views],
+    {ok, QServer} = couch_query_servers:start_doc_map(Language, Defs, Lib),
+    State#mrst{qserver=QServer}.
+
+
+merge_results([], SeqAcc, ViewKVs, DocIdKeys) ->
+    {SeqAcc, ViewKVs, DocIdKeys};
+merge_results([{Seq, Results} | Rest], SeqAcc, ViewKVs, DocIdKeys) ->
+    Fun = fun(RawResults, {VKV, DIK}) ->
+        merge_results(RawResults, VKV, DIK)
+    end,
+    {ViewKVs1, DocIdKeys1} = lists:foldl(Fun, {ViewKVs, DocIdKeys}, Results),
+    merge_results(Rest, erlang:max(Seq, SeqAcc), ViewKVs1, DocIdKeys1).
+
+
+merge_results({DocId, []}, ViewKVs, DocIdKeys) ->
+    {ViewKVs, [{DocId, []} | DocIdKeys]};
+merge_results({DocId, RawResults}, ViewKVs, DocIdKeys) ->
+    JsonResults = couch_query_servers:raw_to_ejson(RawResults),
+    Results = [[list_to_tuple(Res) || Res <- FunRs] || FunRs <- JsonResults],
+    {ViewKVs1, ViewIdKeys} = insert_results(DocId, Results, ViewKVs, [], []),
+    {ViewKVs1, [ViewIdKeys | DocIdKeys]}.
+
+
+insert_results(DocId, [], [], ViewKVs, ViewIdKeys) ->
+    {lists:reverse(ViewKVs), {DocId, ViewIdKeys}};
+insert_results(DocId, [KVs | RKVs], [{Id, VKVs} | RVKVs], VKVAcc, VIdKeys) ->
+    CombineDupesFun = fun
+        ({Key, Val}, {[{Key, {dups, Vals}} | Rest], IdKeys}) ->
+            {[{Key, {dups, [Val | Vals]}} | Rest], IdKeys};
+        ({Key, Val1}, {[{Key, Val2} | Rest], IdKeys}) ->
+            {[{Key, {dups, [Val1, Val2]}} | Rest], IdKeys};
+        ({Key, _}=KV, {Rest, IdKeys}) ->
+            {[KV | Rest], [{Id, Key} | IdKeys]}
+    end,
+    InitAcc = {[], VIdKeys},
+    {Duped, VIdKeys0} = lists:foldl(CombineDupesFun, InitAcc, lists:sort(KVs)),
+    FinalKVs = [{{Key, DocId}, Val} || {Key, Val} <- Duped] ++ VKVs,
+    insert_results(DocId, RKVs, RVKVs, [{Id, FinalKVs} | VKVAcc], VIdKeys0).
+
+
+write_kvs(State, UpdateSeq, ViewKVs, DocIdKeys) ->
+    #mrst{
+        id_btree=IdBtree,
+        first_build=FirstBuild
+    } = State,
+
+    {ok, ToRemove, IdBtree2} = update_id_btree(IdBtree, DocIdKeys, FirstBuild),
+    ToRemByView = collapse_rem_keys(ToRemove, dict:new()),
+
+    UpdateView = fun(#mrview{id_num=ViewId}=View, {ViewId, KVs}) ->
+        ToRem = couch_util:dict_find(ViewId, ToRemByView, []),
+        {ok, VBtree2} = couch_btree:add_remove(View#mrview.btree, KVs, ToRem),
+        NewUpdateSeq = case VBtree2 =/= View#mrview.btree of
+            true -> UpdateSeq;
+            _ -> View#mrview.update_seq
+        end,
+        View#mrview{btree=VBtree2, update_seq=NewUpdateSeq}
+    end,
+
+    State#mrst{
+        views=lists:zipwith(UpdateView, State#mrst.views, ViewKVs),
+        update_seq=UpdateSeq,
+        id_btree=IdBtree2
+    }.
+
+
+update_id_btree(Btree, DocIdKeys, true) ->
+    ToAdd = [{Id, DIKeys} || {Id, DIKeys} <- DocIdKeys, DIKeys /= []],
+    couch_btree:query_modify(Btree, [], ToAdd, []);
+update_id_btree(Btree, DocIdKeys, _) ->
+    ToFind = [Id || {Id, _} <- DocIdKeys],
+    ToAdd = [{Id, DIKeys} || {Id, DIKeys} <- DocIdKeys, DIKeys /= []],
+    ToRem = [Id || {Id, DIKeys} <- DocIdKeys, DIKeys == []],
+    couch_btree:query_modify(Btree, ToFind, ToAdd, ToRem).
+
+
+collapse_rem_keys([], Acc) ->
+    Acc;
+collapse_rem_keys([{ok, {DocId, ViewIdKeys}} | Rest], Acc) ->
+    NewAcc = lists:foldl(fun({ViewId, Key}, Acc2) ->
+        dict:append(ViewId, {Key, DocId}, Acc2)
+    end, Acc, ViewIdKeys),
+    collapse_rem_keys(Rest, NewAcc);
+collapse_rem_keys([{not_found, _} | Rest], Acc) ->
+    collapse_rem_keys(Rest, Acc).
+
+
+send_partial(Pid, State) when is_pid(Pid) ->
+    gen_server:cast(Pid, {new_state, State});
+send_partial(_, _) ->
+    ok.
+
+
+update_task(NumChanges) ->
+    [Changes, Total] = couch_task_status:get([changes_done, total_changes]),
+    Changes2 = Changes + NumChanges,
+    Progress = case Total of
+        0 ->
+            % updater restart after compaction finishes
+            0;
+        _ ->
+            (Changes2 * 100) div Total
+    end,
+    couch_task_status:update([{progress, Progress}, {changes_done, Changes2}]).


Mime
View raw message