couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jch...@apache.org
Subject svn commit: r916518 - in /couchdb/trunk: share/www/script/test/ src/couchdb/ test/etap/
Date Fri, 26 Feb 2010 01:11:03 GMT
Author: jchris
Date: Fri Feb 26 01:11:02 2010
New Revision: 916518

URL: http://svn.apache.org/viewvc?rev=916518&view=rev
Log:
fdmananas patch for filtered replication via COUCHDB-673

Modified:
    couchdb/trunk/share/www/script/test/replication.js
    couchdb/trunk/src/couchdb/Makefile.am
    couchdb/trunk/src/couchdb/couch_db.hrl
    couchdb/trunk/src/couchdb/couch_httpd_db.erl
    couchdb/trunk/src/couchdb/couch_query_servers.erl
    couchdb/trunk/src/couchdb/couch_rep.erl
    couchdb/trunk/src/couchdb/couch_rep_changes_feed.erl
    couchdb/trunk/src/couchdb/couch_rep_missing_revs.erl
    couchdb/trunk/test/etap/111-replication-changes-feed.t

Modified: couchdb/trunk/share/www/script/test/replication.js
URL: http://svn.apache.org/viewvc/couchdb/trunk/share/www/script/test/replication.js?rev=916518&r1=916517&r2=916518&view=diff
==============================================================================
--- couchdb/trunk/share/www/script/test/replication.js (original)
+++ couchdb/trunk/share/www/script/test/replication.js Fri Feb 26 01:11:02 2010
@@ -377,4 +377,81 @@
     T(docFoo666 === null);
   }
 
+  // test filtered replication
+
+  var sourceDb = new CouchDB(
+    "test_suite_filtered_rep_db_a", {"X-Couch-Full-Commit":"false"}
+  );
+
+  sourceDb.deleteDb();
+  sourceDb.createDb();
+
+  T(sourceDb.save({_id:"foo1",value:1}).ok);
+  T(sourceDb.save({_id:"foo2",value:2}).ok);
+  T(sourceDb.save({_id:"foo3",value:3}).ok);
+  T(sourceDb.save({_id:"foo4",value:4}).ok);
+  T(sourceDb.save({
+    "_id": "_design/mydesign",
+    "language" : "javascript",
+    "filters" : {
+      "myfilter" : (function(doc, req) {
+        if (doc.value < Number(req.query.maxvalue)) {
+          return true;
+        } else {
+          return false;
+        }
+      }).toString()
+    }
+  }).ok);
+
+  var dbPairs = [
+    {source:"test_suite_filtered_rep_db_a",
+      target:"test_suite_filtered_rep_db_b"},
+    {source:"test_suite_filtered_rep_db_a",
+      target:"http://" + host + "/test_suite_filtered_rep_db_b"},
+    {source:"http://" + host + "/test_suite_filtered_rep_db_a",
+      target:"test_suite_filtered_rep_db_b"},
+    {source:"http://" + host + "/test_suite_filtered_rep_db_a",
+      target:"http://" + host + "/test_suite_filtered_rep_db_b"}
+  ];
+
+  for (var i = 0; i < dbPairs.length; i++) {
+    var targetDb = new CouchDB("test_suite_filtered_rep_db_b");
+    targetDb.deleteDb();
+    targetDb.createDb();
+
+    var dbA = dbPairs[i].source;
+    var dbB = dbPairs[i].target;
+
+    var repResult = CouchDB.replicate(dbA, dbB, {
+      body: {
+        "filter" : "mydesign/myfilter",
+        "query_params" : {
+          "maxvalue": "3"
+        }
+      }
+    });
+
+    T(repResult.ok);
+    T($.isArray(repResult.history));
+    T(repResult.history.length === 1);
+    T(repResult.history[0].docs_written === 2);
+    T(repResult.history[0].docs_read === 2);
+    T(repResult.history[0].doc_write_failures === 0);
+
+    var docFoo1 = targetDb.open("foo1");
+    T(docFoo1 !== null);
+    T(docFoo1.value === 1);
+
+    var docFoo2 = targetDb.open("foo2");
+    T(docFoo2 !== null);
+    T(docFoo2.value === 2);
+
+    var docFoo3 = targetDb.open("foo3");
+    T(docFoo3 === null);
+
+    var docFoo4 = targetDb.open("foo4");
+    T(docFoo4 === null);
+  }
+
 };

