Return-Path: X-Original-To: apmail-couchdb-commits-archive@www.apache.org Delivered-To: apmail-couchdb-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 4BD48175E7 for ; Fri, 31 Oct 2014 19:53:17 +0000 (UTC) Received: (qmail 2686 invoked by uid 500); 31 Oct 2014 19:53:16 -0000 Delivered-To: apmail-couchdb-commits-archive@couchdb.apache.org Received: (qmail 2566 invoked by uid 500); 31 Oct 2014 19:53:16 -0000 Mailing-List: contact commits-help@couchdb.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@couchdb.apache.org Delivered-To: mailing list commits@couchdb.apache.org Received: (qmail 2163 invoked by uid 99); 31 Oct 2014 19:53:16 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 31 Oct 2014 19:53:16 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 6A4D9927A86; Fri, 31 Oct 2014 19:53:16 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: bbastian@apache.org To: commits@couchdb.apache.org Date: Fri, 31 Oct 2014 19:53:24 -0000 Message-Id: In-Reply-To: <695c7c23941a44a0938af77efbf99d8b@git.apache.org> References: <695c7c23941a44a0938af77efbf99d8b@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [09/41] couch-mrview commit: updated refs/heads/master to 28e51f3 couch_mrview: couch_mrview_changes:handle_changes Similar to couch_changes:handle_changes but for view changes. It add support for longpolling, normal and continuous stream The API differs from the one for doc by beeing independant from the transport: the support of HTTP will be added on top for example. This API will be also used to replace the view filter in the current _changes API. Also add unittests. Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-mrview/repo Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-mrview/commit/1c24c425 Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-mrview/tree/1c24c425 Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-mrview/diff/1c24c425 Branch: refs/heads/master Commit: 1c24c425f2ec9fa63b0e01a13673af234043ee30 Parents: 18b5f6f Author: benoitc Authored: Fri Jan 31 13:13:23 2014 +0100 Committer: Benjamin Bastian Committed: Thu Oct 30 13:38:33 2014 -0700 ---------------------------------------------------------------------- src/couch_mrview_changes.erl | 173 ++++++++++++++++++++++++++++++++ src/couch_mrview_test_util.erl | 2 + test/09-index-events.t | 17 +++- test/10-index-changes.t | 194 ++++++++++++++++++++++++++++++++++++ 4 files changed, 385 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/couchdb-couch-mrview/blob/1c24c425/src/couch_mrview_changes.erl ---------------------------------------------------------------------- diff --git a/src/couch_mrview_changes.erl b/src/couch_mrview_changes.erl new file mode 100644 index 0000000..2b8f910 --- /dev/null +++ b/src/couch_mrview_changes.erl @@ -0,0 +1,173 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. +% +-module(couch_mrview_changes). + +-export([handle_changes/6]). + +-include_lib("couch/include/couch_db.hrl"). + +-record(vst, {dbname, + ddoc, + view, + view_options, + since, + callback, + acc, + user_timeout, + timeout, + heartbeat, + timeout_acc=0, + notifier, + stream}). + +-type changes_stream() :: true | false | once. +-type changes_options() :: [{stream, changes_stream()} | + {since, integer()} | + {view_options, list()} | + {timeout, integer()} | + {heartbeat, true | integer()}]. + +-export_type([changes_stream/0]). +-export_type([changes_options/0]). + +%% @doc function returning changes in a streaming fashion if needed. +-spec handle_changes(binary(), binary(), binary(), function(), term(), + changes_options()) -> ok | {error, term()}. +handle_changes(DbName, DDocId, View, Fun, Acc, Options) -> + Since = proplists:get_value(since, Options, 0), + Stream = proplists:get_value(stream, Options, false), + ViewOptions = proplists:get_value(view_options, Options, []), + + State0 = #vst{dbname=DbName, + ddoc=DDocId, + view=View, + view_options=ViewOptions, + since=Since, + callback=Fun, + acc=Acc}, + + case view_changes_since(State0) of + {ok, #vst{since=LastSeq, acc=Acc2}=State} -> + case Stream of + true -> + start_loop(State#vst{stream=true}, Options); + once when LastSeq =:= Since -> + start_loop(State#vst{stream=once}, Options); + _ -> + Fun(stop, {LastSeq, Acc2}) + end; + {stop, #vst{since=LastSeq, acc=Acc2}} -> + Fun(stop, {LastSeq, Acc2}); + Error -> + Error + end. + +start_loop(#vst{dbname=DbName, ddoc=DDocId}=State, Options) -> + {UserTimeout, Timeout, Heartbeat} = changes_timeout(Options), + Notifier = index_update_notifier(DbName, DDocId), + try + loop(State#vst{notifier=Notifier, + user_timeout=UserTimeout, + timeout=Timeout, + heartbeat=Heartbeat}) + after + couch_index_event:stop(Notifier) + end. + +loop(#vst{since=Since, callback=Callback, acc=Acc, + user_timeout=UserTimeout, timeout=Timeout, + heartbeat=Heartbeat, timeout_acc=TimeoutAcc, + stream=Stream}=State) -> + receive + index_update -> + case view_changes_since(State) of + {ok, State2} when Stream =:= true -> + loop(State2#vst{timeout_acc=0}); + {ok, #vst{since=LastSeq, acc=Acc2}} -> + Callback(stop, {LastSeq, Acc2}); + {stop, #vst{since=LastSeq, acc=Acc2}} -> + Callback(stop, {LastSeq, Acc2}) + end; + index_delete -> + Callback(stop, {Since, Acc}) + after Timeout -> + TimeoutAcc2 = TimeoutAcc + Timeout, + case UserTimeout =< TimeoutAcc2 of + true -> + Callback(stop, {Since, Acc}); + false when Heartbeat =:= true -> + case Callback(heartbeat, Acc) of + {ok, Acc2} -> + loop(State#vst{acc=Acc2, timeout_acc=TimeoutAcc2}); + {stop, Acc2} -> + Callback(stop, {Since, Acc2}) + end; + _ -> + Callback(stop, {Since, Acc}) + end + end. + +changes_timeout(Options) -> + DefaultTimeout = list_to_integer( + couch_config:get("httpd", "changes_timeout", "60000") + ), + UserTimeout = proplists:get_value(timeout, Options, DefaultTimeout), + {Timeout, Heartbeat} = case proplists:get_value(heartbeat, Options) of + undefined -> {UserTimeout, false}; + true -> + T = erlang:min(DefaultTimeout, UserTimeout), + {T, true}; + H -> + T = erlang:min(H, UserTimeout), + {T, true} + end, + {UserTimeout, Timeout, Heartbeat}. + +view_changes_since(#vst{dbname=DbName, ddoc=DDocId, view=View, + view_options=Options, since=Since, + callback=Callback, acc=UserAcc}=State) -> + Wrapper = fun ({{Seq, _Key, _DocId}, _Val}=KV, {Go, Acc2, OldSeq}) -> + LastSeq = if OldSeq < Seq -> Seq; + true -> OldSeq + end, + + case Callback(KV, Acc2) of + {ok, Acc3} -> {ok, {Go, Acc3, LastSeq}}; + {stop, Acc3} -> {stop, {stop, Acc3, LastSeq}} + end + end, + + Acc0 = {ok, UserAcc, Since}, + case couch_mrview:view_changes_since(DbName, DDocId, View, Since, + Wrapper, Options, Acc0) of + {ok, {Go, UserAcc2, Since2}}-> + {Go, State#vst{since=Since2, acc=UserAcc2}}; + Error -> + Error + end. + +index_update_notifier(#db{name=DbName}, DDocId) -> + index_update_notifier(DbName, DDocId); +index_update_notifier(DbName, DDocId) -> + Self = self(), + {ok, NotifierPid} = couch_index_event:start_link(fun + ({index_update, {Name, Id, couch_mrview_index}}) + when Name =:= DbName, Id =:= DDocId -> + Self ! index_update; + ({index_delete, {Name, Id, couch_mrview_index}}) + when Name =:= DbName, Id =:= DDocId -> + Self ! index_delete; + (_) -> + ok + end), + NotifierPid. http://git-wip-us.apache.org/repos/asf/couchdb-couch-mrview/blob/1c24c425/src/couch_mrview_test_util.erl ---------------------------------------------------------------------- diff --git a/src/couch_mrview_test_util.erl b/src/couch_mrview_test_util.erl index c68010c..1d3d788 100644 --- a/src/couch_mrview_test_util.erl +++ b/src/couch_mrview_test_util.erl @@ -33,6 +33,8 @@ new_db(Name, Type) -> {ok, Db} = couch_db:create(Name, [?ADMIN_USER]), save_docs(Db, [ddoc(Type)]). +delete_db(Name) -> + couch_server:delete(Name, [{user_ctx, ?ADMIN}]). save_docs(Db, Docs) -> {ok, _} = couch_db:update_docs(Db, Docs, []), http://git-wip-us.apache.org/repos/asf/couchdb-couch-mrview/blob/1c24c425/test/09-index-events.t ---------------------------------------------------------------------- diff --git a/test/09-index-events.t b/test/09-index-events.t index 90654b8..1489e4e 100644 --- a/test/09-index-events.t +++ b/test/09-index-events.t @@ -15,7 +15,7 @@ % the License. main(_) -> - etap:plan(2), + etap:plan(4), case (catch test()) of ok -> etap:end_tests(); @@ -30,6 +30,7 @@ test() -> test_util:start_couch(), {ok, Db} = couch_mrview_test_util:init_db(<<"foo">>, changes), test_update_event(Db), + test_delete_event(Db), test_util:stop_couch(), ok. @@ -44,3 +45,17 @@ test_update_event(Db) -> etap:is(Event, Expect, "index update events OK") end, couch_index_event:stop(Pid). + +test_delete_event(Db) -> + ok = couch_mrview:refresh(Db, <<"_design/bar">>), + {ok, Pid} = couch_index_event:start_link(self()), + + etap:ok(is_pid(Pid), "event handler added"), + couch_mrview_test_util:delete_db(<<"foo">>), + Expect = {index_delete, {<<"foo">>, <<"_design/bar">>, + couch_mrview_index}}, + receive + Event -> + etap:is(Event, Expect, "index delete events OK") + end, + couch_index_event:stop(Pid). http://git-wip-us.apache.org/repos/asf/couchdb-couch-mrview/blob/1c24c425/test/10-index-changes.t ---------------------------------------------------------------------- diff --git a/test/10-index-changes.t b/test/10-index-changes.t new file mode 100644 index 0000000..627376f --- /dev/null +++ b/test/10-index-changes.t @@ -0,0 +1,194 @@ +#!/usr/bin/env escript +%% -*- erlang -*- +%%! -pa ./deps/*/ebin -pa ./apps/*/ebin -pa ./test/etap + +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +main(_) -> + etap:plan(6), + case (catch test()) of + ok -> + etap:end_tests(); + Other -> + etap:diag(io_lib:format("Test died abnormally: ~p", [Other])), + etap:bail(Other) + end, + timer:sleep(300), + ok. + +test() -> + test_util:start_couch(), + {ok, Db} = couch_mrview_test_util:init_db(<<"foo">>, changes), + test_normal_changes(Db), + test_stream_once(Db), + test_stream_once_since(Db), + test_stream_once_timeout(Db), + test_stream_once_heartbeat(Db), + test_stream(Db), + test_util:stop_couch(), + ok. + +test_normal_changes(Db) -> + Result = run_query(Db, []), + Expect = {ok, 11, [ + {{2, 1, <<"1">>}, 1}, + {{3, 10, <<"10">>}, 10}, + {{4, 2, <<"2">>}, 2}, + {{5, 3, <<"3">>}, 3}, + {{6, 4, <<"4">>}, 4}, + {{7, 5, <<"5">>}, 5}, + {{8, 6, <<"6">>}, 6}, + {{9, 7, <<"7">>}, 7}, + {{10, 8, <<"8">>}, 8}, + {{11, 9, <<"9">>}, 9} + ]}, + etap:is(Result, Expect, "normal changes worked."). + +test_stream_once(Db) -> + Result = run_query(Db, [{stream, once}]), + Expect = {ok, 11, [ + {{2, 1, <<"1">>}, 1}, + {{3, 10, <<"10">>}, 10}, + {{4, 2, <<"2">>}, 2}, + {{5, 3, <<"3">>}, 3}, + {{6, 4, <<"4">>}, 4}, + {{7, 5, <<"5">>}, 5}, + {{8, 6, <<"6">>}, 6}, + {{9, 7, <<"7">>}, 7}, + {{10, 8, <<"8">>}, 8}, + {{11, 9, <<"9">>}, 9} + ]}, + etap:is(Result, Expect, "stream once since 0 worked."). + + +test_stream_once_since(Db) -> + Self = self(), + spawn(fun() -> + Result = run_query(Db, [{since, 11}, + {stream, once}]), + Self ! {result, Result} + end), + + spawn(fun() -> + timer:sleep(1000), + {ok, Db1} = save_doc(Db, 11), + couch_mrview:refresh(Db1, <<"_design/bar">>) + end), + + Expect = {ok,12,[{{12,11,<<"11">>},11}]}, + + receive + {result, Result} -> + etap:is(Result, Expect, "normal changes worked.") + after 5000 -> + io:format("never got the change", []) + end. + + +test_stream_once_timeout(Db) -> + Self = self(), + spawn(fun() -> + Result = run_query(Db, [{since, 12}, + {stream, once}, + {timeout, 3000}]), + Self ! {result, Result} + end), + + + + Expect = {ok, 12, []}, + + receive + {result, Result} -> + etap:is(Result, Expect, "got timeout.") + after 5000 -> + io:format("never got the change", []) + end. + +test_stream_once_heartbeat(Db) -> + Self = self(), + spawn(fun() -> + Result = run_query(Db, [{since, 12}, + {stream, once}, + {heartbeat, 1000}]), + Self ! {result, Result} + end), + + spawn(fun() -> + timer:sleep(3000), + {ok, Db1} = save_doc(Db, 12), + couch_mrview:refresh(Db1, <<"_design/bar">>) + end), + + Expect = {ok,13,[heartbeat, + heartbeat, + heartbeat, + {{13,12,<<"12">>},12}]}, + + + + receive + {result, Result} -> + etap:is(Result, Expect, "heartbeat OK.") + after 5000 -> + io:format("never got the change", []) + end. + + +test_stream(Db) -> + Self = self(), + spawn(fun() -> + Result = run_query(Db, [{since, 13}, + stream, + {timeout, 3000}]), + Self ! {result, Result} + end), + + spawn(fun() -> + timer:sleep(1000), + {ok, Db1} = save_doc(Db, 13), + couch_mrview:refresh(Db1, <<"_design/bar">>), + {ok, Db2} = save_doc(Db1, 14), + couch_mrview:refresh(Db2, <<"_design/bar">>) + end), + + Expect = {ok, 15,[{{14,13,<<"13">>},13}, + {{15,14,<<"14">>},14}]}, + + receive + {result, Result} -> + etap:is(Result, Expect, "stream OK.") + after 5000 -> + io:format("never got the change", []) + end. + + +save_doc(Db, Id) -> + Doc = couch_mrview_test_util:doc(Id), + {ok, _Rev} = couch_db:update_doc(Db, Doc, []), + {ok, _} = couch_db:ensure_full_commit(Db), + couch_db:reopen(Db). + +run_query(Db, Opts) -> + Fun = fun + (stop, {LastSeq, Acc}) -> + {ok, LastSeq, Acc}; + (heartbeat, Acc) -> + {ok, [heartbeat | Acc]}; + (Event, Acc) -> + {ok, [Event | Acc]} + end, + couch_mrview:refresh(Db, <<"_design/bar">>), + {ok, LastSeq, R} = couch_mrview_changes:handle_changes(Db, <<"_design/bar">>, + <<"baz">>, Fun, [], Opts), + {ok, LastSeq, lists:reverse(R)}.