couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dav...@apache.org
Subject [couchdb] 03/06: Implement partitioned views
Date Tue, 30 Oct 2018 20:59:14 GMT
This is an automated email from the ASF dual-hosted git repository.

davisp pushed a commit to branch feature/user-partitioned-databases-davisp
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 657fda6c9bfdf7f84f21a00cffa0c21a43f42636
Author: Paul J. Davis <paul.joseph.davis@gmail.com>
AuthorDate: Thu Oct 25 14:19:07 2018 -0500

    Implement partitioned views
    
    The benefit of using partitioned databases is that views can then be
    scoped to a single shard range. This allows for views to scale nearly as
    linearly as document lookups.
    
    Co-authored-by: Garren Smith <garren.smith@gmail.com>
    Co-authored-by: Robert Newson <rnewson@apache.org>
---
 src/chttpd/src/chttpd_db.erl                  |  45 ++++++++-
 src/chttpd/src/chttpd_httpd_handlers.erl      |   1 +
 src/chttpd/src/chttpd_view.erl                |   2 +-
 src/chttpd/test/chttpd_db_bulk_get_test.erl   |  12 +--
 src/couch/src/couch_btree.erl                 |   6 +-
 src/couch/src/couch_ejson_compare.erl         |   4 +
 src/couch_mrview/include/couch_mrview.hrl     |   1 +
 src/couch_mrview/src/couch_mrview.erl         |  17 +++-
 src/couch_mrview/src/couch_mrview_http.erl    |   5 +-
 src/couch_mrview/src/couch_mrview_index.erl   |  34 ++++++-
 src/couch_mrview/src/couch_mrview_updater.erl |  13 ++-
 src/couch_mrview/src/couch_mrview_util.erl    | 129 +++++++++++++++++++++++++-
 src/fabric/src/fabric.erl                     |  17 ++--
 src/fabric/src/fabric_util.erl                |  22 ++++-
 src/fabric/src/fabric_view.erl                |  32 +++++--
 src/fabric/src/fabric_view_all_docs.erl       |   5 +-
 src/fabric/src/fabric_view_map.erl            |   5 +-
 src/fabric/src/fabric_view_reduce.erl         |   5 +-
 18 files changed, 308 insertions(+), 47 deletions(-)

diff --git a/src/chttpd/src/chttpd_db.erl b/src/chttpd/src/chttpd_db.erl
index 3d6c79f..95c18ab 100644
--- a/src/chttpd/src/chttpd_db.erl
+++ b/src/chttpd/src/chttpd_db.erl
@@ -19,7 +19,8 @@
     db_req/2, couch_doc_open/4,handle_changes_req/2,
     update_doc_result_to_json/1, update_doc_result_to_json/2,
     handle_design_info_req/3, handle_view_cleanup_req/2,
