couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fdman...@apache.org
Subject svn commit: r965581 - in /couchdb/branches/new_replicator: share/www/script/test/new_replication.js src/couchdb/couch_api_wrap.erl src/couchdb/couch_httpd_rep.erl src/couchdb/couch_replicate.erl
Date Mon, 19 Jul 2010 18:48:07 GMT
Author: fdmanana
Date: Mon Jul 19 18:48:07 2010
New Revision: 965581

URL: http://svn.apache.org/viewvc?rev=965581&view=rev
Log:
Added support for replication by doc IDs.

Modified:
    couchdb/branches/new_replicator/share/www/script/test/new_replication.js
    couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl
    couchdb/branches/new_replicator/src/couchdb/couch_httpd_rep.erl
    couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl

Modified: couchdb/branches/new_replicator/share/www/script/test/new_replication.js
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/share/www/script/test/new_replication.js?rev=965581&r1=965580&r2=965581&view=diff
==============================================================================
--- couchdb/branches/new_replicator/share/www/script/test/new_replication.js (original)
+++ couchdb/branches/new_replicator/share/www/script/test/new_replication.js Mon Jul 19 18:48:07
2010
@@ -40,7 +40,7 @@ couchTests.new_replication = function(de
   var sourceInfo, targetInfo;
   var docs, doc, copy;
   var repResult;
-  var i, j;
+  var i, j, k;
 
 
   function populateDb(db, docs, dontRecreateDb) {
@@ -413,6 +413,191 @@ couchTests.new_replication = function(de
   }
 
 
+  // test replication by doc IDs
+  docs = makeDocs(1, 11);
+  docs.push({
+    _id: "_design/foo",
+    language: "javascript",
+    integer: 1
+  });
+
+  var target_doc_ids = [
+    { initial: ["1", "2", "10"], after: [], conflict_id: "2" },
+    { initial: ["1", "2"], after: ["7"], conflict_id: "1" },
+    { initial: ["1", "foo_666", "10"], after: ["7"], conflict_id: "10" },
+    { initial: ["_design/foo", "8"], after: ["foo_5"], conflict_id: "8" },
+    { initial: [], after: ["foo_1000", "_design/foo", "1"], conflict_id: "1" }
+  ];
+  var doc_ids, after_doc_ids;
+  var id, num_inexistent_docs, after_num_inexistent_docs;
+  var total, after_total;
+
+  for (i = 0; i < dbPairs.length; i++) {
+
+    for (j = 0; j < target_doc_ids.length; j++) {
+      doc_ids = target_doc_ids[j].initial;
+      num_inexistent_docs = 0;
+
+      for (k = 0; k < doc_ids.length; k++) {
+        id = doc_ids[k];
+        if (id.indexOf("foo_") === 0) {
+          num_inexistent_docs += 1;
+        }
+      }
+
+      populateDb(sourceDb, docs);
+      populateDb(targetDb, []);
+
+      repResult = CouchDB.new_replicate(
+        dbPairs[i].source,
+        dbPairs[i].target,
+        {
+          body: {
+            doc_ids: doc_ids
+          }
+        }
+      );
+
+      total = doc_ids.length - num_inexistent_docs;
+      T(repResult.ok === true);
+      T(typeof repResult.start_time === "string");
+      T(typeof repResult.end_time === "string");
+      T(repResult.docs_read === total);
+      T(repResult.docs_written === total);
+      T(repResult.doc_write_failures === 0);
+
+      targetInfo = targetDb.info();
+      T(targetInfo.doc_count === total);
+
+      for (k = 0; k < doc_ids.length; k++) {
+        id = doc_ids[k];
+        doc = sourceDb.open(id);
+        copy = targetDb.open(id);
+
+        if (id.indexOf("foo_") === 0) {
+          T(doc === null);
+          T(copy === null);
+        } else {
+          T(doc !== null);
+          T(copy !== null);
+          for (var p in doc) {
+            T(copy[p] === doc[p]);
+          }
+        }
+      }
+
+      // add more docs throught replication by doc IDs
+      after_doc_ids = target_doc_ids[j].after;
+      after_num_inexistent_docs = 0;
+
+      for (k = 0; k < after_doc_ids.length; k++) {
+        id = after_doc_ids[k];
+        if (id.indexOf("foo_") === 0) {
+          after_num_inexistent_docs += 1;
+        }
+      }
+
+      repResult = CouchDB.new_replicate(
+        dbPairs[i].source,
+        dbPairs[i].target,
+        {
+          body: {
+            doc_ids: after_doc_ids
+          }
+        }
+      );
+
+      after_total = after_doc_ids.length - after_num_inexistent_docs;
+      T(repResult.ok === true);
+      T(typeof repResult.start_time === "string");
+      T(typeof repResult.end_time === "string");
+      T(repResult.docs_read === after_total);
+      T(repResult.docs_written === after_total);
+      T(repResult.doc_write_failures === 0);
+
+      targetInfo = targetDb.info();
+      T(targetInfo.doc_count === (total + after_total));
+
+      for (k = 0; k < after_doc_ids.length; k++) {
+        id = after_doc_ids[k];
+        doc = sourceDb.open(id);
+        copy = targetDb.open(id);
+
+        if (id.indexOf("foo_") === 0) {
+          T(doc === null);
+          T(copy === null);
+        } else {
+          T(doc !== null);
+          T(copy !== null);
+          for (var p in doc) {
+            T(copy[p] === doc[p]);
+          }
+        }
+      }
+
+      // replicate again the same doc after updated on source (no conflict)
+      id = target_doc_ids[j].conflict_id;
+      doc = sourceDb.open(id);
+      T(doc !== null);
+      doc.integer += 100;
+      T(sourceDb.save(doc).ok);
+
+      repResult = CouchDB.new_replicate(
+        dbPairs[i].source,
+        dbPairs[i].target,
+        {
+          body: {
+            doc_ids: [id]
+          }
+        }
+      );
+
+      T(repResult.ok === true);
+      T(repResult.docs_read === 1);
+      T(repResult.docs_written === 1);
+      T(repResult.doc_write_failures === 0);
+
+      copy = targetDb.open(id, {conflicts: true});
+
+      T(copy._rev.indexOf("2-") === 0);
+      T(typeof copy._conflicts === "undefined");
+
+      // generate a conflict throught replication by doc IDs
+      id = target_doc_ids[j].conflict_id;
+      doc = sourceDb.open(id);
+      copy = targetDb.open(id);
+      T(doc !== null);
+      T(copy !== null);
+      doc.integer += 100;
+      copy.integer += 1;
+      T(sourceDb.save(doc).ok);
+      T(targetDb.save(copy).ok);
+
+      repResult = CouchDB.new_replicate(
+        dbPairs[i].source,
+        dbPairs[i].target,
+        {
+          body: {
+            doc_ids: [id]
+          }
+        }
+      );
+
+      T(repResult.ok === true);
+      T(repResult.docs_read === 1);
+      T(repResult.docs_written === 1);
+      T(repResult.doc_write_failures === 0);
+
+      copy = targetDb.open(id, {conflicts: true});
+
+      T(copy._rev.indexOf("3-") === 0);
+      T(copy._conflicts instanceof Array);
+      T(copy._conflicts.length === 1);
+      T(copy._conflicts[0].indexOf("3-") === 0);
+    }
+  }
+
+
   // cleanup
   sourceDb.deleteDb();
   targetDb.deleteDb();

Modified: couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl?rev=965581&r1=965580&r2=965581&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl Mon Jul 19 18:48:07 2010
@@ -41,6 +41,7 @@
     update_doc/3,
     ensure_full_commit/1,
     get_missing_revs/2,
+    open_doc/4,
     open_doc_revs/6,
     update_doc/4,
     changes_since/6
@@ -203,6 +204,66 @@ open_doc_revs(Db, Id, Revs, Options, Fun
     {ok, lists:foldl(Fun, Acc, Results)}.
 
 
+open_doc(#httpdb{} = HttpDb, Id, Options, Fun) ->
+    #httpdb{url=Url, oauth=OAuth, headers=Headers} = HttpDb,
+    QArgs = [
+        {"attachments", "true"},
+        {"revs", "true"} |
+        options_to_query_args(Options, [])
+    ],
+    IdEncoded = case Id of
+    <<"_design/", RestId/binary>> ->
+        "_design/" ++ couch_util:url_encode(RestId);
+    _ ->
+        couch_util:url_encode(Id)
+    end,
+    Headers2 = oauth_header(Url ++ IdEncoded, QArgs, get, OAuth) ++
+        [{"accept", "application/json, multipart/related"} | Headers],
+    Self = self(),
+    Streamer = spawn_link(fun() ->
+            FullUrl = Url ++ IdEncoded ++ query_args_to_string(QArgs, []),
+            #url{host=Host, port=Port} = ibrowse_lib:parse_url(Url),
+            {ok, Worker} = ibrowse:spawn_link_worker_process(Host, Port),
+            {ibrowse_req_id, ReqId} = ibrowse:send_req_direct(Worker, FullUrl,
+                Headers2, get, [],
+                [{response_format, binary}, {stream_to, {self(), once}}],
+                infinity),
+
+            DataFun = fun() -> stream_data_self(ReqId) end,
+            receive
+            {ibrowse_async_headers, ReqId, "200", RespHeaders} ->
+                case couch_util:get_value("Content-Type", RespHeaders) of
+                ("multipart/related;" ++ _) = CType ->
+                     {ok, Doc} = couch_doc:doc_from_multi_part_stream(
+                         CType, DataFun);
+                "application/json" ->
+                     Doc = couch_doc:from_json_obj(
+                         json_stream_parse:to_ejson(DataFun))
+                end,
+                receive
+                {get_doc, From} ->
+                    From ! {doc, Doc}
+                end;
+            {ibrowse_async_headers, ReqId, _ErrorCode, _RespHeaders} ->
+                receive
+                {get_doc, From} ->
+                    From ! {error, json_stream_parse:to_ejson(DataFun)}
+                end
+            end,
+            catch ibrowse:stop_worker_process(Worker),
+            unlink(Self)
+        end),
+    Streamer ! {get_doc, self()},
+    receive
+    {doc, Doc} ->
+        Fun({ok, Doc});
+    {error, Error} ->
+        Fun(Error)
+    end;
+open_doc(Db, Id, Options, Fun) ->
+    Fun(couch_db:open_doc(Db, Id, Options)).
+
+
 update_doc(#httpdb{} = HttpDb, Doc, Options, Type) ->
     #httpdb{url=Url, headers=Headers, oauth=OAuth} = HttpDb,
     QArgs = case Type of

Modified: couchdb/branches/new_replicator/src/couchdb/couch_httpd_rep.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_httpd_rep.erl?rev=965581&r1=965580&r2=965581&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_httpd_rep.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_httpd_rep.erl Mon Jul 19 18:48:07 2010
@@ -108,6 +108,8 @@ convert_options([{<<"filter">>, V} | R])
     [{filter, V} | convert_options(R)];
 convert_options([{<<"query_params">>, V} | R]) ->
     [{query_params, V} | convert_options(R)];
+convert_options([{<<"doc_ids">>, V} | R]) ->
+    [{doc_ids, V} | convert_options(R)];
 convert_options([_ | R]) -> % skip unknown option
     convert_options(R).
 

Modified: couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl?rev=965581&r1=965580&r2=965581&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl Mon Jul 19 18:48:07 2010
@@ -52,20 +52,30 @@ start(Src, Tgt, Options, UserCtx) ->
     % for incremental replication.
     #rep_state{source=Source, target=Target, start_seq=StartSeq} = State =
             init_state(Src, Tgt, Options, UserCtx), 
-    
-    % Create the work queues
-    {ok, ChangesQueue} = couch_work_queue:new(100000, 500),
+
     {ok, MissingRevsQueue} = couch_work_queue:new(100000, 500),
-    
-    % this is starts the _changes reader process. It adds the changes from
-    % the source db to the ChangesQueue.
-    spawn_changes_reader(self(), StartSeq, Source, ChangesQueue, Options),
-    
-    % this starts the missing revs finder, it checks the target for changes
-    % in the ChangesQueue to see if they exist on the target or not. If not, 
-    % adds them to MissingRevsQueue.
-    spawn_missing_revs_finder(self(), Target, ChangesQueue, MissingRevsQueue),
-    
+
+    case couch_util:get_value(doc_ids, Options) of
+    undefined ->
+        {ok, ChangesQueue} = couch_work_queue:new(100000, 500),
+
+        % this is starts the _changes reader process. It adds the changes from
+        % the source db to the ChangesQueue.
+        spawn_changes_reader(self(), StartSeq, Source, ChangesQueue, Options),
+
+        % this starts the missing revs finder, it checks the target for changes
+        % in the ChangesQueue to see if they exist on the target or not. If not,
+        % adds them to MissingRevsQueue.
+        spawn_missing_revs_finder(self(), Target, ChangesQueue,
+            MissingRevsQueue);
+    DocIds ->
+        lists:foreach(
+            fun(DocId) ->
+                ok = couch_work_queue:queue(MissingRevsQueue, {doc_id, DocId})
+            end, DocIds),
+        couch_work_queue:close(MissingRevsQueue)
+    end,
+
     % This starts the doc copy process. It gets the documents from the
     % MissingRevsQueue, copying them from the source to the target database.
     spawn_doc_copy(self(), Source, Target, MissingRevsQueue),
@@ -73,13 +83,27 @@ start(Src, Tgt, Options, UserCtx) ->
     % This is the checkpoint loop, it updates the replication record in the
     % database every X seconds, so that if the replication is interuppted,
     % it can restart near where it left off.
-    {ok, State2, _Stats} = checkpoint_loop(State, gb_trees:from_orddict([]),
+    {ok, State2, Stats} = checkpoint_loop(State, gb_trees:from_orddict([]),
             #stats{}),
     couch_api_wrap:db_close(Source),        
     couch_api_wrap:db_close(Target),
-    {ok, State2#rep_state.checkpoint_history}.
+    {ok, get_result(State2, Stats, Options)}.
 
 
+get_result(State, Stats, Options) ->
+    case couch_util:get_value(doc_ids, Options) of
+    undefined ->
+        State#rep_state.checkpoint_history;
+    _DocIdList ->
+        {[
+            {<<"start_time">>, ?l2b(State#rep_state.rep_starttime)},
+            {<<"end_time">>, ?l2b(httpd_util:rfc1123_date())},
+            {<<"docs_read">>, Stats#stats.docs_read},
+            {<<"docs_written">>, Stats#stats.docs_written},
+            {<<"doc_write_failures">>, Stats#stats.doc_write_failures}
+        ]}
+    end.
+
 
 init_state(Src,Tgt,Options,UserCtx)->    
     {ok, Source} = couch_api_wrap:db_open(Src, [{user_ctx, UserCtx}]),
@@ -117,8 +141,15 @@ init_state(Src,Tgt,Options,UserCtx)->   
         src_starttime = couch_util:get_value(instance_start_time, SourceInfo),
         tgt_starttime = couch_util:get_value(instance_start_time, TargetInfo)
     },
-    State#rep_state{timer = erlang:start_timer(checkpoint_interval(State),
-            self(), timed_checkpoint)}.
+    State#rep_state{
+        timer = case couch_util:get_value(doc_ids, Options) of
+        undefined ->
+            erlang:start_timer(checkpoint_interval(State),
+                self(), timed_checkpoint);
+        _DocIdList ->
+            nil
+        end
+    }.
 
 
 spawn_changes_reader(Cp, StartSeq, Source, ChangesQueue, Options) ->
@@ -209,27 +240,31 @@ doc_copy_loop(Cp, Source, Target, Missin
     case couch_work_queue:dequeue(MissingRevsQueue,1) of
     closed ->
         Cp ! done;
+    {ok, [{doc_id, Id}]} ->
+        couch_api_wrap:open_doc(
+            Source, Id, [], fun(R) -> doc_handler(R, Target, Cp) end),
+        doc_copy_loop(Cp, Source, Target, MissingRevsQueue);
     {ok, [{Id, Revs, PossibleAncestors, Seq}]} ->
-        DocFun = fun({ok, Doc}, _) ->
-            % we are called for every rev read on the source
-            Cp ! {add_stat, {#stats.docs_read, 1}},
-            % now write the doc to the target.
-            case couch_api_wrap:update_doc(Target, Doc, [],
-                replicated_changes) of
-            {ok, _} ->
-                Cp ! {add_stat, {#stats.docs_written, 1}};
-            _Error ->
-                Cp ! {add_stat, {#stats.doc_write_failures, 1}}
-            end;
-        (_, _) ->
-            ok
-        end,
-        couch_api_wrap:open_doc_revs(Source, Id, Revs,
-            [{atts_since, PossibleAncestors}], DocFun, []),
+        couch_api_wrap:open_doc_revs(
+            Source, Id, Revs, [{atts_since, PossibleAncestors}],
+            fun(R, _) -> doc_handler(R, Target, Cp) end, []),
         Cp ! {seq_changes_done, {Seq, length(Revs)}},
         doc_copy_loop(Cp, Source, Target, MissingRevsQueue)
     end.
 
+doc_handler({ok, Doc}, Target, Cp) ->
+    % we are called for every rev read on the source
+    Cp ! {add_stat, {#stats.docs_read, 1}},
+    % now write the doc to the target.
+    case couch_api_wrap:update_doc(Target, Doc, [], replicated_changes) of
+    {ok, _} ->
+        Cp ! {add_stat, {#stats.docs_written, 1}};
+    _Error ->
+        Cp ! {add_stat, {#stats.doc_write_failures, 1}}
+    end;
+doc_handler(_, _, _) ->
+    ok.
+
 checkpoint_loop(State, SeqsInProgress, Stats) ->
     % SeqsInProgress contains the number of revs for each seq found by the
     % changes process.
@@ -270,7 +305,7 @@ checkpoint_loop(State, SeqsInProgress, S
         % Assert that all the seqs have been processed
         0 = gb_trees:size(SeqsInProgress),
         State2 = do_checkpoint(State, Stats),
-        erlang:cancel_timer(State2#rep_state.timer),
+        cancel_timer(State2),
         receive timed_checkpoint -> ok
         after 0 -> ok
         end,
@@ -283,6 +318,11 @@ checkpoint_loop(State, SeqsInProgress, S
         checkpoint_loop(State2#rep_state{timer=Timer}, SeqsInProgress, Stats)
     end.
 
+cancel_timer(#rep_state{timer = nil}) ->
+    ok;
+cancel_timer(#rep_state{timer = Timer}) ->
+    erlang:cancel_timer(Timer).
+
 
 checkpoint_interval(_State) ->
     5000.



Mime
View raw message