couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kxe...@apache.org
Subject [43/50] couchdb commit: updated refs/heads/1963-eunit to bfb7eb9
Date Mon, 16 Jun 2014 22:53:24 GMT
Port couch_replicator/03-replication-compact.t etap test suite to eunit

Split big test fun into smaller steps, optimize timeouts.


Project: http://git-wip-us.apache.org/repos/asf/couchdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb/commit/3fc6facb
Tree: http://git-wip-us.apache.org/repos/asf/couchdb/tree/3fc6facb
Diff: http://git-wip-us.apache.org/repos/asf/couchdb/diff/3fc6facb

Branch: refs/heads/1963-eunit
Commit: 3fc6facbf62472b666bd955084f9cbdfe398ffa4
Parents: 182fc07
Author: Alexander Shorin <kxepal@apache.org>
Authored: Mon Jun 16 00:29:29 2014 +0400
Committer: Alexander Shorin <kxepal@apache.org>
Committed: Tue Jun 17 02:50:25 2014 +0400

----------------------------------------------------------------------
 src/couch_replicator/Makefile.am                |   2 +-
 .../test/03-replication-compact.t               | 488 -------------------
 .../test/couch_replicator_compact_tests.erl     | 448 +++++++++++++++++
 3 files changed, 449 insertions(+), 489 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb/blob/3fc6facb/src/couch_replicator/Makefile.am
----------------------------------------------------------------------
diff --git a/src/couch_replicator/Makefile.am b/src/couch_replicator/Makefile.am
index 522716c..b93bc16 100644
--- a/src/couch_replicator/Makefile.am
+++ b/src/couch_replicator/Makefile.am
@@ -36,9 +36,9 @@ source_files = \
 	src/couch_replicator.erl
 
 test_files = \
+	test/couch_replicator_compact_tests.erl \
 	test/couch_replicator_httpc_pool_tests.erl \
 	test/couch_replicator_modules_load_tests.erl \
-	test/03-replication-compact.t \
 	test/04-replication-large-atts.t \
 	test/05-replication-many-leaves.t \
 	test/06-doc-missing-stubs.t \