-    update_doc/4, http_code_from_status/1]).
+    update_doc/4, http_code_from_status/1,
+    handle_partition_req/2]).
 
 -import(chttpd,
     [send_json/2,send_json/3,send_json/4,send_method_not_allowed/2,
@@ -251,6 +252,40 @@ handle_view_cleanup_req(Req, Db) ->
     ok = fabric:cleanup_index_files_all_nodes(Db),
     send_json(Req, 202, {[{ok, true}]}).
 
+
+handle_partition_req(#httpd{method='GET', path_parts=[_,_,PartId]}=Req, Db) ->
+    case couch_db:is_partitioned(Db) of
+        true ->
+            {ok, PartitionInfo} = fabric:get_partition_info(Db, PartId),
+            send_json(Req, {PartitionInfo});
+        false ->
+            throw({bad_request, <<"database is not partitioned">>})
+    end;
+
+handle_partition_req(#httpd{path_parts = [_, _, _]}=Req, _Db) ->
+    send_method_not_allowed(Req, "GET");
+
+handle_partition_req(#httpd{path_parts=[DbName, _, PartId | Rest]}=Req, Db) ->
+    case couch_db:is_partitioned(Db) of
+        true ->
+            QS = chttpd:qs(Req),
+            NewQS = lists:ukeysort(1, [{"partition", ?b2l(PartId)} | QS]),
+            NewReq = Req#httpd{
+                path_parts = [DbName | Rest],
+                qs = NewQS
+            },
+            case Rest of
+                [] ->
+                    do_db_req(NewReq, Db);
+                [SecondPart|_] ->
+                    Handler = chttpd_handlers:db_handler(SecondPart, fun db_req/2),
+                    do_db_req(NewReq, Handler)
+            end;
+        false ->
+            throw({bad_request, <<"database is not partitioned">>})
+    end.
+
+
 handle_design_req(#httpd{
         path_parts=[_DbName, _Design, Name, <<"_",_/binary>> = Action | _Rest]
     }=Req, Db) ->
@@ -697,7 +732,7 @@ multi_all_docs_view(Req, Db, OP, Queries) ->
     ArgQueries = lists:map(fun({Query}) ->
         QueryArg1 = couch_mrview_http:parse_params(Query, undefined,
             Args1, [decoded]),
-        QueryArgs2 = couch_mrview_util:validate_args(QueryArg1),
+        QueryArgs2 = fabric_util:validate_all_docs_args(Db, QueryArg1),
         set_namespace(OP, QueryArgs2)
     end, Queries),
     Options = [{user_ctx, Req#httpd.user_ctx}],
@@ -717,7 +752,7 @@ multi_all_docs_view(Req, Db, OP, Queries) ->
 all_docs_view(Req, Db, Keys, OP) ->
     Args0 = couch_mrview_http:parse_params(Req, Keys),
     Args1 = Args0#mrargs{view_type=map},
-    Args2 = couch_mrview_util:validate_args(Args1),
+    Args2 = fabric_util:validate_all_docs_args(Db, Args1),
     Args3 = set_namespace(OP, Args2),
     Options = [{user_ctx, Req#httpd.user_ctx}],
     Max = chttpd:chunked_response_buffer_size(),
@@ -1659,8 +1694,8 @@ set_namespace(<<"_local_docs">>, Args) ->
     set_namespace(<<"_local">>, Args);
 set_namespace(<<"_design_docs">>, Args) ->
     set_namespace(<<"_design">>, Args);
-set_namespace(NS, #mrargs{extra = Extra} = Args) ->
-    Args#mrargs{extra = [{namespace, NS} | Extra]}.
+set_namespace(NS, #mrargs{} = Args) ->
+    couch_mrview_util:set_extra(Args, namespace, NS).
 
 
 %% /db/_bulk_get stuff
diff --git a/src/chttpd/src/chttpd_httpd_handlers.erl b/src/chttpd/src/chttpd_httpd_handlers.erl
index cb52e2c..000f29b 100644
--- a/src/chttpd/src/chttpd_httpd_handlers.erl
+++ b/src/chttpd/src/chttpd_httpd_handlers.erl
@@ -32,6 +32,7 @@ url_handler(_) -> no_match.
 db_handler(<<"_view_cleanup">>) -> fun chttpd_db:handle_view_cleanup_req/2;
 db_handler(<<"_compact">>)      -> fun chttpd_db:handle_compact_req/2;
 db_handler(<<"_design">>)       -> fun chttpd_db:handle_design_req/2;
+db_handler(<<"_partition">>)    -> fun chttpd_db:handle_partition_req/2;
 db_handler(<<"_temp_view">>)    -> fun chttpd_view:handle_temp_view_req/2;
 db_handler(<<"_changes">>)      -> fun chttpd_db:handle_changes_req/2;
 db_handler(_) -> no_match.
diff --git a/src/chttpd/src/chttpd_view.erl b/src/chttpd/src/chttpd_view.erl
index 3c05c64..e0f92d4 100644
--- a/src/chttpd/src/chttpd_view.erl
+++ b/src/chttpd/src/chttpd_view.erl
@@ -24,7 +24,7 @@ multi_query_view(Req, Db, DDoc, ViewName, Queries) ->
         QueryArg = couch_mrview_http:parse_params(Query, undefined,
             Args1, [decoded]),
         QueryArg1 = couch_mrview_util:set_view_type(QueryArg, ViewName, Views),
-        couch_mrview_util:validate_args(QueryArg1)
+        fabric_util:validate_args(Db, DDoc, QueryArg1)
     end, Queries),
     Options = [{user_ctx, Req#httpd.user_ctx}],
     VAcc0 = #vacc{db=Db, req=Req, prepend="\r\n"},
diff --git a/src/chttpd/test/chttpd_db_bulk_get_test.erl b/src/chttpd/test/chttpd_db_bulk_get_test.erl
index f892131..e46c9c3 100644
--- a/src/chttpd/test/chttpd_db_bulk_get_test.erl
+++ b/src/chttpd/test/chttpd_db_bulk_get_test.erl
@@ -95,7 +95,7 @@ should_get_doc_with_all_revs(Pid) ->
     DocRevB = #doc{id = DocId, body = {[{<<"_rev">>, <<"1-CDE">>}]}},
 
     mock_open_revs(all, {ok, [{ok, DocRevA}, {ok, DocRevB}]}),
-    chttpd_db:db_req(Req, nil),
+    chttpd_db:db_req(Req, test_util:fake_db([name, <<"foo">>])),
 
     [{Result}] = get_results_from_response(Pid),
     ?assertEqual(DocId, couch_util:get_value(<<"id">>, Result)),
@@ -115,7 +115,7 @@ should_validate_doc_with_bad_id(Pid) ->
     DocId = <<"_docudoc">>,
 
     Req = fake_request(DocId),
-    chttpd_db:db_req(Req, nil),
+    chttpd_db:db_req(Req, test_util:fake_db([name, <<"foo">>])),
 
     [{Result}] = get_results_from_response(Pid),
     ?assertEqual(DocId, couch_util:get_value(<<"id">>, Result)),
@@ -138,7 +138,7 @@ should_validate_doc_with_bad_rev(Pid) ->
     Rev = <<"revorev">>,
 
     Req = fake_request(DocId, Rev),
-    chttpd_db:db_req(Req, nil),
+    chttpd_db:db_req(Req, test_util:fake_db([name, <<"foo">>])),
 
     [{Result}] = get_results_from_response(Pid),
     ?assertEqual(DocId, couch_util:get_value(<<"id">>, Result)),
@@ -162,7 +162,7 @@ should_validate_missing_doc(Pid) ->
 
     Req = fake_request(DocId, Rev),
     mock_open_revs([{1,<<"revorev">>}], {ok, []}),
-    chttpd_db:db_req(Req, nil),
+    chttpd_db:db_req(Req, test_util:fake_db([name, <<"foo">>])),
 
     [{Result}] = get_results_from_response(Pid),
     ?assertEqual(DocId, couch_util:get_value(<<"id">>, Result)),
@@ -186,7 +186,7 @@ should_validate_bad_atts_since(Pid) ->
 
     Req = fake_request(DocId, Rev, <<"badattsince">>),
     mock_open_revs([{1,<<"revorev">>}], {ok, []}),
-    chttpd_db:db_req(Req, nil),
+    chttpd_db:db_req(Req, test_util:fake_db([name, <<"foo">>])),
 
     [{Result}] = get_results_from_response(Pid),
     ?assertEqual(DocId, couch_util:get_value(<<"id">>, Result)),
@@ -210,7 +210,7 @@ should_include_attachments_when_atts_since_specified(_) ->
 
     Req = fake_request(DocId, Rev, [<<"1-abc">>]),
     mock_open_revs([{1,<<"revorev">>}], {ok, []}),
-    chttpd_db:db_req(Req, nil),
+    chttpd_db:db_req(Req, test_util:fake_db([name, <<"foo">>])),
 
     ?_assert(meck:called(fabric, open_revs,
                          [nil, DocId, [{1, <<"revorev">>}],
diff --git a/src/couch/src/couch_btree.erl b/src/couch/src/couch_btree.erl
index ea224b1..c1ff26b 100644
--- a/src/couch/src/couch_btree.erl
+++ b/src/couch/src/couch_btree.erl
@@ -133,7 +133,9 @@ make_group_fun(Bt, exact) ->
     end;
 make_group_fun(Bt, GroupLevel) when is_integer(GroupLevel), GroupLevel > 0 ->
     fun
-        ({[_|_] = Key1, _}, {[_|_] = Key2, _}) ->
+        GF({{p, Partition, Key1}, Val1}, {{p, Partition, Key2}, Val2}) ->
+            GF({Key1, Val1}, {Key2, Val2});
+        GF({[_|_] = Key1, _}, {[_|_] = Key2, _}) ->
             SL1 = lists:sublist(Key1, GroupLevel),
             SL2 = lists:sublist(Key2, GroupLevel),
             case less(Bt, {SL1, nil}, {SL2, nil}) of
@@ -147,7 +149,7 @@ make_group_fun(Bt, GroupLevel) when is_integer(GroupLevel), GroupLevel
> 0 ->
                 _ ->
                     false
             end;
-        ({Key1, _}, {Key2, _}) ->
+        GF({Key1, _}, {Key2, _}) ->
             case less(Bt, {Key1, nil}, {Key2, nil}) of
                 false ->
                     case less(Bt, {Key2, nil}, {Key1, nil}) of
diff --git a/src/couch/src/couch_ejson_compare.erl b/src/couch/src/couch_ejson_compare.erl
index 81adbb8..ca36c86 100644
--- a/src/couch/src/couch_ejson_compare.erl
+++ b/src/couch/src/couch_ejson_compare.erl
@@ -22,6 +22,10 @@ init() ->
     Dir = code:priv_dir(couch),
     ok = erlang:load_nif(filename:join(Dir, ?MODULE), NumScheds).
 
+% partitioned row comparison
+less({p, PA, A}, {p, PB, B}) ->
+    less([PA, A], [PB, B]);
+
 less(A, B) ->
     try
         less_nif(A, B)
diff --git a/src/couch_mrview/include/couch_mrview.hrl b/src/couch_mrview/include/couch_mrview.hrl
index a341e30..e17aaba 100644
--- a/src/couch_mrview/include/couch_mrview.hrl
+++ b/src/couch_mrview/include/couch_mrview.hrl
@@ -20,6 +20,7 @@
     design_opts=[],
     seq_indexed=false,
     keyseq_indexed=false,
+    partitioned=false,
     lib,
     views,
     id_btree=nil,
diff --git a/src/couch_mrview/src/couch_mrview.erl b/src/couch_mrview/src/couch_mrview.erl
index 94c6ff0..d2038c0 100644
--- a/src/couch_mrview/src/couch_mrview.erl
+++ b/src/couch_mrview/src/couch_mrview.erl
@@ -59,6 +59,7 @@ validate_ddoc_fields(DDoc) ->
         [{<<"options">>, object}],
         [{<<"options">>, object}, {<<"include_design">>, boolean}],
         [{<<"options">>, object}, {<<"local_seq">>, boolean}],
+        [{<<"options">>, object}, {<<"partitioned">>, boolean}],
         [{<<"rewrites">>, [string, array]}],
         [{<<"shows">>, object}, {any, [object, string]}],
         [{<<"updates">>, object}, {any, [object, string]}],
@@ -200,9 +201,19 @@ validate(Db,  DDoc) ->
     end,
     {ok, #mrst{
         language = Lang,
-        views = Views
+        views = Views,
+        partitioned = Partitioned
     }} = couch_mrview_util:ddoc_to_mrst(couch_db:name(Db), DDoc),
 
+    case {couch_db:is_partitioned(Db), Partitioned} of
+        {false, true} ->
+            throw({invalid_design_doc,
+                <<"partitioned option cannot be true in a "
+                  "non-partitioned database.">>});
+        {_, _} ->
+            ok
+    end,
+
     try Views =/= [] andalso couch_query_servers:get_os_process(Lang) of
         false ->
             ok;
@@ -230,7 +241,7 @@ query_all_docs(Db, Args0, Callback, Acc) ->
         couch_index_util:hexsig(couch_hash:md5_hash(term_to_binary(Info)))
     end),
     Args1 = Args0#mrargs{view_type=map},
-    Args2 = couch_mrview_util:validate_args(Args1),
+    Args2 = couch_mrview_util:validate_all_docs_args(Db, Args1),
     {ok, Acc1} = case Args2#mrargs.preflight_fun of
         PFFun when is_function(PFFun, 2) -> PFFun(Sig, Acc);
         _ -> {ok, Acc}
@@ -616,6 +627,8 @@ red_fold(Db, {NthRed, _Lang, View}=RedView, Args, Callback, UAcc) ->
     end, Acc, OptList),
     finish_fold(Acc2, []).
 
+red_fold({p, _Partition, Key}, Red, Acc) ->
+    red_fold(Key, Red, Acc);
 red_fold(_Key, _Red, #mracc{skip=N}=Acc) when N > 0 ->
     {ok, Acc#mracc{skip=N-1, last_go=ok}};
 red_fold(Key, Red, #mracc{meta_sent=false}=Acc) ->
diff --git a/src/couch_mrview/src/couch_mrview_http.erl b/src/couch_mrview/src/couch_mrview_http.erl
index 004caef..d8b333a 100644
--- a/src/couch_mrview/src/couch_mrview_http.erl
+++ b/src/couch_mrview/src/couch_mrview_http.erl
@@ -296,7 +296,7 @@ multi_query_view(Req, Db, DDoc, ViewName, Queries) ->
     {ok, _, _, Args1} = couch_mrview_util:get_view(Db, DDoc, ViewName, Args0),
     ArgQueries = lists:map(fun({Query}) ->
         QueryArg = parse_params(Query, undefined, Args1),
-        couch_mrview_util:validate_args(QueryArg)
+        couch_mrview_util:validate_args(Db, DDoc, QueryArg)
     end, Queries),
     {ok, Resp2} = couch_httpd:etag_maybe(Req, fun() ->
         Max = chttpd:chunked_response_buffer_size(),
@@ -582,6 +582,9 @@ parse_param(Key, Val, Args, IsDecoded) ->
             Args#mrargs{callback=couch_util:to_binary(Val)};
         "sorted" ->
             Args#mrargs{sorted=parse_boolean(Val)};
+        "partition" ->
+            Partition = couch_util:to_binary(Val),
+            couch_mrview_util:set_extra(Args, partition, Partition);
         _ ->
             BKey = couch_util:to_binary(Key),
             BVal = couch_util:to_binary(Val),
diff --git a/src/couch_mrview/src/couch_mrview_index.erl b/src/couch_mrview/src/couch_mrview_index.erl
index 4718b56..2815c91 100644
--- a/src/couch_mrview/src/couch_mrview_index.erl
+++ b/src/couch_mrview/src/couch_mrview_index.erl
@@ -40,10 +40,12 @@ get(update_options, #mrst{design_opts = Opts}) ->
     LocalSeq = couch_util:get_value(<<"local_seq">>, Opts, false),
     SeqIndexed = couch_util:get_value(<<"seq_indexed">>, Opts, false),
     KeySeqIndexed = couch_util:get_value(<<"keyseq_indexed">>, Opts, false),
+    Partitioned = couch_util:get_value(<<"partitioned">>, Opts, false),
     if IncDesign -> [include_design]; true -> [] end
         ++ if LocalSeq -> [local_seq]; true -> [] end
         ++ if KeySeqIndexed -> [keyseq_indexed]; true -> [] end
-        ++ if SeqIndexed -> [seq_indexed]; true -> [] end;
+        ++ if SeqIndexed -> [seq_indexed]; true -> [] end
+        ++ if Partitioned -> [partitioned]; true -> [] end;
 get(fd, #mrst{fd = Fd}) ->
     Fd;
 get(language, #mrst{language = Language}) ->
@@ -94,14 +96,15 @@ get(Other, _) ->
 
 
 init(Db, DDoc) ->
-    couch_mrview_util:ddoc_to_mrst(couch_db:name(Db), DDoc).
+    {ok, State} = couch_mrview_util:ddoc_to_mrst(couch_db:name(Db), DDoc),
+    {ok, set_partitioned(Db, State)}.
 
 
-open(Db, State) ->
+open(Db, State0) ->
     #mrst{
         db_name=DbName,
         sig=Sig
-    } = State,
+    } = State = set_partitioned(Db, State0),
     IndexFName = couch_mrview_util:index_file(DbName, Sig),
 
     % If we are upgrading from <=1.2.x, we upgrade the view
@@ -264,6 +267,29 @@ get_ddoc(DbName, DesignDocs, DDocId) ->
     end).
 
 
+set_partitioned(Db, State) ->
+    #mrst{
+        design_opts = DesignOpts
+    } = State,
+    DbPartitioned = couch_db:is_partitioned(Db),
+    ViewPartitioned = proplists:get_value(<<"partitioned">>, DesignOpts),
+    IsPartitioned = case {DbPartitioned, ViewPartitioned} of
+        {true, undefined} ->
+            true;
+        {true, true} ->
+            true;
+        {true, false} ->
+            false;
+        {false, undefined} ->
+            false;
+        {false, false} ->
+            false;
+        _ ->
+            throw({bad_request, <<"invalid partition option">>})
+    end,
+    State#mrst{partitioned = IsPartitioned}.
+
+
 ensure_local_purge_docs(DbName, DDocs) ->
     couch_util:with_db(DbName, fun(Db) ->
         lists:foreach(fun(DDoc) ->
diff --git a/src/couch_mrview/src/couch_mrview_updater.erl b/src/couch_mrview/src/couch_mrview_updater.erl
index 3383b49..fdfac0e 100644
--- a/src/couch_mrview/src/couch_mrview_updater.erl
+++ b/src/couch_mrview/src/couch_mrview_updater.erl
@@ -315,7 +315,8 @@ write_kvs(State, UpdateSeq, ViewKVs, DocIdKeys, Seqs, Log0) ->
     #mrst{
         id_btree=IdBtree,
         log_btree=LogBtree,
-        first_build=FirstBuild
+        first_build=FirstBuild,
+        partitioned=Partitioned
     } = State,
 
     Revs = dict:from_list(dict:fetch_keys(Log0)),
@@ -332,8 +333,9 @@ write_kvs(State, UpdateSeq, ViewKVs, DocIdKeys, Seqs, Log0) ->
         _ -> update_log(LogBtree, Log, Revs, Seqs, FirstBuild)
     end,
 
-    UpdateView = fun(#mrview{id_num=ViewId}=View, {ViewId, {KVs, SKVs}}) ->
+    UpdateView = fun(#mrview{id_num=ViewId}=View, {ViewId, {KVs0, SKVs}}) ->
         #mrview{seq_indexed=SIndexed, keyseq_indexed=KSIndexed} = View,
+        KVs = if Partitioned -> inject_partition(KVs0); true -> KVs0 end,
         ToRem = couch_util:dict_find(ViewId, ToRemByView, []),
         {ok, VBtree2} = couch_btree:add_remove(View#mrview.btree, KVs, ToRem),
         NewUpdateSeq = case VBtree2 =/= View#mrview.btree of
@@ -382,6 +384,13 @@ write_kvs(State, UpdateSeq, ViewKVs, DocIdKeys, Seqs, Log0) ->
         log_btree=LogBtree2
     }.
 
+inject_partition(KVs) ->
+    [{{{p, partition(DocId), Key}, DocId}, Value} || {{Key, DocId}, Value} <- KVs].
+
+partition(DocId) ->
+    [Partition, _Rest] = binary:split(DocId, <<":">>),
+    Partition.
+
 update_id_btree(Btree, DocIdKeys, true) ->
     ToAdd = [{Id, DIKeys} || {Id, DIKeys} <- DocIdKeys, DIKeys /= []],
     couch_btree:query_modify(Btree, [], ToAdd, []);
diff --git a/src/couch_mrview/src/couch_mrview_util.erl b/src/couch_mrview/src/couch_mrview_util.erl
index 4fd82e0..930f68a 100644
--- a/src/couch_mrview/src/couch_mrview_util.erl
+++ b/src/couch_mrview/src/couch_mrview_util.erl
@@ -26,12 +26,13 @@
 -export([temp_view_to_ddoc/1]).
 -export([calculate_external_size/1]).
 -export([calculate_active_size/1]).
--export([validate_args/1]).
+-export([validate_all_docs_args/2, validate_args/1, validate_args/3]).
 -export([maybe_load_doc/3, maybe_load_doc/4]).
 -export([maybe_update_index_file/1]).
 -export([extract_view/4, extract_view_reduce/1]).
 -export([get_view_keys/1, get_view_queries/1]).
 -export([set_view_type/3]).
+-export([set_extra/3, get_extra/2, get_extra/3]).
 -export([changes_key_opts/2]).
 -export([fold_changes/4]).
 -export([to_key_seq/1]).
@@ -39,6 +40,12 @@
 -define(MOD, couch_mrview_index).
 -define(GET_VIEW_RETRY_COUNT, 1).
 -define(GET_VIEW_RETRY_DELAY, 50).
+-define(LOWEST_KEY, null).
+-define(HIGHEST_KEY, {<<255, 255, 255, 255>>}).
+-define(PARTITION_START(P), <<P/binary, $:>>).
+-define(PARTITION_END(P), <<P/binary, $;>>).
+-define(LOWEST(A, B), (if A < B -> A; true -> B end)).
+-define(HIGHEST(A, B), (if A > B -> A; true -> B end)).
 
 -include_lib("couch/include/couch_db.hrl").
 -include_lib("couch_mrview/include/couch_mrview.hrl").
@@ -94,7 +101,7 @@ get_view(Db, DDoc, ViewName, Args0) ->
 get_view_index_pid(Db, DDoc, ViewName, Args0) ->
     ArgCheck = fun(InitState) ->
         Args1 = set_view_type(Args0, ViewName, InitState#mrst.views),
-        {ok, validate_args(Args1)}
+        {ok, validate_args(InitState, Args1)}
     end,
     couch_index_server:get_index(?MOD, Db, DDoc, ArgCheck).
 
@@ -169,6 +176,7 @@ ddoc_to_mrst(DbName, #doc{id=Id, body={Fields}}) ->
     {DesignOpts} = proplists:get_value(<<"options">>, Fields, {[]}),
     SeqIndexed = proplists:get_value(<<"seq_indexed">>, DesignOpts, false),
     KeySeqIndexed = proplists:get_value(<<"keyseq_indexed">>, DesignOpts, false),
+    Partitioned = proplists:get_value(<<"partitioned">>, DesignOpts, false),
 
     {RawViews} = couch_util:get_value(<<"views">>, Fields, {[]}),
     BySrc = lists:foldl(MakeDict, dict:new(), RawViews),
@@ -189,7 +197,8 @@ ddoc_to_mrst(DbName, #doc{id=Id, body={Fields}}) ->
         language=Language,
         design_opts=DesignOpts,
         seq_indexed=SeqIndexed,
-        keyseq_indexed=KeySeqIndexed
+        keyseq_indexed=KeySeqIndexed,
+        partitioned=Partitioned
     },
     SigInfo = {Views, Language, DesignOpts, couch_index_util:sort_lib(Lib)},
     {ok, IdxState#mrst{sig=couch_hash:md5_hash(term_to_binary(SigInfo))}}.
@@ -213,6 +222,19 @@ set_view_type(Args, ViewName, [View | Rest]) ->
     end.
 
 
+set_extra(#mrargs{} = Args, Key, Value) ->
+    Extra0 = Args#mrargs.extra,
+    Extra1 = lists:ukeysort(1, [{Key, Value} | Extra0]),
+    Args#mrargs{extra = Extra1}.
+
+
+get_extra(#mrargs{} = Args, Key) ->
+    couch_util:get_value(Key, Args#mrargs.extra).
+
+get_extra(#mrargs{} = Args, Key, Default) ->
+    couch_util:get_value(Key, Args#mrargs.extra, Default).
+
+
 extract_view(_Lang, _Args, _ViewName, []) ->
     throw({not_found, missing_named_view});
 extract_view(Lang, #mrargs{view_type=map}=Args, Name, [View | Rest]) ->
@@ -476,6 +498,45 @@ fold_reduce({NthRed, Lang, View}, Fun,  Acc, Options) ->
     couch_btree:fold_reduce(Bt, WrapperFun, Acc, Options).
 
 
+validate_args(Db, DDoc, Args) ->
+    {ok, State} = couch_mrview_index:init(Db, DDoc),
+    validate_args(State, Args).
+
+
+validate_args(#mrst{} = State, Args0) ->
+    Args = validate_args(Args0),
+
+    ViewPartitioned = State#mrst.partitioned,
+    Partition = get_extra(Args, partition),
+
+    case {ViewPartitioned, Partition} of
+        {true, undefined} ->
+            mrverror(<<"`partition` parameter is mandatory for this view.">>);
+        {true, _} ->
+            apply_partition(Args, Partition);
+        {false, undefined} ->
+            Args;
+        {false, Value} when is_binary(Value) ->
+            mrverror(<<"`partition` parameter is not supported on this view">>)
+    end.
+
+
+validate_all_docs_args(Db, Args0) ->
+    Args = validate_args(Args0),
+
+    DbPartitioned = couch_db:is_partitioned(Db),
+    Partition = get_extra(Args, partition),
+
+    case {DbPartitioned, Partition} of
+        {false, <<_/binary>>} ->
+            mrverror(<<"`partition` paramter is not support on this db">>);
+        {_, <<_/binary>>} ->
+            apply_all_docs_partition(Args, Partition);
+        _ ->
+            Args
+    end.
+
+
 validate_args(Args) ->
     GroupLevel = determine_group_level(Args),
     Reduce = Args#mrargs.reduce,
@@ -598,6 +659,12 @@ validate_args(Args) ->
         _ -> mrverror(<<"Invalid value for `sorted`.">>)
     end,
 
+    case get_extra(Args, partition) of
+        undefined -> ok;
+        Partition when is_binary(Partition) -> ok;
+        _ -> mrverror(<<"Invalid value for `partition`.">>)
+    end,
+
     Args#mrargs{
         start_key_docid=SKDocId,
         end_key_docid=EKDocId,
@@ -616,6 +683,62 @@ determine_group_level(#mrargs{group=true, group_level=undefined}) ->
 determine_group_level(#mrargs{group_level=GroupLevel}) ->
     GroupLevel.
 
+apply_partition(#mrargs{keys=[{p, _, _} | _]} = Args, _Partition) ->
+    Args; % already applied
+
+apply_partition(#mrargs{keys=Keys} = Args, Partition) when Keys /= undefined ->
+    Args#mrargs{keys=[{p, Partition, K} || K <- Keys]};
+
+apply_partition(#mrargs{start_key={p, _, _}, end_key={p, _, _}} = Args, _Partition) ->
+    Args; % already applied.
+
+apply_partition(Args, Partition) ->
+    #mrargs{
+        direction = Dir,
+        start_key = StartKey,
+        end_key = EndKey
+    } = Args,
+
+    {DefSK, DefEK} = case Dir of
+        fwd -> {?LOWEST_KEY, ?HIGHEST_KEY};
+        rev -> {?HIGHEST_KEY, ?LOWEST_KEY}
+    end,
+
+    SK0 = if StartKey /= undefined -> StartKey; true -> DefSK end,
+    EK0 = if EndKey /= undefined -> EndKey; true -> DefEK end,
+
+    Args#mrargs{
+        start_key = {p, Partition, SK0},
+        end_key = {p, Partition, EK0}
+    }.
+
+%% all_docs is special as it's not really a view and is already
+%% effectively partitioned as the partition is a prefix of all keys.
+apply_all_docs_partition(#mrargs{} = Args, Partition) ->
+    #mrargs{
+        direction = Dir,
+        start_key = StartKey,
+        end_key = EndKey
+    } = Args,
+
+    {DefSK, DefEK} = case Dir of
+        fwd -> {?PARTITION_START(Partition), ?PARTITION_END(Partition)};
+        rev -> {?PARTITION_END(Partition), ?PARTITION_START(Partition)}
+    end,
+
+    SK0 = if StartKey == undefined -> DefSK; true -> StartKey end,
+    EK0 = if EndKey == undefined -> DefEK; true -> EndKey end,
+
+    {SK1, EK1} = case Dir of
+        fwd -> {?HIGHEST(DefSK, SK0), ?LOWEST(DefEK, EK0)};
+        rev -> {?LOWEST(DefSK, SK0), ?HIGHEST(DefEK, EK0)}
+    end,
+
+    Args#mrargs{
+        start_key = SK1,
+        end_key = EK1
+    }.
+
 
 check_range(#mrargs{start_key=undefined}, _Cmp) ->
     ok;
diff --git a/src/fabric/src/fabric.erl b/src/fabric/src/fabric.erl
index e796c91..c62f780 100644
--- a/src/fabric/src/fabric.erl
+++ b/src/fabric/src/fabric.erl
@@ -381,10 +381,11 @@ query_view(Db, Options, GroupId, ViewName, Callback, Acc0, QueryArgs)
         when is_binary(GroupId) ->
     DbName = dbname(Db),
     {ok, DDoc} = ddoc_cache:open(DbName, <<"_design/", GroupId/binary>>),
-    query_view(DbName, Options, DDoc, ViewName, Callback, Acc0, QueryArgs);
-query_view(DbName, Options, DDoc, ViewName, Callback, Acc0, QueryArgs0) ->
-    Db = dbname(DbName), View = name(ViewName),
-    case fabric_util:is_users_db(Db) of
+    query_view(Db, Options, DDoc, ViewName, Callback, Acc0, QueryArgs);
+query_view(Db342, Options, DDoc, ViewName, Callback, Acc0, QueryArgs0) ->
+    DbName = dbname(Db342),
+    View = name(ViewName),
+    case fabric_util:is_users_db(DbName) of
     true ->
         FakeDb = fabric_util:make_cluster_db(DbName, Options),
         couch_users_db:after_doc_read(DDoc, FakeDb);
@@ -392,14 +393,14 @@ query_view(DbName, Options, DDoc, ViewName, Callback, Acc0, QueryArgs0)
->
         ok
     end,
     {ok, #mrst{views=Views, language=Lang}} =
-        couch_mrview_util:ddoc_to_mrst(Db, DDoc),
+        couch_mrview_util:ddoc_to_mrst(DbName, DDoc),
     QueryArgs1 = couch_mrview_util:set_view_type(QueryArgs0, View, Views),
-    QueryArgs2 = couch_mrview_util:validate_args(QueryArgs1),
+    QueryArgs2 = fabric_util:validate_args(Db342, DDoc, QueryArgs1),
     VInfo = couch_mrview_util:extract_view(Lang, QueryArgs2, View, Views),
     case is_reduce_view(QueryArgs2) of
         true ->
             fabric_view_reduce:go(
-                Db,
+                Db342,
                 DDoc,
                 View,
                 QueryArgs2,
@@ -409,7 +410,7 @@ query_view(DbName, Options, DDoc, ViewName, Callback, Acc0, QueryArgs0)
->
             );
         false ->
             fabric_view_map:go(
-                Db,
+                Db342,
                 Options,
                 DDoc,
                 View,
diff --git a/src/fabric/src/fabric_util.erl b/src/fabric/src/fabric_util.erl
index 298921b..80918e7 100644
--- a/src/fabric/src/fabric_util.erl
+++ b/src/fabric/src/fabric_util.erl
@@ -21,6 +21,7 @@
 -export([is_users_db/1, is_replicator_db/1]).
 -export([make_cluster_db/1, make_cluster_db/2]).
 -export([is_partitioned/1]).
+-export([validate_all_docs_args/2, validate_args/3]).
 -export([upgrade_mrargs/1]).
 
 -compile({inline, [{doc_id_and_rev,1}]}).
@@ -66,7 +67,6 @@ stream_start(Workers0, Keypos, StartFun, Replacements) ->
     Timeout = request_timeout(),
     case rexi_utils:recv(Workers0, Keypos, Fun, Acc, Timeout, infinity) of
         {ok, #stream_acc{workers=Workers}} ->
-            true = fabric_view:is_progress_possible(Workers),
             AckedWorkers = fabric_dict:fold(fun(Worker, From, WorkerAcc) ->
                 rexi:stream_start(From),
                 [Worker | WorkerAcc]
@@ -336,6 +336,26 @@ is_partitioned(Db) ->
     couch_db:is_partitioned(Db).
 
 
+validate_all_docs_args(DbName, Args) when is_binary(DbName) ->
+    Shards = mem3:shards(fabric:dbname(DbName)),
+    Db = make_cluster_db(hd(Shards)),
+    validate_all_docs_args(Db, Args);
+
+validate_all_docs_args(Db, Args) ->
+    true = couch_db:is_clustered(Db),
+    couch_mrview_util:validate_all_docs_args(Db, Args).
+
+
+validate_args(DbName, DDoc, Args) when is_binary(DbName) ->
+    Shards = mem3:shards(fabric:dbname(DbName)),
+    Db = make_cluster_db(hd(Shards)),
+    validate_args(Db, DDoc, Args);
+
+validate_args(Db, DDoc, Args) ->
+    true = couch_db:is_clustered(Db),
+    couch_mrview_util:validate_args(Db, DDoc, Args).
+
+
 upgrade_mrargs(#mrargs{} = Args) ->
     Args;
 
diff --git a/src/fabric/src/fabric_view.erl b/src/fabric/src/fabric_view.erl
index 69f4290..81eb6f0 100644
--- a/src/fabric/src/fabric_view.erl
+++ b/src/fabric/src/fabric_view.erl
@@ -128,8 +128,11 @@ maybe_send_row(State) ->
         try get_next_row(State) of
         {_, NewState} when Skip > 0 ->
             maybe_send_row(NewState#collector{skip=Skip-1});
-        {Row, NewState} ->
-            case Callback(transform_row(possibly_embed_doc(NewState,Row)), AccIn) of
+        {Row0, NewState} ->
+            Row1 = possibly_embed_doc(NewState, Row0),
+            Row2 = detach_partition(Row1),
+            Row3 = transform_row(Row2),
+            case Callback(Row3, AccIn) of
             {stop, Acc} ->
                 {stop, NewState#collector{user_acc=Acc, limit=Limit-1}};
             {ok, Acc} ->
@@ -194,6 +197,10 @@ possibly_embed_doc(#collector{db_name=DbName, query_args=Args},
         _ -> Row
     end.
 
+detach_partition(#view_row{key={p, _Partition, Key}} = Row) ->
+    Row#view_row{key = Key};
+detach_partition(#view_row{} = Row) ->
+    Row.
 
 keydict(undefined) ->
     undefined;
@@ -309,10 +316,23 @@ index_of(X, [X|_Rest], I) ->
 index_of(X, [_|Rest], I) ->
     index_of(X, Rest, I+1).
 
-get_shards(DbName, #mrargs{stable=true}) ->
-    mem3:ushards(DbName);
-get_shards(DbName, #mrargs{stable=false}) ->
-    mem3:shards(DbName).
+get_shards(Db, #mrargs{} = Args) ->
+    DbPartitioned = fabric_util:is_partitioned(Db),
+    Partition = couch_mrview_util:get_extra(Args, partition),
+    if DbPartitioned orelse Partition == undefined -> ok; true ->
+        throw({bad_request, <<"partition specified on non-partitioned db">>})
+    end,
+    DbName = fabric:dbname(Db),
+    case {Args#mrargs.stable, Partition} of
+        {true, undefined} ->
+            mem3:ushards(DbName);
+        {true, Partition} ->
+            mem3:ushards(DbName, <<Partition/binary, ":foo">>);
+        {false, undefined} ->
+            mem3:shards(DbName);
+        {false, Partition} ->
+            mem3:shards(DbName, <<Partition/binary, ":foo">>)
+    end.
 
 maybe_update_others(DbName, DDoc, ShardsInvolved, ViewName,
     #mrargs{update=lazy} = Args) ->
diff --git a/src/fabric/src/fabric_view_all_docs.erl b/src/fabric/src/fabric_view_all_docs.erl
index ac16dac..6acc792 100644
--- a/src/fabric/src/fabric_view_all_docs.erl
+++ b/src/fabric/src/fabric_view_all_docs.erl
@@ -20,8 +20,9 @@
 -include_lib("couch/include/couch_db.hrl").
 -include_lib("couch_mrview/include/couch_mrview.hrl").
 
-go(DbName, Options, #mrargs{keys=undefined} = QueryArgs, Callback, Acc) ->
-    Shards = mem3:shards(DbName),
+go(Db, Options, #mrargs{keys=undefined} = QueryArgs, Callback, Acc) ->
+    DbName = fabric:dbname(Db),
+    Shards = shards(Db, QueryArgs),
     Workers0 = fabric_util:submit_jobs(
             Shards, fabric_rpc, all_docs, [Options, QueryArgs]),
     RexiMon = fabric_util:create_monitors(Workers0),
diff --git a/src/fabric/src/fabric_view_map.erl b/src/fabric/src/fabric_view_map.erl
index b6a3d6f..1648623 100644
--- a/src/fabric/src/fabric_view_map.erl
+++ b/src/fabric/src/fabric_view_map.erl
@@ -24,8 +24,9 @@ go(DbName, Options, GroupId, View, Args, Callback, Acc, VInfo)
     {ok, DDoc} = fabric:open_doc(DbName, <<"_design/", GroupId/binary>>, []),
     go(DbName, Options, DDoc, View, Args, Callback, Acc, VInfo);
 
-go(DbName, Options, DDoc, View, Args, Callback, Acc, VInfo) ->
-    Shards = fabric_view:get_shards(DbName, Args),
+go(Db, Options, DDoc, View, Args, Callback, Acc, VInfo) ->
+    DbName = fabric:dbname(Db),
+    Shards = fabric_view:get_shards(Db, Args),
     DocIdAndRev = fabric_util:doc_id_and_rev(DDoc),
     fabric_view:maybe_update_others(DbName, DocIdAndRev, Shards, View, Args),
     Repls = fabric_view:get_shard_replacements(DbName, Shards),
diff --git a/src/fabric/src/fabric_view_reduce.erl b/src/fabric/src/fabric_view_reduce.erl
index a74be10..7acc67c 100644
--- a/src/fabric/src/fabric_view_reduce.erl
+++ b/src/fabric/src/fabric_view_reduce.erl
@@ -23,10 +23,11 @@ go(DbName, GroupId, View, Args, Callback, Acc0, VInfo) when is_binary(GroupId)
-
     {ok, DDoc} = fabric:open_doc(DbName, <<"_design/", GroupId/binary>>, []),
     go(DbName, DDoc, View, Args, Callback, Acc0, VInfo);
 
-go(DbName, DDoc, VName, Args, Callback, Acc, VInfo) ->
+go(Db, DDoc, VName, Args, Callback, Acc, VInfo) ->
+    DbName = fabric:dbname(Db),
     DocIdAndRev = fabric_util:doc_id_and_rev(DDoc),
     RPCArgs = [DocIdAndRev, VName, Args],
-    Shards = fabric_view:get_shards(DbName, Args),
+    Shards = fabric_view:get_shards(Db, Args),
     fabric_view:maybe_update_others(DbName, DocIdAndRev, Shards, VName, Args),
     Repls = fabric_view:get_shard_replacements(DbName, Shards),
     StartFun = fun(Shard) ->


Mime
View raw message