couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bbast...@apache.org
Subject [01/41] couch-mrview commit: updated refs/heads/master to 28e51f3
Date Fri, 31 Oct 2014 19:53:16 GMT
Repository: couchdb-couch-mrview
Updated Branches:
  refs/heads/master 1a6bd404e -> 28e51f3f8


add couch_mrview:view_changes_since/{6,7} function

This function add the possibility to get changes in a view since the
last upddated sequence. You can all changes in a view since a sequence
or all changes for a key or a range in a view.

The following new secondaries are created to allows this feature:

- a generic log index to log the latest changes in views for a docid :
{DocId, [{ViewId, {Key, Seq, OP}}]} where OP can be del or add. This
index allows us to mark a key as removed if needed. It will be useful
later to help us to chain map/reduces operations or such things.

- a seq index associated to a view id : {ViewId, [{Seq, Key}, {DocId, Val}]} to look for all
changes in a view

- an index indexing keys by seq: {ViewId, [{[Key, Seq], DocId}, Val}]},
  to looks for changes associated to a key or a ranhge

Note: all deleted keys are marked as deleted in the log index and their
value is {[{<<"_removed">>, true}]}.

To start to index changes you need to pass the options {seq_indexed:
true} to the design document.

Caveat: when the changes are indexed the size of the index is significantly higher.

Example of usage:  https://www.friendpaste.com/5Y6gihQReaxd8ERqbDom3y

Conflicts:
	src/couch_mrview_updater.erl
	src/couch_mrview_util.erl


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-mrview/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-mrview/commit/44c07c30
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-mrview/tree/44c07c30
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-mrview/diff/44c07c30

Branch: refs/heads/master
Commit: 44c07c30ad2b39a528b9e3aa5aaa4ea1bc07088f
Parents: 1a6bd40
Author: benoitc <benoitc@apache.org>
Authored: Sun Jan 26 23:56:09 2014 +0100
Committer: Benjamin Bastian <benjamin.bastian@gmail.com>
Committed: Thu Oct 30 13:38:30 2014 -0700

----------------------------------------------------------------------
 include/couch_mrview.hrl     |   7 +-
 src/couch_mrview.erl         |  48 ++++++++++
 src/couch_mrview_index.erl   |   7 +-
 src/couch_mrview_updater.erl | 184 +++++++++++++++++++++++++++++--------
 src/couch_mrview_util.erl    | 186 ++++++++++++++++++++++++++++++++------
 5 files changed, 364 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-mrview/blob/44c07c30/include/couch_mrview.hrl
----------------------------------------------------------------------
diff --git a/include/couch_mrview.hrl b/include/couch_mrview.hrl
index 36c35d6..f1911db 100644
--- a/include/couch_mrview.hrl
+++ b/include/couch_mrview.hrl
@@ -18,12 +18,13 @@
     idx_name,
     language,
     design_opts=[],
+    seq_indexed=false,
     lib,
     views,
     id_btree=nil,
+    log_btree=nil,
     update_seq=0,
     purge_seq=0,
-
     first_build,
     partial_resp_pid,
     doc_acc,
@@ -41,6 +42,9 @@
     reduce_funs=[],
     def,
     btree=nil,
+    seq_btree=nil,
+    key_byseq_btree=nil,
+    seq_indexed=false,
     options=[]
 }).
 
@@ -49,6 +53,7 @@
     seq=0,
     purge_seq=0,
     id_btree_state=nil,
+    log_btree_state=nil,
     view_states=nil
 }).
 

http://git-wip-us.apache.org/repos/asf/couchdb-couch-mrview/blob/44c07c30/src/couch_mrview.erl
----------------------------------------------------------------------
diff --git a/src/couch_mrview.erl b/src/couch_mrview.erl
index 047bc00..afb6f1e 100644
--- a/src/couch_mrview.erl
+++ b/src/couch_mrview.erl
@@ -15,6 +15,7 @@
 -export([validate/2]).
 -export([query_all_docs/2, query_all_docs/4]).
 -export([query_view/3, query_view/4, query_view/6]).
+-export([view_changes_since/6, view_changes_since/7]).
 -export([get_info/2]).
 -export([trigger_update/2, trigger_update/3]).
 -export([compact/2, compact/3, cancel_compaction/2]).
@@ -135,6 +136,30 @@ query_view(Db, {Type, View, Ref}, Args, Callback, Acc) ->
         erlang:demonitor(Ref, [flush])
     end.
 