http://git-wip-us.apache.org/repos/asf/couchdb/blob/3fc6facb/src/couch_replicator/test/03-replication-compact.t
----------------------------------------------------------------------
diff --git a/src/couch_replicator/test/03-replication-compact.t b/src/couch_replicator/test/03-replication-compact.t
deleted file mode 100755
index 7c4d38c..0000000
--- a/src/couch_replicator/test/03-replication-compact.t
+++ /dev/null
@@ -1,488 +0,0 @@
-#!/usr/bin/env escript
-%% -*- erlang -*-
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-%   http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
-% Verify that compacting databases that are being used as the source or
-% target of a replication doesn't affect the replication and that the
-% replication doesn't hold their reference counters forever.
-
--define(b2l(B), binary_to_list(B)).
-
--record(user_ctx, {
-    name = null,
-    roles = [],
-    handler
-}).
-
--record(db, {
-    main_pid = nil,
-    update_pid = nil,
-    compactor_pid = nil,
-    instance_start_time, % number of microsecs since jan 1 1970 as a binary string
-    fd,
-    updater_fd,
-    fd_ref_counter,
-    header = nil,
-    committed_update_seq,
-    fulldocinfo_by_id_btree,
-    docinfo_by_seq_btree,
-    local_docs_btree,
-    update_seq,
-    name,
-    filepath,
-    validate_doc_funs = [],
-    security = [],
-    security_ptr = nil,
-    user_ctx = #user_ctx{},
-    waiting_delayed_commit = nil,
-    revs_limit = 1000,
-    fsync_options = [],
-    options = [],
-    compression,
-    before_doc_update,
-    after_doc_read
-}).
-
--record(rep, {
-    id,
-    source,
-    target,
-    options,
-    user_ctx,
-    doc_id
-}).
-
-
-source_db_name() -> <<"couch_test_rep_db_a">>.
-target_db_name() -> <<"couch_test_rep_db_b">>.
-
-
-main(_) ->
-    test_util:init_code_path(),
-
-    etap:plan(376),
-    case (catch test()) of
-        ok ->
-            etap:end_tests();
-        Other ->
-            etap:diag(io_lib:format("Test died abnormally: ~p", [Other])),
-            etap:bail(Other)
-    end,
-    ok.
-
-
-test() ->
-    couch_server_sup:start_link(test_util:config_files()),
-    ibrowse:start(),
-
-    Pairs = [
-        {source_db_name(), target_db_name()},
-        {{remote, source_db_name()}, target_db_name()},
-        {source_db_name(), {remote, target_db_name()}},
-        {{remote, source_db_name()}, {remote, (target_db_name())}}
-    ],
-
-    lists:foreach(
-        fun({Source, Target}) ->
-            {ok, SourceDb} = create_db(source_db_name()),
-            etap:is(couch_db:is_idle(SourceDb), true,
-                "Source database is idle before starting replication"),
-
-            {ok, TargetDb} = create_db(target_db_name()),
-            etap:is(couch_db:is_idle(TargetDb), true,
-                "Target database is idle before starting replication"),
-
-            {ok, RepPid, RepId} = replicate(Source, Target),
-            check_active_tasks(RepPid, RepId, Source, Target),
-            {ok, DocsWritten} = populate_and_compact_test(
-                RepPid, SourceDb, TargetDb),
-
-            wait_target_in_sync(DocsWritten, TargetDb),
-            check_active_tasks(RepPid, RepId, Source, Target),
-            cancel_replication(RepId, RepPid),
-            compare_dbs(SourceDb, TargetDb),
-
-            delete_db(SourceDb),
-            delete_db(TargetDb),
-            couch_server_sup:stop(),
-            ok = timer:sleep(1000),
-            couch_server_sup:start_link(test_util:config_files())
-        end,
-        Pairs),
-
-    couch_server_sup:stop(),
-    ok.
-
-
-populate_and_compact_test(RepPid, SourceDb0, TargetDb0) ->
-    etap:is(is_process_alive(RepPid), true, "Replication process is alive"),
-    check_db_alive("source", SourceDb0),
-    check_db_alive("target", TargetDb0),
-
-    Writer = spawn_writer(SourceDb0),
-
-    lists:foldl(
-        fun(_, {SourceDb, TargetDb, DocCount}) ->
-            pause_writer(Writer),
-
-            compact_db("source", SourceDb),
-            etap:is(is_process_alive(RepPid), true,
-                "Replication process is alive after source database compaction"),
-            check_db_alive("source", SourceDb),
-            check_ref_counter("source", SourceDb),
-
-            compact_db("target", TargetDb),
-            etap:is(is_process_alive(RepPid), true,
-                "Replication process is alive after target database compaction"),
-            check_db_alive("target", TargetDb),
-            check_ref_counter("target", TargetDb),
-
-            {ok, SourceDb2} = reopen_db(SourceDb),
-            {ok, TargetDb2} = reopen_db(TargetDb),
-
-            resume_writer(Writer),
-            wait_writer(Writer, DocCount),
-
-            compact_db("source", SourceDb2),
-            etap:is(is_process_alive(RepPid), true,
-                "Replication process is alive after source database compaction"),
-            check_db_alive("source", SourceDb2),
-            pause_writer(Writer),
-            check_ref_counter("source", SourceDb2),
-            resume_writer(Writer),
-
-            compact_db("target", TargetDb2),
-            etap:is(is_process_alive(RepPid), true,
-                "Replication process is alive after target database compaction"),
-            check_db_alive("target", TargetDb2),
-            pause_writer(Writer),
-            check_ref_counter("target", TargetDb2),
-            resume_writer(Writer),
-
-            {ok, SourceDb3} = reopen_db(SourceDb2),
-            {ok, TargetDb3} = reopen_db(TargetDb2),
-            {SourceDb3, TargetDb3, DocCount + 50}
-        end,
-        {SourceDb0, TargetDb0, 50}, lists:seq(1, 5)),
-
-    DocsWritten = stop_writer(Writer),
-    {ok, DocsWritten}.
-
-
-check_db_alive(Type, #db{main_pid = Pid}) ->
-    etap:is(is_process_alive(Pid), true,
-        "Local " ++ Type ++ " database main pid is alive").
-
-
-compact_db(Type, #db{name = Name}) ->
-    {ok, Db} = couch_db:open_int(Name, []),
-    {ok, CompactPid} = couch_db:start_compact(Db),
-    MonRef = erlang:monitor(process, CompactPid),
-    receive
-    {'DOWN', MonRef, process, CompactPid, normal} ->
-        ok;
-    {'DOWN', MonRef, process, CompactPid, Reason} ->
-        etap:bail("Error compacting " ++ Type ++ " database " ++ ?b2l(Name) ++
-            ": " ++ couch_util:to_list(Reason))
-    after 30000 ->
-        etap:bail("Compaction for " ++ Type ++ " database " ++ ?b2l(Name) ++
-            " didn't finish")
-    end,
-    ok = couch_db:close(Db).
-
-
-check_ref_counter(Type, #db{name = Name, fd_ref_counter = OldRefCounter}) ->
-    MonRef = erlang:monitor(process, OldRefCounter),
-    receive
-    {'DOWN', MonRef, process, OldRefCounter, _} ->
-        etap:diag("Old " ++ Type ++ " database ref counter terminated")
-    after 30000 ->
-        etap:bail("Old " ++ Type ++ " database ref counter didn't terminate")
-    end,
-    {ok, #db{fd_ref_counter = NewRefCounter} = Db} = couch_db:open_int(Name, []),
-    ok = couch_db:close(Db),
-    etap:isnt(
-        NewRefCounter, OldRefCounter, Type ++ " database has new ref counter").
-
-
-reopen_db(#db{name = Name}) ->
-    {ok, Db} = couch_db:open_int(Name, []),
-    ok = couch_db:close(Db),
-    {ok, Db}.
-
-
-wait_target_in_sync(DocCount, #db{name = TargetName}) ->
-    wait_target_in_sync_loop(DocCount, TargetName, 300).
-
-
-wait_target_in_sync_loop(_DocCount, _TargetName, 0) ->
-    etap:bail("Could not get source and target databases in sync");
-wait_target_in_sync_loop(DocCount, TargetName, RetriesLeft) ->
-    {ok, Target} = couch_db:open_int(TargetName, []),
-    {ok, TargetInfo} = couch_db:get_db_info(Target),
-    ok = couch_db:close(Target),
-    TargetDocCount = couch_util:get_value(doc_count, TargetInfo),
-    case TargetDocCount == DocCount of
-    true ->
-        etap:diag("Source and target databases are in sync");
-    false ->
-        ok = timer:sleep(100),
-        wait_target_in_sync_loop(DocCount, TargetName, RetriesLeft - 1)
-    end.
-
-
-compare_dbs(#db{name = SourceName}, #db{name = TargetName}) ->
-    {ok, SourceDb} = couch_db:open_int(SourceName, []),
-    {ok, TargetDb} = couch_db:open_int(TargetName, []),
-    Fun = fun(FullDocInfo, _, Acc) ->
-        {ok, Doc} = couch_db:open_doc(SourceDb, FullDocInfo),
-        {Props} = DocJson = couch_doc:to_json_obj(Doc, [attachments]),
-        DocId = couch_util:get_value(<<"_id">>, Props),
-        DocTarget = case couch_db:open_doc(TargetDb, DocId) of
-        {ok, DocT} ->
-            DocT;
-        Error ->
-            etap:bail("Error opening document '" ++ ?b2l(DocId) ++
-                "' from target: " ++ couch_util:to_list(Error))
-        end,
-        DocTargetJson = couch_doc:to_json_obj(DocTarget, [attachments]),
-        case DocTargetJson of
-        DocJson ->
-            ok;
-        _ ->
-            etap:bail("Content from document '" ++ ?b2l(DocId) ++
-                "' differs in target database")
-        end,
-        {ok, Acc}
-    end,
-    {ok, _, _} = couch_db:enum_docs(SourceDb, Fun, [], []),
-    etap:diag("Target database has the same documents as the source database"),
-    ok = couch_db:close(SourceDb),
-    ok = couch_db:close(TargetDb).
-
-
-check_active_tasks(RepPid, {BaseId, Ext} = _RepId, Src, Tgt) ->
-    Source = case Src of
-    {remote, NameSrc} ->
-        <<(db_url(NameSrc))/binary, $/>>;
-    _ ->
-        Src
-    end,
-    Target = case Tgt of
-    {remote, NameTgt} ->
-        <<(db_url(NameTgt))/binary, $/>>;
-    _ ->
-        Tgt
-    end,
-    FullRepId = list_to_binary(BaseId ++ Ext),
-    Pid = list_to_binary(pid_to_list(RepPid)),
-    [RepTask] = couch_task_status:all(),
-    etap:is(couch_util:get_value(pid, RepTask), Pid,
-        "_active_tasks entry has correct pid property"),
-    etap:is(couch_util:get_value(replication_id, RepTask), FullRepId,
-        "_active_tasks entry has right replication id"),
-    etap:is(couch_util:get_value(continuous, RepTask), true,
-        "_active_tasks entry has continuous property set to true"),
-    etap:is(couch_util:get_value(source, RepTask), Source,
-        "_active_tasks entry has correct source property"),
-    etap:is(couch_util:get_value(target, RepTask), Target,
-        "_active_tasks entry has correct target property"),
-    etap:is(is_integer(couch_util:get_value(docs_read, RepTask)), true,
-        "_active_tasks entry has integer docs_read property"),
-    etap:is(is_integer(couch_util:get_value(docs_written, RepTask)), true,
-        "_active_tasks entry has integer docs_written property"),
-    etap:is(is_integer(couch_util:get_value(doc_write_failures, RepTask)), true,
-        "_active_tasks entry has integer doc_write_failures property"),
-    etap:is(is_integer(couch_util:get_value(revisions_checked, RepTask)), true,
-        "_active_tasks entry has integer revisions_checked property"),
-    etap:is(is_integer(couch_util:get_value(missing_revisions_found, RepTask)), true,
-        "_active_tasks entry has integer missing_revisions_found property"),
-    etap:is(is_integer(couch_util:get_value(checkpointed_source_seq, RepTask)), true,
-        "_active_tasks entry has integer checkpointed_source_seq property"),
-    etap:is(is_integer(couch_util:get_value(source_seq, RepTask)), true,
-        "_active_tasks entry has integer source_seq property"),
-    Progress = couch_util:get_value(progress, RepTask),
-    etap:is(is_integer(Progress), true,
-        "_active_tasks entry has an integer progress property"),
-    etap:is(Progress =< 100, true, "Progress is not greater than 100%").
-
-
-wait_writer(Pid, NumDocs) ->
-    case get_writer_num_docs_written(Pid) of
-    N when N >= NumDocs ->
-        ok;
-    _ ->
-        wait_writer(Pid, NumDocs)
-    end.
-
-
-spawn_writer(Db) ->
-    Parent = self(),
-    Pid = spawn(fun() -> writer_loop(Db, Parent, 0) end),
-    etap:diag("Started source database writer"),
-    Pid.
-
-
-pause_writer(Pid) ->
-    Ref = make_ref(),
-    Pid ! {pause, Ref},
-    receive
-    {paused, Ref} ->
-        ok
-    after 30000 ->
-        etap:bail("Failed to pause source database writer")
-    end.
-
-
-resume_writer(Pid) ->
-    Ref = make_ref(),
-    Pid ! {continue, Ref},
-    receive
-    {ok, Ref} ->
-        ok
-    after 30000 ->
-        etap:bail("Failed to unpause source database writer")
-    end.
-
-
-get_writer_num_docs_written(Pid) ->
-    Ref = make_ref(),
-    Pid ! {get_count, Ref},
-    receive
-    {count, Ref, Count} ->
-        Count
-    after 30000 ->
-        etap:bail("Timeout getting number of documents written from "
-            "source database writer")
-    end.
-
-
-stop_writer(Pid) ->
-    Ref = make_ref(),
-    Pid ! {stop, Ref},
-    receive
-    {stopped, Ref, DocsWritten} ->
-        MonRef = erlang:monitor(process, Pid),
-        receive
-        {'DOWN', MonRef, process, Pid, _Reason} ->
-            etap:diag("Stopped source database writer"),
-            DocsWritten
-        after 30000 ->
-            etap:bail("Timeout stopping source database writer")
-        end
-    after 30000 ->
-        etap:bail("Timeout stopping source database writer")
-    end.
-
-
-writer_loop(#db{name = DbName}, Parent, Counter) ->
-    maybe_pause(Parent, Counter),
-    Doc = couch_doc:from_json_obj({[
-        {<<"_id">>, list_to_binary(integer_to_list(Counter + 1))},
-        {<<"value">>, Counter + 1},
-        {<<"_attachments">>, {[
-            {<<"icon1.png">>, {[
-                {<<"data">>, base64:encode(att_data())},
-                {<<"content_type">>, <<"image/png">>}
-            ]}},
-            {<<"icon2.png">>, {[
-                {<<"data">>, base64:encode(iolist_to_binary(
-                    [att_data(), att_data()]))},
-                {<<"content_type">>, <<"image/png">>}
-            ]}}
-        ]}}
-    ]}),
-    maybe_pause(Parent, Counter),
-    {ok, Db} = couch_db:open_int(DbName, []),
-    {ok, _} = couch_db:update_doc(Db, Doc, []),
-    ok = couch_db:close(Db),
-    receive
-    {get_count, Ref} ->
-        Parent ! {count, Ref, Counter + 1},
-        writer_loop(Db, Parent, Counter + 1);
-    {stop, Ref} ->
-        Parent ! {stopped, Ref, Counter + 1}
-    after 0 ->
-        ok = timer:sleep(500),
-        writer_loop(Db, Parent, Counter + 1)
-    end.
-
-
-maybe_pause(Parent, Counter) ->
-    receive
-    {get_count, Ref} ->
-        Parent ! {count, Ref, Counter};
-    {pause, Ref} ->
-        Parent ! {paused, Ref},
-        receive {continue, Ref2} -> Parent ! {ok, Ref2} end
-    after 0 ->
-        ok
-    end.
-
-
-db_url(DbName) ->
-    iolist_to_binary([
-        "http://", couch_config:get("httpd", "bind_address", "127.0.0.1"),
-        ":", integer_to_list(mochiweb_socket_server:get(couch_httpd, port)),
-        "/", DbName
-    ]).
-
-
-create_db(DbName) ->
-    {ok, Db} = couch_db:create(
-        DbName,
-        [{user_ctx, #user_ctx{roles = [<<"_admin">>]}}, overwrite]),
-    couch_db:close(Db),
-    {ok, Db}.
-
-
-delete_db(#db{name = DbName, main_pid = Pid}) ->
-    ok = couch_server:delete(
-        DbName, [{user_ctx, #user_ctx{roles = [<<"_admin">>]}}]),
-    MonRef = erlang:monitor(process, Pid),
-    receive
-    {'DOWN', MonRef, process, Pid, _Reason} ->
-        ok
-    after 30000 ->
-        etap:bail("Timeout deleting database")
-    end.
-
-
-replicate({remote, Db}, Target) ->
-    replicate(db_url(Db), Target);
-
-replicate(Source, {remote, Db}) ->
-    replicate(Source, db_url(Db));
-
-replicate(Source, Target) ->
-    RepObject = {[
-        {<<"source">>, Source},
-        {<<"target">>, Target},
-        {<<"continuous">>, true}
-    ]},
-    {ok, Rep} = couch_replicator_utils:parse_rep_doc(
-        RepObject, #user_ctx{roles = [<<"_admin">>]}),
-    {ok, Pid} = couch_replicator:async_replicate(Rep),
-    {ok, Pid, Rep#rep.id}.
-
-
-cancel_replication(RepId, RepPid) ->
-    {ok, _} = couch_replicator:cancel_replication(RepId),
-    etap:is(is_process_alive(RepPid), false,
-        "Replication process is no longer alive after cancel").
-
-
-att_data() ->
-    {ok, Data} = file:read_file(
-        test_util:source_file("share/www/image/logo.png")),
-    Data.

http://git-wip-us.apache.org/repos/asf/couchdb/blob/3fc6facb/src/couch_replicator/test/couch_replicator_compact_tests.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator/test/couch_replicator_compact_tests.erl b/src/couch_replicator/test/couch_replicator_compact_tests.erl
new file mode 100644
index 0000000..4e019fe
--- /dev/null
+++ b/src/couch_replicator/test/couch_replicator_compact_tests.erl
@@ -0,0 +1,448 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_compact_tests).
+
+-include("../../../test/couchdb/couch_eunit.hrl").
+-include_lib("couchdb/couch_db.hrl").
+-include_lib("couch_replicator/src/couch_replicator.hrl").
+
+-define(ADMIN_ROLE, #user_ctx{roles=[<<"_admin">>]}).
+-define(ADMIN_USER, {user_ctx, ?ADMIN_ROLE}).
+-define(ATTFILE, filename:join([?FIXTURESDIR, "logo.png"])).
+-define(DELAY, 100).
+-define(TIMEOUT, 30000).
+-define(TIMEOUT_STOP, 1000).
+-define(TIMEOUT_WRITER, 3000).
+-define(TIMEOUT_EUNIT, ?TIMEOUT div 1000 + 5).
+
+setup() ->
+    DbName = ?tempdb(),
+    {ok, Db} = couch_db:create(DbName, [?ADMIN_USER]),
+    ok = couch_db:close(Db),
+    DbName.
+
+setup(local) ->
+    setup();
+setup(remote) ->
+    {remote, setup()};
+setup({A, B}) ->
+    {ok, _} = couch_server_sup:start_link(?CONFIG_CHAIN),
+    Source = setup(A),
+    Target = setup(B),
+    {Source, Target}.
+
+teardown({remote, DbName}) ->
+    teardown(DbName);
+teardown(DbName) ->
+    ok = couch_server:delete(DbName, [?ADMIN_USER]),
+    ok.
+
+teardown(_, {Source, Target}) ->
+    teardown(Source),
+    teardown(Target),
+
+    Pid = whereis(couch_server_sup),
+    erlang:monitor(process, Pid),
+    couch_server_sup:stop(),
+    receive
+        {'DOWN', _, _, Pid, _} ->
+            ok
+    after ?TIMEOUT_STOP ->
+        throw({timeout, server_stop})
+    end.
+
+
+compact_test_() ->
+    Pairs = [{local, local}, {local, remote},
+             {remote, local}, {remote, remote}],
+    {
+        "Compaction during replication tests",
+        {
+            foreachx,
+            fun setup/1, fun teardown/2,
+            [{Pair, fun should_populate_replicate_compact/2}
+             || Pair <- Pairs]
+        }
+    }.
+
+
+should_populate_replicate_compact({From, To}, {Source, Target}) ->
+    {ok, RepPid, RepId} = replicate(Source, Target),
+    {lists:flatten(io_lib:format("~p -> ~p", [From, To])),
+     {inorder, [
+         should_run_replication(RepPid, RepId, Source, Target),
+         should_all_processes_be_alive(RepPid, Source, Target),
+         should_populate_and_compact(RepPid, Source, Target, 50, 5),
+         should_wait_target_in_sync(Source, Target),
+         should_ensure_replication_still_running(RepPid, RepId, Source, Target),
+         should_cancel_replication(RepId, RepPid),
+         should_compare_databases(Source, Target)
+     ]}}.
+
+should_all_processes_be_alive(RepPid, Source, Target) ->
+    ?_test(begin
+        {ok, SourceDb} = reopen_db(Source),
+        {ok, TargetDb} = reopen_db(Target),
+        ?assert(is_process_alive(RepPid)),
+        ?assert(is_process_alive(SourceDb#db.main_pid)),
+        ?assert(is_process_alive(TargetDb#db.main_pid))
+    end).
+
+should_run_replication(RepPid, RepId, Source, Target) ->
+    ?_test(check_active_tasks(RepPid, RepId, Source, Target)).
+
+should_ensure_replication_still_running(RepPid, RepId, Source, Target) ->
+    ?_test(check_active_tasks(RepPid, RepId, Source, Target)).
+
+check_active_tasks(RepPid, {BaseId, Ext} = _RepId, Src, Tgt) ->
+    Source = case Src of
+        {remote, NameSrc} ->
+            <<(db_url(NameSrc))/binary, $/>>;
+        _ ->
+            Src
+    end,
+    Target = case Tgt of
+        {remote, NameTgt} ->
+            <<(db_url(NameTgt))/binary, $/>>;
+        _ ->
+            Tgt
+    end,
+    FullRepId = ?l2b(BaseId ++ Ext),
+    Pid = ?l2b(pid_to_list(RepPid)),
+    [RepTask] = couch_task_status:all(),
+    ?assertEqual(Pid, couch_util:get_value(pid, RepTask)),
+    ?assertEqual(FullRepId, couch_util:get_value(replication_id, RepTask)),
+    ?assertEqual(true, couch_util:get_value(continuous, RepTask)),
+    ?assertEqual(Source, couch_util:get_value(source, RepTask)),
+    ?assertEqual(Target, couch_util:get_value(target, RepTask)),
+    ?assert(is_integer(couch_util:get_value(docs_read, RepTask))),
+    ?assert(is_integer(couch_util:get_value(docs_written, RepTask))),
+    ?assert(is_integer(couch_util:get_value(doc_write_failures, RepTask))),
+    ?assert(is_integer(couch_util:get_value(revisions_checked, RepTask))),
+    ?assert(is_integer(couch_util:get_value(missing_revisions_found, RepTask))),
+    ?assert(is_integer(couch_util:get_value(checkpointed_source_seq, RepTask))),
+    ?assert(is_integer(couch_util:get_value(source_seq, RepTask))),
+    Progress = couch_util:get_value(progress, RepTask),
+    ?assert(is_integer(Progress)),
+    ?assert(Progress =< 100).
+
+should_cancel_replication(RepId, RepPid) ->
+    ?_assertNot(begin
+        {ok, _} = couch_replicator:cancel_replication(RepId),
+        is_process_alive(RepPid)
+    end).
+
+should_populate_and_compact(RepPid, Source, Target, BatchSize, Rounds) ->
+    {timeout, ?TIMEOUT_EUNIT, ?_test(begin
+        {ok, SourceDb0} = reopen_db(Source),
+        Writer = spawn_writer(SourceDb0),
+        lists:foreach(
+            fun(N) ->
+                {ok, SourceDb} = reopen_db(Source),
+                {ok, TargetDb} = reopen_db(Target),
+                pause_writer(Writer),
+
+                compact_db("source", SourceDb),
+                ?assert(is_process_alive(RepPid)),
+                ?assert(is_process_alive(SourceDb#db.main_pid)),
+                check_ref_counter("source", SourceDb),
+
+                compact_db("target", TargetDb),
+                ?assert(is_process_alive(RepPid)),
+                ?assert(is_process_alive(TargetDb#db.main_pid)),
+                check_ref_counter("target", TargetDb),
+
+                {ok, SourceDb2} = reopen_db(SourceDb),
+                {ok, TargetDb2} = reopen_db(TargetDb),
+
+                resume_writer(Writer),
+                wait_writer(Writer, BatchSize * N),
+
+                compact_db("source", SourceDb2),
+                ?assert(is_process_alive(RepPid)),
+                ?assert(is_process_alive(SourceDb2#db.main_pid)),
+                pause_writer(Writer),
+                check_ref_counter("source", SourceDb2),
+                resume_writer(Writer),
+
+                compact_db("target", TargetDb2),
+                ?assert(is_process_alive(RepPid)),
+                ?assert(is_process_alive(TargetDb2#db.main_pid)),
+                pause_writer(Writer),
+                check_ref_counter("target", TargetDb2),
+                resume_writer(Writer)
+            end, lists:seq(1, Rounds)),
+        stop_writer(Writer)
+    end)}.
+
+should_wait_target_in_sync({remote, Source}, Target) ->
+    should_wait_target_in_sync(Source, Target);
+should_wait_target_in_sync(Source, {remote, Target}) ->
+    should_wait_target_in_sync(Source, Target);
+should_wait_target_in_sync(Source, Target) ->
+    {timeout, ?TIMEOUT_EUNIT, ?_assert(begin
+        {ok, SourceDb} = couch_db:open_int(Source, []),
+        {ok, SourceInfo} = couch_db:get_db_info(SourceDb),
+        ok = couch_db:close(SourceDb),
+        SourceDocCount = couch_util:get_value(doc_count, SourceInfo),
+        wait_target_in_sync_loop(SourceDocCount, Target, 300)
+    end)}.
+
+wait_target_in_sync_loop(_DocCount, _TargetName, 0) ->
+    erlang:error(
+        {assertion_failed,
+         [{module, ?MODULE}, {line, ?LINE},
+          {reason, "Could not get source and target databases in sync"}]});
+wait_target_in_sync_loop(DocCount, {remote, TargetName}, RetriesLeft) ->
+    wait_target_in_sync_loop(DocCount, TargetName, RetriesLeft);
+wait_target_in_sync_loop(DocCount, TargetName, RetriesLeft) ->
+    {ok, Target} = couch_db:open_int(TargetName, []),
+    {ok, TargetInfo} = couch_db:get_db_info(Target),
+    ok = couch_db:close(Target),
+    TargetDocCount = couch_util:get_value(doc_count, TargetInfo),
+    case TargetDocCount == DocCount of
+        true ->
+            true;
+        false ->
+            ok = timer:sleep(?DELAY),
+            wait_target_in_sync_loop(DocCount, TargetName, RetriesLeft - 1)
+    end.
+
+should_compare_databases({remote, Source}, Target) ->
+    should_compare_databases(Source, Target);
+should_compare_databases(Source, {remote, Target}) ->
+    should_compare_databases(Source, Target);
+should_compare_databases(Source, Target) ->
+    {timeout, 35, ?_test(begin
+        {ok, SourceDb} = couch_db:open_int(Source, []),
+        {ok, TargetDb} = couch_db:open_int(Target, []),
+        Fun = fun(FullDocInfo, _, Acc) ->
+            {ok, Doc} = couch_db:open_doc(SourceDb, FullDocInfo),
+            {Props} = DocJson = couch_doc:to_json_obj(Doc, [attachments]),
+            DocId = couch_util:get_value(<<"_id">>, Props),
+            DocTarget = case couch_db:open_doc(TargetDb, DocId) of
+                {ok, DocT} ->
+                    DocT;
+                Error ->
+                    erlang:error(
+                        {assertion_failed,
+                         [{module, ?MODULE}, {line, ?LINE},
+                          {reason, lists:concat(["Error opening document '",
+                                                 ?b2l(DocId), "' from target: ",
+                                                 couch_util:to_list(Error)])}]})
+            end,
+            DocTargetJson = couch_doc:to_json_obj(DocTarget, [attachments]),
+            ?assertEqual(DocJson, DocTargetJson),
+            {ok, Acc}
+        end,
+        {ok, _, _} = couch_db:enum_docs(SourceDb, Fun, [], []),
+        ok = couch_db:close(SourceDb),
+        ok = couch_db:close(TargetDb)
+    end)}.
+
+
+reopen_db({remote, Db}) ->
+    reopen_db(Db);
+reopen_db(#db{name=DbName}) ->
+    reopen_db(DbName);
+reopen_db(DbName) ->
+    {ok, Db} = couch_db:open_int(DbName, []),
+    ok = couch_db:close(Db),
+    {ok, Db}.
+
+compact_db(Type, #db{name = Name}) ->
+    {ok, Db} = couch_db:open_int(Name, []),
+    {ok, CompactPid} = couch_db:start_compact(Db),
+    MonRef = erlang:monitor(process, CompactPid),
+    receive
+        {'DOWN', MonRef, process, CompactPid, normal} ->
+            ok;
+        {'DOWN', MonRef, process, CompactPid, Reason} ->
+            erlang:error(
+                {assertion_failed,
+                 [{module, ?MODULE}, {line, ?LINE},
+                  {reason,
+                   lists:concat(["Error compacting ", Type, " database ",
+                                 ?b2l(Name), ": ",
+                                 couch_util:to_list(Reason)])}]})
+    after ?TIMEOUT ->
+        erlang:error(
+            {assertion_failed,
+             [{module, ?MODULE}, {line, ?LINE},
+              {reason, lists:concat(["Compaction for ", Type, " database ",
+                                     ?b2l(Name), " didn't finish"])}]})
+    end,
+    ok = couch_db:close(Db).
+
+check_ref_counter(Type, #db{name = Name, fd_ref_counter = OldRefCounter}) ->
+    MonRef = erlang:monitor(process, OldRefCounter),
+    receive
+        {'DOWN', MonRef, process, OldRefCounter, _} ->
+            ok
+        after ?TIMEOUT ->
+            erlang:error(
+                {assertion_failed,
+                 [{module, ?MODULE}, {line, ?LINE},
+                  {reason, lists:concat(["Old ", Type,
+                                         " database ref counter didn't"
+                                         " terminate"])}]})
+    end,
+    {ok, #db{fd_ref_counter = NewRefCounter} = Db} = couch_db:open_int(Name, []),
+    ok = couch_db:close(Db),
+    ?assertNotEqual(OldRefCounter, NewRefCounter).
+
+db_url(DbName) ->
+    iolist_to_binary([
+        "http://", couch_config:get("httpd", "bind_address", "127.0.0.1"),
+        ":", integer_to_list(mochiweb_socket_server:get(couch_httpd, port)),
+        "/", DbName
+    ]).
+
+replicate({remote, Db}, Target) ->
+    replicate(db_url(Db), Target);
+
+replicate(Source, {remote, Db}) ->
+    replicate(Source, db_url(Db));
+
+replicate(Source, Target) ->
+    RepObject = {[
+        {<<"source">>, Source},
+        {<<"target">>, Target},
+        {<<"continuous">>, true}
+    ]},
+    {ok, Rep} = couch_replicator_utils:parse_rep_doc(RepObject, ?ADMIN_ROLE),
+    {ok, Pid} = couch_replicator:async_replicate(Rep),
+    {ok, Pid, Rep#rep.id}.
+
+
+wait_writer(Pid, NumDocs) ->
+    case get_writer_num_docs_written(Pid) of
+        N when N >= NumDocs ->
+            ok;
+        _ ->
+            wait_writer(Pid, NumDocs)
+    end.
+
+spawn_writer(Db) ->
+    Parent = self(),
+    Pid = spawn(fun() -> writer_loop(Db, Parent, 0) end),
+    Pid.
+
+
+pause_writer(Pid) ->
+    Ref = make_ref(),
+    Pid ! {pause, Ref},
+    receive
+        {paused, Ref} ->
+            ok
+    after ?TIMEOUT_WRITER ->
+        erlang:error({assertion_failed,
+                      [{module, ?MODULE},
+                       {line, ?LINE},
+                       {reason, "Failed to pause source database writer"}]})
+    end.
+
+resume_writer(Pid) ->
+    Ref = make_ref(),
+    Pid ! {continue, Ref},
+    receive
+        {ok, Ref} ->
+            ok
+    after ?TIMEOUT_WRITER ->
+        erlang:error({assertion_failed,
+                      [{module, ?MODULE},
+                       {line, ?LINE},
+                       {reason, "Failed to pause source database writer"}]})
+    end.
+
+get_writer_num_docs_written(Pid) ->
+    Ref = make_ref(),
+    Pid ! {get_count, Ref},
+    receive
+        {count, Ref, Count} ->
+            Count
+    after ?TIMEOUT_WRITER ->
+        erlang:error({assertion_failed,
+                      [{module, ?MODULE},
+                       {line, ?LINE},
+                       {reason, "Timeout getting number of documents written"
+                                " from source database writer"}]})
+    end.
+
+stop_writer(Pid) ->
+    Ref = make_ref(),
+    Pid ! {stop, Ref},
+    receive
+        {stopped, Ref, DocsWritten} ->
+            MonRef = erlang:monitor(process, Pid),
+            receive
+                {'DOWN', MonRef, process, Pid, _Reason} ->
+                    DocsWritten
+            after ?TIMEOUT ->
+                erlang:error({assertion_failed,
+                      [{module, ?MODULE},
+                       {line, ?LINE},
+                       {reason, "Timeout stopping source database writer"}]})
+            end
+    after ?TIMEOUT_WRITER ->
+        erlang:error({assertion_failed,
+                      [{module, ?MODULE},
+                       {line, ?LINE},
+                       {reason, "Timeout stopping source database writer"}]})
+    end.
+
+writer_loop(#db{name = DbName}, Parent, Counter) ->
+    {ok, Data} = file:read_file(?ATTFILE),
+    maybe_pause(Parent, Counter),
+    Doc = couch_doc:from_json_obj({[
+        {<<"_id">>, ?l2b(integer_to_list(Counter + 1))},
+        {<<"value">>, Counter + 1},
+        {<<"_attachments">>, {[
+            {<<"icon1.png">>, {[
+                {<<"data">>, base64:encode(Data)},
+                {<<"content_type">>, <<"image/png">>}
+            ]}},
+            {<<"icon2.png">>, {[
+                {<<"data">>, base64:encode(iolist_to_binary([Data, Data]))},
+                {<<"content_type">>, <<"image/png">>}
+            ]}}
+        ]}}
+    ]}),
+    maybe_pause(Parent, Counter),
+    {ok, Db} = couch_db:open_int(DbName, []),
+    {ok, _} = couch_db:update_doc(Db, Doc, []),
+    ok = couch_db:close(Db),
+    receive
+        {get_count, Ref} ->
+            Parent ! {count, Ref, Counter + 1},
+            writer_loop(Db, Parent, Counter + 1);
+        {stop, Ref} ->
+            Parent ! {stopped, Ref, Counter + 1}
+    after 0 ->
+        timer:sleep(?DELAY),
+        writer_loop(Db, Parent, Counter + 1)
+    end.
+
+maybe_pause(Parent, Counter) ->
+    receive
+        {get_count, Ref} ->
+            Parent ! {count, Ref, Counter};
+        {pause, Ref} ->
+            Parent ! {paused, Ref},
+            receive
+                {continue, Ref2} ->
+                    Parent ! {ok, Ref2}
+            end
+    after 0 ->
+        ok
+    end.


Mime
View raw message