couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rnew...@apache.org
Subject couch-replicator commit: updated refs/heads/master to 9865499
Date Mon, 18 Apr 2016 17:41:05 GMT
Repository: couchdb-couch-replicator
Updated Branches:
  refs/heads/master 9ee3e19b9 -> 986549984


Implement Mango selectors for replication

Replication document should have a "selector"
field with a Mango selector JSON object
as the value.

For example:
```
{
    "_id": "r",
    "continuous": true,
    "selector": {
        "_id": {
            "$gte": "d2"
        }
    },
    "source": "http://adm:pass@localhost:15984/a",
    "target": "http://adm:pass@localhost:15984/b"
}
```

This feature underneath uses the _changes feed
Mango selectors capability.

Replicator docs js validation function has been
updated to return an error if it notices user has
specified both `doc_ids` and `selector`. Or
they specified `filter` and either of the other
two.

Replication options parsing also checks for those
mutually exclusive fields, as replications can be
started from the `_replicate` endpoint not just
via the docs in `*_replicator` dbs.

When generating a replication id, Mango selector
object is normalized and sorted (JSON fields
are sorted inside objects only). That is done in order
to reduce the chance of creating two different
replication checkpoints for same Mango selector.

Jira: COUCHDB-2988


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/98654998
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/98654998
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/98654998

Branch: refs/heads/master
Commit: 986549984be44c7e5cd66f3773d2bdd94f506e5c
Parents: 9ee3e19
Author: Nick Vatamaniuc <vatamane@gmail.com>
Authored: Fri Apr 15 18:04:17 2016 -0400
Committer: Nick Vatamaniuc <vatamane@gmail.com>
Committed: Mon Apr 18 11:28:53 2016 -0400

----------------------------------------------------------------------
 src/couch_replicator_api_wrap.erl        |  25 ++++--
 src/couch_replicator_js_functions.hrl    |  19 ++++
 src/couch_replicator_utils.erl           | 110 ++++++++++++++++++++---
 test/couch_replicator_selector_tests.erl | 121 ++++++++++++++++++++++++++
 4 files changed, 255 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/98654998/src/couch_replicator_api_wrap.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_api_wrap.erl b/src/couch_replicator_api_wrap.erl
index b7d3bb6..ff6b00c 100644
--- a/src/couch_replicator_api_wrap.erl
+++ b/src/couch_replicator_api_wrap.erl
@@ -469,11 +469,16 @@ changes_since(#httpdb{headers = Headers1, timeout = InactiveTimeout}
= HttpDb,
         {"timeout", integer_to_list(Timeout)}
            ],
     DocIds = get_value(doc_ids, Options),
-    {QArgs, Method, Body, Headers} = case DocIds of
-    undefined ->
+    Selector = get_value(selector, Options),
+    {QArgs, Method, Body, Headers} = case {DocIds, Selector} of
+    {undefined, undefined} ->
         QArgs1 = maybe_add_changes_filter_q_args(BaseQArgs, Options),
         {QArgs1, get, [], Headers1};
-    _ when is_list(DocIds) ->
+    {undefined, _} when is_tuple(Selector) ->
+        Headers2 = [{"Content-Type", "application/json"} | Headers1],
+        JsonSelector = ?JSON_ENCODE({[{<<"selector">>, Selector}]}),
+        {[{"filter", "_selector"} | BaseQArgs], post, JsonSelector, Headers2};
+    {_, undefined} when is_list(DocIds) ->
         Headers2 = [{"Content-Type", "application/json"} | Headers1],
         JsonDocIds = ?JSON_ENCODE({[{<<"doc_ids">>, DocIds}]}),
         {[{"filter", "_doc_ids"} | BaseQArgs], post, JsonDocIds, Headers2}
@@ -506,11 +511,15 @@ changes_since(#httpdb{headers = Headers1, timeout = InactiveTimeout}
= HttpDb,
                     end)
         end);
 changes_since(Db, Style, StartSeq, UserFun, Options) ->
-    Filter = case get_value(doc_ids, Options) of
-    undefined ->
+    DocIds = get_value(doc_ids, Options),
+    Selector = get_value(selector, Options),
+    Filter = case {DocIds, Selector} of
+    {undefined, undefined} ->
         ?b2l(get_value(filter, Options, <<>>));
-    _DocIds ->
-        "_doc_ids"
+    {_, undefined} ->
+        "_doc_ids";
+    {undefined, _} ->
+        "_selector"
     end,
     Args = #changes_args{
         style = Style,
@@ -580,6 +589,8 @@ changes_json_req(_Db, "", _QueryParams, _Options) ->
     {[]};
 changes_json_req(_Db, "_doc_ids", _QueryParams, Options) ->
     {[{<<"doc_ids">>, get_value(doc_ids, Options)}]};