+view_changes_since(Db, DDoc, VName, StartSeq, Fun, Acc) ->
+    view_changes_since(Db, DDoc, VName, StartSeq, Fun, [], Acc).
+
+view_changes_since(Db, DDoc, VName, StartSeq, Fun, Options, Acc) ->
+    Args0 = make_view_changes_args(Options),
+    {ok, {_, View}, _, Args} = couch_mrview_util:get_view(Db, DDoc, VName,
+                                                          Args0),
+    case View#mrview.seq_indexed of
+        true ->
+            OptList = make_view_changes_opts(StartSeq, Options, Args),
+            Btree = case is_key_byseq(Options) of
+                true -> View#mrview.key_byseq_btree;
+                _ -> View#mrview.seq_btree
+            end,
+            io:format("opt list ~p~n", [OptList]),
+            AccOut = lists:foldl(fun(Opts, Acc0) ->
+                        {ok, _R, A} = couch_mrview_util:fold_changes(
+                                    Btree, Fun, Acc0, Opts),
+                        A
+                end, Acc, OptList),
+            {ok, AccOut};
+        _ ->
+            {error, seqs_not_indexed}
+    end.
 
 get_info(Db, DDocId) when is_binary(DDocId) ->
     DbName = mem3:dbname(Db#db.name),
@@ -447,3 +472,26 @@ lookup_index(Key) ->
         record_info(fields, mrargs), lists:seq(2, record_info(size, mrargs))
     ),
     couch_util:get_value(Key, Index).
+
+
+is_key_byseq(Options) ->
+    lists:any(fun({K, _}) ->
+                lists:member(K, [start_key, end_key, start_key_docid,
+                                 end_key_docid, keys])
+        end, Options).
+
+make_view_changes_args(Options) ->
+    case is_key_byseq(Options) of
+        true ->
+            to_mrargs(Options);
+        false ->
+            #mrargs{}
+    end.
+
+make_view_changes_opts(StartSeq, Options, Args) ->
+    case is_key_byseq(Options) of
+        true ->
+            couch_mrview_util:changes_key_opts(StartSeq, Args);
+        false ->
+            [[{start_key, {StartSeq+1, <<>>}}] ++ Options]
+    end.

http://git-wip-us.apache.org/repos/asf/couchdb-couch-mrview/blob/44c07c30/src/couch_mrview_index.erl
----------------------------------------------------------------------
diff --git a/src/couch_mrview_index.erl b/src/couch_mrview_index.erl
index 2d6ccae..0b9b236 100644
--- a/src/couch_mrview_index.erl
+++ b/src/couch_mrview_index.erl
@@ -51,14 +51,17 @@ get(Property, State) ->
             #mrst{
                 fd = Fd,
                 sig = Sig,
-                id_btree = Btree,
+                id_btree = IdBtree,
+                log_btree = LogBtree,
                 language = Lang,
                 update_seq = UpdateSeq,
                 purge_seq = PurgeSeq,
                 views = Views
             } = State,
             {ok, FileSize} = couch_file:bytes(Fd),