Modified: couchdb/trunk/src/couchdb/Makefile.am
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/Makefile.am?rev=916518&r1=916517&r2=916518&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/Makefile.am (original)
+++ couchdb/trunk/src/couchdb/Makefile.am Fri Feb 26 01:11:02 2010
@@ -30,6 +30,7 @@
     couch.erl \
     couch_app.erl \
     couch_btree.erl \
+    couch_changes.erl \
     couch_config.erl \
     couch_config_writer.erl \
     couch_db.erl \
@@ -86,6 +87,7 @@
     couch.beam \
     couch_app.beam \
     couch_btree.beam \
+    couch_changes.beam \
     couch_config.beam \
     couch_config_writer.beam \
     couch_db.beam \

Modified: couchdb/trunk/src/couchdb/couch_db.hrl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_db.hrl?rev=916518&r1=916517&r2=916518&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_db.hrl (original)
+++ couchdb/trunk/src/couchdb/couch_db.hrl Fri Feb 26 01:11:02 2010
@@ -264,3 +264,15 @@
 
 % small value used in revision trees to indicate the revision isn't stored
 -define(REV_MISSING, []).
+
+-record(changes_args, {
+    feed = "normal",
+    dir = fwd,
+    since = 0,
+    limit = 1000000000000000,
+    style = main_only,
+    heartbeat,
+    timeout,
+    filter = "",
+    include_docs = false
+}).

Modified: couchdb/trunk/src/couchdb/couch_httpd_db.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_httpd_db.erl?rev=916518&r1=916517&r2=916518&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_httpd_db.erl (original)
+++ couchdb/trunk/src/couchdb/couch_httpd_db.erl Fri Feb 26 01:11:02 2010
@@ -55,200 +55,62 @@
         do_db_req(Req, Handler)
     end.
 