+changes_json_req(_Db, "_selector", _QueryParams, Options) ->
+    {[{<<"selector">>, get_value(selector, Options)}]};
 changes_json_req(Db, FilterName, {QueryParams}, _Options) ->
     {ok, Info} = couch_db:get_db_info(Db),
     % simulate a request to db_name/_changes

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/98654998/src/couch_replicator_js_functions.hrl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_js_functions.hrl b/src/couch_replicator_js_functions.hrl
index 3f1db7c..f3f7ab6 100644
--- a/src/couch_replicator_js_functions.hrl
+++ b/src/couch_replicator_js_functions.hrl
@@ -81,12 +81,31 @@
                 reportError('The `doc_ids\\' field must be an array of strings.');
             }
 
+            if ((typeof newDoc.selector !== 'undefined') &&
+                (typeof newDoc.selector !== 'object')) {
+
+                reportError('The `selector\\' field must be an object.');
+            }
+
             if ((typeof newDoc.filter !== 'undefined') &&
                 ((typeof newDoc.filter !== 'string') || !newDoc.filter)) {
 
                 reportError('The `filter\\' field must be a non-empty string.');
             }
 
+            if ((typeof newDoc.doc_ids !== 'undefined') &&
+                (typeof newDoc.selector !== 'undefined')) {
+
+                reportError('`doc_ids\\' field is incompatible with `selector\\'.');
+            }
+
+            if ( ((typeof newDoc.doc_ids !== 'undefined') ||
+                  (typeof newDoc.selector !== 'undefined')) &&
+                 (typeof newDoc.filter !== 'undefined') ) {
+
+                reportError('`filter\\' field is incompatible with `selector\\' and `doc_ids\\'.');
+            }
+
             if ((typeof newDoc.query_params !== 'undefined') &&
                 ((typeof newDoc.query_params !== 'object') ||
                     newDoc.query_params === null)) {

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/98654998/src/couch_replicator_utils.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_utils.erl b/src/couch_replicator_utils.erl
index dde30f6..76bc8e1 100644
--- a/src/couch_replicator_utils.erl
+++ b/src/couch_replicator_utils.erl
@@ -115,20 +115,24 @@ replication_id(#rep{user_ctx = UserCtx} = Rep, 1) ->
 
 maybe_append_filters(Base,
         #rep{source = Source, user_ctx = UserCtx, options = Options}) ->
+    Filter = get_value(filter, Options),
+    DocIds = get_value(doc_ids, Options),
+    Selector = get_value(selector, Options),
     Base2 = Base ++
-        case get_value(filter, Options) of
-        undefined ->
-            case get_value(doc_ids, Options) of
-            undefined ->
-                [];
-            DocIds ->
-                [DocIds]
-            end;
-        <<"_", _/binary>> = Filter ->
-                [Filter, get_value(query_params, Options, {[]})];
-        Filter ->
+        case {Filter, DocIds, Selector} of
+        {undefined, undefined, undefined} ->
+            [];
+        {<<"_", _/binary>>, undefined, undefined} ->
+            [Filter, get_value(query_params, Options, {[]})];
+        {_, undefined, undefined} ->
             [filter_code(Filter, Source, UserCtx),
-                get_value(query_params, Options, {[]})]
+                get_value(query_params, Options, {[]})];
+        {undefined, _, undefined} ->
+            [DocIds];
+        {undefined, undefined, _} ->
+            [ejsort(mango_selector:normalize(Selector))];
+        _ ->
+            throw({error, <<"`selector`, `filter` and `doc_ids` fields are mutually
exclusive">>})
         end,
     couch_util:to_hex(couch_crypto:hash(md5, term_to_binary(Base2))).
 
@@ -248,7 +252,8 @@ maybe_add_trailing_slash(Url) ->
 
 
 make_options(Props) ->
-    Options = lists:ukeysort(1, convert_options(Props)),
+    Options0 = lists:ukeysort(1, convert_options(Props)),
+    Options = check_options(Options0),
     DefWorkers = config:get("replicator", "worker_processes", "4"),
     DefBatchSize = config:get("replicator", "worker_batch_size", "500"),
     DefConns = config:get("replicator", "http_connections", "20"),
@@ -296,6 +301,10 @@ convert_options([{<<"doc_ids">>, V} | R]) ->
     % encoded doc IDs.
     DocIds = [?l2b(couch_httpd:unquote(Id)) || Id <- V],
     [{doc_ids, DocIds} | convert_options(R)];
+convert_options([{<<"selector">>, V} | _R]) when not is_tuple(V) ->
+    throw({bad_request, <<"parameter `selector` must be a JSON object">>});
+convert_options([{<<"selector">>, V} | R]) ->
+    [{selector, V} | convert_options(R)];
 convert_options([{<<"worker_processes">>, V} | R]) ->
     [{worker_processes, couch_util:to_integer(V)} | convert_options(R)];
 convert_options([{<<"worker_batch_size">>, V} | R]) ->
@@ -318,6 +327,19 @@ convert_options([{<<"checkpoint_interval">>, V} | R]) ->
 convert_options([_ | R]) -> % skip unknown option
     convert_options(R).
 
+check_options(Options) ->
+    DocIds = lists:keyfind(doc_ids, 1, Options),
+    Filter = lists:keyfind(filter, 1, Options),
+    Selector = lists:keyfind(selector, 1, Options),
+    case {DocIds, Filter, Selector} of
+        {false, false, false} -> Options;
+        {false, false, _} -> Options;
+        {false, _, false} -> Options;
+        {_, false, false} -> Options;
+        _ ->
+            throw({bad_request, "`doc_ids`, `filter`, `selector` are mutually exclusive options"})
+    end.
+
 
 parse_proxy_params(ProxyUrl) when is_binary(ProxyUrl) ->
     parse_proxy_params(?b2l(ProxyUrl));
@@ -458,3 +480,65 @@ is_deleted(Change) ->
     Else ->
         Else
     end.
+
+
+% Sort an EJSON object's properties to attempt
+% to generate a unique representation. This is used
+% to reduce the chance of getting different
+% replication checkpoints for the same Mango selector
+ejsort({V})->
+    ejsort_props(V, []);
+ejsort(V) when is_list(V) ->
+    ejsort_array(V, []);
+ejsort(V) ->
+    V.
+
+ejsort_props([], Acc)->
+    {lists:keysort(1, Acc)};
+ejsort_props([{K, V}| R], Acc) ->
+    ejsort_props(R, [{K, ejsort(V)} | Acc]).
+
+ejsort_array([], Acc)->
+    lists:reverse(Acc);
+ejsort_array([V | R], Acc) ->
+    ejsort_array(R, [ejsort(V) | Acc]).
+
+
+-ifdef(TEST).
+
+-include_lib("eunit/include/eunit.hrl").
+
+ejsort_basic_values_test() ->
+    ?assertEqual(ejsort(0), 0),
+    ?assertEqual(ejsort(<<"a">>), <<"a">>),
+    ?assertEqual(ejsort(true), true),
+    ?assertEqual(ejsort([]), []),
+    ?assertEqual(ejsort({[]}), {[]}).
+
+ejsort_compound_values_test() ->
+    ?assertEqual(ejsort([2, 1, 3 ,<<"a">>]), [2, 1, 3, <<"a">>]),
+    Ej1 = {[{<<"a">>, 0}, {<<"c">>, 0},  {<<"b">>, 0}]},
+    Ej1s =  {[{<<"a">>, 0}, {<<"b">>, 0}, {<<"c">>, 0}]},
+    ?assertEqual(ejsort(Ej1), Ej1s),
+    Ej2 = {[{<<"x">>, Ej1}, {<<"z">>, Ej1}, {<<"y">>,
[Ej1, Ej1]}]},
+    ?assertEqual(ejsort(Ej2),
+        {[{<<"x">>, Ej1s}, {<<"y">>, [Ej1s, Ej1s]}, {<<"z">>,
Ej1s}]}).
+
+check_options_pass_values_test() ->
+    ?assertEqual(check_options([]), []),
+    ?assertEqual(check_options([baz, {other,fiz}]), [baz, {other, fiz}]),
+    ?assertEqual(check_options([{doc_ids, x}]), [{doc_ids, x}]),
+    ?assertEqual(check_options([{filter, x}]), [{filter, x}]),
+    ?assertEqual(check_options([{selector, x}]), [{selector, x}]).
+
+check_options_fail_values_test() ->
+    ?assertThrow({bad_request, _},
+        check_options([{doc_ids, x}, {filter, y}])),
+    ?assertThrow({bad_request, _},
+        check_options([{doc_ids, x}, {selector, y}])),
+    ?assertThrow({bad_request, _},
+        check_options([{filter, x}, {selector, y}])),
+    ?assertThrow({bad_request, _},
+        check_options([{doc_ids, x}, {selector, y}, {filter, z}])).
+
+-endif.

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/98654998/test/couch_replicator_selector_tests.erl
----------------------------------------------------------------------
diff --git a/test/couch_replicator_selector_tests.erl b/test/couch_replicator_selector_tests.erl
new file mode 100644
index 0000000..98c6099
--- /dev/null
+++ b/test/couch_replicator_selector_tests.erl
@@ -0,0 +1,121 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_selector_tests).
+
+-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch_replicator/src/couch_replicator.hrl").
+
+
+setup(_) ->
+    Ctx = test_util:start_couch([couch_replicator]),
+    Source = create_db(),
+    create_docs(Source),
+    Target = create_db(),
+    {Ctx, {Source, Target}}.
+
+teardown(_, {Ctx, {Source, Target}}) ->
+    delete_db(Source),
+    delete_db(Target),
+    ok = application:stop(couch_replicator),
+    ok = test_util:stop_couch(Ctx).
+
+selector_replication_test_() ->
+    Pairs = [{local, local}, {local, remote},
+             {remote, local}, {remote, remote}],
+    {
+        "Selector filtered replication tests",
+        {
+            foreachx,
+            fun setup/1, fun teardown/2,
+            [{Pair, fun should_succeed/2} || Pair <- Pairs]
+        }
+    }.
+
+should_succeed({From, To}, {_Ctx, {Source, Target}}) ->
+    RepObject = {[
+        {<<"source">>, db_url(From, Source)},
+        {<<"target">>, db_url(To, Target)},
+        {<<"selector">>, {[{<<"_id">>, <<"doc2">>}]}}
+    ]},
+    {ok, _} = couch_replicator:replicate(RepObject, ?ADMIN_USER),
+    %% FilteredFun is an Erlang version of following mango selector
+    FilterFun = fun(_DocId, {Props}) ->
+        couch_util:get_value(<<"_id">>, Props) == <<"doc2">>
+    end,
+    {ok, TargetDbInfo, AllReplies} = compare_dbs(Source, Target, FilterFun),
+    {lists:flatten(io_lib:format("~p -> ~p", [From, To])), [
+        {"Target DB has proper number of docs",
+        ?_assertEqual(1, proplists:get_value(doc_count, TargetDbInfo))},
+        {"All the docs selected as expected",
+        ?_assert(lists:all(fun(Valid) -> Valid end, AllReplies))}
+    ]}.
+
+compare_dbs(Source, Target, FilterFun) ->
+    {ok, SourceDb} = couch_db:open_int(Source, []),
+    {ok, TargetDb} = couch_db:open_int(Target, []),
+    {ok, TargetDbInfo} = couch_db:get_db_info(TargetDb),
+    Fun = fun(FullDocInfo, _, Acc) ->
+        {ok, DocId, SourceDoc} = read_doc(SourceDb, FullDocInfo),
+        TargetReply = read_doc(TargetDb, DocId),
+        case FilterFun(DocId, SourceDoc) of
+            true ->
+                ValidReply = {ok, DocId, SourceDoc} == TargetReply,
+                {ok, [ValidReply|Acc]};
+            false ->
+                ValidReply = {not_found, missing} == TargetReply,
+                {ok, [ValidReply|Acc]}
+        end
+    end,
+    {ok, _, AllReplies} = couch_db:enum_docs(SourceDb, Fun, [], []),
+    ok = couch_db:close(SourceDb),
+    ok = couch_db:close(TargetDb),
+    {ok, TargetDbInfo, AllReplies}.
+
+read_doc(Db, DocIdOrInfo) ->
+    case couch_db:open_doc(Db, DocIdOrInfo) of
+        {ok, Doc} ->
+            {Props} = couch_doc:to_json_obj(Doc, [attachments]),
+            DocId = couch_util:get_value(<<"_id">>, Props),
+            {ok, DocId, {Props}};
+        Error ->
+            Error
+    end.
+
+create_db() ->
+    DbName = ?tempdb(),
+    {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]),
+    ok = couch_db:close(Db),
+    DbName.
+
+create_docs(DbName) ->
+    {ok, Db} = couch_db:open(DbName, [?ADMIN_CTX]),
+    Doc1 = couch_doc:from_json_obj({[
+        {<<"_id">>, <<"doc1">>}
+    ]}),
+    Doc2 = couch_doc:from_json_obj({[
+        {<<"_id">>, <<"doc2">>}
+    ]}),
+    {ok, _} = couch_db:update_docs(Db, [Doc1, Doc2]),
+    couch_db:ensure_full_commit(Db),
+    couch_db:close(Db).
+
+delete_db(DbName) ->
+    ok = couch_server:delete(DbName, [?ADMIN_CTX]).
+
+db_url(local, DbName) ->
+    DbName;
+db_url(remote, DbName) ->
+    Addr = config:get("httpd", "bind_address", "127.0.0.1"),
+    Port = mochiweb_socket_server:get(couch_httpd, port),
+    ?l2b(io_lib:format("http://~s:~b/~s", [Addr, Port, DbName])).


Mime
View raw message