-            {ok, ExternalSize} = couch_mrview_util:calculate_external_size(Views),
+            {ok, ExternalSize} = couch_mrview_util:calculate_external_size(IdBtree,
+                                                                           LogBtree,
+                                                                           Views),
             ActiveSize = ExternalSize + couch_btree:size(Btree),
             {ok, [
                 {signature, list_to_binary(couch_index_util:hexsig(Sig))},

http://git-wip-us.apache.org/repos/asf/couchdb-couch-mrview/blob/44c07c30/src/couch_mrview_updater.erl
----------------------------------------------------------------------
diff --git a/src/couch_mrview_updater.erl b/src/couch_mrview_updater.erl
index 1525a4c..eb06c92 100644
--- a/src/couch_mrview_updater.erl
+++ b/src/couch_mrview_updater.erl
@@ -141,12 +141,12 @@ map_docs(Parent, State0) ->
                 ({nil, Seq, _}, {SeqAcc, Results}) ->
                     {erlang:max(Seq, SeqAcc), Results};
                 ({Id, Seq, deleted}, {SeqAcc, Results}) ->
-                    {erlang:max(Seq, SeqAcc), [{Id, []} | Results]};
+                    {erlang:max(Seq, SeqAcc), [{Id, Seq, []} | Results]};
                 ({Id, Seq, Doc}, {SeqAcc, Results}) ->
                     couch_stats:increment_counter([couchdb, mrview, map_docs],
                                                   1),
                     {ok, Res} = couch_query_servers:map_doc_raw(QServer, Doc),
-                    {erlang:max(Seq, SeqAcc), [{Id, Res} | Results]}
+                    {erlang:max(Seq, SeqAcc), [{Id, Seq, Res} | Results]}
             end,
             FoldFun = fun(Docs, Acc) ->
                 update_task(length(Docs)),
@@ -162,8 +162,8 @@ write_results(Parent, State) ->
     case accumulate_writes(State, State#mrst.write_queue, nil) of
         stop ->
             Parent ! {new_state, State};
-        {Go, {Seq, ViewKVs, DocIdKeys}} ->
-            NewState = write_kvs(State, Seq, ViewKVs, DocIdKeys),
+        {Go, {Seq, ViewKVs, DocIdKeys, Log}} ->
+            NewState = write_kvs(State, Seq, ViewKVs, DocIdKeys, Log),
             if Go == stop ->
                 Parent ! {new_state, NewState};
             true ->
@@ -185,17 +185,17 @@ start_query_server(State) ->
 
 
 accumulate_writes(State, W, Acc0) ->
-    {Seq, ViewKVs, DocIdKVs} = case Acc0 of
-        nil -> {0, [{V#mrview.id_num, []} || V <- State#mrst.views], []};
+    {Seq, ViewKVs, DocIdKVs, Log} = case Acc0 of
+        nil -> {0, [{V#mrview.id_num, {[], []}} || V <- State#mrst.views], [], dict:new()};
         _ -> Acc0
     end,
     case couch_work_queue:dequeue(W) of
         closed when Seq == 0 ->
             stop;
         closed ->
-            {stop, {Seq, ViewKVs, DocIdKVs}};
+            {stop, {Seq, ViewKVs, DocIdKVs, Log}};
         {ok, Info} ->
-            {_, _, NewIds} = Acc = merge_results(Info, Seq, ViewKVs, DocIdKVs),
+            {_, _, NewIds, _} = Acc = merge_results(Info, Seq, ViewKVs, DocIdKVs, Log),
             case accumulate_more(length(NewIds)) of
                 true -> accumulate_writes(State, W, Acc);
                 false -> {ok, Acc}
@@ -212,66 +212,100 @@ accumulate_more(NumDocIds) ->
         andalso CurrMem < list_to_integer(MinSize).
 
 
-merge_results([], SeqAcc, ViewKVs, DocIdKeys) ->
-    {SeqAcc, ViewKVs, DocIdKeys};
-merge_results([{Seq, Results} | Rest], SeqAcc, ViewKVs, DocIdKeys) ->
-    Fun = fun(RawResults, {VKV, DIK}) ->
-        merge_results(RawResults, VKV, DIK)
+merge_results([], SeqAcc, ViewKVs, DocIdKeys, Log) ->
+    {SeqAcc, ViewKVs, DocIdKeys, Log};
+merge_results([{Seq, Results} | Rest], SeqAcc, ViewKVs, DocIdKeys, Log) ->
+    Fun = fun(RawResults, {VKV, DIK, Log2}) ->
+        merge_results(RawResults, VKV, DIK, Log2)
     end,
-    {ViewKVs1, DocIdKeys1} = lists:foldl(Fun, {ViewKVs, DocIdKeys}, Results),
-    merge_results(Rest, erlang:max(Seq, SeqAcc), ViewKVs1, DocIdKeys1).
+    {ViewKVs1, DocIdKeys1, Log1} = lists:foldl(Fun, {ViewKVs, DocIdKeys, Log},
+                                               Results),
+    merge_results(Rest, erlang:max(Seq, SeqAcc), ViewKVs1, DocIdKeys1,
+                  Log1).
 
 
-merge_results({DocId, []}, ViewKVs, DocIdKeys) ->
-    {ViewKVs, [{DocId, []} | DocIdKeys]};
-merge_results({DocId, RawResults}, ViewKVs, DocIdKeys) ->
+merge_results({DocId, _Seq, []}, ViewKVs, DocIdKeys, Log) ->
+    {ViewKVs, [{DocId, []} | DocIdKeys], dict:store(DocId, [], Log)};
+merge_results({DocId, Seq, RawResults}, ViewKVs, DocIdKeys, Log) ->
     JsonResults = couch_query_servers:raw_to_ejson(RawResults),
     Results = [[list_to_tuple(Res) || Res <- FunRs] || FunRs <- JsonResults],
-    {ViewKVs1, ViewIdKeys} = insert_results(DocId, Results, ViewKVs, [], []),
-    {ViewKVs1, [ViewIdKeys | DocIdKeys]}.
+    {ViewKVs1, ViewIdKeys, Log1} = insert_results(DocId, Seq, Results, ViewKVs, [],
+                                            [], Log),
+    {ViewKVs1, [ViewIdKeys | DocIdKeys], Log1}.
 
 
-insert_results(DocId, [], [], ViewKVs, ViewIdKeys) ->
-    {lists:reverse(ViewKVs), {DocId, ViewIdKeys}};
-insert_results(DocId, [KVs | RKVs], [{Id, VKVs} | RVKVs], VKVAcc, VIdKeys) ->
+insert_results(DocId, _Seq, [], [], ViewKVs, ViewIdKeys, Log) ->
+    {lists:reverse(ViewKVs), {DocId, ViewIdKeys}, Log};
+insert_results(DocId, Seq, [KVs | RKVs], [{Id, {VKVs, SKVs}} | RVKVs], VKVAcc,
+               VIdKeys, Log) ->
     CombineDupesFun = fun
-        ({Key, Val}, {[{Key, {dups, Vals}} | Rest], IdKeys}) ->
-            {[{Key, {dups, [Val | Vals]}} | Rest], IdKeys};
-        ({Key, Val1}, {[{Key, Val2} | Rest], IdKeys}) ->
-            {[{Key, {dups, [Val1, Val2]}} | Rest], IdKeys};
-        ({Key, _}=KV, {Rest, IdKeys}) ->
-            {[KV | Rest], [{Id, Key} | IdKeys]}
+        ({Key, Val}, {[{Key, {dups, Vals}} | Rest], IdKeys, Log2}) ->
+            {[{Key, {dups, [Val | Vals]}} | Rest], IdKeys, Log2};
+        ({Key, Val1}, {[{Key, Val2} | Rest], IdKeys, Log2}) ->
+            {[{Key, {dups, [Val1, Val2]}} | Rest], IdKeys, Log2};
+        ({Key, _}=KV, {Rest, IdKeys, Log2}) ->
+            {[KV | Rest], [{Id, Key} | IdKeys],
+             dict:append(DocId, {Id, {Key, Seq, add}}, Log2)}
     end,
-    InitAcc = {[], VIdKeys},
+    InitAcc = {[], VIdKeys, Log},
     couch_stats:increment_counter([couchdb, mrview, emits], length(KVs)),
-    {Duped, VIdKeys0} = lists:foldl(CombineDupesFun, InitAcc, lists:sort(KVs)),
+    {Duped, VIdKeys0, Log1} = lists:foldl(CombineDupesFun, InitAcc,
+                                          lists:sort(KVs)),
     FinalKVs = [{{Key, DocId}, Val} || {Key, Val} <- Duped] ++ VKVs,
-    insert_results(DocId, RKVs, RVKVs, [{Id, FinalKVs} | VKVAcc], VIdKeys0).
+    FinalSKVs = [{{Seq, Key}, {DocId, Val}} || {Key, Val} <- Duped] ++ SKVs,
+    insert_results(DocId, Seq, RKVs, RVKVs,
+                  [{Id, {FinalKVs, FinalSKVs}} | VKVAcc], VIdKeys0, Log1).
 
 
-write_kvs(State, UpdateSeq, ViewKVs, DocIdKeys) ->
+write_kvs(State, UpdateSeq, ViewKVs, DocIdKeys, Log) ->
     #mrst{
         id_btree=IdBtree,
+        log_btree=LogBtree,
         first_build=FirstBuild
     } = State,
 
     {ok, ToRemove, IdBtree2} = update_id_btree(IdBtree, DocIdKeys, FirstBuild),
     ToRemByView = collapse_rem_keys(ToRemove, dict:new()),
 
-    UpdateView = fun(#mrview{id_num=ViewId}=View, {ViewId, KVs}) ->
+    {ok, SeqsToAdd, SeqsToRemove, LogBtree2} = case LogBtree of
+        nil -> {ok, undefined, undefined, nil};
+        _ -> update_log(LogBtree, Log, UpdateSeq, FirstBuild)
+    end,
+
+    UpdateView = fun(#mrview{id_num=ViewId}=View, {ViewId, {KVs, SKVs}}) ->
         ToRem = couch_util:dict_find(ViewId, ToRemByView, []),
         {ok, VBtree2} = couch_btree:add_remove(View#mrview.btree, KVs, ToRem),
         NewUpdateSeq = case VBtree2 =/= View#mrview.btree of
             true -> UpdateSeq;
             _ -> View#mrview.update_seq
         end,
-        View#mrview{btree=VBtree2, update_seq=NewUpdateSeq}
+
+        %% store the view changes.
+        {SeqBtree2, KeyBySeqBtree2} = case View#mrview.seq_indexed of
+            true ->
+                SToRem = couch_util:dict_find(ViewId, SeqsToRemove, []),
+                SToAdd = couch_util:dict_find(ViewId, SeqsToAdd, []),
+                SKVs1 = SKVs ++ SToAdd,
+                {ok, SBt} = couch_btree:add_remove(View#mrview.seq_btree,
+                                                   SKVs1, SToRem),
+
+                {ok, KSbt} = couch_btree:add_remove(View#mrview.key_byseq_btree,
+                                                    couch_mrview_util:to_key_seq(SKVs1),
+                                                    couch_mrview_util:to_key_seq(SToRem)),
+                {SBt, KSbt};
+            _ -> {nil, nil}
+        end,
+        View#mrview{btree=VBtree2,
+                    seq_btree=SeqBtree2,
+                    key_byseq_btree=KeyBySeqBtree2,
+                    update_seq=NewUpdateSeq}
     end,
 
     State#mrst{
         views=lists:zipwith(UpdateView, State#mrst.views, ViewKVs),
         update_seq=UpdateSeq,
-        id_btree=IdBtree2
+        id_btree=IdBtree2,
+        log_btree=LogBtree2
     }.
 
 
@@ -284,6 +318,84 @@ update_id_btree(Btree, DocIdKeys, _) ->
     ToRem = [Id || {Id, DIKeys} <- DocIdKeys, DIKeys == []],
     couch_btree:query_modify(Btree, ToFind, ToAdd, ToRem).
 
+walk_log(BTree, Fun, Acc, Ids) ->
+    WrapFun = fun(KV, _Offset, Acc2) ->
+            Fun(KV, Acc2)
+    end,
+    lists:foldl(fun(Id, Acc1) ->
+                Opt = [{start_key, Id}, {end_key, Id}],
+                {ok, _, A} = couch_btree:fold(BTree, WrapFun, Acc1, Opt),
+                A
+        end, Acc, Ids).
+
+update_log(Btree, Log, _UpdatedSeq, true) ->
+    ToAdd = [{Id, DIKeys} || {Id, DIKeys} <- dict:to_list(Log),
+                             DIKeys /= []],
+    {ok, LogBtree2} = couch_btree:add_remove(Btree, ToAdd, []),
+    {ok, dict:new(), dict:new(), LogBtree2};
+update_log(Btree, Log, UpdatedSeq, _) ->
+    %% build list of updated keys and Id
+    {ToLook, Updated} = dict:fold(fun
+                (Id, [], {IdsAcc, KeysAcc}) ->
+                    {[Id | IdsAcc], KeysAcc};
+                (Id, DIKeys, {IdsAcc, KeysAcc}) ->
+                    KeysAcc1 = lists:foldl(fun({ViewId, {Key, _Seq, _Op}},
+                                               KeysAcc2) ->
+                                    [{Id, ViewId, Key} | KeysAcc2]
+                            end, KeysAcc, DIKeys),
+                {[Id | IdsAcc], KeysAcc1} end, {[], []}, Log),
+
+    io:format("updated ~p~n", [Updated]),
+    RemValue = {[{<<"_removed">>, true}]},
+    {Log1, AddAcc, DelAcc} = walk_log(Btree, fun({DocId, VIdKeys},
+                                                          {Log2, AddAcc2, DelAcc2}) ->
+
+                {Log3, AddAcc3, DelAcc3} = lists:foldl(fun({ViewId,{Key, Seq, Op}},
+                                                           {Log4, AddAcc4, DelAcc4}) ->
+
+                            case lists:member({DocId, ViewId, Key}, Updated) of
+                                true ->
+                                    %% the log is updated, deleted old
+                                    %% record from the view
+                                    DelAcc5 = dict:append(ViewId, {Seq, Key},
+                                                        DelAcc4),
+                                    {Log4, AddAcc4, DelAcc5};
+                                false when Op /= del ->
+                                    %% an update operation has been
+                                    %% logged for this key. We must now
+                                    %% record it as deleted in the
+                                    %% log, remove the old record in
+                                    %% the view and update the view
+                                    %% with a removed record.
+                                    Log5 = dict:append(DocId,
+                                                       {ViewId,
+                                                        {Key,UpdatedSeq, del}},
+                                                       Log4),
+                                    DelAcc5 = dict:append(ViewId, {Seq, Key},
+                                                        DelAcc4),
+                                    AddAcc5 = dict:append(ViewId,
+                                                          {{UpdatedSeq, Key},
+                                                           {DocId, RemValue}},
+                                                          AddAcc4),
+                                    {Log5, AddAcc5, DelAcc5};
+                                false ->
+                                    %% the key has already been
+                                    %% registered in the view as
+                                    %% deleted, make sure to add it
+                                    %% to the new log.
+                                    Log5 = dict:append(DocId,
+                                                       {ViewId,
+                                                        {Key, Seq, del}}, Log4),
+                                    {Log5, AddAcc4, DelAcc4}
+                            end
+                    end, {Log2, AddAcc2, DelAcc2}, VIdKeys),
+                    {ok, {Log3, AddAcc3, DelAcc3}}
+            end, {Log, dict:new(), dict:new()}, ToLook),
+
+    ToAdd = [{Id, DIKeys} || {Id, DIKeys} <- dict:to_list(Log1), DIKeys /= []],
+    %% store the new logs
+    {ok, LogBtree2} = couch_btree:add_remove(Btree, ToAdd, []),
+    {ok, AddAcc, DelAcc, LogBtree2}.
 
 collapse_rem_keys([], Acc) ->
     Acc;

http://git-wip-us.apache.org/repos/asf/couchdb-couch-mrview/blob/44c07c30/src/couch_mrview_util.erl
----------------------------------------------------------------------
diff --git a/src/couch_mrview_util.erl b/src/couch_mrview_util.erl
index 7fc60f8..cd25887 100644
--- a/src/couch_mrview_util.erl
+++ b/src/couch_mrview_util.erl
@@ -21,13 +21,16 @@
 -export([all_docs_key_opts/1, all_docs_key_opts/2, key_opts/1, key_opts/2]).
 -export([fold/4, fold_reduce/4]).
 -export([temp_view_to_ddoc/1]).
--export([calculate_external_size/1]).
+-export([calculate_external_size/3]).
 -export([validate_args/1]).
 -export([maybe_load_doc/3, maybe_load_doc/4]).
 -export([maybe_update_index_file/1]).
 -export([extract_view/4, extract_view_reduce/1]).
 -export([get_view_keys/1, get_view_queries/1]).
 -export([set_view_type/3]).
+-export([changes_key_opts/2]).
+-export([fold_changes/4]).
+-export([to_key_seq/1]).
 
 -define(MOD, couch_mrview_index).
 
@@ -91,15 +94,18 @@ ddoc_to_mrst(DbName, #doc{id=Id, body={Fields}}) ->
                             [Name, Else]),
             DictBySrcAcc
     end,
+    {DesignOpts} = proplists:get_value(<<"options">>, Fields, {[]}),
+    SeqIndexed = proplists:get_value(<<"seq_indexed">>, DesignOpts, false),
+
     {RawViews} = couch_util:get_value(<<"views">>, Fields, {[]}),
     BySrc = lists:foldl(MakeDict, dict:new(), RawViews),
 
-    NumViews = fun({_, View}, N) -> {View#mrview{id_num=N}, N+1} end,
+    NumViews = fun({_, View}, N) ->
+            {View#mrview{id_num=N, seq_indexed=SeqIndexed}, N+1}
+    end,
     {Views, _} = lists:mapfoldl(NumViews, 0, lists:sort(dict:to_list(BySrc))),
 
     Language = couch_util:get_value(<<"language">>, Fields, <<"javascript">>),
-    {DesignOpts} = couch_util:get_value(<<"options">>, Fields, {[]}),
-    {RawViews} = couch_util:get_value(<<"views">>, Fields, {[]}),
     Lib = couch_util:get_value(<<"lib">>, RawViews, {[]}),
 
     IdxState = #mrst{
@@ -108,7 +114,8 @@ ddoc_to_mrst(DbName, #doc{id=Id, body={Fields}}) ->
         lib=Lib,
         views=Views,
         language=Language,
-        design_opts=DesignOpts
+        design_opts=DesignOpts,
+        seq_indexed=SeqIndexed
     },
     SigInfo = {Views, Language, DesignOpts, couch_index_util:sort_lib(Lib)},
     {ok, IdxState#mrst{sig=couch_util:md5(term_to_binary(SigInfo))}}.
@@ -152,7 +159,8 @@ view_sig(Db, State, View, #mrargs{include_docs=true}=Args) ->
     BaseSig = view_sig(Db, State, View, Args#mrargs{include_docs=false}),
     UpdateSeq = couch_db:get_update_seq(Db),
     PurgeSeq = couch_db:get_purge_seq(Db),
-    Bin = term_to_binary({BaseSig, UpdateSeq, PurgeSeq}),
+    Bin = term_to_binary({BaseSig, UpdateSeq, PurgeSeq,
+                          State#mrst.seq_indexed}),
     couch_index_util:hexsig(couch_util:md5(Bin));
 view_sig(Db, State, {_Nth, _Lang, View}, Args) ->
     view_sig(Db, State, View, Args);
@@ -160,11 +168,12 @@ view_sig(_Db, State, View, Args0) ->
     Sig = State#mrst.sig,
     UpdateSeq = View#mrview.update_seq,
     PurgeSeq = View#mrview.purge_seq,
+    SeqIndexed = View#mrview.seq_indexed,
     Args = Args0#mrargs{
         preflight_fun=undefined,
         extra=[]
     },
-    Bin = term_to_binary({Sig, UpdateSeq, PurgeSeq, Args}),
+    Bin = term_to_binary({Sig, UpdateSeq, PurgeSeq, SeqIndexed, Args}),
     couch_index_util:hexsig(couch_util:md5(Bin)).
 
 
@@ -173,7 +182,8 @@ init_state(Db, Fd, #mrst{views=Views}=State, nil) ->
         seq=0,
         purge_seq=couch_db:get_purge_seq(Db),
         id_btree_state=nil,
-        view_states=[{nil, 0, 0} || _ <- Views]
+        log_btree_state=nil,
+        view_states=[{nil, nil, nil, 0, 0} || _ <- Views]
     },
     init_state(Db, Fd, State, Header);
 % read <= 1.2.x header record and transpile it to >=1.3.x
@@ -187,25 +197,31 @@ init_state(Db, Fd, State, #index_header{
         seq=Seq,
         purge_seq=PurgeSeq,
         id_btree_state=IdBtreeState,
-        view_states=ViewStates
+        log_btree_state=nil,
+        view_states=[{Bt, nil, nil, USeq, PSeq} || {Bt, USeq, PSeq} <- ViewStates]
         });
 init_state(Db, Fd, State, Header) ->
-    #mrst{language=Lang, views=Views} = State,
+    #mrst{language=Lang, views=Views, seq_indexed=SeqIndexed} = State,
     #mrheader{
         seq=Seq,
         purge_seq=PurgeSeq,
         id_btree_state=IdBtreeState,
+        log_btree_state=LogBtreeState,
         view_states=ViewStates
     } = Header,
 
     StateUpdate = fun
-        ({_, _, _}=St) -> St;
-        (St) -> {St, 0, 0}
+        ({_, _, _, _, _}=St) -> St;
+        (St) -> {St, nil, nil, 0, 0}
     end,
     ViewStates2 = lists:map(StateUpdate, ViewStates),
 
     IdBtOpts = [{compression, couch_db:compression(Db)}],
     {ok, IdBtree} = couch_btree:open(IdBtreeState, Fd, IdBtOpts),
+    {ok, LogBtree} = case SeqIndexed of
+        true -> couch_btree:open(LogBtreeState, Fd, IdBtOpts);
+        false -> {ok, nil}
+    end,
 
     OpenViewFun = fun(St, View) -> open_view(Db, Fd, Lang, St, View) end,
     Views2 = lists:zipwith(OpenViewFun, ViewStates2, Views),
@@ -216,11 +232,20 @@ init_state(Db, Fd, State, Header) ->
         update_seq=Seq,
         purge_seq=PurgeSeq,
         id_btree=IdBtree,
+        log_btree=LogBtree,
         views=Views2
     }.
 
+less_json_seqs({SeqA, JsonA}, {SeqB, JsonB}) ->
+    case couch_ejson_compare:less(SeqA, SeqB) of
+        0 ->
+            couch_ejson_compare:less_json(JsonA, JsonB);
+        Result ->
+            Result < 0
+    end.
+
 
-open_view(Db, Fd, Lang, {BTState, USeq, PSeq}, View) ->
+open_view(Db, Fd, Lang, {BTState, SeqBTState, KSeqBTState, USeq, PSeq}, View) ->
     FunSrcs = [FunSrc || {_Name, FunSrc} <- View#mrview.reduce_funs],
     ReduceFun =
         fun(reduce, KVs) ->
@@ -245,7 +270,23 @@ open_view(Db, Fd, Lang, {BTState, USeq, PSeq}, View) ->
         {compression, couch_db:compression(Db)}
     ],
     {ok, Btree} = couch_btree:open(BTState, Fd, ViewBtOpts),
-    View#mrview{btree=Btree, update_seq=USeq, purge_seq=PSeq}.
+
+    {SeqBtree, KeyBySeqBtree} = case View#mrview.seq_indexed of
+        true ->
+            ViewSeqBtOpts = [{less, fun less_json_seqs/2},
+                             {compression, couch_db:compression(Db)}],
+            {ok, SBt} = couch_btree:open(SeqBTState, Fd, ViewSeqBtOpts),
+            {ok, KSBt} = couch_btree:open(KSeqBTState, Fd, ViewBtOpts),
+            {SBt, KSBt};
+        false ->
+            {nil, nil}
+    end,
+
+    View#mrview{btree=Btree,
+                seq_btree=SeqBtree,
+                key_byseq_btree=KeyBySeqBtree,
+                update_seq=USeq,
+                purge_seq=PSeq}.
 
 
 temp_view_to_ddoc({Props}) ->
@@ -300,7 +341,6 @@ fold(#mrview{btree=Bt}, Fun, Acc, Opts) ->
     end,
     {ok, _LastRed, _Acc} = couch_btree:fold(Bt, WrapperFun, Acc, Opts).
 
-
 fold_fun(_Fun, [], _, Acc) ->
     {ok, Acc};
 fold_fun(Fun, [KV|Rest], {KVReds, Reds}, Acc) ->
@@ -311,6 +351,12 @@ fold_fun(Fun, [KV|Rest], {KVReds, Reds}, Acc) ->
             {stop, Acc2}
     end.
 
+fold_changes(Bt, Fun, Acc, Opts) ->
+    WrapperFun = fun(KV, _Reds, Acc2) ->
+        Fun(changes_expand_dups([KV], []), Acc2)
+    end,
+    {ok, _LastRed, _Acc} = couch_btree:fold(Bt, WrapperFun, Acc, Opts).
+
 
 fold_reduce({NthRed, Lang, View}, Fun,  Acc, Options) ->
     #mrview{
@@ -495,21 +541,34 @@ make_header(State) ->
         update_seq=Seq,
         purge_seq=PurgeSeq,
         id_btree=IdBtree,
+        log_btree=LogBtree,
         views=Views
     } = State,
-    ViewStates = [
-        {
-            couch_btree:get_state(V#mrview.btree),
-            V#mrview.update_seq,
-            V#mrview.purge_seq
-        }
-        ||
-        V <- Views
-    ],
+
+    ViewStates = lists:foldr(fun(V, Acc) ->
+                    {SeqBtState, KSeqBtState} = case V#mrview.seq_indexed of
+                        true ->
+                            {couch_btree:get_state(V#mrview.seq_btree),
+                             couch_btree:get_state(V#mrview.key_byseq_btree)};
+                        _ -> {nil, nil}
+                    end,
+                    [{couch_btree:get_state(V#mrview.btree),
+                      SeqBtState,
+                      KSeqBtState,
+                      V#mrview.update_seq,
+                      V#mrview.purge_seq} | Acc]
+            end, [], Views),
+
+    LogBtreeState = case LogBtree of
+        nil -> nil;
+        _ -> couch_btree:get_state(LogBtree)
+    end,
+
     #mrheader{
         seq=Seq,
         purge_seq=PurgeSeq,
         id_btree_state=couch_btree:get_state(IdBtree),
+        log_btree_state= LogBtreeState,
         view_states=ViewStates
     }.
 
@@ -567,7 +626,9 @@ reset_state(State) ->
         qserver=nil,
         update_seq=0,
         id_btree=nil,
-        views=[View#mrview{btree=nil} || View <- State#mrst.views]
+        log_btree=nil,
+        views=[View#mrview{btree=nil, seq_btree=nil, key_byseq_btree=nil}
+               || View <- State#mrst.views]
     }.
 
 
@@ -636,11 +697,62 @@ reverse_key_default(<<255>>) -> <<>>;
 reverse_key_default(Key) -> Key.
 
 
-calculate_external_size(Views) ->
-    SumFun = fun(#mrview{btree=Bt}, Acc) ->
-        sum_btree_sizes(Acc, couch_btree:size(Bt))
+changes_key_opts(StartSeq, Args) ->
+    changes_key_opts(StartSeq, Args, []).
+
+
+changes_key_opts(StartSeq, #mrargs{keys=undefined, direction=Dir}=Args, Extra) ->
+    [[{dir, Dir}] ++ changes_skey_opts(StartSeq, Args) ++
+     changes_ekey_opts(StartSeq, Args) ++ Extra];
+changes_key_opts(StartSeq, #mrargs{keys=Keys, direction=Dir}=Args, Extra) ->
+    lists:map(fun(K) ->
+        [{dir, Dir}]
+        ++ changes_skey_opts(StartSeq, Args#mrargs{start_key=K})
+        ++ changes_ekey_opts(StartSeq, Args#mrargs{end_key=K})
+        ++ Extra
+    end, Keys).
+
+
+changes_skey_opts(StartSeq, #mrargs{start_key=undefined}) ->
+    [{start_key, [<<>>, StartSeq+1]}];
+changes_skey_opts(StartSeq, #mrargs{start_key=SKey,
+                                    start_key_docid=SKeyDocId}) ->
+    [{start_key, {[SKey, StartSeq+1], SKeyDocId}}].
+
+
+changes_ekey_opts(_StartSeq, #mrargs{end_key=undefined}) ->
+    [];
+changes_ekey_opts(_StartSeq, #mrargs{end_key=EKey,
+                                    end_key_docid=EKeyDocId,
+                                    direction=Dir}=Args) ->
+    EndSeq = case Dir of
+        fwd -> 16#10000000;
+        rev -> 0
+    end,
+
+    case Args#mrargs.inclusive_end of
+        true -> [{end_key, {[EKey, EndSeq], EKeyDocId}}];
+        false -> [{end_key_gt, {[EKey, EndSeq], EKeyDocId}}]
+    end.
+
+
+
+calculate_external_size(IdBt, LogBt, Views) ->
+    SumFun = fun
+        (#mrview{btree=Bt, seq_btree=nil}, Acc) ->
+            sum_btree_sizes(Acc, couch_btree:size(Bt));
+        (#mrview{btree=Bt, seq_btree=SBt, key_byseq_btree=KSBt}, Acc) ->
+            Acc1 = sum_btree_sizes(Acc, couch_btree:size(Bt)),
+            Acc2 = sum_btree_sizes(Acc1, couch_btree:size(SBt)),
+            sum_btree_sizes(Acc2, couch_btree:size(KSBt))
+    end,
+    Size = case LogBt of
+        nil ->
+            lists:foldl(SumFun, couch_btree:size(IdBt), Views);
+        _ ->
+            lists:foldl(SumFun, couch_btree:size(IdBt) +
+                        couch_btree:size(LogBt), Views)
     end,
-    Size = lists:foldl(SumFun, 0, Views),
     {ok, Size}.
 
 
@@ -669,6 +781,19 @@ expand_dups([KV | Rest], Acc) ->
     expand_dups(Rest, [KV | Acc]).
 
 
+changes_expand_dups([], Acc) ->
+    lists:reverse(Acc);
+changes_expand_dups([{{[Key, Seq], DocId}, {dups, Vals}} | Rest], Acc) ->
+    Expanded = [{{Key, Seq, DocId}, Val} || Val <- Vals],
+    changes_expand_dups(Rest, Expanded ++ Acc);
+changes_expand_dups([{{Key, Seq}, {DocId, {dups, Vals}}} | Rest], Acc) ->
+    Expanded = [{{Key, Seq, DocId}, Val} || Val <- Vals],
+    changes_expand_dups(Rest, Expanded ++ Acc);
+changes_expand_dups([{{[Key, Seq], DocId}, Val} | Rest], Acc) ->
+    changes_expand_dups(Rest, [{{Key, Seq, DocId}, Val} | Acc]);
+changes_expand_dups([{{Key, Seq}, {DocId, Val}} | Rest], Acc) ->
+    changes_expand_dups(Rest, [{{Key, Seq, DocId}, Val} | Acc]).
+
 maybe_load_doc(_Db, _DI, #mrargs{include_docs=false}) ->
     [];
 maybe_load_doc(Db, #doc_info{}=DI, #mrargs{conflicts=true, doc_options=Opts}) ->
@@ -718,6 +843,9 @@ mrverror(Mesg) ->
     throw({query_parse_error, Mesg}).
 
 
+to_key_seq(L) ->
+    [{{[Key, Seq], DocId}, Val} || {{Seq, Key}, {DocId, Val}} <- L].
+
 %% Updates 1.2.x or earlier view files to 1.3.x or later view files
 %% transparently, the first time the 1.2.x view file is opened by
 %% 1.3.x or later.


Mime
View raw message