couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kocol...@apache.org
Subject svn commit: r802416 - in /couchdb/trunk: src/couchdb/couch_rep_changes_feed.erl test/etap/110-replication-changes-feed.t test/etap/111-replication-changes-feed.t
Date Sat, 08 Aug 2009 16:20:29 GMT
Author: kocolosk
Date: Sat Aug  8 16:20:29 2009
New Revision: 802416

URL: http://svn.apache.org/viewvc?rev=802416&view=rev
Log:
update rep_changes_feed to use feed=continuous and rep_httpc

Added:
    couchdb/trunk/test/etap/111-replication-changes-feed.t
      - copied, changed from r802066, couchdb/trunk/test/etap/110-replication-changes-feed.t
Removed:
    couchdb/trunk/test/etap/110-replication-changes-feed.t
Modified:
    couchdb/trunk/src/couchdb/couch_rep_changes_feed.erl

Modified: couchdb/trunk/src/couchdb/couch_rep_changes_feed.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_rep_changes_feed.erl?rev=802416&r1=802415&r2=802416&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_rep_changes_feed.erl (original)
+++ couchdb/trunk/src/couchdb/couch_rep_changes_feed.erl Sat Aug  8 16:20:29 2009
@@ -15,17 +15,21 @@
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, 
     code_change/3]).
 
--export([start/2, start_link/2, all/1, next/1, stop/1]).
+-export([start_link/4, next/1, stop/1]).
 
--define(MIN_BUFFER_SIZE, 100).
+-define(BUFFER_SIZE, 1000).
 
 -include("couch_db.hrl").
 -include("../ibrowse/ibrowse.hrl").
 
