Return-Path: X-Original-To: apmail-couchdb-commits-archive@www.apache.org Delivered-To: apmail-couchdb-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A1FE9175E0 for ; Fri, 31 Oct 2014 19:53:16 +0000 (UTC) Received: (qmail 1724 invoked by uid 500); 31 Oct 2014 19:53:16 -0000 Delivered-To: apmail-couchdb-commits-archive@couchdb.apache.org Received: (qmail 1647 invoked by uid 500); 31 Oct 2014 19:53:16 -0000 Mailing-List: contact commits-help@couchdb.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@couchdb.apache.org Delivered-To: mailing list commits@couchdb.apache.org Received: (qmail 1620 invoked by uid 99); 31 Oct 2014 19:53:16 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 31 Oct 2014 19:53:16 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 3DA0A927A75; Fri, 31 Oct 2014 19:53:16 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: bbastian@apache.org To: commits@couchdb.apache.org Date: Fri, 31 Oct 2014 19:53:16 -0000 Message-Id: <695c7c23941a44a0938af77efbf99d8b@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [01/41] couch-mrview commit: updated refs/heads/master to 28e51f3 Repository: couchdb-couch-mrview Updated Branches: refs/heads/master 1a6bd404e -> 28e51f3f8 add couch_mrview:view_changes_since/{6,7} function This function add the possibility to get changes in a view since the last upddated sequence. You can all changes in a view since a sequence or all changes for a key or a range in a view. The following new secondaries are created to allows this feature: - a generic log index to log the latest changes in views for a docid : {DocId, [{ViewId, {Key, Seq, OP}}]} where OP can be del or add. This index allows us to mark a key as removed if needed. It will be useful later to help us to chain map/reduces operations or such things. - a seq index associated to a view id : {ViewId, [{Seq, Key}, {DocId, Val}]} to look for all changes in a view - an index indexing keys by seq: {ViewId, [{[Key, Seq], DocId}, Val}]}, to looks for changes associated to a key or a ranhge Note: all deleted keys are marked as deleted in the log index and their value is {[{<<"_removed">>, true}]}. To start to index changes you need to pass the options {seq_indexed: true} to the design document. Caveat: when the changes are indexed the size of the index is significantly higher. Example of usage: https://www.friendpaste.com/5Y6gihQReaxd8ERqbDom3y Conflicts: src/couch_mrview_updater.erl src/couch_mrview_util.erl Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-mrview/repo Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-mrview/commit/44c07c30 Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-mrview/tree/44c07c30 Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-mrview/diff/44c07c30 Branch: refs/heads/master Commit: 44c07c30ad2b39a528b9e3aa5aaa4ea1bc07088f Parents: 1a6bd40 Author: benoitc Authored: Sun Jan 26 23:56:09 2014 +0100 Committer: Benjamin Bastian Committed: Thu Oct 30 13:38:30 2014 -0700 ---------------------------------------------------------------------- include/couch_mrview.hrl | 7 +- src/couch_mrview.erl | 48 ++++++++++ src/couch_mrview_index.erl | 7 +- src/couch_mrview_updater.erl | 184 +++++++++++++++++++++++++++++-------- src/couch_mrview_util.erl | 186 ++++++++++++++++++++++++++++++++------ 5 files changed, 364 insertions(+), 68 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/couchdb-couch-mrview/blob/44c07c30/include/couch_mrview.hrl ---------------------------------------------------------------------- diff --git a/include/couch_mrview.hrl b/include/couch_mrview.hrl index 36c35d6..f1911db 100644 --- a/include/couch_mrview.hrl +++ b/include/couch_mrview.hrl @@ -18,12 +18,13 @@ idx_name, language, design_opts=[], + seq_indexed=false, lib, views, id_btree=nil, + log_btree=nil, update_seq=0, purge_seq=0, - first_build, partial_resp_pid, doc_acc, @@ -41,6 +42,9 @@ reduce_funs=[], def, btree=nil, + seq_btree=nil, + key_byseq_btree=nil, + seq_indexed=false, options=[] }). @@ -49,6 +53,7 @@ seq=0, purge_seq=0, id_btree_state=nil, + log_btree_state=nil, view_states=nil }). http://git-wip-us.apache.org/repos/asf/couchdb-couch-mrview/blob/44c07c30/src/couch_mrview.erl ---------------------------------------------------------------------- diff --git a/src/couch_mrview.erl b/src/couch_mrview.erl index 047bc00..afb6f1e 100644 --- a/src/couch_mrview.erl +++ b/src/couch_mrview.erl @@ -15,6 +15,7 @@ -export([validate/2]). -export([query_all_docs/2, query_all_docs/4]). -export([query_view/3, query_view/4, query_view/6]). +-export([view_changes_since/6, view_changes_since/7]). -export([get_info/2]). -export([trigger_update/2, trigger_update/3]). -export([compact/2, compact/3, cancel_compaction/2]). @@ -135,6 +136,30 @@ query_view(Db, {Type, View, Ref}, Args, Callback, Acc) -> erlang:demonitor(Ref, [flush]) end. +view_changes_since(Db, DDoc, VName, StartSeq, Fun, Acc) -> + view_changes_since(Db, DDoc, VName, StartSeq, Fun, [], Acc). + +view_changes_since(Db, DDoc, VName, StartSeq, Fun, Options, Acc) -> + Args0 = make_view_changes_args(Options), + {ok, {_, View}, _, Args} = couch_mrview_util:get_view(Db, DDoc, VName, + Args0), + case View#mrview.seq_indexed of + true -> + OptList = make_view_changes_opts(StartSeq, Options, Args), + Btree = case is_key_byseq(Options) of + true -> View#mrview.key_byseq_btree; + _ -> View#mrview.seq_btree + end, + io:format("opt list ~p~n", [OptList]), + AccOut = lists:foldl(fun(Opts, Acc0) -> + {ok, _R, A} = couch_mrview_util:fold_changes( + Btree, Fun, Acc0, Opts), + A + end, Acc, OptList), + {ok, AccOut}; + _ -> + {error, seqs_not_indexed} + end. get_info(Db, DDocId) when is_binary(DDocId) -> DbName = mem3:dbname(Db#db.name), @@ -447,3 +472,26 @@ lookup_index(Key) -> record_info(fields, mrargs), lists:seq(2, record_info(size, mrargs)) ), couch_util:get_value(Key, Index). + + +is_key_byseq(Options) -> + lists:any(fun({K, _}) -> + lists:member(K, [start_key, end_key, start_key_docid, + end_key_docid, keys]) + end, Options). + +make_view_changes_args(Options) -> + case is_key_byseq(Options) of + true -> + to_mrargs(Options); + false -> + #mrargs{} + end. + +make_view_changes_opts(StartSeq, Options, Args) -> + case is_key_byseq(Options) of + true -> + couch_mrview_util:changes_key_opts(StartSeq, Args); + false -> + [[{start_key, {StartSeq+1, <<>>}}] ++ Options] + end. http://git-wip-us.apache.org/repos/asf/couchdb-couch-mrview/blob/44c07c30/src/couch_mrview_index.erl ---------------------------------------------------------------------- diff --git a/src/couch_mrview_index.erl b/src/couch_mrview_index.erl index 2d6ccae..0b9b236 100644 --- a/src/couch_mrview_index.erl +++ b/src/couch_mrview_index.erl @@ -51,14 +51,17 @@ get(Property, State) -> #mrst{ fd = Fd, sig = Sig, - id_btree = Btree, + id_btree = IdBtree, + log_btree = LogBtree, language = Lang, update_seq = UpdateSeq, purge_seq = PurgeSeq, views = Views } = State, {ok, FileSize} = couch_file:bytes(Fd), - {ok, ExternalSize} = couch_mrview_util:calculate_external_size(Views), + {ok, ExternalSize} = couch_mrview_util:calculate_external_size(IdBtree, + LogBtree, + Views), ActiveSize = ExternalSize + couch_btree:size(Btree), {ok, [ {signature, list_to_binary(couch_index_util:hexsig(Sig))}, http://git-wip-us.apache.org/repos/asf/couchdb-couch-mrview/blob/44c07c30/src/couch_mrview_updater.erl ---------------------------------------------------------------------- diff --git a/src/couch_mrview_updater.erl b/src/couch_mrview_updater.erl index 1525a4c..eb06c92 100644 --- a/src/couch_mrview_updater.erl +++ b/src/couch_mrview_updater.erl @@ -141,12 +141,12 @@ map_docs(Parent, State0) -> ({nil, Seq, _}, {SeqAcc, Results}) -> {erlang:max(Seq, SeqAcc), Results}; ({Id, Seq, deleted}, {SeqAcc, Results}) -> - {erlang:max(Seq, SeqAcc), [{Id, []} | Results]}; + {erlang:max(Seq, SeqAcc), [{Id, Seq, []} | Results]}; ({Id, Seq, Doc}, {SeqAcc, Results}) -> couch_stats:increment_counter([couchdb, mrview, map_docs], 1), {ok, Res} = couch_query_servers:map_doc_raw(QServer, Doc), - {erlang:max(Seq, SeqAcc), [{Id, Res} | Results]} + {erlang:max(Seq, SeqAcc), [{Id, Seq, Res} | Results]} end, FoldFun = fun(Docs, Acc) -> update_task(length(Docs)), @@ -162,8 +162,8 @@ write_results(Parent, State) -> case accumulate_writes(State, State#mrst.write_queue, nil) of stop -> Parent ! {new_state, State}; - {Go, {Seq, ViewKVs, DocIdKeys}} -> - NewState = write_kvs(State, Seq, ViewKVs, DocIdKeys), + {Go, {Seq, ViewKVs, DocIdKeys, Log}} -> + NewState = write_kvs(State, Seq, ViewKVs, DocIdKeys, Log), if Go == stop -> Parent ! {new_state, NewState}; true -> @@ -185,17 +185,17 @@ start_query_server(State) -> accumulate_writes(State, W, Acc0) -> - {Seq, ViewKVs, DocIdKVs} = case Acc0 of - nil -> {0, [{V#mrview.id_num, []} || V <- State#mrst.views], []}; + {Seq, ViewKVs, DocIdKVs, Log} = case Acc0 of + nil -> {0, [{V#mrview.id_num, {[], []}} || V <- State#mrst.views], [], dict:new()}; _ -> Acc0 end, case couch_work_queue:dequeue(W) of closed when Seq == 0 -> stop; closed -> - {stop, {Seq, ViewKVs, DocIdKVs}}; + {stop, {Seq, ViewKVs, DocIdKVs, Log}}; {ok, Info} -> - {_, _, NewIds} = Acc = merge_results(Info, Seq, ViewKVs, DocIdKVs), + {_, _, NewIds, _} = Acc = merge_results(Info, Seq, ViewKVs, DocIdKVs, Log), case accumulate_more(length(NewIds)) of true -> accumulate_writes(State, W, Acc); false -> {ok, Acc} @@ -212,66 +212,100 @@ accumulate_more(NumDocIds) -> andalso CurrMem < list_to_integer(MinSize). -merge_results([], SeqAcc, ViewKVs, DocIdKeys) -> - {SeqAcc, ViewKVs, DocIdKeys}; -merge_results([{Seq, Results} | Rest], SeqAcc, ViewKVs, DocIdKeys) -> - Fun = fun(RawResults, {VKV, DIK}) -> - merge_results(RawResults, VKV, DIK) +merge_results([], SeqAcc, ViewKVs, DocIdKeys, Log) -> + {SeqAcc, ViewKVs, DocIdKeys, Log}; +merge_results([{Seq, Results} | Rest], SeqAcc, ViewKVs, DocIdKeys, Log) -> + Fun = fun(RawResults, {VKV, DIK, Log2}) -> + merge_results(RawResults, VKV, DIK, Log2) end, - {ViewKVs1, DocIdKeys1} = lists:foldl(Fun, {ViewKVs, DocIdKeys}, Results), - merge_results(Rest, erlang:max(Seq, SeqAcc), ViewKVs1, DocIdKeys1). + {ViewKVs1, DocIdKeys1, Log1} = lists:foldl(Fun, {ViewKVs, DocIdKeys, Log}, + Results), + merge_results(Rest, erlang:max(Seq, SeqAcc), ViewKVs1, DocIdKeys1, + Log1). -merge_results({DocId, []}, ViewKVs, DocIdKeys) -> - {ViewKVs, [{DocId, []} | DocIdKeys]}; -merge_results({DocId, RawResults}, ViewKVs, DocIdKeys) -> +merge_results({DocId, _Seq, []}, ViewKVs, DocIdKeys, Log) -> + {ViewKVs, [{DocId, []} | DocIdKeys], dict:store(DocId, [], Log)}; +merge_results({DocId, Seq, RawResults}, ViewKVs, DocIdKeys, Log) -> JsonResults = couch_query_servers:raw_to_ejson(RawResults), Results = [[list_to_tuple(Res) || Res <- FunRs] || FunRs <- JsonResults], - {ViewKVs1, ViewIdKeys} = insert_results(DocId, Results, ViewKVs, [], []), - {ViewKVs1, [ViewIdKeys | DocIdKeys]}. + {ViewKVs1, ViewIdKeys, Log1} = insert_results(DocId, Seq, Results, ViewKVs, [], + [], Log), + {ViewKVs1, [ViewIdKeys | DocIdKeys], Log1}. -insert_results(DocId, [], [], ViewKVs, ViewIdKeys) -> - {lists:reverse(ViewKVs), {DocId, ViewIdKeys}}; -insert_results(DocId, [KVs | RKVs], [{Id, VKVs} | RVKVs], VKVAcc, VIdKeys) -> +insert_results(DocId, _Seq, [], [], ViewKVs, ViewIdKeys, Log) -> + {lists:reverse(ViewKVs), {DocId, ViewIdKeys}, Log}; +insert_results(DocId, Seq, [KVs | RKVs], [{Id, {VKVs, SKVs}} | RVKVs], VKVAcc, + VIdKeys, Log) -> CombineDupesFun = fun - ({Key, Val}, {[{Key, {dups, Vals}} | Rest], IdKeys}) -> - {[{Key, {dups, [Val | Vals]}} | Rest], IdKeys}; - ({Key, Val1}, {[{Key, Val2} | Rest], IdKeys}) -> - {[{Key, {dups, [Val1, Val2]}} | Rest], IdKeys}; - ({Key, _}=KV, {Rest, IdKeys}) -> - {[KV | Rest], [{Id, Key} | IdKeys]} + ({Key, Val}, {[{Key, {dups, Vals}} | Rest], IdKeys, Log2}) -> + {[{Key, {dups, [Val | Vals]}} | Rest], IdKeys, Log2}; + ({Key, Val1}, {[{Key, Val2} | Rest], IdKeys, Log2}) -> + {[{Key, {dups, [Val1, Val2]}} | Rest], IdKeys, Log2}; + ({Key, _}=KV, {Rest, IdKeys, Log2}) -> + {[KV | Rest], [{Id, Key} | IdKeys], + dict:append(DocId, {Id, {Key, Seq, add}}, Log2)} end, - InitAcc = {[], VIdKeys}, + InitAcc = {[], VIdKeys, Log}, couch_stats:increment_counter([couchdb, mrview, emits], length(KVs)), - {Duped, VIdKeys0} = lists:foldl(CombineDupesFun, InitAcc, lists:sort(KVs)), + {Duped, VIdKeys0, Log1} = lists:foldl(CombineDupesFun, InitAcc, + lists:sort(KVs)), FinalKVs = [{{Key, DocId}, Val} || {Key, Val} <- Duped] ++ VKVs, - insert_results(DocId, RKVs, RVKVs, [{Id, FinalKVs} | VKVAcc], VIdKeys0). + FinalSKVs = [{{Seq, Key}, {DocId, Val}} || {Key, Val} <- Duped] ++ SKVs, + insert_results(DocId, Seq, RKVs, RVKVs, + [{Id, {FinalKVs, FinalSKVs}} | VKVAcc], VIdKeys0, Log1). -write_kvs(State, UpdateSeq, ViewKVs, DocIdKeys) -> +write_kvs(State, UpdateSeq, ViewKVs, DocIdKeys, Log) -> #mrst{ id_btree=IdBtree, + log_btree=LogBtree, first_build=FirstBuild } = State, {ok, ToRemove, IdBtree2} = update_id_btree(IdBtree, DocIdKeys, FirstBuild), ToRemByView = collapse_rem_keys(ToRemove, dict:new()), - UpdateView = fun(#mrview{id_num=ViewId}=View, {ViewId, KVs}) -> + {ok, SeqsToAdd, SeqsToRemove, LogBtree2} = case LogBtree of + nil -> {ok, undefined, undefined, nil}; + _ -> update_log(LogBtree, Log, UpdateSeq, FirstBuild) + end, + + UpdateView = fun(#mrview{id_num=ViewId}=View, {ViewId, {KVs, SKVs}}) -> ToRem = couch_util:dict_find(ViewId, ToRemByView, []), {ok, VBtree2} = couch_btree:add_remove(View#mrview.btree, KVs, ToRem), NewUpdateSeq = case VBtree2 =/= View#mrview.btree of true -> UpdateSeq; _ -> View#mrview.update_seq end, - View#mrview{btree=VBtree2, update_seq=NewUpdateSeq} + + %% store the view changes. + {SeqBtree2, KeyBySeqBtree2} = case View#mrview.seq_indexed of + true -> + SToRem = couch_util:dict_find(ViewId, SeqsToRemove, []), + SToAdd = couch_util:dict_find(ViewId, SeqsToAdd, []), + SKVs1 = SKVs ++ SToAdd, + {ok, SBt} = couch_btree:add_remove(View#mrview.seq_btree, + SKVs1, SToRem), + + {ok, KSbt} = couch_btree:add_remove(View#mrview.key_byseq_btree, + couch_mrview_util:to_key_seq(SKVs1), + couch_mrview_util:to_key_seq(SToRem)), + {SBt, KSbt}; + _ -> {nil, nil} + end, + View#mrview{btree=VBtree2, + seq_btree=SeqBtree2, + key_byseq_btree=KeyBySeqBtree2, + update_seq=NewUpdateSeq} end, State#mrst{ views=lists:zipwith(UpdateView, State#mrst.views, ViewKVs), update_seq=UpdateSeq, - id_btree=IdBtree2 + id_btree=IdBtree2, + log_btree=LogBtree2 }. @@ -284,6 +318,84 @@ update_id_btree(Btree, DocIdKeys, _) -> ToRem = [Id || {Id, DIKeys} <- DocIdKeys, DIKeys == []], couch_btree:query_modify(Btree, ToFind, ToAdd, ToRem). +walk_log(BTree, Fun, Acc, Ids) -> + WrapFun = fun(KV, _Offset, Acc2) -> + Fun(KV, Acc2) + end, + lists:foldl(fun(Id, Acc1) -> + Opt = [{start_key, Id}, {end_key, Id}], + {ok, _, A} = couch_btree:fold(BTree, WrapFun, Acc1, Opt), + A + end, Acc, Ids). + +update_log(Btree, Log, _UpdatedSeq, true) -> + ToAdd = [{Id, DIKeys} || {Id, DIKeys} <- dict:to_list(Log), + DIKeys /= []], + {ok, LogBtree2} = couch_btree:add_remove(Btree, ToAdd, []), + {ok, dict:new(), dict:new(), LogBtree2}; +update_log(Btree, Log, UpdatedSeq, _) -> + %% build list of updated keys and Id + {ToLook, Updated} = dict:fold(fun + (Id, [], {IdsAcc, KeysAcc}) -> + {[Id | IdsAcc], KeysAcc}; + (Id, DIKeys, {IdsAcc, KeysAcc}) -> + KeysAcc1 = lists:foldl(fun({ViewId, {Key, _Seq, _Op}}, + KeysAcc2) -> + [{Id, ViewId, Key} | KeysAcc2] + end, KeysAcc, DIKeys), + {[Id | IdsAcc], KeysAcc1} end, {[], []}, Log), + + io:format("updated ~p~n", [Updated]), + RemValue = {[{<<"_removed">>, true}]}, + {Log1, AddAcc, DelAcc} = walk_log(Btree, fun({DocId, VIdKeys}, + {Log2, AddAcc2, DelAcc2}) -> + + {Log3, AddAcc3, DelAcc3} = lists:foldl(fun({ViewId,{Key, Seq, Op}}, + {Log4, AddAcc4, DelAcc4}) -> + + case lists:member({DocId, ViewId, Key}, Updated) of + true -> + %% the log is updated, deleted old + %% record from the view + DelAcc5 = dict:append(ViewId, {Seq, Key}, + DelAcc4), + {Log4, AddAcc4, DelAcc5}; + false when Op /= del -> + %% an update operation has been + %% logged for this key. We must now + %% record it as deleted in the + %% log, remove the old record in + %% the view and update the view + %% with a removed record. + Log5 = dict:append(DocId, + {ViewId, + {Key,UpdatedSeq, del}}, + Log4), + DelAcc5 = dict:append(ViewId, {Seq, Key}, + DelAcc4), + AddAcc5 = dict:append(ViewId, + {{UpdatedSeq, Key}, + {DocId, RemValue}}, + AddAcc4), + {Log5, AddAcc5, DelAcc5}; + false -> + %% the key has already been + %% registered in the view as + %% deleted, make sure to add it + %% to the new log. + Log5 = dict:append(DocId, + {ViewId, + {Key, Seq, del}}, Log4), + {Log5, AddAcc4, DelAcc4} + end + end, {Log2, AddAcc2, DelAcc2}, VIdKeys), + {ok, {Log3, AddAcc3, DelAcc3}} + end, {Log, dict:new(), dict:new()}, ToLook), + + ToAdd = [{Id, DIKeys} || {Id, DIKeys} <- dict:to_list(Log1), DIKeys /= []], + %% store the new logs + {ok, LogBtree2} = couch_btree:add_remove(Btree, ToAdd, []), + {ok, AddAcc, DelAcc, LogBtree2}. collapse_rem_keys([], Acc) -> Acc; http://git-wip-us.apache.org/repos/asf/couchdb-couch-mrview/blob/44c07c30/src/couch_mrview_util.erl ---------------------------------------------------------------------- diff --git a/src/couch_mrview_util.erl b/src/couch_mrview_util.erl index 7fc60f8..cd25887 100644 --- a/src/couch_mrview_util.erl +++ b/src/couch_mrview_util.erl @@ -21,13 +21,16 @@ -export([all_docs_key_opts/1, all_docs_key_opts/2, key_opts/1, key_opts/2]). -export([fold/4, fold_reduce/4]). -export([temp_view_to_ddoc/1]). --export([calculate_external_size/1]). +-export([calculate_external_size/3]). -export([validate_args/1]). -export([maybe_load_doc/3, maybe_load_doc/4]). -export([maybe_update_index_file/1]). -export([extract_view/4, extract_view_reduce/1]). -export([get_view_keys/1, get_view_queries/1]). -export([set_view_type/3]). +-export([changes_key_opts/2]). +-export([fold_changes/4]). +-export([to_key_seq/1]). -define(MOD, couch_mrview_index). @@ -91,15 +94,18 @@ ddoc_to_mrst(DbName, #doc{id=Id, body={Fields}}) -> [Name, Else]), DictBySrcAcc end, + {DesignOpts} = proplists:get_value(<<"options">>, Fields, {[]}), + SeqIndexed = proplists:get_value(<<"seq_indexed">>, DesignOpts, false), + {RawViews} = couch_util:get_value(<<"views">>, Fields, {[]}), BySrc = lists:foldl(MakeDict, dict:new(), RawViews), - NumViews = fun({_, View}, N) -> {View#mrview{id_num=N}, N+1} end, + NumViews = fun({_, View}, N) -> + {View#mrview{id_num=N, seq_indexed=SeqIndexed}, N+1} + end, {Views, _} = lists:mapfoldl(NumViews, 0, lists:sort(dict:to_list(BySrc))), Language = couch_util:get_value(<<"language">>, Fields, <<"javascript">>), - {DesignOpts} = couch_util:get_value(<<"options">>, Fields, {[]}), - {RawViews} = couch_util:get_value(<<"views">>, Fields, {[]}), Lib = couch_util:get_value(<<"lib">>, RawViews, {[]}), IdxState = #mrst{ @@ -108,7 +114,8 @@ ddoc_to_mrst(DbName, #doc{id=Id, body={Fields}}) -> lib=Lib, views=Views, language=Language, - design_opts=DesignOpts + design_opts=DesignOpts, + seq_indexed=SeqIndexed }, SigInfo = {Views, Language, DesignOpts, couch_index_util:sort_lib(Lib)}, {ok, IdxState#mrst{sig=couch_util:md5(term_to_binary(SigInfo))}}. @@ -152,7 +159,8 @@ view_sig(Db, State, View, #mrargs{include_docs=true}=Args) -> BaseSig = view_sig(Db, State, View, Args#mrargs{include_docs=false}), UpdateSeq = couch_db:get_update_seq(Db), PurgeSeq = couch_db:get_purge_seq(Db), - Bin = term_to_binary({BaseSig, UpdateSeq, PurgeSeq}), + Bin = term_to_binary({BaseSig, UpdateSeq, PurgeSeq, + State#mrst.seq_indexed}), couch_index_util:hexsig(couch_util:md5(Bin)); view_sig(Db, State, {_Nth, _Lang, View}, Args) -> view_sig(Db, State, View, Args); @@ -160,11 +168,12 @@ view_sig(_Db, State, View, Args0) -> Sig = State#mrst.sig, UpdateSeq = View#mrview.update_seq, PurgeSeq = View#mrview.purge_seq, + SeqIndexed = View#mrview.seq_indexed, Args = Args0#mrargs{ preflight_fun=undefined, extra=[] }, - Bin = term_to_binary({Sig, UpdateSeq, PurgeSeq, Args}), + Bin = term_to_binary({Sig, UpdateSeq, PurgeSeq, SeqIndexed, Args}), couch_index_util:hexsig(couch_util:md5(Bin)). @@ -173,7 +182,8 @@ init_state(Db, Fd, #mrst{views=Views}=State, nil) -> seq=0, purge_seq=couch_db:get_purge_seq(Db), id_btree_state=nil, - view_states=[{nil, 0, 0} || _ <- Views] + log_btree_state=nil, + view_states=[{nil, nil, nil, 0, 0} || _ <- Views] }, init_state(Db, Fd, State, Header); % read <= 1.2.x header record and transpile it to >=1.3.x @@ -187,25 +197,31 @@ init_state(Db, Fd, State, #index_header{ seq=Seq, purge_seq=PurgeSeq, id_btree_state=IdBtreeState, - view_states=ViewStates + log_btree_state=nil, + view_states=[{Bt, nil, nil, USeq, PSeq} || {Bt, USeq, PSeq} <- ViewStates] }); init_state(Db, Fd, State, Header) -> - #mrst{language=Lang, views=Views} = State, + #mrst{language=Lang, views=Views, seq_indexed=SeqIndexed} = State, #mrheader{ seq=Seq, purge_seq=PurgeSeq, id_btree_state=IdBtreeState, + log_btree_state=LogBtreeState, view_states=ViewStates } = Header, StateUpdate = fun - ({_, _, _}=St) -> St; - (St) -> {St, 0, 0} + ({_, _, _, _, _}=St) -> St; + (St) -> {St, nil, nil, 0, 0} end, ViewStates2 = lists:map(StateUpdate, ViewStates), IdBtOpts = [{compression, couch_db:compression(Db)}], {ok, IdBtree} = couch_btree:open(IdBtreeState, Fd, IdBtOpts), + {ok, LogBtree} = case SeqIndexed of + true -> couch_btree:open(LogBtreeState, Fd, IdBtOpts); + false -> {ok, nil} + end, OpenViewFun = fun(St, View) -> open_view(Db, Fd, Lang, St, View) end, Views2 = lists:zipwith(OpenViewFun, ViewStates2, Views), @@ -216,11 +232,20 @@ init_state(Db, Fd, State, Header) -> update_seq=Seq, purge_seq=PurgeSeq, id_btree=IdBtree, + log_btree=LogBtree, views=Views2 }. +less_json_seqs({SeqA, JsonA}, {SeqB, JsonB}) -> + case couch_ejson_compare:less(SeqA, SeqB) of + 0 -> + couch_ejson_compare:less_json(JsonA, JsonB); + Result -> + Result < 0 + end. + -open_view(Db, Fd, Lang, {BTState, USeq, PSeq}, View) -> +open_view(Db, Fd, Lang, {BTState, SeqBTState, KSeqBTState, USeq, PSeq}, View) -> FunSrcs = [FunSrc || {_Name, FunSrc} <- View#mrview.reduce_funs], ReduceFun = fun(reduce, KVs) -> @@ -245,7 +270,23 @@ open_view(Db, Fd, Lang, {BTState, USeq, PSeq}, View) -> {compression, couch_db:compression(Db)} ], {ok, Btree} = couch_btree:open(BTState, Fd, ViewBtOpts), - View#mrview{btree=Btree, update_seq=USeq, purge_seq=PSeq}. + + {SeqBtree, KeyBySeqBtree} = case View#mrview.seq_indexed of + true -> + ViewSeqBtOpts = [{less, fun less_json_seqs/2}, + {compression, couch_db:compression(Db)}], + {ok, SBt} = couch_btree:open(SeqBTState, Fd, ViewSeqBtOpts), + {ok, KSBt} = couch_btree:open(KSeqBTState, Fd, ViewBtOpts), + {SBt, KSBt}; + false -> + {nil, nil} + end, + + View#mrview{btree=Btree, + seq_btree=SeqBtree, + key_byseq_btree=KeyBySeqBtree, + update_seq=USeq, + purge_seq=PSeq}. temp_view_to_ddoc({Props}) -> @@ -300,7 +341,6 @@ fold(#mrview{btree=Bt}, Fun, Acc, Opts) -> end, {ok, _LastRed, _Acc} = couch_btree:fold(Bt, WrapperFun, Acc, Opts). - fold_fun(_Fun, [], _, Acc) -> {ok, Acc}; fold_fun(Fun, [KV|Rest], {KVReds, Reds}, Acc) -> @@ -311,6 +351,12 @@ fold_fun(Fun, [KV|Rest], {KVReds, Reds}, Acc) -> {stop, Acc2} end. +fold_changes(Bt, Fun, Acc, Opts) -> + WrapperFun = fun(KV, _Reds, Acc2) -> + Fun(changes_expand_dups([KV], []), Acc2) + end, + {ok, _LastRed, _Acc} = couch_btree:fold(Bt, WrapperFun, Acc, Opts). + fold_reduce({NthRed, Lang, View}, Fun, Acc, Options) -> #mrview{ @@ -495,21 +541,34 @@ make_header(State) -> update_seq=Seq, purge_seq=PurgeSeq, id_btree=IdBtree, + log_btree=LogBtree, views=Views } = State, - ViewStates = [ - { - couch_btree:get_state(V#mrview.btree), - V#mrview.update_seq, - V#mrview.purge_seq - } - || - V <- Views - ], + + ViewStates = lists:foldr(fun(V, Acc) -> + {SeqBtState, KSeqBtState} = case V#mrview.seq_indexed of + true -> + {couch_btree:get_state(V#mrview.seq_btree), + couch_btree:get_state(V#mrview.key_byseq_btree)}; + _ -> {nil, nil} + end, + [{couch_btree:get_state(V#mrview.btree), + SeqBtState, + KSeqBtState, + V#mrview.update_seq, + V#mrview.purge_seq} | Acc] + end, [], Views), + + LogBtreeState = case LogBtree of + nil -> nil; + _ -> couch_btree:get_state(LogBtree) + end, + #mrheader{ seq=Seq, purge_seq=PurgeSeq, id_btree_state=couch_btree:get_state(IdBtree), + log_btree_state= LogBtreeState, view_states=ViewStates }. @@ -567,7 +626,9 @@ reset_state(State) -> qserver=nil, update_seq=0, id_btree=nil, - views=[View#mrview{btree=nil} || View <- State#mrst.views] + log_btree=nil, + views=[View#mrview{btree=nil, seq_btree=nil, key_byseq_btree=nil} + || View <- State#mrst.views] }. @@ -636,11 +697,62 @@ reverse_key_default(<<255>>) -> <<>>; reverse_key_default(Key) -> Key. -calculate_external_size(Views) -> - SumFun = fun(#mrview{btree=Bt}, Acc) -> - sum_btree_sizes(Acc, couch_btree:size(Bt)) +changes_key_opts(StartSeq, Args) -> + changes_key_opts(StartSeq, Args, []). + + +changes_key_opts(StartSeq, #mrargs{keys=undefined, direction=Dir}=Args, Extra) -> + [[{dir, Dir}] ++ changes_skey_opts(StartSeq, Args) ++ + changes_ekey_opts(StartSeq, Args) ++ Extra]; +changes_key_opts(StartSeq, #mrargs{keys=Keys, direction=Dir}=Args, Extra) -> + lists:map(fun(K) -> + [{dir, Dir}] + ++ changes_skey_opts(StartSeq, Args#mrargs{start_key=K}) + ++ changes_ekey_opts(StartSeq, Args#mrargs{end_key=K}) + ++ Extra + end, Keys). + + +changes_skey_opts(StartSeq, #mrargs{start_key=undefined}) -> + [{start_key, [<<>>, StartSeq+1]}]; +changes_skey_opts(StartSeq, #mrargs{start_key=SKey, + start_key_docid=SKeyDocId}) -> + [{start_key, {[SKey, StartSeq+1], SKeyDocId}}]. + + +changes_ekey_opts(_StartSeq, #mrargs{end_key=undefined}) -> + []; +changes_ekey_opts(_StartSeq, #mrargs{end_key=EKey, + end_key_docid=EKeyDocId, + direction=Dir}=Args) -> + EndSeq = case Dir of + fwd -> 16#10000000; + rev -> 0 + end, + + case Args#mrargs.inclusive_end of + true -> [{end_key, {[EKey, EndSeq], EKeyDocId}}]; + false -> [{end_key_gt, {[EKey, EndSeq], EKeyDocId}}] + end. + + + +calculate_external_size(IdBt, LogBt, Views) -> + SumFun = fun + (#mrview{btree=Bt, seq_btree=nil}, Acc) -> + sum_btree_sizes(Acc, couch_btree:size(Bt)); + (#mrview{btree=Bt, seq_btree=SBt, key_byseq_btree=KSBt}, Acc) -> + Acc1 = sum_btree_sizes(Acc, couch_btree:size(Bt)), + Acc2 = sum_btree_sizes(Acc1, couch_btree:size(SBt)), + sum_btree_sizes(Acc2, couch_btree:size(KSBt)) + end, + Size = case LogBt of + nil -> + lists:foldl(SumFun, couch_btree:size(IdBt), Views); + _ -> + lists:foldl(SumFun, couch_btree:size(IdBt) + + couch_btree:size(LogBt), Views) end, - Size = lists:foldl(SumFun, 0, Views), {ok, Size}. @@ -669,6 +781,19 @@ expand_dups([KV | Rest], Acc) -> expand_dups(Rest, [KV | Acc]). +changes_expand_dups([], Acc) -> + lists:reverse(Acc); +changes_expand_dups([{{[Key, Seq], DocId}, {dups, Vals}} | Rest], Acc) -> + Expanded = [{{Key, Seq, DocId}, Val} || Val <- Vals], + changes_expand_dups(Rest, Expanded ++ Acc); +changes_expand_dups([{{Key, Seq}, {DocId, {dups, Vals}}} | Rest], Acc) -> + Expanded = [{{Key, Seq, DocId}, Val} || Val <- Vals], + changes_expand_dups(Rest, Expanded ++ Acc); +changes_expand_dups([{{[Key, Seq], DocId}, Val} | Rest], Acc) -> + changes_expand_dups(Rest, [{{Key, Seq, DocId}, Val} | Acc]); +changes_expand_dups([{{Key, Seq}, {DocId, Val}} | Rest], Acc) -> + changes_expand_dups(Rest, [{{Key, Seq, DocId}, Val} | Acc]). + maybe_load_doc(_Db, _DI, #mrargs{include_docs=false}) -> []; maybe_load_doc(Db, #doc_info{}=DI, #mrargs{conflicts=true, doc_options=Opts}) -> @@ -718,6 +843,9 @@ mrverror(Mesg) -> throw({query_parse_error, Mesg}). +to_key_seq(L) -> + [{{[Key, Seq], DocId}, Val} || {{Seq, Key}, {DocId, Val}} <- L]. + %% Updates 1.2.x or earlier view files to 1.3.x or later view files %% transparently, the first time the 1.2.x view file is opened by %% 1.3.x or later.