-get_changes_timeout(Req, Resp) ->
-    DefaultTimeout = list_to_integer(
-            couch_config:get("httpd", "changes_timeout", "60000")),
-    case couch_httpd:qs_value(Req, "heartbeat") of
-    undefined ->
-        case couch_httpd:qs_value(Req, "timeout") of
-        undefined ->
-            {DefaultTimeout, fun() -> stop end};
-        TimeoutList ->
-            {lists:min([DefaultTimeout, list_to_integer(TimeoutList)]),
-                fun() -> stop end}
-        end;
-    "true" ->
-        {DefaultTimeout, fun() -> send_chunk(Resp, "\n"), ok end};
-    TimeoutList ->
-        {lists:min([DefaultTimeout, list_to_integer(TimeoutList)]),
-            fun() -> send_chunk(Resp, "\n"), ok end}
-    end.
-
-
-start_sending_changes(_Resp, "continuous") ->
-    ok;
-start_sending_changes(Resp, _Else) ->
-    send_chunk(Resp, "{\"results\":[\n").
-
-handle_changes_req(#httpd{method='GET',path_parts=[DbName|_]}=Req, Db) ->
-    FilterFun = make_filter_fun(Req, Db),
-    {ok, Info} = couch_db:get_db_info(Db),
-    Seq = proplists:get_value(update_seq, Info),
-    {Dir, StartSeq} = case couch_httpd:qs_value(Req, "descending", "false") of 
-        "false" -> 
-            {fwd, list_to_integer(couch_httpd:qs_value(Req, "since", "0"))}; 
-        "true" -> 
-            {rev, Seq};
-        _Bad -> throw({bad_request, "descending must be true or false"})
+handle_changes_req(#httpd{method='GET'}=Req, Db) ->
+    MakeCallback = fun(Resp) ->
+        fun({change, Change, _}, "continuous") ->
+            send_chunk(Resp, [?JSON_ENCODE(Change) | "\n"]);
+        ({change, Change, Prepend}, _) ->
+            send_chunk(Resp, [Prepend, ?JSON_ENCODE(Change)]);
+        (start, "continuous") ->
+            ok;
+        (start, _) ->
+            send_chunk(Resp, "{\"results\":[\n");
+        ({stop, EndSeq}, "continuous") ->
+            send_chunk(
+                Resp,
+                [?JSON_ENCODE({[{<<"last_seq">>, EndSeq}]}) | "\n"]
+            ),
+            end_json_response(Resp);
+        ({stop, EndSeq}, _) ->
+            send_chunk(
+                Resp,
+                io_lib:format("\n],\n\"last_seq\":~w}\n", [EndSeq])
+            ),
+            end_json_response(Resp);
+        (timeout, _) ->
+            send_chunk(Resp, "\n")
+        end
     end,
-    Limit = list_to_integer(couch_httpd:qs_value(Req, "limit", "1000000000000000")),
-    ResponseType = couch_httpd:qs_value(Req, "feed", "normal"),
-    if ResponseType == "continuous" orelse ResponseType == "longpoll" ->
-        {ok, Resp} = start_json_response(Req, 200),
-        start_sending_changes(Resp, ResponseType),
-
-        Self = self(),
-        {ok, Notify} = couch_db_update_notifier:start_link(
-            fun({_, DbName0}) when DbName0 == DbName ->
-                Self ! db_updated;
-            (_) ->
-                ok
-            end),
-        {Timeout, TimeoutFun} = get_changes_timeout(Req, Resp),
-        couch_stats_collector:track_process_count(Self,
-                            {httpd, clients_requesting_changes}),
-        try
-            keep_sending_changes(Req, Resp, Db, StartSeq, <<"">>, Timeout,
-                TimeoutFun, ResponseType, Limit, FilterFun)
-        after
-            couch_db_update_notifier:stop(Notify),
-            get_rest_db_updated() % clean out any remaining update messages
-        end;
-    true ->
+    ChangesArgs = parse_changes_query(Req),
+    ChangesFun = couch_changes:handle_changes(ChangesArgs, Req, Db),
+    WrapperFun = case ChangesArgs#changes_args.feed of
+    "normal" ->
+        {ok, Info} = couch_db:get_db_info(Db),
         CurrentEtag = couch_httpd:make_etag(Info),
-        couch_httpd:etag_respond(Req, CurrentEtag, fun() ->
-            % send the etag
-            {ok, Resp} = start_json_response(Req, 200, [{"Etag", CurrentEtag}]),
-            start_sending_changes(Resp, ResponseType),
-            {ok, {_, LastSeq, _Prepend, _, _, _, _, _}} =
-                    send_changes(Req, Resp, Db, Dir, StartSeq, <<"">>, "normal",
-                        Limit, FilterFun),
-            end_sending_changes(Resp, LastSeq, ResponseType)
-        end)
-    end;
+        fun(FeedChangesFun) ->
+            couch_httpd:etag_respond(
+                Req,
+                CurrentEtag,
+                fun() ->
+                    {ok, Resp} = couch_httpd:start_json_response(
+                         Req, 200, [{"Etag", CurrentEtag}]
+                    ),
+                    FeedChangesFun(MakeCallback(Resp))
+                end
+            )
+        end;
+    _ ->
+        % "longpoll" or "continuous"
+        {ok, Resp} = couch_httpd:start_json_response(Req, 200),
+        fun(FeedChangesFun) ->
+            FeedChangesFun(MakeCallback(Resp))
+        end
+    end,
+    WrapperFun(ChangesFun);
 
 handle_changes_req(#httpd{path_parts=[_,<<"_changes">>]}=Req, _Db) ->
     send_method_not_allowed(Req, "GET,HEAD").
 
-% waits for a db_updated msg, if there are multiple msgs, collects them.
-wait_db_updated(Timeout, TimeoutFun) ->
-    receive db_updated -> get_rest_db_updated()
-    after Timeout ->
-        case TimeoutFun() of
-        ok -> wait_db_updated(Timeout, TimeoutFun);
-        stop -> stop
-        end
-    end.
-
-get_rest_db_updated() ->
-    receive db_updated -> get_rest_db_updated()
-    after 0 -> updated
-    end.
-    
-end_sending_changes(Resp, EndSeq, "continuous") ->
-    send_chunk(Resp, [?JSON_ENCODE({[{<<"last_seq">>, EndSeq}]}) | "\n"]),
-    end_json_response(Resp);
-end_sending_changes(Resp, EndSeq, _Else) ->
-    send_chunk(Resp, io_lib:format("\n],\n\"last_seq\":~w}\n", [EndSeq])),
-    end_json_response(Resp).
-
-keep_sending_changes(#httpd{user_ctx=UserCtx,path_parts=[DbName|_]}=Req, Resp,
-        Db, StartSeq, Prepend, Timeout, TimeoutFun, ResponseType, Limit, Filter) ->
-    {ok, {_, EndSeq, Prepend2, _, _, _, NewLimit, _}} = send_changes(Req, Resp, Db, fwd,
StartSeq,
-        Prepend, ResponseType, Limit, Filter),
-    couch_db:close(Db),
-    if
-    Limit > NewLimit, ResponseType == "longpoll" ->
-        end_sending_changes(Resp, EndSeq, ResponseType);
-    true ->
-        case wait_db_updated(Timeout, TimeoutFun) of
-        updated ->
-            case couch_db:open(DbName, [{user_ctx, UserCtx}]) of
-            {ok, Db2} ->
-                keep_sending_changes(Req, Resp, Db2, EndSeq, Prepend2, Timeout,
-                    TimeoutFun, ResponseType, NewLimit, Filter);
-            _Else ->
-                end_sending_changes(Resp, EndSeq, ResponseType)
-            end;
-        stop ->
-            end_sending_changes(Resp, EndSeq, ResponseType)
-        end
-    end.
-
-changes_enumerator(DocInfos, {Db, _, _, FilterFun, Resp, "continuous", Limit, IncludeDocs})
->
-    [#doc_info{id=Id, high_seq=Seq, revs=[#rev_info{deleted=Del,rev=Rev}|_]}|_] = DocInfos,
-    Results0 = FilterFun(DocInfos),
-    Results = [Result || Result <- Results0, Result /= null],
-    Go = if Limit =< 1 -> stop; true -> ok end,
-    case Results of
-    [] ->
-        {Go, {Db, Seq, nil, FilterFun, Resp, "continuous", Limit, IncludeDocs}};
-    _ ->
-        send_chunk(Resp, [?JSON_ENCODE(changes_row(Db, Seq, Id, Del, Results, Rev, IncludeDocs))
-            |"\n"]),
-        {Go, {Db, Seq, nil, FilterFun, Resp, "continuous",  Limit-1, IncludeDocs}}
-    end;
-changes_enumerator(DocInfos, {Db, _, Prepend, FilterFun, Resp, _, Limit, IncludeDocs}) ->
-    [#doc_info{id=Id, high_seq=Seq, revs=[#rev_info{deleted=Del,rev=Rev}|_]}|_] = DocInfos,
-    Results0 = FilterFun(DocInfos),
-    Results = [Result || Result <- Results0, Result /= null],
-    Go = if Limit =< 1 -> stop; true -> ok end,
-    case Results of
-    [] ->
-        {Go, {Db, Seq, Prepend, FilterFun, Resp, nil, Limit, IncludeDocs}};
-    _ ->
-        send_chunk(Resp, [Prepend, ?JSON_ENCODE(
-            changes_row(Db, Seq, Id, Del, Results, Rev, IncludeDocs))]),
-        {Go, {Db, Seq, <<",\n">>, FilterFun, Resp, nil, Limit-1, IncludeDocs}}
-    end.
-
-changes_row(Db, Seq, Id, Del, Results, Rev, true) ->
-    {[{seq,Seq},{id,Id},{changes,Results}] ++ deleted_item(Del) ++
-        couch_httpd_view:doc_member(Db, {Id, Rev})};
-changes_row(_, Seq, Id, Del, Results, _, false) ->
-    {[{seq,Seq},{id,Id},{changes,Results}] ++ deleted_item(Del)}.
-
-deleted_item(true) -> [{deleted,true}];
-deleted_item(_) -> [].
-
-send_changes(Req, Resp, Db, Dir, StartSeq, Prepend, ResponseType, Limit, FilterFun) ->
-    Style = list_to_existing_atom(
-            couch_httpd:qs_value(Req, "style", "main_only")),
-    IncludeDocs = list_to_existing_atom(
-            couch_httpd:qs_value(Req, "include_docs", "false")),
-    couch_db:changes_since(Db, Style, StartSeq, fun changes_enumerator/2, 
-            [{dir, Dir}], {Db, StartSeq, Prepend, FilterFun, Resp, ResponseType, Limit, IncludeDocs}).
-
-make_filter_fun(Req, Db) ->
-    Filter = couch_httpd:qs_value(Req, "filter", ""),
-    case [list_to_binary(couch_httpd:unquote(Part))
-            || Part <- string:tokens(Filter, "/")] of
-    [] ->
-        fun(DocInfos) ->
-        % doing this as a batch is more efficient for external filters
-            [{[{rev, couch_doc:rev_to_str(Rev)}]} ||
-                #doc_info{revs=[#rev_info{rev=Rev}|_]} <- DocInfos]
-        end;
-    [DName, FName] ->
-        DesignId = <<"_design/", DName/binary>>,
-        DDoc = couch_httpd_db:couch_doc_open(Db, DesignId, nil, []),
-        % validate that the ddoc has the filter fun
-        #doc{body={Props}} = DDoc,
-        couch_util:get_nested_json_value({Props}, [<<"filters">>, FName]),
-        fun(DocInfos) ->
-            Docs = [Doc || {ok, Doc} <- [
-                {ok, _Doc} = couch_db:open_doc(Db, DInfo, [deleted, conflicts])
-                || DInfo <- DocInfos]],
-            {ok, Passes} = couch_query_servers:filter_docs(Req, Db, DDoc, FName, Docs),
-            [{[{rev, couch_doc:rev_to_str(Rev)}]}
-                || #doc_info{revs=[#rev_info{rev=Rev}|_]} <- DocInfos, 
-                Pass <- Passes, Pass == true]
-        end;
-    _Else ->
-        throw({bad_request, 
-            "filter parameter must be of the form `designname/filtername`"})
-    end.  
-
 handle_compact_req(#httpd{method='POST',path_parts=[DbName,_,Id|_]}=Req, _Db) ->
     ok = couch_view_compactor:start_compact(DbName, Id),
     send_json(Req, 202, {[{ok, true}]});
@@ -1188,6 +1050,33 @@
         end
     end, #doc_query_args{}, couch_httpd:qs(Req)).
 
+parse_changes_query(Req) ->
+    lists:foldl(fun({Key, Value}, Args) ->
+        case {Key, Value} of
+        {"feed", _} ->
+            Args#changes_args{feed=Value};
+        {"descending", "true"} ->
+            Args#changes_args{dir=rev};
+        {"since", _} ->
+            Args#changes_args{since=list_to_integer(Value)};
+        {"limit", _} ->
+            Args#changes_args{limit=list_to_integer(Value)};
+        {"style", _} ->
+            Args#changes_args{style=list_to_existing_atom(Value)};
+        {"heartbeat", "true"} ->
+            Args#changes_args{heartbeat=true};
+        {"heartbeat", _} ->
+            Args#changes_args{heartbeat=list_to_integer(Value)};
+        {"timeout", _} ->
+            Args#changes_args{timeout=list_to_integer(Value)};
+        {"include_docs", "true"} ->
+            Args#changes_args{include_docs=true};
+        {"filter", _} ->
+            Args#changes_args{filter=Value};
+        _Else -> % unknown key value pair, ignore.
+            Args
+        end
+    end, #changes_args{}, couch_httpd:qs(Req)).
 
 extract_header_rev(Req, ExplicitRev) when is_binary(ExplicitRev) or is_list(ExplicitRev)->
     extract_header_rev(Req, couch_doc:parse_rev(ExplicitRev));

Modified: couchdb/trunk/src/couchdb/couch_query_servers.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_query_servers.erl?rev=916518&r1=916517&r2=916518&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_query_servers.erl (original)
+++ couchdb/trunk/src/couchdb/couch_query_servers.erl Fri Feb 26 01:11:02 2010
@@ -183,7 +183,12 @@
     couch_doc:to_json_obj(Doc, [revs]).
 
 filter_docs(Req, Db, DDoc, FName, Docs) ->
-    JsonReq = couch_httpd_external:json_req_obj(Req, Db),
+    JsonReq = case Req of
+    {json_req, JsonObj} ->
+        JsonObj;
+    #httpd{} = HttpReq ->
+        couch_httpd_external:json_req_obj(HttpReq, Db)
+    end,
     JsonDocs = [couch_doc:to_json_obj(Doc, [revs]) || Doc <- Docs],
     JsonCtx = couch_util:json_user_ctx(Db),
     [true, Passes] = ddoc_prompt(DDoc, [<<"filters">>, FName], [JsonDocs, JsonReq,
JsonCtx]),

Modified: couchdb/trunk/src/couchdb/couch_rep.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_rep.erl?rev=916518&r1=916517&r2=916518&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_rep.erl (original)
+++ couchdb/trunk/src/couchdb/couch_rep.erl Fri Feb 26 01:11:02 2010
@@ -451,7 +451,11 @@
     % Port = mochiweb_socket_server:get(couch_httpd, port),
     Src = get_rep_endpoint(UserCtx, proplists:get_value(<<"source">>, Props)),
     Tgt = get_rep_endpoint(UserCtx, proplists:get_value(<<"target">>, Props)),
   
-    Base = couch_util:to_hex(erlang:md5(term_to_binary([HostName, Src, Tgt]))),
+    Filter = proplists:get_value(<<"filter">>, Props),
+    QueryParams = proplists:get_value(<<"query_params">>, Props),
+    Base = couch_util:to_hex(erlang:md5(
+        term_to_binary([HostName, Src, Tgt, Filter, QueryParams])
+    )),
     Extension = maybe_append_options(
         [<<"continuous">>, <<"create_target">>], Props),
     {Base, Extension}.

Modified: couchdb/trunk/src/couchdb/couch_rep_changes_feed.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_rep_changes_feed.erl?rev=916518&r1=916517&r2=916518&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_rep_changes_feed.erl (original)
+++ couchdb/trunk/src/couchdb/couch_rep_changes_feed.erl Fri Feb 26 01:11:02 2010
@@ -53,11 +53,35 @@
     true ->
         continuous
     end,
+    BaseQS = [
+        {"style", all_docs},
+        {"heartbeat", 10000},
+        {"since", Since},
+        {"feed", Feed}
+    ],
+    QS = case proplists:get_value(<<"filter">>, PostProps) of
+    undefined ->
+        BaseQS;
+    FilterName ->
+        {Params} = proplists:get_value(<<"query_params">>, PostProps, {[]}),
+        lists:foldr(
+            fun({K, V}, QSAcc) ->
+                Ks = couch_util:to_list(K),
+                case proplists:is_defined(Ks, QSAcc) of
+                true ->
+                    QSAcc;
+                false ->
+                    [{Ks, V} | QSAcc]
+                end
+            end,
+            [{"filter", FilterName} | BaseQS],
+            Params
+        )
+    end,
     Pid = couch_rep_httpc:spawn_link_worker_process(Source),
     Req = Source#http_db{
         resource = "_changes",
-        qs = [{style, all_docs}, {heartbeat, 10000}, {since, Since},
-            {feed, Feed}],
+        qs = QS,
         conn = Pid,
         options = [{stream_to, {self(), once}}, {response_format, binary},
             {inactivity_timeout, 31000}], % miss 3 heartbeats, assume death
@@ -94,20 +118,54 @@
 init([_Parent, Source, Since, PostProps] = InitArgs) ->
     process_flag(trap_exit, true),
     Server = self(),
-    ChangesPid =
-    case proplists:get_value(<<"continuous">>, PostProps, false) of
-    false ->
-        spawn_link(fun() -> send_local_changes_once(Server, Source, Since) end);
-    true ->
-        spawn_link(fun() ->
-            Self = self(),
-            {ok, _} = couch_db_update_notifier:start_link(fun(Msg) ->
-                local_update_notification(Self, Source#db.name, Msg) end),
-            send_local_changes_forever(Server, Source, Since)
+    ChangesArgs = #changes_args{
+        style = all_docs,
+        since = Since,
+        filter = ?b2l(proplists:get_value(<<"filter">>, PostProps, <<>>)),
+        feed = case proplists:get_value(<<"continuous">>, PostProps, false) of
+            true ->
+                "continuous";
+            false ->
+                "normal"
+        end
+    },
+    ChangesPid = spawn_link(fun() ->
+        ChangesFeedFun = couch_changes:handle_changes(
+            ChangesArgs,
+            {json_req, filter_json_req(Source, PostProps)},
+            Source
+        ),
+        ChangesFeedFun(fun({change, Change, _}, _) ->
+                gen_server:call(Server, {add_change, Change}, infinity);
+            (_, _) ->
+                ok
         end)
-    end,
+    end),
     {ok, #state{changes_loop=ChangesPid, init_args=InitArgs}}.
 
+filter_json_req(Db, PostProps) ->
+    case proplists:get_value(<<"filter">>, PostProps) of
+    undefined ->
+        {[]};
+    FilterName ->
+        {Query} = proplists:get_value(<<"query_params">>, PostProps, {[]}),
+        {ok, Info} = couch_db:get_db_info(Db),
+        % simulate a request to db_name/_changes
+        {[
+            {<<"info">>, {Info}},
+            {<<"id">>, null},
+            {<<"method">>, 'GET'},
+            {<<"path">>, [couch_db:name(Db), <<"_changes">>]},
+            {<<"query">>, {[{<<"filter">>, FilterName} | Query]}},
+            {<<"headers">>, []},
+            {<<"body">>, []},
+            {<<"peer">>, <<"replicator">>},
+            {<<"form">>, []},
+            {<<"cookie">>, []},
+            {<<"userCtx">>, couch_util:json_user_ctx(Db)}
+       ]}
+    end.
+
 handle_call({add_change, Row}, From, State) ->
     handle_add_change(Row, From, State);
 
@@ -302,23 +360,7 @@
 decode_row(<<",", Rest/binary>>) ->
     decode_row(Rest);
 decode_row(Row) ->
-    {Props} = ?JSON_DECODE(Row),
-    % [Seq, Id, {<<"changes">>,C}]
-    Seq = proplists:get_value(<<"seq">>, Props),
-    Id = proplists:get_value(<<"id">>, Props),
-    C = proplists:get_value(<<"changes">>, Props),
-    C2 = [{[{<<"rev">>,couch_doc:parse_rev(R)}]} || {[{<<"rev">>,R}]}
<- C],
-    {[{<<"seq">>, Seq}, {<<"id">>,Id}, {<<"changes">>,C2}]}.
-
-flush_updated_messages() ->
-    receive updated -> flush_updated_messages()
-    after 0 -> ok
-    end.
-
-local_update_notification(Self, DbName, {updated, DbName}) ->
-    Self ! updated;
-local_update_notification(_, _, _) ->
-    ok.
+    ?JSON_DECODE(Row).
 
 maybe_stream_next(#state{reqid=nil}) ->
     ok;
@@ -327,35 +369,6 @@
 maybe_stream_next(_) ->
     ok.
 
-send_local_changes_forever(Server, Db, Since) ->
-    #db{name = DbName, user_ctx = UserCtx} = Db,
-    {ok, NewSeq} = send_local_changes_once(Server, Db, Since),
-    couch_db:close(Db),
-    ok = wait_db_updated(),
-    {ok, NewDb} = couch_db:open(DbName, [{user_ctx, UserCtx}]),
-    send_local_changes_forever(Server, NewDb, NewSeq).
-
-send_local_changes_once(Server, Db, Since) ->
-    FilterFun =
-    fun(#doc_info{revs=[#rev_info{rev=Rev}|_]}) ->
-        {[{<<"rev">>, Rev}]}
-    end,
-
-    ChangesFun =
-    fun([#doc_info{id=Id, high_seq=Seq}|_]=DocInfos, _) ->
-        Results0 = [FilterFun(DocInfo) || DocInfo <- DocInfos],
-        Results = [Result || Result <- Results0, Result /= null],
-        if Results /= [] ->
-            Change = {[{<<"seq">>,Seq}, {<<"id">>,Id}, {<<"changes">>,Results}]},
-            gen_server:call(Server, {add_change, Change}, infinity);
-        true ->
-            ok
-        end,
-        {ok, Seq}
-    end,
-
-    couch_db:changes_since(Db, all_docs, Since, ChangesFun, Since).
-
 start_http_request(RawUrl) ->
     Url = ibrowse_lib:parse_url(RawUrl),
     {ok, Pid} = ibrowse:spawn_link_worker_process(Url#url.host, Url#url.port),
@@ -367,8 +380,3 @@
     {ibrowse_req_id, Id} = 
         ibrowse:send_req_direct(Pid, RawUrl, [], get, [], Opts, infinity),
     {Pid, Id}.
-
-wait_db_updated() ->
-    receive updated ->
-        flush_updated_messages()
-    end.

Modified: couchdb/trunk/src/couchdb/couch_rep_missing_revs.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_rep_missing_revs.erl?rev=916518&r1=916517&r2=916518&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_rep_missing_revs.erl (original)
+++ couchdb/trunk/src/couchdb/couch_rep_missing_revs.erl Fri Feb 26 01:11:02 2010
@@ -144,11 +144,15 @@
     changes_loop(OurServer, SourceChangesServer, Target).
 
 get_missing_revs(#http_db{}=Target, Changes) ->
-    Transform = fun({[{<<"seq">>,_}, {<<"id">>,Id}, {<<"changes">>,C}]})
->
-        {Id, [couch_doc:rev_to_str(R) || {[{<<"rev">>, R}]} <- C]} end,
+    Transform = fun({Props}) ->
+        C = proplists:get_value(<<"changes">>, Props),
+        Id = proplists:get_value(<<"id">>, Props),
+        {Id, [R || {[{<<"rev">>, R}]} <- C]}
+    end,
     IdRevsList = [Transform(Change) || Change <- Changes],
     SeqDict = changes_dictionary(Changes),
-    {[{<<"seq">>, HighSeq}, _, _]} = lists:last(Changes),
+    {LastProps} = lists:last(Changes),
+    HighSeq = proplists:get_value(<<"seq">>, LastProps),
     Request = Target#http_db{
         resource = "_missing_revs",
         method = post,
@@ -165,11 +169,15 @@
     end;
 
 get_missing_revs(Target, Changes) ->
-    Transform = fun({[{<<"seq">>,_}, {<<"id">>,Id}, {<<"changes">>,C}]})
->
-        {Id, [R || {[{<<"rev">>, R}]} <- C]} end,
+    Transform = fun({Props}) ->
+        C = proplists:get_value(<<"changes">>, Props),
+        Id = proplists:get_value(<<"id">>, Props),
+        {Id, [couch_doc:parse_rev(R) || {[{<<"rev">>, R}]} <- C]}
+    end,
     IdRevsList = [Transform(Change) || Change <- Changes],
     SeqDict = changes_dictionary(Changes),
-    {[{<<"seq">>, HighSeq}, _, _]} = lists:last(Changes),
+    {LastProps} = lists:last(Changes),
+    HighSeq = proplists:get_value(<<"seq">>, LastProps),
     {ok, Results} = couch_db:get_missing_revs(Target, IdRevsList),
     {HighSeq, [{Id, dict:fetch(Id, SeqDict), Revs} || {Id, Revs, _} <- Results]}.
 

Modified: couchdb/trunk/test/etap/111-replication-changes-feed.t
URL: http://svn.apache.org/viewvc/couchdb/trunk/test/etap/111-replication-changes-feed.t?rev=916518&r1=916517&r2=916518&view=diff
==============================================================================
--- couchdb/trunk/test/etap/111-replication-changes-feed.t (original)
+++ couchdb/trunk/test/etap/111-replication-changes-feed.t Fri Feb 26 01:11:02 2010
@@ -157,7 +157,7 @@
     [Win, {[{<<"rev">>, Lose}]}] = proplists:get_value(<<"changes">>,
ExpectProps),
     Doc = couch_doc:from_json_obj({[
         {<<"_id">>, Id},
-        {<<"_rev">>, couch_doc:rev_to_str(Lose)},
+        {<<"_rev">>, Lose},
         {<<"_deleted">>, true}
     ]}),
     Db = get_db(),
@@ -167,7 +167,7 @@
     Expect = {[
         {<<"seq">>, get_update_seq()},
         {<<"id">>, Id},
-        {<<"changes">>, [Win, {[{<<"rev">>, Rev}]}]}
+        {<<"changes">>, [Win, {[{<<"rev">>, couch_doc:rev_to_str(Rev)}]}]}
     ]},
     
     {ok, Pid} = start_changes_feed(Type, Since, false),
@@ -210,7 +210,7 @@
     {[
         {<<"seq">>, get_update_seq()},
         {<<"id">>, Id},
-        {<<"changes">>, [{[{<<"rev">>, Rev}]}]}
+        {<<"changes">>, [{[{<<"rev">>, couch_doc:rev_to_str(Rev)}]}]}
     ]}.
 
 generate_conflict() ->
@@ -220,9 +220,9 @@
     Doc2 = (couch_doc:from_json_obj({[<<"foo">>, <<"baz">>]}))#doc{id
= Id},
     {ok, Rev1} = couch_db:update_doc(Db, Doc1, [full_commit]),
     {ok, Rev2} = couch_db:update_doc(Db, Doc2, [full_commit, all_or_nothing]),
-    
+
     %% relies on undocumented CouchDB conflict winner algo and revision sorting!
-    RevList = [{[{<<"rev">>, R}]} || R
+    RevList = [{[{<<"rev">>, couch_doc:rev_to_str(R)}]} || R
         <- lists:sort(fun(A,B) -> B<A end, [Rev1,Rev2])],
     {[
         {<<"seq">>, get_update_seq()},



Mime
View raw message