--record (remote, {
-    conn,
-    reqid,
+-record (state, {
+    changes_from = nil,
+    changes_loop = nil,
     last_seq,
+    conn = nil,
+    reqid = nil,
+    by_seq_from = nil,
+    by_seq_loop = nil,
     complete = false,
     count = 0,
     partial_chunk = nil,
@@ -33,194 +37,110 @@
     rows = queue:new()
 }).
 
--record (local, {
-    changes_from = nil,
-    changes_pid = nil,
-    complete = false,
-    count = 0,
-    reply_to = nil,
-    rows = queue:new()
-}).
-
-start(Url, Options) ->
-    gen_server:start(?MODULE, [Url, Options], []).
-
-start_link(Url, Options) ->
-    gen_server:start_link(?MODULE, [Url, Options], []).
+start_link(Parent, Source, StartSeq, PostProps) ->
+    gen_server:start_link(?MODULE, [Parent, Source, StartSeq, PostProps], []).
 
-%% @doc does not block
-all(Server) ->
-    gen_server:call(Server, all_changes).
-
-%% @doc returns the next change from the feed, blocking if necessary
 next(Server) ->
-    gen_server:call(Server, next_change, infinity).
+    gen_server:call(Server, next_changes, infinity).
 
 stop(Server) ->
     gen_server:call(Server, stop).
 
-init([{remote, Url}, Options]) ->
-    Since = proplists:get_value(since, Options, 0),
-    Continuous = proplists:get_value(continuous, Options, false),
-    {Pid, ReqId} = start_http_request(lists:concat([Url, "/_changes",
-        "?style=all_docs", "&since=", Since, "&continuous=", Continuous])),
-    {ok, #remote{conn=Pid, last_seq=Since, reqid=ReqId}};
-
-init([{local, DbName}, Options]) when is_list(DbName) ->
-    init([{local, ?l2b(DbName)}, Options]);
-init([{local, DbName}, Options]) ->
-    ?LOG_DEBUG("initializing local changes feed for ~s with ~p", [DbName, Options]),
+init([_Parent, #http_db{}=Source, Since, PostProps]) ->
+    Feed = case proplists:get_value(<<"continuous">>, PostProps, false) of
+    false ->
+        normal;
+    true ->
+        continuous
+    end,
+    Pid = couch_rep_httpc:spawn_link_worker_process(Source),
+    Req = Source#http_db{
+        resource = "_changes",
+        qs = [{style, all_docs}, {heartbeat, true}, {since, Since},
+            {feed, Feed}],
+        conn = Pid,
+        options = [{stream_to, {self(), once}}, {response_format, binary}],
+        headers = Source#http_db.headers -- [{"Accept-Encoding", "gzip"}]
+    },    
+    {ibrowse_req_id, ReqId} = couch_rep_httpc:request(Req),
+
+    receive
+    {ibrowse_async_headers, ReqId, "200", _} ->
+        ibrowse:stream_next(ReqId),
+        {ok, #state{conn=Pid, last_seq=Since, reqid=ReqId}};
+    {ibrowse_async_headers, ReqId, "301", Hdrs} ->
+        catch ibrowse:stop_worker_process(Pid),
+        Url2 = mochiweb_headers:get_value("Location", mochiweb_headers:make(Hdrs)),
+        %% TODO use couch_httpc:request instead of start_http_request
+        {Pid2, ReqId2} = start_http_request(Url2),
+        receive {ibrowse_async_headers, ReqId2, "200", _} ->
+            {ok, #state{conn=Pid2, last_seq=Since, reqid=ReqId2}}
+        after 30000 ->
+            {stop, changes_timeout}
+        end;
+    {ibrowse_async_headers, ReqId, "404", _} ->
+        catch ibrowse:stop_worker_process(Pid),
+        ?LOG_INFO("source doesn't have _changes, trying _all_docs_by_seq", []),
+        Self = self(),
+        BySeqPid = spawn_link(fun() -> by_seq_loop(Self, Source, Since) end),
+        {ok, #state{last_seq=Since, by_seq_loop=BySeqPid}};
+    {ibrowse_async_headers, ReqId, Code, _} ->
+        {stop, {changes_error_code, list_to_integer(Code)}}
+    after 10000 ->
+        {stop, changes_timeout}
+    end;
+
+init([_Parent, Source, Since, PostProps]) ->
     process_flag(trap_exit, true),
     Server = self(),
-    Since = proplists:get_value(since, Options, 0),
     ChangesPid =
-    case proplists:get_value(continuous, Options, false) of
+    case proplists:get_value(<<"continuous">>, PostProps, false) of
     false ->
-        spawn_link(fun() -> send_local_changes_once(Server, DbName, Since) end);
+        spawn_link(fun() -> send_local_changes_once(Server, Source, Since) end);
     true ->
-        spawn_link(fun() -> send_local_changes_forever(Server, DbName, Since) end)
+        spawn_link(fun() -> send_local_changes_forever(Server, Source, Since) end)
     end,
-    {ok, #local{changes_pid=ChangesPid}}.
+    {ok, #state{changes_loop=ChangesPid}}.
 
-handle_call({add, Row}, _From, #local{count=Count, rows=Rows}=State) 
-        when Count < ?MIN_BUFFER_SIZE->
-    case State of
-    #local{reply_to=nil} ->
-        {reply, ok, State#local{count=Count+1, rows = queue:in(Row, Rows)}};
-    #local{count=0, reply_to=Requestor}->
-        gen_server:reply(Requestor, Row),
-        {reply, ok, State#local{reply_to=nil}}
-    end;
-handle_call({add, Row}, From, #local{}=State) ->
-    #local{
-        count = Count,
-        rows = Rows
-    } = State,
-    {noreply, State#local{count=Count+1, changes_from=From, rows=queue:in(Row,Rows)}};
+handle_call({add_change, Row}, From, State) ->
+    handle_add_change(Row, From, State);
 
-handle_call(all_changes, _From, #local{complete=Complete, count=Count}=State) 
-        when Complete =:= false; Count > 0 ->
-    #local{
-        changes_from = ChangesFrom,
-        rows = Rows
-    } = State,
-    if Count < ?MIN_BUFFER_SIZE, ChangesFrom =/= nil ->
-        gen_server:reply(ChangesFrom, ok);
-    true -> ok end,
-    {reply, queue:to_list(Rows), State#local{count=0, rows=queue:new()}};
-handle_call(all_changes, _From, #local{}=State) ->
-    {stop, normal, complete, State};
-
-handle_call(all_changes, _From, #remote{complete=Complete, count=Count}=State)
-        when Complete =:= false; Count > 0 ->
-    #remote{
-        reqid = Id,
-        rows = Rows
-    } = State,
-    ok = maybe_stream_next(Complete, 0, Id),
-    {reply, queue:to_list(Rows), State#remote{count=0, rows=queue:new()}};
-handle_call(all_changes, _From, #remote{}=State) ->
-    {stop, normal, complete, State};
-
-handle_call(next_change, From, #local{count=0}=State) ->
-    if State#local.complete ->
-        {stop, normal, complete, State};
-    true ->
-        {noreply, State#local{reply_to=From}}
-    end;
-handle_call(next_change, _From, #local{}=State) ->
-    #local{
-        count = Count,
-        changes_from = ChangesFrom,
-        rows = Rows
-    } = State,
-    {{value, Row}, NewRows} = queue:out(Rows),
-    if Count =:= ?MIN_BUFFER_SIZE, ChangesFrom =/= nil ->
-        gen_server:reply(ChangesFrom, ok),
-        {reply, Row, State#local{count=Count-1, changes_from=nil, rows=NewRows}};
-    true ->
-        {reply, Row, State#local{count=Count-1, rows=NewRows}}
-    end;
-
-handle_call(next_change, From, #remote{count=0}=State) ->
-    if State#remote.complete ->
-        {stop, normal, complete, State};
-    true ->
-        {noreply, State#remote{reply_to=From}}
-    end;
-handle_call(next_change, _From, #remote{}=State) ->
-    #remote{
-        reqid = Id,
-        complete = Complete,
-        count = Count,
-        rows = Rows
+handle_call(next_changes, From, State) ->
+    handle_next_changes(From, State);
+    
+handle_call(stop, _From, State) ->
+    #state{
+        changes_loop = ChangesPid,
+        conn = Conn
     } = State,
-    ok = maybe_stream_next(Complete, Count, Id),
-    {{value, Row}, NewRows} = queue:out(Rows),
-    {reply, Row, State#remote{count=Count-1, rows=NewRows}};
-
-handle_call(stop, _From, #local{changes_pid=ChangesPid} = State) ->
-    exit(ChangesPid, stop),
-    {stop, normal, ok, State};
-
-handle_call(stop, _From, #remote{conn=Conn} = State) ->
-    catch ibrowse:stop_worker_process(Conn),
+    if is_pid(ChangesPid) -> exit(ChangesPid, stop); true -> ok end,
+    if is_pid(Conn) -> catch ibrowse:stop_worker_process(Conn); true -> ok end,
     {stop, normal, ok, State}.
 
 handle_cast(_Msg, State) ->
     {noreply, State}.
 
-handle_info({ibrowse_async_headers, Id, "200", _}, #remote{reqid=Id}=State) ->
-    #remote{
-        complete = Complete,
-        count = Count
-    } = State,
-    ?LOG_DEBUG("~p reqid ~p ibrowse_async_headers 200", [?MODULE, Id]),
-    ok = maybe_stream_next(Complete, Count, Id),
-    {noreply, State};
-handle_info({ibrowse_async_headers, Id, "301", Hdrs}, #remote{reqid=Id}=State) ->
-    ?LOG_DEBUG("~p reqid ~p ibrowse_async_headers 301", [?MODULE, Id]),
-    catch ibrowse:stop_worker_process(State#remote.conn),
-    Url = mochiweb_headers:get_value("Location", mochiweb_headers:make(Hdrs)),
-    {Pid, ReqId} = start_http_request(Url),
-    {noreply, State#remote{conn=Pid, reqid=ReqId}};
-handle_info({ibrowse_async_headers, Id, Code, Hdrs}, #remote{reqid=Id}=State) ->
-    ?LOG_ERROR("replicator changes feed failed with code ~s and Headers ~n~p",
-        [Code,Hdrs]),
-    {stop, {error, list_to_integer(Code)}, State};
+handle_info({ibrowse_async_headers, Id, Code, Hdrs}, #state{reqid=Id}=State) ->
+    handle_headers(list_to_integer(Code), Hdrs, State);
 
-handle_info({ibrowse_async_response, _, {error, Reason}}, State) ->
-    {stop, {error, Reason}, State};
-handle_info({ibrowse_async_response, Id, Msg}, #remote{reqid=Id} = State) ->
-    ?LOG_DEBUG("~p reqid ~p ibrowse_async_response ~p", [?MODULE, Id, Msg]),
-    {noreply, process_response(Msg, Id, State)};
-
-handle_info({ibrowse_async_response_end, Id}, #remote{reqid=Id} = State) ->
-    ?LOG_DEBUG("got ibrowse_async_response_end ~p", [State#remote.reply_to]),
-    case State of
-    #remote{reply_to=nil} ->
-        {noreply, State#remote{complete=true}};
-    #remote{count=0, reply_to=From}->
-        gen_server:reply(From, complete),
-        {stop, normal, State}
-    end;
+handle_info({ibrowse_async_response, Id, Msg}, #state{reqid=Id} = State) ->
+    handle_response(Msg, State);
 
-handle_info({'EXIT', From, normal}, #local{changes_pid=From} = State) ->
-    if State#local.reply_to =/= nil ->
-        gen_server:reply(State#local.reply_to, complete),
-        {stop, normal, State};
-    true ->
-        {noreply, State#local{complete=true}}
-    end;
-handle_info({'EXIT', From, Reason}, #local{changes_pid=From} = State) ->
-    ?LOG_ERROR("changes_pid died with reason ~p", [Reason]),
-    {stop, changes_pid_died, State};
+handle_info({ibrowse_async_response_end, Id}, #state{reqid=Id} = State) ->
+    handle_feed_completion(State);
+
+handle_info({'EXIT', From, normal}, #state{changes_loop=From} = State) ->
+    handle_feed_completion(State);
+
+handle_info({'EXIT', From, Reason}, #state{changes_loop=From} = State) ->
+    ?LOG_ERROR("changes_loop died with reason ~p", [Reason]),
+    {stop, changes_loop_died, State};
 
 handle_info(Msg, State) ->
     ?LOG_INFO("unexpected message ~p", [Msg]),
     {noreply, State}.
 
-terminate(_Reason, #remote{conn=Pid}) when is_pid(Pid) ->
+terminate(_Reason, #state{conn=Pid}) when is_pid(Pid) ->
     catch ibrowse:stop_worker_process(Pid),
     ok;
 terminate(_Reason, _State) ->
@@ -231,66 +151,141 @@
 
 %internal funs
 
-process_response(<<"{\"results\":[\n">>, Id, State) ->
-    #remote{
-        complete = Complete,
-        count = Count
+handle_add_change(Row, From, #state{reply_to=nil} = State) ->
+    #state{
+        count = Count,
+        rows = Rows
     } = State,
-    ok = maybe_stream_next(Complete, Count, Id),
-    State;
-process_response(<<"\n],\n\"last_seq\":", LastSeqStr/binary>>, _, State) ->
+    NewState = State#state{count=Count+1, rows=queue:in(Row,Rows)},
+    if Count < ?BUFFER_SIZE ->
+        {reply, ok, NewState};
+    true ->
+        {noreply, NewState#state{changes_from=From}}
+    end;
+handle_add_change(Row, _From, #state{count=0} = State) ->
+    gen_server:reply(State#state.reply_to, [Row]),
+    {reply, ok, State#state{reply_to=nil}}.
+
+handle_next_changes(From, #state{count=0}=State) ->
+    if State#state.complete ->
+        {stop, normal, complete, State};
+    true ->
+        {noreply, State#state{reply_to=From}}
+    end;
+handle_next_changes(_From, State) ->
+    #state{
+        changes_from = ChangesFrom,
+        rows = Rows
+    } = State,
+    NewState = State#state{count=0, changes_from=nil, rows=queue:new()},
+    ok = maybe_stream_next(NewState),
+    if ChangesFrom =/= nil -> gen_server:reply(ChangesFrom, ok); true -> ok end,
+    {reply, queue:to_list(Rows), NewState}.
+
+handle_headers(200, _, State) ->
+    ok = maybe_stream_next(State),
+    {noreply, State};
+handle_headers(301, Hdrs, State) ->
+    catch ibrowse:stop_worker_process(State#state.conn),
+    Url = mochiweb_headers:get_value("Location", mochiweb_headers:make(Hdrs)),
+    %% TODO use couch_httpc:request instead of start_http_request
+    {Pid, ReqId} = start_http_request(Url),
+    {noreply, State#state{conn=Pid, reqid=ReqId}};
+handle_headers(Code, Hdrs, State) ->
+    ?LOG_ERROR("replicator changes feed failed with code ~s and Headers ~n~p",
+        [Code,Hdrs]),
+    {stop, {error, Code}, State}.
+
+handle_response({error, Reason}, State) ->
+    {stop, {error, Reason}, State};
+handle_response(<<"\n">>, State) ->
+    ?LOG_DEBUG("got a heartbeat from the remote server", []),
+    {noreply, State};
+handle_response(<<"{\"results\":[\n">>, State) ->
+    ok = maybe_stream_next(State),
+    {noreply, State};
+handle_response(<<"\n],\n\"last_seq\":", LastSeqStr/binary>>, State) ->
     LastSeq = list_to_integer(?b2l(hd(re:split(LastSeqStr, "}")))),
-    State#remote{last_seq = LastSeq};
-process_response(Chunk, Id, #remote{partial_chunk=nil} = State) ->
-    #remote{
-        complete = Complete,
+    {noreply, State#state{last_seq = LastSeq}};
+handle_response(Chunk, #state{partial_chunk=nil} = State) ->
+    #state{
         count = Count,
         rows = Rows
     } = State,
+    ok = maybe_stream_next(State),
     try
         Row = decode_row(Chunk),
-        ok = maybe_stream_next(Complete, Count+1, Id),
         case State of
-        #remote{reply_to=nil} ->
-            State#remote{count=Count+1, rows = queue:in(Row, Rows)};
-        #remote{count=0, reply_to=From}->
-            gen_server:reply(From, Row),
-            State#remote{reply_to=nil}
+        #state{reply_to=nil} ->
+            {noreply, State#state{count=Count+1, rows = queue:in(Row, Rows)}};
+        #state{count=0, reply_to=From}->
+            gen_server:reply(From, [Row]),
+            {noreply, State#state{reply_to=nil}}
         end
     catch
     throw:{invalid_json, Bad} ->
-        ?LOG_DEBUG("got invalid_json ~p", [Bad]),
-        ok = maybe_stream_next(Complete, Count, Id),
-        State#remote{partial_chunk = Bad}
+        {noreply, State#state{partial_chunk = Bad}}
     end;
-process_response(Chunk, Id, State) ->
-    #remote{
-        complete = Complete,
+handle_response(Chunk, State) ->
+    #state{
         count = Count,
         partial_chunk = Partial,
         rows = Rows
     } = State,
+    ok = maybe_stream_next(State),
     try
         Row = decode_row(<<Partial/binary, Chunk/binary>>),
-        ok = maybe_stream_next(Complete, Count+1, Id),
-        case State of
-        #remote{reply_to=nil} ->
-            State#remote{count=Count+1, partial_chunk=nil, rows=queue:in(Row,Rows)};
-        #remote{count=0, reply_to=From}->
-            gen_server:reply(From, Row),
-            State#remote{reply_to=nil, partial_chunk=nil}
-        end
+        {noreply, case State of
+        #state{reply_to=nil} ->
+            State#state{count=Count+1, partial_chunk=nil, rows=queue:in(Row,Rows)};
+        #state{count=0, reply_to=From}->
+            gen_server:reply(From, [Row]),
+            State#state{reply_to=nil, partial_chunk=nil}
+        end}
     catch
     throw:{invalid_json, Bad} ->
-        ?LOG_DEBUG("got invalid_json ~p", [Bad]),
-        ok = maybe_stream_next(Complete, Count, Id),
-        State#remote{partial_chunk = Bad}
+        {noreply, State#state{partial_chunk = Bad}}
     end.
-    
+
+handle_feed_completion(#state{reply_to=nil} = State)->
+    {noreply, State#state{complete=true}};
+handle_feed_completion(#state{count=0} = State) ->
+    gen_server:reply(State#state.reply_to, complete),
+    {stop, normal, State}.
+
+by_seq_loop(Server, Source, StartSeq) ->
+    Req = Source#http_db{
+        resource = "_all_docs_by_seq",
+        qs = [{limit, 1000}, {startkey, StartSeq}]
+    },
+    {Results} = couch_rep_httpc:request(Req),
+    if Results =:= [] -> exit(normal); true -> ok end,
+    EndSeq = lists:foldl(fun({RowInfoList}, _) ->
+        Id = proplists:get_value(<<"id">>, RowInfoList),
+        Seq = proplists:get_value(<<"key">>, RowInfoList),
+        {RowProps} = proplists:get_value(<<"value">>, RowInfoList),
+        RawRevs = [
+            proplists:get_value(<<"rev">>, RowProps),
+            proplists:get_value(<<"conflicts">>, RowProps, []),
+            proplists:get_value(<<"deleted_conflicts">>, RowProps, [])
+        ],
+        ParsedRevs = couch_doc:parse_revs(lists:flatten(RawRevs)),
+        Change = {[
+            {<<"seq">>, Seq},
+            {<<"Id">>, Id},
+            {<<"changes">>, [{[{<<"rev">>,R}]} || R <- ParsedRevs]}
+        ]},
+        gen_server:call(Server, {add_change, Change}),
+        Seq
+    end, 0, proplists:get_value(<<"rows">>, Results)),
+    by_seq_loop(Server, Source, EndSeq+1).
+
 decode_row(<<",\n", Rest/binary>>) ->
     decode_row(Rest);
 decode_row(Row) ->
-    ?JSON_DECODE(Row).
+    {[Seq, Id, {<<"changes">>,C}]} = ?JSON_DECODE(Row),
+    C2 = [{[{<<"rev">>,couch_doc:parse_rev(R)}]} || {[{<<"rev">>,R}]}
<- C],
+    {[Seq, Id, {<<"changes">>,C2}]}.
 
 flush_updated_messages() ->
     receive updated -> flush_updated_messages()
@@ -304,27 +299,28 @@
 local_update_notification(_, _, _) ->
     ok.
 
-maybe_stream_next(false, Count, Id) when Count < ?MIN_BUFFER_SIZE ->
-    ?LOG_DEBUG("~p reqid ~p streaming next chunk", [?MODULE, Id]),
-    ibrowse:stream_next(Id);
-maybe_stream_next(_Complete, _Count, Id) ->
-    ?LOG_DEBUG("~p reqid ~p not streaming", [?MODULE, Id]),
+maybe_stream_next(#state{reqid=nil}) ->
+    ok;
+maybe_stream_next(#state{complete=false, count=N} = S) when N < ?BUFFER_SIZE ->
+    ibrowse:stream_next(S#state.reqid);
+maybe_stream_next(_) ->
     ok.
 
-send_local_changes_forever(Server, DbName, Since) ->
+send_local_changes_forever(Server, Db, Since) ->
+    #db{name = DbName, user_ctx = UserCtx} = Db,
     Self = self(),
     {ok, _} = couch_db_update_notifier:start_link(
         fun(Msg) -> local_update_notification(Self, DbName, Msg) end),
-    {ok, NewSeq} = send_local_changes_once(Server, DbName, Since),
+    {ok, NewSeq} = send_local_changes_once(Server, Db, Since),
+    couch_db:close(Db),
     ok = wait_db_updated(),
-    send_local_changes_forever(Server, DbName, NewSeq).
-
-send_local_changes_once(Server, DbName, Since) ->
-    {ok, Db} = couch_db:open(DbName, []),
+    {ok, NewDb} = couch_db:open(DbName, [{user_ctx, UserCtx}]),
+    send_local_changes_forever(Server, NewDb, NewSeq).
 
+send_local_changes_once(Server, Db, Since) ->
     FilterFun =
     fun(#doc_info{revs=[#rev_info{rev=Rev}|_]}) ->
-        {[{<<"rev">>, couch_doc:rev_to_str(Rev)}]}
+        {[{<<"rev">>, Rev}]}
     end,
 
     ChangesFun =
@@ -333,9 +329,8 @@
         Results = [Result || Result <- Results0, Result /= null],
         if Results /= [] ->
             Change = {[{<<"seq">>,Seq}, {<<"id">>,Id}, {<<"changes">>,Results}]},
-            gen_server:call(Server, {add, Change}, infinity);
+            gen_server:call(Server, {add_change, Change}, infinity);
         true ->
-            ?LOG_DEBUG("Results was empty ~p", [Results0]),
             ok
         end,
         {ok, Seq}

Copied: couchdb/trunk/test/etap/111-replication-changes-feed.t (from r802066, couchdb/trunk/test/etap/110-replication-changes-feed.t)
URL: http://svn.apache.org/viewvc/couchdb/trunk/test/etap/111-replication-changes-feed.t?p2=couchdb/trunk/test/etap/111-replication-changes-feed.t&p1=couchdb/trunk/test/etap/110-replication-changes-feed.t&r1=802066&r2=802416&rev=802416&view=diff
==============================================================================
--- couchdb/trunk/test/etap/110-replication-changes-feed.t (original)
+++ couchdb/trunk/test/etap/111-replication-changes-feed.t Sat Aug  8 16:20:29 2009
@@ -17,12 +17,33 @@
 -record(doc, {id= <<"">>, revs={0, []}, body={[]},
             attachments=[], deleted=false, meta=[]}).
 
+-record(http_db, {
+    url,
+    auth = [],
+    resource = "",
+    headers = [
+        {"User-Agent", "CouchDb/"++couch_server:get_version()},
+        {"Accept", "application/json"},
+        {"Accept-Encoding", "gzip"}
+    ],
+    qs = [],
+    method = get,
+    body = nil,
+    options = [
+        {response_format,binary},
+        {inactivity_timeout, 30000}
+    ],
+    retries = 10,
+    pause = 1,
+    conn = nil
+}).
 main(_) ->
     code:add_pathz("src/couchdb"),
     code:add_pathz("src/ibrowse"),
     code:add_pathz("src/mochiweb"),
+    code:add_pathz("src/erlang-oauth"),
     
-    etap:plan(17),
+    etap:plan(13),
     case (catch test()) of
         ok ->
             etap:end_tests();
@@ -60,14 +81,13 @@
     test_since_parameter(Type),
     test_continuous_parameter(Type),
     test_conflicts(Type),
-    test_deleted_conflicts(Type),
-    test_non_blocking_call(Type).
+    test_deleted_conflicts(Type).
 
 test_remote_only() ->
     test_chunk_reassembly(remote).
 
 test_unchanged_db(Type) ->
-    {ok, Pid} = couch_rep_changes_feed:start({Type, get_dbname(Type)}, []),
+    {ok, Pid} = start_changes_feed(Type, 0, false),
     etap:is(
         couch_rep_changes_feed:next(Pid),
         complete,
@@ -78,16 +98,15 @@
 
 test_simple_change(Type) ->
     Expect = generate_change(),
-    {ok, Pid} = couch_rep_changes_feed:start({Type, get_dbname(Type)}, []),
+    {ok, Pid} = start_changes_feed(Type, 0, false),
     etap:is(
         {couch_rep_changes_feed:next(Pid), couch_rep_changes_feed:next(Pid)},
-        {Expect, complete},
+        {[Expect], complete},
         io_lib:format("(~p) change one document, get one row", [Type])
     ).
 
 test_since_parameter(Type) ->
-    {ok, Pid} = couch_rep_changes_feed:start({Type, get_dbname(Type)}, 
-        [{since, get_update_seq()}]),
+    {ok, Pid} = start_changes_feed(Type, get_update_seq(), false), 
     etap:is(
         couch_rep_changes_feed:next(Pid),
         complete,
@@ -97,8 +116,7 @@
     ).
 
 test_continuous_parameter(Type) ->
-    {ok, Pid} = couch_rep_changes_feed:start({Type, get_dbname(Type)},
-        [{since, get_update_seq()}, {continuous, true}]),
+    {ok, Pid} = start_changes_feed(Type, get_update_seq(), true),
 
     % make the changes_feed request before the next update
     Self = self(),
@@ -110,9 +128,9 @@
     Expect = generate_change(),
     etap:is(
         receive {actual, Actual} -> Actual end,
-        Expect,
+        [Expect],
         io_lib:format(
-            "(~p) continuous query-string parameter picks up new changes",
+            "(~p) feed=continuous query-string parameter picks up new changes",
             [Type])
     ),
 
@@ -121,11 +139,10 @@
 test_conflicts(Type) ->
     Since = get_update_seq(),
     Expect = generate_conflict(),
-    {ok, Pid} = couch_rep_changes_feed:start({Type, get_dbname(Type)}, 
-        [{since, Since}]),
+    {ok, Pid} = start_changes_feed(Type, Since, false),
     etap:is(
         {couch_rep_changes_feed:next(Pid), couch_rep_changes_feed:next(Pid)},
-        {Expect, complete},
+        {[Expect], complete},
         io_lib:format("(~p) conflict revisions show up in feed", [Type])
     ).
 
@@ -138,7 +155,7 @@
     [Win, {[{<<"rev">>, Lose}]}] = proplists:get_value(<<"changes">>,
ExpectProps),
     Doc = couch_doc:from_json_obj({[
         {<<"_id">>, Id},
-        {<<"_rev">>, Lose},
+        {<<"_rev">>, couch_doc:rev_to_str(Lose)},
         {<<"_deleted">>, true}
     ]}),
     Db = get_db(),
@@ -148,51 +165,35 @@
     Expect = {[
         {<<"seq">>, get_update_seq()},
         {<<"id">>, Id},
-        {<<"changes">>, [Win, {[{<<"rev">>, couch_doc:rev_to_str(Rev)}]}]}
+        {<<"changes">>, [Win, {[{<<"rev">>, Rev}]}]}
     ]},
     
-    {ok, Pid} = couch_rep_changes_feed:start({Type, get_dbname(Type)}, 
-        [{since, Since}]),
+    {ok, Pid} = start_changes_feed(Type, Since, false),
     etap:is(
         {couch_rep_changes_feed:next(Pid), couch_rep_changes_feed:next(Pid)},
-        {Expect, complete},
+        {[Expect], complete},
         io_lib:format("(~p) deleted conflict revisions show up in feed", [Type])
     ).
 
-test_non_blocking_call(Type) ->
-    Since = get_update_seq(),
-    {ok, Pid} = couch_rep_changes_feed:start({Type, get_dbname(Type)},
-        [{since, Since}, {continuous, true}]),
-    etap:is(
-        couch_rep_changes_feed:all(Pid),
-        [],
-        io_lib:format("(~p) all() returns empty list if no changes available",
-            [Type])
-    ),
-    Expect1 = generate_change(),
-    Expect2 = generate_change(),
-    timer:sleep(100),
-    etap:is(
-        couch_rep_changes_feed:all(Pid),
-        [Expect1, Expect2],
-        io_lib:format("(~p) all() returns full list of outstanding changes",
-            [Type])
-    ),
-    ok = couch_rep_changes_feed:stop(Pid).
-
 test_chunk_reassembly(Type) ->
     Since = get_update_seq(),
     Expect = [generate_change() || _I <- lists:seq(1,30)],
-    {ok, Pid} = couch_rep_changes_feed:start({Type, get_dbname(Type)},
-        [{since, Since}]),
-    timer:sleep(100),
+    {ok, Pid} = start_changes_feed(Type, Since, false),
     etap:is(
-        couch_rep_changes_feed:all(Pid),
+        get_all_changes(Pid, []),
         Expect,
         io_lib:format("(~p) reassembles chunks split across TCP frames",
             [Type])
     ).
 
+get_all_changes(Pid, Acc) ->
+    case couch_rep_changes_feed:next(Pid) of
+    complete ->
+        lists:flatten(lists:reverse(Acc));
+    Else ->
+        get_all_changes(Pid, [Else|Acc])
+    end.
+
 generate_change() ->
     generate_change(couch_util:new_uuid()).
 
@@ -207,7 +208,7 @@
     {[
         {<<"seq">>, get_update_seq()},
         {<<"id">>, Id},
-        {<<"changes">>, [{[{<<"rev">>, couch_doc:rev_to_str(Rev)}]}]}
+        {<<"changes">>, [{[{<<"rev">>, Rev}]}]}
     ]}.
 
 generate_conflict() ->
@@ -219,7 +220,7 @@
     {ok, Rev2} = couch_db:update_doc(Db, Doc2, [full_commit, all_or_nothing]),
     
     %% relies on undocumented CouchDB conflict winner algo and revision sorting!
-    RevList = [{[{<<"rev">>, couch_doc:rev_to_str(R)}]} || R
+    RevList = [{[{<<"rev">>, R}]} || R
         <- lists:sort(fun(A,B) -> B<A end, [Rev1,Rev2])],
     {[
         {<<"seq">>, get_update_seq()},
@@ -234,10 +235,18 @@
 get_dbname(local) ->
     "etap-test-db";
 get_dbname(remote) ->
-    "http://127.0.0.1:5984/etap-test-db".
+    "http://127.0.0.1:5984/etap-test-db/".
 
 get_update_seq() ->
     Db = get_db(),
     Seq = couch_db:get_update_seq(Db),
     couch_db:close(Db),
     Seq.
+
+start_changes_feed(local, Since, Continuous) ->
+    Props = [{<<"continuous">>, Continuous}],
+    couch_rep_changes_feed:start_link(self(), get_db(), Since, Props);
+start_changes_feed(remote, Since, Continuous) ->
+    Props = [{<<"continuous">>, Continuous}],
+    Db = #http_db{url = get_dbname(remote)},
+    couch_rep_changes_feed:start_link(self(), Db, Since, Props).



Mime
View raw message