couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rnew...@apache.org
Subject [couchdb] 04/04: implement partitioned views
Date Tue, 07 Aug 2018 14:46:47 GMT
This is an automated email from the ASF dual-hosted git repository.

rnewson pushed a commit to branch user-partitioned-dbs-4
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit df768d1f9304637cc2e79e97a7031f2db69365fc
Author: Robert Newson <rnewson@apache.org>
AuthorDate: Tue Aug 7 15:44:33 2018 +0100

    implement partitioned views
---
 src/couch_mrview/src/couch_mrview_updater.erl | 14 +++++++--
 src/couch_mrview/src/couch_mrview_util.erl    | 41 ++++++++++++++++++++++++++-
 src/fabric/src/fabric_view.erl                | 19 +++++++++++--
 3 files changed, 68 insertions(+), 6 deletions(-)

diff --git a/src/couch_mrview/src/couch_mrview_updater.erl b/src/couch_mrview/src/couch_mrview_updater.erl
index 214f487..bfaf136 100644
--- a/src/couch_mrview/src/couch_mrview_updater.erl
+++ b/src/couch_mrview/src/couch_mrview_updater.erl
@@ -311,9 +311,11 @@ write_kvs(State, UpdateSeq, ViewKVs, DocIdKeys, Seqs, Log0) ->
     #mrst{
         id_btree=IdBtree,
         log_btree=LogBtree,
-        first_build=FirstBuild
+        first_build=FirstBuild,
+        design_opts=DesignOpts
     } = State,
 
+    Partitioned = couch_util:get_value(<<"partitioned">>, DesignOpts, false),
     Revs = dict:from_list(dict:fetch_keys(Log0)),
 
     Log = dict:fold(fun({Id, _Rev}, DIKeys, Acc) ->
@@ -328,8 +330,9 @@ write_kvs(State, UpdateSeq, ViewKVs, DocIdKeys, Seqs, Log0) ->
         _ -> update_log(LogBtree, Log, Revs, Seqs, FirstBuild)
     end,
 
-    UpdateView = fun(#mrview{id_num=ViewId}=View, {ViewId, {KVs, SKVs}}) ->
+    UpdateView = fun(#mrview{id_num=ViewId}=View, {ViewId, {KVs0, SKVs}}) ->
         #mrview{seq_indexed=SIndexed, keyseq_indexed=KSIndexed} = View,
+        KVs = if Partitioned -> inject_partition(KVs0); true -> KVs0 end,
         ToRem = couch_util:dict_find(ViewId, ToRemByView, []),
         {ok, VBtree2} = couch_btree:add_remove(View#mrview.btree, KVs, ToRem),
         NewUpdateSeq = case VBtree2 =/= View#mrview.btree of
@@ -378,6 +381,13 @@ write_kvs(State, UpdateSeq, ViewKVs, DocIdKeys, Seqs, Log0) ->
         log_btree=LogBtree2
     }.
 
+inject_partition(KVs) ->
+    [{{[partition(DocId), Key], DocId}, Value} || {{Key, DocId}, Value} <- KVs].
+
+partition(DocId) ->
+    [Partition, _Rest] = binary:split(DocId, <<":">>),
+    Partition.
+
 update_id_btree(Btree, DocIdKeys, true) ->
     ToAdd = [{Id, DIKeys} || {Id, DIKeys} <- DocIdKeys, DIKeys /= []],
     couch_btree:query_modify(Btree, [], ToAdd, []);
diff --git a/src/couch_mrview/src/couch_mrview_util.erl b/src/couch_mrview/src/couch_mrview_util.erl
index 592bfb5..e4f06ff 100644
--- a/src/couch_mrview/src/couch_mrview_util.erl
+++ b/src/couch_mrview/src/couch_mrview_util.erl
@@ -38,6 +38,9 @@
 -define(MOD, couch_mrview_index).
 -define(GET_VIEW_RETRY_COUNT, 1).
 -define(GET_VIEW_RETRY_DELAY, 50).
+-define(LOWEST_KEY, null).
+-define(HIGHEST_KEY, {[{<<239, 191, 176>>, null}]}). % is {"\ufff0": null}
+
 
 -include_lib("couch/include/couch_db.hrl").
 -include_lib("couch_mrview/include/couch_mrview.hrl").
@@ -588,7 +591,12 @@ validate_args(Args) ->
             mrverror(<<"`partition` parameter is not supported in this view.">>)
     end,
 
-    Args#mrargs{
+    Args1 = case get_extra(Args, partitioned, false) of
+        true  -> apply_partition(Args);
+        false -> Args
+    end,
+
+    Args1#mrargs{
         start_key_docid=SKDocId,
         end_key_docid=EKDocId,
         group_level=GroupLevel
@@ -606,6 +614,37 @@ determine_group_level(#mrargs{group=true, group_level=undefined}) ->
 determine_group_level(#mrargs{group_level=GroupLevel}) ->
     GroupLevel.
 
+apply_partition(#mrargs{} = Args0) ->
+    case get_extra(Args0, partition_applied, false) of
+        true ->
+            Args0;
+        false ->
+            Partition = get_extra(Args0, partition),
+            Args1 = apply_partition(Partition, Args0),
+            set_extra(Args1, partition_applied, true)
+    end.
+
+apply_partition(Partition, #mrargs{direction=fwd, start_key=undefined, end_key=undefined}
= Args) ->
+    Args#mrargs{start_key=[Partition, ?LOWEST_KEY], end_key=[Partition, ?HIGHEST_KEY]};
+
+apply_partition(Partition, #mrargs{direction=rev, start_key=undefined, end_key=undefined}
= Args) ->
+    Args#mrargs{start_key=[Partition, ?HIGHEST_KEY], end_key=[Partition, ?LOWEST_KEY]};
+
+apply_partition(Partition, #mrargs{direction=fwd, start_key=SK0, end_key=undefined} = Args)
->
+    Args#mrargs{start_key=[Partition, SK0], end_key=[Partition, ?HIGHEST_KEY]};
+
+apply_partition(Partition, #mrargs{direction=rev, start_key=SK0, end_key=undefined} = Args)
->
+    Args#mrargs{start_key=[Partition, SK0], end_key=[Partition, ?LOWEST_KEY]};
+
+apply_partition(Partition, #mrargs{direction=fwd, start_key=undefined, end_key=EK0} = Args)
->
+    Args#mrargs{start_key=[Partition, ?LOWEST_KEY], end_key=[Partition, EK0]};
+
+apply_partition(Partition, #mrargs{direction=rev, start_key=undefined, end_key=EK0} = Args)
->
+    Args#mrargs{start_key=[Partition, ?HIGHEST_KEY], end_key=[Partition, EK0]};
+
+apply_partition(Partition, #mrargs{start_key=SK0, end_key=EK0} = Args) ->
+    Args#mrargs{start_key=[Partition, SK0], end_key=[Partition, EK0]}.
+
 
 check_range(#mrargs{start_key=undefined}, _Cmp) ->
     ok;
diff --git a/src/fabric/src/fabric_view.erl b/src/fabric/src/fabric_view.erl
index eae4cd6..994c739 100644
--- a/src/fabric/src/fabric_view.erl
+++ b/src/fabric/src/fabric_view.erl
@@ -119,8 +119,10 @@ maybe_send_row(State) ->
         counters = Counters,
         skip = Skip,
         limit = Limit,
-        user_acc = AccIn
+        user_acc = AccIn,
+        query_args = QueryArgs
     } = State,
+    Partitioned = couch_mrview_util:get_extra(QueryArgs, partitioned, false),
     case fabric_dict:any(0, Counters) of
     true ->
         {ok, State};
@@ -128,8 +130,14 @@ maybe_send_row(State) ->
         try get_next_row(State) of
         {_, NewState} when Skip > 0 ->
             maybe_send_row(NewState#collector{skip=Skip-1});
-        {Row, NewState} ->
-            case Callback(transform_row(possibly_embed_doc(NewState,Row)), AccIn) of
+        {Row0, NewState} ->
+            Row1 = possibly_embed_doc(NewState, Row0),
+            Row2 = if
+                Partitioned -> detach_partition(Row1);
+                true -> Row1
+            end,
+            Row3 = transform_row(Row2),
+            case Callback(Row3, AccIn) of
             {stop, Acc} ->
                 {stop, NewState#collector{user_acc=Acc, limit=Limit-1}};
             {ok, Acc} ->
@@ -194,6 +202,11 @@ possibly_embed_doc(#collector{db_name=DbName, query_args=Args},
         _ -> Row
     end.
 
+detach_partition(#view_row{key=[_Partition, Key]} = Row) ->
+    Row#view_row{key = Key};
+detach_partition(#view_row{key=null} = Row) ->
+    Row#view_row{key = null}.
+
 
 keydict(undefined) ->
     undefined;


Mime
View raw message