Return-Path: X-Original-To: apmail-couchdb-commits-archive@www.apache.org Delivered-To: apmail-couchdb-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1491A1112E for ; Fri, 5 Sep 2014 21:10:50 +0000 (UTC) Received: (qmail 11416 invoked by uid 500); 5 Sep 2014 21:10:49 -0000 Delivered-To: apmail-couchdb-commits-archive@couchdb.apache.org Received: (qmail 11319 invoked by uid 500); 5 Sep 2014 21:10:49 -0000 Mailing-List: contact commits-help@couchdb.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@couchdb.apache.org Delivered-To: mailing list commits@couchdb.apache.org Received: (qmail 11156 invoked by uid 99); 5 Sep 2014 21:10:49 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 05 Sep 2014 21:10:49 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id DA1CCA0B383; Fri, 5 Sep 2014 21:10:48 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: rnewson@apache.org To: commits@couchdb.apache.org Date: Fri, 05 Sep 2014 21:10:49 -0000 Message-Id: <0cba33935fc84e08ac818c65569b666d@git.apache.org> In-Reply-To: <1564fe69f5b34d54a8360e2e320a51a5@git.apache.org> References: <1564fe69f5b34d54a8360e2e320a51a5@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [02/14] couch-replicator commit: updated refs/heads/master to 3cf0b13 Move files out of test/couchdb into top level test/ folder Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/2850b946 Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/2850b946 Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/2850b946 Branch: refs/heads/master Commit: 2850b94675d15974cad8bf98ac6cf4df63577f5c Parents: ba8c397 Author: Russell Branca Authored: Wed Aug 27 15:31:54 2014 -0700 Committer: Russell Branca Committed: Thu Aug 28 10:29:40 2014 -0700 ---------------------------------------------------------------------- .../test/couch_replicator_compact_tests.erl | 448 ------------------- .../test/couch_replicator_httpc_pool_tests.erl | 189 -------- .../test/couch_replicator_large_atts_tests.erl | 218 --------- .../test/couch_replicator_many_leaves_tests.erl | 232 ---------- .../couch_replicator_missing_stubs_tests.erl | 260 ----------- .../couch_replicator_modules_load_tests.erl | 40 -- .../couch_replicator_use_checkpoints_tests.erl | 200 --------- test/couch_replicator_compact_tests.erl | 448 +++++++++++++++++++ test/couch_replicator_httpc_pool_tests.erl | 189 ++++++++ test/couch_replicator_large_atts_tests.erl | 218 +++++++++ test/couch_replicator_many_leaves_tests.erl | 232 ++++++++++ test/couch_replicator_missing_stubs_tests.erl | 260 +++++++++++ test/couch_replicator_modules_load_tests.erl | 40 ++ test/couch_replicator_use_checkpoints_tests.erl | 200 +++++++++ 14 files changed, 1587 insertions(+), 1587 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/2850b946/src/couch_replicator/test/couch_replicator_compact_tests.erl ---------------------------------------------------------------------- diff --git a/src/couch_replicator/test/couch_replicator_compact_tests.erl b/src/couch_replicator/test/couch_replicator_compact_tests.erl deleted file mode 100644 index 4e019fe..0000000 --- a/src/couch_replicator/test/couch_replicator_compact_tests.erl +++ /dev/null @@ -1,448 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(couch_replicator_compact_tests). - --include("../../../test/couchdb/couch_eunit.hrl"). --include_lib("couchdb/couch_db.hrl"). --include_lib("couch_replicator/src/couch_replicator.hrl"). - --define(ADMIN_ROLE, #user_ctx{roles=[<<"_admin">>]}). --define(ADMIN_USER, {user_ctx, ?ADMIN_ROLE}). --define(ATTFILE, filename:join([?FIXTURESDIR, "logo.png"])). --define(DELAY, 100). --define(TIMEOUT, 30000). --define(TIMEOUT_STOP, 1000). --define(TIMEOUT_WRITER, 3000). --define(TIMEOUT_EUNIT, ?TIMEOUT div 1000 + 5). - -setup() -> - DbName = ?tempdb(), - {ok, Db} = couch_db:create(DbName, [?ADMIN_USER]), - ok = couch_db:close(Db), - DbName. - -setup(local) -> - setup(); -setup(remote) -> - {remote, setup()}; -setup({A, B}) -> - {ok, _} = couch_server_sup:start_link(?CONFIG_CHAIN), - Source = setup(A), - Target = setup(B), - {Source, Target}. - -teardown({remote, DbName}) -> - teardown(DbName); -teardown(DbName) -> - ok = couch_server:delete(DbName, [?ADMIN_USER]), - ok. - -teardown(_, {Source, Target}) -> - teardown(Source), - teardown(Target), - - Pid = whereis(couch_server_sup), - erlang:monitor(process, Pid), - couch_server_sup:stop(), - receive - {'DOWN', _, _, Pid, _} -> - ok - after ?TIMEOUT_STOP -> - throw({timeout, server_stop}) - end. - - -compact_test_() -> - Pairs = [{local, local}, {local, remote}, - {remote, local}, {remote, remote}], - { - "Compaction during replication tests", - { - foreachx, - fun setup/1, fun teardown/2, - [{Pair, fun should_populate_replicate_compact/2} - || Pair <- Pairs] - } - }. - - -should_populate_replicate_compact({From, To}, {Source, Target}) -> - {ok, RepPid, RepId} = replicate(Source, Target), - {lists:flatten(io_lib:format("~p -> ~p", [From, To])), - {inorder, [ - should_run_replication(RepPid, RepId, Source, Target), - should_all_processes_be_alive(RepPid, Source, Target), - should_populate_and_compact(RepPid, Source, Target, 50, 5), - should_wait_target_in_sync(Source, Target), - should_ensure_replication_still_running(RepPid, RepId, Source, Target), - should_cancel_replication(RepId, RepPid), - should_compare_databases(Source, Target) - ]}}. - -should_all_processes_be_alive(RepPid, Source, Target) -> - ?_test(begin - {ok, SourceDb} = reopen_db(Source), - {ok, TargetDb} = reopen_db(Target), - ?assert(is_process_alive(RepPid)), - ?assert(is_process_alive(SourceDb#db.main_pid)), - ?assert(is_process_alive(TargetDb#db.main_pid)) - end). - -should_run_replication(RepPid, RepId, Source, Target) -> - ?_test(check_active_tasks(RepPid, RepId, Source, Target)). - -should_ensure_replication_still_running(RepPid, RepId, Source, Target) -> - ?_test(check_active_tasks(RepPid, RepId, Source, Target)). - -check_active_tasks(RepPid, {BaseId, Ext} = _RepId, Src, Tgt) -> - Source = case Src of - {remote, NameSrc} -> - <<(db_url(NameSrc))/binary, $/>>; - _ -> - Src - end, - Target = case Tgt of - {remote, NameTgt} -> - <<(db_url(NameTgt))/binary, $/>>; - _ -> - Tgt - end, - FullRepId = ?l2b(BaseId ++ Ext), - Pid = ?l2b(pid_to_list(RepPid)), - [RepTask] = couch_task_status:all(), - ?assertEqual(Pid, couch_util:get_value(pid, RepTask)), - ?assertEqual(FullRepId, couch_util:get_value(replication_id, RepTask)), - ?assertEqual(true, couch_util:get_value(continuous, RepTask)), - ?assertEqual(Source, couch_util:get_value(source, RepTask)), - ?assertEqual(Target, couch_util:get_value(target, RepTask)), - ?assert(is_integer(couch_util:get_value(docs_read, RepTask))), - ?assert(is_integer(couch_util:get_value(docs_written, RepTask))), - ?assert(is_integer(couch_util:get_value(doc_write_failures, RepTask))), - ?assert(is_integer(couch_util:get_value(revisions_checked, RepTask))), - ?assert(is_integer(couch_util:get_value(missing_revisions_found, RepTask))), - ?assert(is_integer(couch_util:get_value(checkpointed_source_seq, RepTask))), - ?assert(is_integer(couch_util:get_value(source_seq, RepTask))), - Progress = couch_util:get_value(progress, RepTask), - ?assert(is_integer(Progress)), - ?assert(Progress =< 100). - -should_cancel_replication(RepId, RepPid) -> - ?_assertNot(begin - {ok, _} = couch_replicator:cancel_replication(RepId), - is_process_alive(RepPid) - end). - -should_populate_and_compact(RepPid, Source, Target, BatchSize, Rounds) -> - {timeout, ?TIMEOUT_EUNIT, ?_test(begin - {ok, SourceDb0} = reopen_db(Source), - Writer = spawn_writer(SourceDb0), - lists:foreach( - fun(N) -> - {ok, SourceDb} = reopen_db(Source), - {ok, TargetDb} = reopen_db(Target), - pause_writer(Writer), - - compact_db("source", SourceDb), - ?assert(is_process_alive(RepPid)), - ?assert(is_process_alive(SourceDb#db.main_pid)), - check_ref_counter("source", SourceDb), - - compact_db("target", TargetDb), - ?assert(is_process_alive(RepPid)), - ?assert(is_process_alive(TargetDb#db.main_pid)), - check_ref_counter("target", TargetDb), - - {ok, SourceDb2} = reopen_db(SourceDb), - {ok, TargetDb2} = reopen_db(TargetDb), - - resume_writer(Writer), - wait_writer(Writer, BatchSize * N), - - compact_db("source", SourceDb2), - ?assert(is_process_alive(RepPid)), - ?assert(is_process_alive(SourceDb2#db.main_pid)), - pause_writer(Writer), - check_ref_counter("source", SourceDb2), - resume_writer(Writer), - - compact_db("target", TargetDb2), - ?assert(is_process_alive(RepPid)), - ?assert(is_process_alive(TargetDb2#db.main_pid)), - pause_writer(Writer), - check_ref_counter("target", TargetDb2), - resume_writer(Writer) - end, lists:seq(1, Rounds)), - stop_writer(Writer) - end)}. - -should_wait_target_in_sync({remote, Source}, Target) -> - should_wait_target_in_sync(Source, Target); -should_wait_target_in_sync(Source, {remote, Target}) -> - should_wait_target_in_sync(Source, Target); -should_wait_target_in_sync(Source, Target) -> - {timeout, ?TIMEOUT_EUNIT, ?_assert(begin - {ok, SourceDb} = couch_db:open_int(Source, []), - {ok, SourceInfo} = couch_db:get_db_info(SourceDb), - ok = couch_db:close(SourceDb), - SourceDocCount = couch_util:get_value(doc_count, SourceInfo), - wait_target_in_sync_loop(SourceDocCount, Target, 300) - end)}. - -wait_target_in_sync_loop(_DocCount, _TargetName, 0) -> - erlang:error( - {assertion_failed, - [{module, ?MODULE}, {line, ?LINE}, - {reason, "Could not get source and target databases in sync"}]}); -wait_target_in_sync_loop(DocCount, {remote, TargetName}, RetriesLeft) -> - wait_target_in_sync_loop(DocCount, TargetName, RetriesLeft); -wait_target_in_sync_loop(DocCount, TargetName, RetriesLeft) -> - {ok, Target} = couch_db:open_int(TargetName, []), - {ok, TargetInfo} = couch_db:get_db_info(Target), - ok = couch_db:close(Target), - TargetDocCount = couch_util:get_value(doc_count, TargetInfo), - case TargetDocCount == DocCount of - true -> - true; - false -> - ok = timer:sleep(?DELAY), - wait_target_in_sync_loop(DocCount, TargetName, RetriesLeft - 1) - end. - -should_compare_databases({remote, Source}, Target) -> - should_compare_databases(Source, Target); -should_compare_databases(Source, {remote, Target}) -> - should_compare_databases(Source, Target); -should_compare_databases(Source, Target) -> - {timeout, 35, ?_test(begin - {ok, SourceDb} = couch_db:open_int(Source, []), - {ok, TargetDb} = couch_db:open_int(Target, []), - Fun = fun(FullDocInfo, _, Acc) -> - {ok, Doc} = couch_db:open_doc(SourceDb, FullDocInfo), - {Props} = DocJson = couch_doc:to_json_obj(Doc, [attachments]), - DocId = couch_util:get_value(<<"_id">>, Props), - DocTarget = case couch_db:open_doc(TargetDb, DocId) of - {ok, DocT} -> - DocT; - Error -> - erlang:error( - {assertion_failed, - [{module, ?MODULE}, {line, ?LINE}, - {reason, lists:concat(["Error opening document '", - ?b2l(DocId), "' from target: ", - couch_util:to_list(Error)])}]}) - end, - DocTargetJson = couch_doc:to_json_obj(DocTarget, [attachments]), - ?assertEqual(DocJson, DocTargetJson), - {ok, Acc} - end, - {ok, _, _} = couch_db:enum_docs(SourceDb, Fun, [], []), - ok = couch_db:close(SourceDb), - ok = couch_db:close(TargetDb) - end)}. - - -reopen_db({remote, Db}) -> - reopen_db(Db); -reopen_db(#db{name=DbName}) -> - reopen_db(DbName); -reopen_db(DbName) -> - {ok, Db} = couch_db:open_int(DbName, []), - ok = couch_db:close(Db), - {ok, Db}. - -compact_db(Type, #db{name = Name}) -> - {ok, Db} = couch_db:open_int(Name, []), - {ok, CompactPid} = couch_db:start_compact(Db), - MonRef = erlang:monitor(process, CompactPid), - receive - {'DOWN', MonRef, process, CompactPid, normal} -> - ok; - {'DOWN', MonRef, process, CompactPid, Reason} -> - erlang:error( - {assertion_failed, - [{module, ?MODULE}, {line, ?LINE}, - {reason, - lists:concat(["Error compacting ", Type, " database ", - ?b2l(Name), ": ", - couch_util:to_list(Reason)])}]}) - after ?TIMEOUT -> - erlang:error( - {assertion_failed, - [{module, ?MODULE}, {line, ?LINE}, - {reason, lists:concat(["Compaction for ", Type, " database ", - ?b2l(Name), " didn't finish"])}]}) - end, - ok = couch_db:close(Db). - -check_ref_counter(Type, #db{name = Name, fd_ref_counter = OldRefCounter}) -> - MonRef = erlang:monitor(process, OldRefCounter), - receive - {'DOWN', MonRef, process, OldRefCounter, _} -> - ok - after ?TIMEOUT -> - erlang:error( - {assertion_failed, - [{module, ?MODULE}, {line, ?LINE}, - {reason, lists:concat(["Old ", Type, - " database ref counter didn't" - " terminate"])}]}) - end, - {ok, #db{fd_ref_counter = NewRefCounter} = Db} = couch_db:open_int(Name, []), - ok = couch_db:close(Db), - ?assertNotEqual(OldRefCounter, NewRefCounter). - -db_url(DbName) -> - iolist_to_binary([ - "http://", couch_config:get("httpd", "bind_address", "127.0.0.1"), - ":", integer_to_list(mochiweb_socket_server:get(couch_httpd, port)), - "/", DbName - ]). - -replicate({remote, Db}, Target) -> - replicate(db_url(Db), Target); - -replicate(Source, {remote, Db}) -> - replicate(Source, db_url(Db)); - -replicate(Source, Target) -> - RepObject = {[ - {<<"source">>, Source}, - {<<"target">>, Target}, - {<<"continuous">>, true} - ]}, - {ok, Rep} = couch_replicator_utils:parse_rep_doc(RepObject, ?ADMIN_ROLE), - {ok, Pid} = couch_replicator:async_replicate(Rep), - {ok, Pid, Rep#rep.id}. - - -wait_writer(Pid, NumDocs) -> - case get_writer_num_docs_written(Pid) of - N when N >= NumDocs -> - ok; - _ -> - wait_writer(Pid, NumDocs) - end. - -spawn_writer(Db) -> - Parent = self(), - Pid = spawn(fun() -> writer_loop(Db, Parent, 0) end), - Pid. - - -pause_writer(Pid) -> - Ref = make_ref(), - Pid ! {pause, Ref}, - receive - {paused, Ref} -> - ok - after ?TIMEOUT_WRITER -> - erlang:error({assertion_failed, - [{module, ?MODULE}, - {line, ?LINE}, - {reason, "Failed to pause source database writer"}]}) - end. - -resume_writer(Pid) -> - Ref = make_ref(), - Pid ! {continue, Ref}, - receive - {ok, Ref} -> - ok - after ?TIMEOUT_WRITER -> - erlang:error({assertion_failed, - [{module, ?MODULE}, - {line, ?LINE}, - {reason, "Failed to pause source database writer"}]}) - end. - -get_writer_num_docs_written(Pid) -> - Ref = make_ref(), - Pid ! {get_count, Ref}, - receive - {count, Ref, Count} -> - Count - after ?TIMEOUT_WRITER -> - erlang:error({assertion_failed, - [{module, ?MODULE}, - {line, ?LINE}, - {reason, "Timeout getting number of documents written" - " from source database writer"}]}) - end. - -stop_writer(Pid) -> - Ref = make_ref(), - Pid ! {stop, Ref}, - receive - {stopped, Ref, DocsWritten} -> - MonRef = erlang:monitor(process, Pid), - receive - {'DOWN', MonRef, process, Pid, _Reason} -> - DocsWritten - after ?TIMEOUT -> - erlang:error({assertion_failed, - [{module, ?MODULE}, - {line, ?LINE}, - {reason, "Timeout stopping source database writer"}]}) - end - after ?TIMEOUT_WRITER -> - erlang:error({assertion_failed, - [{module, ?MODULE}, - {line, ?LINE}, - {reason, "Timeout stopping source database writer"}]}) - end. - -writer_loop(#db{name = DbName}, Parent, Counter) -> - {ok, Data} = file:read_file(?ATTFILE), - maybe_pause(Parent, Counter), - Doc = couch_doc:from_json_obj({[ - {<<"_id">>, ?l2b(integer_to_list(Counter + 1))}, - {<<"value">>, Counter + 1}, - {<<"_attachments">>, {[ - {<<"icon1.png">>, {[ - {<<"data">>, base64:encode(Data)}, - {<<"content_type">>, <<"image/png">>} - ]}}, - {<<"icon2.png">>, {[ - {<<"data">>, base64:encode(iolist_to_binary([Data, Data]))}, - {<<"content_type">>, <<"image/png">>} - ]}} - ]}} - ]}), - maybe_pause(Parent, Counter), - {ok, Db} = couch_db:open_int(DbName, []), - {ok, _} = couch_db:update_doc(Db, Doc, []), - ok = couch_db:close(Db), - receive - {get_count, Ref} -> - Parent ! {count, Ref, Counter + 1}, - writer_loop(Db, Parent, Counter + 1); - {stop, Ref} -> - Parent ! {stopped, Ref, Counter + 1} - after 0 -> - timer:sleep(?DELAY), - writer_loop(Db, Parent, Counter + 1) - end. - -maybe_pause(Parent, Counter) -> - receive - {get_count, Ref} -> - Parent ! {count, Ref, Counter}; - {pause, Ref} -> - Parent ! {paused, Ref}, - receive - {continue, Ref2} -> - Parent ! {ok, Ref2} - end - after 0 -> - ok - end. http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/2850b946/src/couch_replicator/test/couch_replicator_httpc_pool_tests.erl ---------------------------------------------------------------------- diff --git a/src/couch_replicator/test/couch_replicator_httpc_pool_tests.erl b/src/couch_replicator/test/couch_replicator_httpc_pool_tests.erl deleted file mode 100644 index 6bede4c..0000000 --- a/src/couch_replicator/test/couch_replicator_httpc_pool_tests.erl +++ /dev/null @@ -1,189 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(couch_replicator_httpc_pool_tests). - --include("../../../test/couchdb/couch_eunit.hrl"). --include_lib("couchdb/couch_db.hrl"). - --define(ADMIN_USER, {user_ctx, #user_ctx{roles=[<<"_admin">>]}}). --define(TIMEOUT, 1000). - - -start() -> - {ok, Pid} = couch_server_sup:start_link(?CONFIG_CHAIN), - Pid. - -stop(Pid) -> - erlang:monitor(process, Pid), - couch_server_sup:stop(), - receive - {'DOWN', _, _, Pid, _} -> - ok - after ?TIMEOUT -> - throw({timeout, server_stop}) - end. - -setup() -> - spawn_pool(). - -teardown(Pool) -> - stop_pool(Pool). - - -httpc_pool_test_() -> - { - "httpc pool tests", - { - setup, - fun start/0, fun stop/1, - { - foreach, - fun setup/0, fun teardown/1, - [ - fun should_block_new_clients_when_full/1, - fun should_replace_worker_on_death/1 - ] - } - } - }. - - -should_block_new_clients_when_full(Pool) -> - ?_test(begin - Client1 = spawn_client(Pool), - Client2 = spawn_client(Pool), - Client3 = spawn_client(Pool), - - ?assertEqual(ok, ping_client(Client1)), - ?assertEqual(ok, ping_client(Client2)), - ?assertEqual(ok, ping_client(Client3)), - - Worker1 = get_client_worker(Client1, "1"), - Worker2 = get_client_worker(Client2, "2"), - Worker3 = get_client_worker(Client3, "3"), - - ?assert(is_process_alive(Worker1)), - ?assert(is_process_alive(Worker2)), - ?assert(is_process_alive(Worker3)), - - ?assertNotEqual(Worker1, Worker2), - ?assertNotEqual(Worker2, Worker3), - ?assertNotEqual(Worker3, Worker1), - - Client4 = spawn_client(Pool), - ?assertEqual(timeout, ping_client(Client4)), - - ?assertEqual(ok, stop_client(Client1)), - ?assertEqual(ok, ping_client(Client4)), - - Worker4 = get_client_worker(Client4, "4"), - ?assertEqual(Worker1, Worker4), - - lists:foreach( - fun(C) -> - ?assertEqual(ok, stop_client(C)) - end, [Client2, Client3, Client4]) - end). - -should_replace_worker_on_death(Pool) -> - ?_test(begin - Client1 = spawn_client(Pool), - ?assertEqual(ok, ping_client(Client1)), - Worker1 = get_client_worker(Client1, "1"), - ?assert(is_process_alive(Worker1)), - - ?assertEqual(ok, kill_client_worker(Client1)), - ?assertNot(is_process_alive(Worker1)), - ?assertEqual(ok, stop_client(Client1)), - - Client2 = spawn_client(Pool), - ?assertEqual(ok, ping_client(Client2)), - Worker2 = get_client_worker(Client2, "2"), - ?assert(is_process_alive(Worker2)), - - ?assertNotEqual(Worker1, Worker2), - ?assertEqual(ok, stop_client(Client2)) - end). - - -spawn_client(Pool) -> - Parent = self(), - Ref = make_ref(), - Pid = spawn(fun() -> - {ok, Worker} = couch_replicator_httpc_pool:get_worker(Pool), - loop(Parent, Ref, Worker, Pool) - end), - {Pid, Ref}. - -ping_client({Pid, Ref}) -> - Pid ! ping, - receive - {pong, Ref} -> - ok - after ?TIMEOUT -> - timeout - end. - -get_client_worker({Pid, Ref}, ClientName) -> - Pid ! get_worker, - receive - {worker, Ref, Worker} -> - Worker - after ?TIMEOUT -> - erlang:error( - {assertion_failed, - [{module, ?MODULE}, {line, ?LINE}, - {reason, "Timeout getting client " ++ ClientName ++ " worker"}]}) - end. - -stop_client({Pid, Ref}) -> - Pid ! stop, - receive - {stop, Ref} -> - ok - after ?TIMEOUT -> - timeout - end. - -kill_client_worker({Pid, Ref}) -> - Pid ! get_worker, - receive - {worker, Ref, Worker} -> - exit(Worker, kill), - ok - after ?TIMEOUT -> - timeout - end. - -loop(Parent, Ref, Worker, Pool) -> - receive - ping -> - Parent ! {pong, Ref}, - loop(Parent, Ref, Worker, Pool); - get_worker -> - Parent ! {worker, Ref, Worker}, - loop(Parent, Ref, Worker, Pool); - stop -> - couch_replicator_httpc_pool:release_worker(Pool, Worker), - Parent ! {stop, Ref} - end. - -spawn_pool() -> - Host = couch_config:get("httpd", "bind_address", "127.0.0.1"), - Port = couch_config:get("httpd", "port", "5984"), - {ok, Pool} = couch_replicator_httpc_pool:start_link( - "http://" ++ Host ++ ":" ++ Port, [{max_connections, 3}]), - Pool. - -stop_pool(Pool) -> - ok = couch_replicator_httpc_pool:stop(Pool). http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/2850b946/src/couch_replicator/test/couch_replicator_large_atts_tests.erl ---------------------------------------------------------------------- diff --git a/src/couch_replicator/test/couch_replicator_large_atts_tests.erl b/src/couch_replicator/test/couch_replicator_large_atts_tests.erl deleted file mode 100644 index ed7ec50..0000000 --- a/src/couch_replicator/test/couch_replicator_large_atts_tests.erl +++ /dev/null @@ -1,218 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(couch_replicator_large_atts_tests). - --include("../../../test/couchdb/couch_eunit.hrl"). --include_lib("couchdb/couch_db.hrl"). - --define(ADMIN_ROLE, #user_ctx{roles=[<<"_admin">>]}). --define(ADMIN_USER, {user_ctx, ?ADMIN_ROLE}). --define(ATT_SIZE_1, 2 * 1024 * 1024). --define(ATT_SIZE_2, round(6.6 * 1024 * 1024)). --define(DOCS_COUNT, 11). --define(TIMEOUT_EUNIT, 30). --define(TIMEOUT_STOP, 1000). - - -setup() -> - DbName = ?tempdb(), - {ok, Db} = couch_db:create(DbName, [?ADMIN_USER]), - ok = couch_db:close(Db), - DbName. - -setup(local) -> - setup(); -setup(remote) -> - {remote, setup()}; -setup({A, B}) -> - {ok, _} = couch_server_sup:start_link(?CONFIG_CHAIN), - couch_config:set("attachments", "compressible_types", "text/*", false), - Source = setup(A), - Target = setup(B), - {Source, Target}. - -teardown({remote, DbName}) -> - teardown(DbName); -teardown(DbName) -> - ok = couch_server:delete(DbName, [?ADMIN_USER]), - ok. - -teardown(_, {Source, Target}) -> - teardown(Source), - teardown(Target), - - Pid = whereis(couch_server_sup), - erlang:monitor(process, Pid), - couch_server_sup:stop(), - receive - {'DOWN', _, _, Pid, _} -> - ok - after ?TIMEOUT_STOP -> - throw({timeout, server_stop}) - end. - - -large_atts_test_() -> - Pairs = [{local, local}, {local, remote}, - {remote, local}, {remote, remote}], - { - "Replicate docs with large attachments", - { - foreachx, - fun setup/1, fun teardown/2, - [{Pair, fun should_populate_replicate_compact/2} - || Pair <- Pairs] - } - }. - - -should_populate_replicate_compact({From, To}, {Source, Target}) -> - {lists:flatten(io_lib:format("~p -> ~p", [From, To])), - {inorder, [should_populate_source(Source), - should_replicate(Source, Target), - should_compare_databases(Source, Target)]}}. - -should_populate_source({remote, Source}) -> - should_populate_source(Source); -should_populate_source(Source) -> - {timeout, ?TIMEOUT_EUNIT, ?_test(populate_db(Source, ?DOCS_COUNT))}. - -should_replicate({remote, Source}, Target) -> - should_replicate(db_url(Source), Target); -should_replicate(Source, {remote, Target}) -> - should_replicate(Source, db_url(Target)); -should_replicate(Source, Target) -> - {timeout, ?TIMEOUT_EUNIT, ?_test(replicate(Source, Target))}. - -should_compare_databases({remote, Source}, Target) -> - should_compare_databases(Source, Target); -should_compare_databases(Source, {remote, Target}) -> - should_compare_databases(Source, Target); -should_compare_databases(Source, Target) -> - {timeout, ?TIMEOUT_EUNIT, ?_test(compare_dbs(Source, Target))}. - - -populate_db(DbName, DocCount) -> - {ok, Db} = couch_db:open_int(DbName, []), - Docs = lists:foldl( - fun(DocIdCounter, Acc) -> - Doc = #doc{ - id = iolist_to_binary(["doc", integer_to_list(DocIdCounter)]), - body = {[]}, - atts = [ - att(<<"att1">>, ?ATT_SIZE_1, <<"text/plain">>), - att(<<"att2">>, ?ATT_SIZE_2, <<"app/binary">>) - ] - }, - [Doc | Acc] - end, - [], lists:seq(1, DocCount)), - {ok, _} = couch_db:update_docs(Db, Docs, []), - couch_db:close(Db). - -compare_dbs(Source, Target) -> - {ok, SourceDb} = couch_db:open_int(Source, []), - {ok, TargetDb} = couch_db:open_int(Target, []), - - Fun = fun(FullDocInfo, _, Acc) -> - {ok, DocSource} = couch_db:open_doc(SourceDb, FullDocInfo), - Id = DocSource#doc.id, - - {ok, DocTarget} = couch_db:open_doc(TargetDb, Id), - ?assertEqual(DocSource#doc.body, DocTarget#doc.body), - - #doc{atts = SourceAtts} = DocSource, - #doc{atts = TargetAtts} = DocTarget, - ?assertEqual(lists:sort([N || #att{name = N} <- SourceAtts]), - lists:sort([N || #att{name = N} <- TargetAtts])), - - FunCompareAtts = fun(#att{name = AttName} = Att) -> - {ok, AttTarget} = find_att(TargetAtts, AttName), - SourceMd5 = att_md5(Att), - TargetMd5 = att_md5(AttTarget), - case AttName of - <<"att1">> -> - ?assertEqual(gzip, Att#att.encoding), - ?assertEqual(gzip, AttTarget#att.encoding), - DecSourceMd5 = att_decoded_md5(Att), - DecTargetMd5 = att_decoded_md5(AttTarget), - ?assertEqual(DecSourceMd5, DecTargetMd5); - _ -> - ?assertEqual(identity, Att#att.encoding), - ?assertEqual(identity, AttTarget#att.encoding) - end, - ?assertEqual(SourceMd5, TargetMd5), - ?assert(is_integer(Att#att.disk_len)), - ?assert(is_integer(Att#att.att_len)), - ?assert(is_integer(AttTarget#att.disk_len)), - ?assert(is_integer(AttTarget#att.att_len)), - ?assertEqual(Att#att.disk_len, AttTarget#att.disk_len), - ?assertEqual(Att#att.att_len, AttTarget#att.att_len), - ?assertEqual(Att#att.type, AttTarget#att.type), - ?assertEqual(Att#att.md5, AttTarget#att.md5) - end, - - lists:foreach(FunCompareAtts, SourceAtts), - - {ok, Acc} - end, - - {ok, _, _} = couch_db:enum_docs(SourceDb, Fun, [], []), - ok = couch_db:close(SourceDb), - ok = couch_db:close(TargetDb). - -att(Name, Size, Type) -> - #att{ - name = Name, - type = Type, - att_len = Size, - data = fun(Count) -> crypto:rand_bytes(Count) end - }. - -find_att([], _Name) -> - nil; -find_att([#att{name = Name} = Att | _], Name) -> - {ok, Att}; -find_att([_ | Rest], Name) -> - find_att(Rest, Name). - -att_md5(Att) -> - Md50 = couch_doc:att_foldl( - Att, - fun(Chunk, Acc) -> couch_util:md5_update(Acc, Chunk) end, - couch_util:md5_init()), - couch_util:md5_final(Md50). - -att_decoded_md5(Att) -> - Md50 = couch_doc:att_foldl_decode( - Att, - fun(Chunk, Acc) -> couch_util:md5_update(Acc, Chunk) end, - couch_util:md5_init()), - couch_util:md5_final(Md50). - -db_url(DbName) -> - iolist_to_binary([ - "http://", couch_config:get("httpd", "bind_address", "127.0.0.1"), - ":", integer_to_list(mochiweb_socket_server:get(couch_httpd, port)), - "/", DbName - ]). - -replicate(Source, Target) -> - RepObject = {[{<<"source">>, Source}, {<<"target">>, Target}]}, - {ok, Rep} = couch_replicator_utils:parse_rep_doc(RepObject, ?ADMIN_ROLE), - {ok, Pid} = couch_replicator:async_replicate(Rep), - MonRef = erlang:monitor(process, Pid), - receive - {'DOWN', MonRef, process, Pid, _} -> - ok - end. http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/2850b946/src/couch_replicator/test/couch_replicator_many_leaves_tests.erl ---------------------------------------------------------------------- diff --git a/src/couch_replicator/test/couch_replicator_many_leaves_tests.erl b/src/couch_replicator/test/couch_replicator_many_leaves_tests.erl deleted file mode 100644 index 7d0de9e..0000000 --- a/src/couch_replicator/test/couch_replicator_many_leaves_tests.erl +++ /dev/null @@ -1,232 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(couch_replicator_many_leaves_tests). - --include("../../../test/couchdb/couch_eunit.hrl"). --include_lib("couchdb/couch_db.hrl"). - --define(ADMIN_ROLE, #user_ctx{roles=[<<"_admin">>]}). --define(ADMIN_USER, {user_ctx, ?ADMIN_ROLE}). --define(DOCS_CONFLICTS, [ - {<<"doc1">>, 10}, - {<<"doc2">>, 100}, - % a number > MaxURLlength (7000) / length(DocRevisionString) - {<<"doc3">>, 210} -]). --define(NUM_ATTS, 2). --define(TIMEOUT_STOP, 1000). --define(TIMEOUT_EUNIT, 60). --define(i2l(I), integer_to_list(I)). --define(io2b(Io), iolist_to_binary(Io)). - -setup() -> - DbName = ?tempdb(), - {ok, Db} = couch_db:create(DbName, [?ADMIN_USER]), - ok = couch_db:close(Db), - DbName. - -setup(local) -> - setup(); -setup(remote) -> - {remote, setup()}; -setup({A, B}) -> - {ok, _} = couch_server_sup:start_link(?CONFIG_CHAIN), - Source = setup(A), - Target = setup(B), - {Source, Target}. - -teardown({remote, DbName}) -> - teardown(DbName); -teardown(DbName) -> - ok = couch_server:delete(DbName, [?ADMIN_USER]), - ok. - -teardown(_, {Source, Target}) -> - teardown(Source), - teardown(Target), - - Pid = whereis(couch_server_sup), - erlang:monitor(process, Pid), - couch_server_sup:stop(), - receive - {'DOWN', _, _, Pid, _} -> - ok - after ?TIMEOUT_STOP -> - throw({timeout, server_stop}) - end. - - -docs_with_many_leaves_test_() -> - Pairs = [{local, local}, {local, remote}, - {remote, local}, {remote, remote}], - { - "Replicate documents with many leaves", - { - foreachx, - fun setup/1, fun teardown/2, - [{Pair, fun should_populate_replicate_compact/2} - || Pair <- Pairs] - } - }. - - -should_populate_replicate_compact({From, To}, {Source, Target}) -> - {lists:flatten(io_lib:format("~p -> ~p", [From, To])), - {inorder, [ - should_populate_source(Source), - should_replicate(Source, Target), - should_verify_target(Source, Target), - should_add_attachments_to_source(Source), - should_replicate(Source, Target), - should_verify_target(Source, Target) - ]}}. - -should_populate_source({remote, Source}) -> - should_populate_source(Source); -should_populate_source(Source) -> - {timeout, ?TIMEOUT_EUNIT, ?_test(populate_db(Source))}. - -should_replicate({remote, Source}, Target) -> - should_replicate(db_url(Source), Target); -should_replicate(Source, {remote, Target}) -> - should_replicate(Source, db_url(Target)); -should_replicate(Source, Target) -> - {timeout, ?TIMEOUT_EUNIT, ?_test(replicate(Source, Target))}. - -should_verify_target({remote, Source}, Target) -> - should_verify_target(Source, Target); -should_verify_target(Source, {remote, Target}) -> - should_verify_target(Source, Target); -should_verify_target(Source, Target) -> - {timeout, ?TIMEOUT_EUNIT, ?_test(begin - {ok, SourceDb} = couch_db:open_int(Source, []), - {ok, TargetDb} = couch_db:open_int(Target, []), - verify_target(SourceDb, TargetDb, ?DOCS_CONFLICTS), - ok = couch_db:close(SourceDb), - ok = couch_db:close(TargetDb) - end)}. - -should_add_attachments_to_source({remote, Source}) -> - should_add_attachments_to_source(Source); -should_add_attachments_to_source(Source) -> - {timeout, ?TIMEOUT_EUNIT, ?_test(begin - {ok, SourceDb} = couch_db:open_int(Source, []), - add_attachments(SourceDb, ?NUM_ATTS, ?DOCS_CONFLICTS), - ok = couch_db:close(SourceDb) - end)}. - -populate_db(DbName) -> - {ok, Db} = couch_db:open_int(DbName, []), - lists:foreach( - fun({DocId, NumConflicts}) -> - Value = <<"0">>, - Doc = #doc{ - id = DocId, - body = {[ {<<"value">>, Value} ]} - }, - {ok, _} = couch_db:update_doc(Db, Doc, []), - {ok, _} = add_doc_siblings(Db, DocId, NumConflicts) - end, ?DOCS_CONFLICTS), - couch_db:close(Db). - -add_doc_siblings(Db, DocId, NumLeaves) when NumLeaves > 0 -> - add_doc_siblings(Db, DocId, NumLeaves, [], []). - -add_doc_siblings(Db, _DocId, 0, AccDocs, AccRevs) -> - {ok, []} = couch_db:update_docs(Db, AccDocs, [], replicated_changes), - {ok, AccRevs}; - -add_doc_siblings(Db, DocId, NumLeaves, AccDocs, AccRevs) -> - Value = ?l2b(?i2l(NumLeaves)), - Rev = couch_util:md5(Value), - Doc = #doc{ - id = DocId, - revs = {1, [Rev]}, - body = {[ {<<"value">>, Value} ]} - }, - add_doc_siblings(Db, DocId, NumLeaves - 1, - [Doc | AccDocs], [{1, Rev} | AccRevs]). - -verify_target(_SourceDb, _TargetDb, []) -> - ok; -verify_target(SourceDb, TargetDb, [{DocId, NumConflicts} | Rest]) -> - {ok, SourceLookups} = couch_db:open_doc_revs( - SourceDb, - DocId, - all, - [conflicts, deleted_conflicts]), - {ok, TargetLookups} = couch_db:open_doc_revs( - TargetDb, - DocId, - all, - [conflicts, deleted_conflicts]), - SourceDocs = [Doc || {ok, Doc} <- SourceLookups], - TargetDocs = [Doc || {ok, Doc} <- TargetLookups], - Total = NumConflicts + 1, - ?assertEqual(Total, length(TargetDocs)), - lists:foreach( - fun({SourceDoc, TargetDoc}) -> - SourceJson = couch_doc:to_json_obj(SourceDoc, [attachments]), - TargetJson = couch_doc:to_json_obj(TargetDoc, [attachments]), - ?assertEqual(SourceJson, TargetJson) - end, - lists:zip(SourceDocs, TargetDocs)), - verify_target(SourceDb, TargetDb, Rest). - -add_attachments(_SourceDb, _NumAtts, []) -> - ok; -add_attachments(SourceDb, NumAtts, [{DocId, NumConflicts} | Rest]) -> - {ok, SourceLookups} = couch_db:open_doc_revs(SourceDb, DocId, all, []), - SourceDocs = [Doc || {ok, Doc} <- SourceLookups], - Total = NumConflicts + 1, - ?assertEqual(Total, length(SourceDocs)), - NewDocs = lists:foldl( - fun(#doc{atts = Atts, revs = {Pos, [Rev | _]}} = Doc, Acc) -> - NewAtts = lists:foldl(fun(I, AttAcc) -> - AttData = crypto:rand_bytes(100), - NewAtt = #att{ - name = ?io2b(["att_", ?i2l(I), "_", - couch_doc:rev_to_str({Pos, Rev})]), - type = <<"application/foobar">>, - att_len = byte_size(AttData), - data = AttData - }, - [NewAtt | AttAcc] - end, [], lists:seq(1, NumAtts)), - [Doc#doc{atts = Atts ++ NewAtts} | Acc] - end, - [], SourceDocs), - {ok, UpdateResults} = couch_db:update_docs(SourceDb, NewDocs, []), - NewRevs = [R || {ok, R} <- UpdateResults], - ?assertEqual(length(NewDocs), length(NewRevs)), - add_attachments(SourceDb, NumAtts, Rest). - -db_url(DbName) -> - iolist_to_binary([ - "http://", couch_config:get("httpd", "bind_address", "127.0.0.1"), - ":", integer_to_list(mochiweb_socket_server:get(couch_httpd, port)), - "/", DbName - ]). - -replicate(Source, Target) -> - RepObject = {[ - {<<"source">>, Source}, - {<<"target">>, Target} - ]}, - {ok, Rep} = couch_replicator_utils:parse_rep_doc(RepObject, ?ADMIN_ROLE), - {ok, Pid} = couch_replicator:async_replicate(Rep), - MonRef = erlang:monitor(process, Pid), - receive - {'DOWN', MonRef, process, Pid, _} -> - ok - end. http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/2850b946/src/couch_replicator/test/couch_replicator_missing_stubs_tests.erl ---------------------------------------------------------------------- diff --git a/src/couch_replicator/test/couch_replicator_missing_stubs_tests.erl b/src/couch_replicator/test/couch_replicator_missing_stubs_tests.erl deleted file mode 100644 index 0c2c30d..0000000 --- a/src/couch_replicator/test/couch_replicator_missing_stubs_tests.erl +++ /dev/null @@ -1,260 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(couch_replicator_missing_stubs_tests). - --include("../../../test/couchdb/couch_eunit.hrl"). --include_lib("couchdb/couch_db.hrl"). - --define(ADMIN_ROLE, #user_ctx{roles=[<<"_admin">>]}). --define(ADMIN_USER, {user_ctx, ?ADMIN_ROLE}). --define(REVS_LIMIT, 3). --define(TIMEOUT_STOP, 1000). --define(TIMEOUT_EUNIT, 30). - - -setup() -> - DbName = ?tempdb(), - {ok, Db} = couch_db:create(DbName, [?ADMIN_USER]), - ok = couch_db:close(Db), - DbName. - -setup(local) -> - setup(); -setup(remote) -> - {remote, setup()}; -setup({A, B}) -> - {ok, _} = couch_server_sup:start_link(?CONFIG_CHAIN), - Source = setup(A), - Target = setup(B), - {Source, Target}. - -teardown({remote, DbName}) -> - teardown(DbName); -teardown(DbName) -> - ok = couch_server:delete(DbName, [?ADMIN_USER]), - ok. - -teardown(_, {Source, Target}) -> - teardown(Source), - teardown(Target), - - Pid = whereis(couch_server_sup), - erlang:monitor(process, Pid), - couch_server_sup:stop(), - receive - {'DOWN', _, _, Pid, _} -> - ok - after ?TIMEOUT_STOP -> - throw({timeout, server_stop}) - end. - - -missing_stubs_test_() -> - Pairs = [{local, local}, {local, remote}, - {remote, local}, {remote, remote}], - { - "Replicate docs with missing stubs (COUCHDB-1365)", - { - foreachx, - fun setup/1, fun teardown/2, - [{Pair, fun should_replicate_docs_with_missed_att_stubs/2} - || Pair <- Pairs] - } - }. - - -should_replicate_docs_with_missed_att_stubs({From, To}, {Source, Target}) -> - {lists:flatten(io_lib:format("~p -> ~p", [From, To])), - {inorder, [ - should_populate_source(Source), - should_set_target_revs_limit(Target, ?REVS_LIMIT), - should_replicate(Source, Target), - should_compare_databases(Source, Target), - should_update_source_docs(Source, ?REVS_LIMIT * 2), - should_replicate(Source, Target), - should_compare_databases(Source, Target) - ]}}. - -should_populate_source({remote, Source}) -> - should_populate_source(Source); -should_populate_source(Source) -> - {timeout, ?TIMEOUT_EUNIT, ?_test(populate_db(Source))}. - -should_replicate({remote, Source}, Target) -> - should_replicate(db_url(Source), Target); -should_replicate(Source, {remote, Target}) -> - should_replicate(Source, db_url(Target)); -should_replicate(Source, Target) -> - {timeout, ?TIMEOUT_EUNIT, ?_test(replicate(Source, Target))}. - -should_set_target_revs_limit({remote, Target}, RevsLimit) -> - should_set_target_revs_limit(Target, RevsLimit); -should_set_target_revs_limit(Target, RevsLimit) -> - ?_test(begin - {ok, Db} = couch_db:open_int(Target, [?ADMIN_USER]), - ?assertEqual(ok, couch_db:set_revs_limit(Db, RevsLimit)), - ok = couch_db:close(Db) - end). - -should_compare_databases({remote, Source}, Target) -> - should_compare_databases(Source, Target); -should_compare_databases(Source, {remote, Target}) -> - should_compare_databases(Source, Target); -should_compare_databases(Source, Target) -> - {timeout, ?TIMEOUT_EUNIT, ?_test(compare_dbs(Source, Target))}. - -should_update_source_docs({remote, Source}, Times) -> - should_update_source_docs(Source, Times); -should_update_source_docs(Source, Times) -> - {timeout, ?TIMEOUT_EUNIT, ?_test(update_db_docs(Source, Times))}. - - -populate_db(DbName) -> - {ok, Db} = couch_db:open_int(DbName, []), - AttData = crypto:rand_bytes(6000), - Doc = #doc{ - id = <<"doc1">>, - atts = [ - #att{ - name = <<"doc1_att1">>, - type = <<"application/foobar">>, - att_len = byte_size(AttData), - data = AttData - } - ] - }, - {ok, _} = couch_db:update_doc(Db, Doc, []), - couch_db:close(Db). - -update_db_docs(DbName, Times) -> - {ok, Db} = couch_db:open_int(DbName, []), - {ok, _, _} = couch_db:enum_docs( - Db, - fun(FDI, _, Acc) -> db_fold_fun(FDI, Acc) end, - {DbName, Times}, - []), - ok = couch_db:close(Db). - -db_fold_fun(FullDocInfo, {DbName, Times}) -> - {ok, Db} = couch_db:open_int(DbName, []), - {ok, Doc} = couch_db:open_doc(Db, FullDocInfo), - lists:foldl( - fun(_, {Pos, RevId}) -> - {ok, Db2} = couch_db:reopen(Db), - NewDocVersion = Doc#doc{ - revs = {Pos, [RevId]}, - body = {[{<<"value">>, base64:encode(crypto:rand_bytes(100))}]} - }, - {ok, NewRev} = couch_db:update_doc(Db2, NewDocVersion, []), - NewRev - end, - {element(1, Doc#doc.revs), hd(element(2, Doc#doc.revs))}, - lists:seq(1, Times)), - ok = couch_db:close(Db), - {ok, {DbName, Times}}. - -compare_dbs(Source, Target) -> - {ok, SourceDb} = couch_db:open_int(Source, []), - {ok, TargetDb} = couch_db:open_int(Target, []), - - Fun = fun(FullDocInfo, _, Acc) -> - {ok, DocSource} = couch_db:open_doc(SourceDb, FullDocInfo, - [conflicts, deleted_conflicts]), - Id = DocSource#doc.id, - - {ok, DocTarget} = couch_db:open_doc(TargetDb, Id, - [conflicts, deleted_conflicts]), - ?assertEqual(DocSource#doc.body, DocTarget#doc.body), - - ?assertEqual(couch_doc:to_json_obj(DocSource, []), - couch_doc:to_json_obj(DocTarget, [])), - - #doc{atts = SourceAtts} = DocSource, - #doc{atts = TargetAtts} = DocTarget, - ?assertEqual(lists:sort([N || #att{name = N} <- SourceAtts]), - lists:sort([N || #att{name = N} <- TargetAtts])), - - lists:foreach( - fun(#att{name = AttName} = Att) -> - {ok, AttTarget} = find_att(TargetAtts, AttName), - SourceMd5 = att_md5(Att), - TargetMd5 = att_md5(AttTarget), - case AttName of - <<"att1">> -> - ?assertEqual(gzip, Att#att.encoding), - ?assertEqual(gzip, AttTarget#att.encoding), - DecSourceMd5 = att_decoded_md5(Att), - DecTargetMd5 = att_decoded_md5(AttTarget), - ?assertEqual(DecSourceMd5, DecTargetMd5); - _ -> - ?assertEqual(identity, Att#att.encoding), - ?assertEqual(identity, AttTarget#att.encoding) - end, - ?assertEqual(SourceMd5, TargetMd5), - ?assert(is_integer(Att#att.disk_len)), - ?assert(is_integer(Att#att.att_len)), - ?assert(is_integer(AttTarget#att.disk_len)), - ?assert(is_integer(AttTarget#att.att_len)), - ?assertEqual(Att#att.disk_len, AttTarget#att.disk_len), - ?assertEqual(Att#att.att_len, AttTarget#att.att_len), - ?assertEqual(Att#att.type, AttTarget#att.type), - ?assertEqual(Att#att.md5, AttTarget#att.md5) - end, - SourceAtts), - {ok, Acc} - end, - - {ok, _, _} = couch_db:enum_docs(SourceDb, Fun, [], []), - ok = couch_db:close(SourceDb), - ok = couch_db:close(TargetDb). - -find_att([], _Name) -> - nil; -find_att([#att{name = Name} = Att | _], Name) -> - {ok, Att}; -find_att([_ | Rest], Name) -> - find_att(Rest, Name). - -att_md5(Att) -> - Md50 = couch_doc:att_foldl( - Att, - fun(Chunk, Acc) -> couch_util:md5_update(Acc, Chunk) end, - couch_util:md5_init()), - couch_util:md5_final(Md50). - -att_decoded_md5(Att) -> - Md50 = couch_doc:att_foldl_decode( - Att, - fun(Chunk, Acc) -> couch_util:md5_update(Acc, Chunk) end, - couch_util:md5_init()), - couch_util:md5_final(Md50). - -db_url(DbName) -> - iolist_to_binary([ - "http://", couch_config:get("httpd", "bind_address", "127.0.0.1"), - ":", integer_to_list(mochiweb_socket_server:get(couch_httpd, port)), - "/", DbName - ]). - -replicate(Source, Target) -> - RepObject = {[ - {<<"source">>, Source}, - {<<"target">>, Target} - ]}, - {ok, Rep} = couch_replicator_utils:parse_rep_doc(RepObject, ?ADMIN_ROLE), - {ok, Pid} = couch_replicator:async_replicate(Rep), - MonRef = erlang:monitor(process, Pid), - receive - {'DOWN', MonRef, process, Pid, _} -> - ok - end. http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/2850b946/src/couch_replicator/test/couch_replicator_modules_load_tests.erl ---------------------------------------------------------------------- diff --git a/src/couch_replicator/test/couch_replicator_modules_load_tests.erl b/src/couch_replicator/test/couch_replicator_modules_load_tests.erl deleted file mode 100644 index cea4cc2..0000000 --- a/src/couch_replicator/test/couch_replicator_modules_load_tests.erl +++ /dev/null @@ -1,40 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(couch_replicator_modules_load_tests). - --include("../../../test/couchdb/couch_eunit.hrl"). - - -modules_load_test_() -> - { - "Verify that all modules loads", - should_load_modules() - }. - - -should_load_modules() -> - Modules = [ - couch_replicator_api_wrap, - couch_replicator_httpc, - couch_replicator_httpd, - couch_replicator_manager, - couch_replicator_notifier, - couch_replicator, - couch_replicator_worker, - couch_replicator_utils, - couch_replicator_job_sup - ], - [should_load_module(Mod) || Mod <- Modules]. - -should_load_module(Mod) -> - {atom_to_list(Mod), ?_assertMatch({module, _}, code:load_file(Mod))}. http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/2850b946/src/couch_replicator/test/couch_replicator_use_checkpoints_tests.erl ---------------------------------------------------------------------- diff --git a/src/couch_replicator/test/couch_replicator_use_checkpoints_tests.erl b/src/couch_replicator/test/couch_replicator_use_checkpoints_tests.erl deleted file mode 100644 index f09a235..0000000 --- a/src/couch_replicator/test/couch_replicator_use_checkpoints_tests.erl +++ /dev/null @@ -1,200 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(couch_replicator_use_checkpoints_tests). - --include("../../../test/couchdb/couch_eunit.hrl"). --include_lib("couchdb/couch_db.hrl"). - --define(ADMIN_ROLE, #user_ctx{roles=[<<"_admin">>]}). --define(ADMIN_USER, {user_ctx, ?ADMIN_ROLE}). --define(DOCS_COUNT, 100). --define(TIMEOUT_STOP, 1000). --define(TIMEOUT_EUNIT, 30). --define(i2l(I), integer_to_list(I)). --define(io2b(Io), iolist_to_binary(Io)). - - -start(false) -> - fun - ({finished, _, {CheckpointHistory}}) -> - ?assertEqual([{<<"use_checkpoints">>,false}], CheckpointHistory); - (_) -> - ok - end; -start(true) -> - fun - ({finished, _, {CheckpointHistory}}) -> - ?assertNotEqual(false, lists:keyfind(<<"session_id">>, - 1, CheckpointHistory)); - (_) -> - ok - end. - -stop(_, _) -> - ok. - -setup() -> - DbName = ?tempdb(), - {ok, Db} = couch_db:create(DbName, [?ADMIN_USER]), - ok = couch_db:close(Db), - DbName. - -setup(local) -> - setup(); -setup(remote) -> - {remote, setup()}; -setup({_, Fun, {A, B}}) -> - {ok, _} = couch_server_sup:start_link(?CONFIG_CHAIN), - {ok, Listener} = couch_replicator_notifier:start_link(Fun), - Source = setup(A), - Target = setup(B), - {Source, Target, Listener}. - -teardown({remote, DbName}) -> - teardown(DbName); -teardown(DbName) -> - ok = couch_server:delete(DbName, [?ADMIN_USER]), - ok. - -teardown(_, {Source, Target, Listener}) -> - teardown(Source), - teardown(Target), - - couch_replicator_notifier:stop(Listener), - Pid = whereis(couch_server_sup), - erlang:monitor(process, Pid), - couch_server_sup:stop(), - receive - {'DOWN', _, _, Pid, _} -> - ok - after ?TIMEOUT_STOP -> - throw({timeout, server_stop}) - end. - - -use_checkpoints_test_() -> - { - "Replication use_checkpoints feature tests", - { - foreachx, - fun start/1, fun stop/2, - [{UseCheckpoints, fun use_checkpoints_tests/2} - || UseCheckpoints <- [false, true]] - } - }. - -use_checkpoints_tests(UseCheckpoints, Fun) -> - Pairs = [{local, local}, {local, remote}, - {remote, local}, {remote, remote}], - { - "use_checkpoints: " ++ atom_to_list(UseCheckpoints), - { - foreachx, - fun setup/1, fun teardown/2, - [{{UseCheckpoints, Fun, Pair}, fun should_test_checkpoints/2} - || Pair <- Pairs] - } - }. - -should_test_checkpoints({UseCheckpoints, _, {From, To}}, {Source, Target, _}) -> - should_test_checkpoints(UseCheckpoints, {From, To}, {Source, Target}). -should_test_checkpoints(UseCheckpoints, {From, To}, {Source, Target}) -> - {lists:flatten(io_lib:format("~p -> ~p", [From, To])), - {inorder, [ - should_populate_source(Source, ?DOCS_COUNT), - should_replicate(Source, Target, UseCheckpoints), - should_compare_databases(Source, Target) - ]}}. - -should_populate_source({remote, Source}, DocCount) -> - should_populate_source(Source, DocCount); -should_populate_source(Source, DocCount) -> - {timeout, ?TIMEOUT_EUNIT, ?_test(populate_db(Source, DocCount))}. - -should_replicate({remote, Source}, Target, UseCheckpoints) -> - should_replicate(db_url(Source), Target, UseCheckpoints); -should_replicate(Source, {remote, Target}, UseCheckpoints) -> - should_replicate(Source, db_url(Target), UseCheckpoints); -should_replicate(Source, Target, UseCheckpoints) -> - {timeout, ?TIMEOUT_EUNIT, ?_test(replicate(Source, Target, UseCheckpoints))}. - -should_compare_databases({remote, Source}, Target) -> - should_compare_databases(Source, Target); -should_compare_databases(Source, {remote, Target}) -> - should_compare_databases(Source, Target); -should_compare_databases(Source, Target) -> - {timeout, ?TIMEOUT_EUNIT, ?_test(compare_dbs(Source, Target))}. - - -populate_db(DbName, DocCount) -> - {ok, Db} = couch_db:open_int(DbName, []), - Docs = lists:foldl( - fun(DocIdCounter, Acc) -> - Id = ?io2b(["doc", ?i2l(DocIdCounter)]), - Value = ?io2b(["val", ?i2l(DocIdCounter)]), - Doc = #doc{ - id = Id, - body = {[ {<<"value">>, Value} ]} - }, - [Doc | Acc] - end, - [], lists:seq(1, DocCount)), - {ok, _} = couch_db:update_docs(Db, Docs, []), - ok = couch_db:close(Db). - -compare_dbs(Source, Target) -> - {ok, SourceDb} = couch_db:open_int(Source, []), - {ok, TargetDb} = couch_db:open_int(Target, []), - Fun = fun(FullDocInfo, _, Acc) -> - {ok, Doc} = couch_db:open_doc(SourceDb, FullDocInfo), - {Props} = DocJson = couch_doc:to_json_obj(Doc, [attachments]), - DocId = couch_util:get_value(<<"_id">>, Props), - DocTarget = case couch_db:open_doc(TargetDb, DocId) of - {ok, DocT} -> - DocT; - Error -> - erlang:error( - {assertion_failed, - [{module, ?MODULE}, {line, ?LINE}, - {reason, lists:concat(["Error opening document '", - ?b2l(DocId), "' from target: ", - couch_util:to_list(Error)])}]}) - end, - DocTargetJson = couch_doc:to_json_obj(DocTarget, [attachments]), - ?assertEqual(DocJson, DocTargetJson), - {ok, Acc} - end, - {ok, _, _} = couch_db:enum_docs(SourceDb, Fun, [], []), - ok = couch_db:close(SourceDb), - ok = couch_db:close(TargetDb). - -db_url(DbName) -> - iolist_to_binary([ - "http://", couch_config:get("httpd", "bind_address", "127.0.0.1"), - ":", integer_to_list(mochiweb_socket_server:get(couch_httpd, port)), - "/", DbName - ]). - -replicate(Source, Target, UseCheckpoints) -> - RepObject = {[ - {<<"source">>, Source}, - {<<"target">>, Target}, - {<<"use_checkpoints">>, UseCheckpoints} - ]}, - {ok, Rep} = couch_replicator_utils:parse_rep_doc(RepObject, ?ADMIN_ROLE), - {ok, Pid} = couch_replicator:async_replicate(Rep), - MonRef = erlang:monitor(process, Pid), - receive - {'DOWN', MonRef, process, Pid, _} -> - ok - end. http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/2850b946/test/couch_replicator_compact_tests.erl ---------------------------------------------------------------------- diff --git a/test/couch_replicator_compact_tests.erl b/test/couch_replicator_compact_tests.erl new file mode 100644 index 0000000..4e019fe --- /dev/null +++ b/test/couch_replicator_compact_tests.erl @@ -0,0 +1,448 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_replicator_compact_tests). + +-include("../../../test/couchdb/couch_eunit.hrl"). +-include_lib("couchdb/couch_db.hrl"). +-include_lib("couch_replicator/src/couch_replicator.hrl"). + +-define(ADMIN_ROLE, #user_ctx{roles=[<<"_admin">>]}). +-define(ADMIN_USER, {user_ctx, ?ADMIN_ROLE}). +-define(ATTFILE, filename:join([?FIXTURESDIR, "logo.png"])). +-define(DELAY, 100). +-define(TIMEOUT, 30000). +-define(TIMEOUT_STOP, 1000). +-define(TIMEOUT_WRITER, 3000). +-define(TIMEOUT_EUNIT, ?TIMEOUT div 1000 + 5). + +setup() -> + DbName = ?tempdb(), + {ok, Db} = couch_db:create(DbName, [?ADMIN_USER]), + ok = couch_db:close(Db), + DbName. + +setup(local) -> + setup(); +setup(remote) -> + {remote, setup()}; +setup({A, B}) -> + {ok, _} = couch_server_sup:start_link(?CONFIG_CHAIN), + Source = setup(A), + Target = setup(B), + {Source, Target}. + +teardown({remote, DbName}) -> + teardown(DbName); +teardown(DbName) -> + ok = couch_server:delete(DbName, [?ADMIN_USER]), + ok. + +teardown(_, {Source, Target}) -> + teardown(Source), + teardown(Target), + + Pid = whereis(couch_server_sup), + erlang:monitor(process, Pid), + couch_server_sup:stop(), + receive + {'DOWN', _, _, Pid, _} -> + ok + after ?TIMEOUT_STOP -> + throw({timeout, server_stop}) + end. + + +compact_test_() -> + Pairs = [{local, local}, {local, remote}, + {remote, local}, {remote, remote}], + { + "Compaction during replication tests", + { + foreachx, + fun setup/1, fun teardown/2, + [{Pair, fun should_populate_replicate_compact/2} + || Pair <- Pairs] + } + }. + + +should_populate_replicate_compact({From, To}, {Source, Target}) -> + {ok, RepPid, RepId} = replicate(Source, Target), + {lists:flatten(io_lib:format("~p -> ~p", [From, To])), + {inorder, [ + should_run_replication(RepPid, RepId, Source, Target), + should_all_processes_be_alive(RepPid, Source, Target), + should_populate_and_compact(RepPid, Source, Target, 50, 5), + should_wait_target_in_sync(Source, Target), + should_ensure_replication_still_running(RepPid, RepId, Source, Target), + should_cancel_replication(RepId, RepPid), + should_compare_databases(Source, Target) + ]}}. + +should_all_processes_be_alive(RepPid, Source, Target) -> + ?_test(begin + {ok, SourceDb} = reopen_db(Source), + {ok, TargetDb} = reopen_db(Target), + ?assert(is_process_alive(RepPid)), + ?assert(is_process_alive(SourceDb#db.main_pid)), + ?assert(is_process_alive(TargetDb#db.main_pid)) + end). + +should_run_replication(RepPid, RepId, Source, Target) -> + ?_test(check_active_tasks(RepPid, RepId, Source, Target)). + +should_ensure_replication_still_running(RepPid, RepId, Source, Target) -> + ?_test(check_active_tasks(RepPid, RepId, Source, Target)). + +check_active_tasks(RepPid, {BaseId, Ext} = _RepId, Src, Tgt) -> + Source = case Src of + {remote, NameSrc} -> + <<(db_url(NameSrc))/binary, $/>>; + _ -> + Src + end, + Target = case Tgt of + {remote, NameTgt} -> + <<(db_url(NameTgt))/binary, $/>>; + _ -> + Tgt + end, + FullRepId = ?l2b(BaseId ++ Ext), + Pid = ?l2b(pid_to_list(RepPid)), + [RepTask] = couch_task_status:all(), + ?assertEqual(Pid, couch_util:get_value(pid, RepTask)), + ?assertEqual(FullRepId, couch_util:get_value(replication_id, RepTask)), + ?assertEqual(true, couch_util:get_value(continuous, RepTask)), + ?assertEqual(Source, couch_util:get_value(source, RepTask)), + ?assertEqual(Target, couch_util:get_value(target, RepTask)), + ?assert(is_integer(couch_util:get_value(docs_read, RepTask))), + ?assert(is_integer(couch_util:get_value(docs_written, RepTask))), + ?assert(is_integer(couch_util:get_value(doc_write_failures, RepTask))), + ?assert(is_integer(couch_util:get_value(revisions_checked, RepTask))), + ?assert(is_integer(couch_util:get_value(missing_revisions_found, RepTask))), + ?assert(is_integer(couch_util:get_value(checkpointed_source_seq, RepTask))), + ?assert(is_integer(couch_util:get_value(source_seq, RepTask))), + Progress = couch_util:get_value(progress, RepTask), + ?assert(is_integer(Progress)), + ?assert(Progress =< 100). + +should_cancel_replication(RepId, RepPid) -> + ?_assertNot(begin + {ok, _} = couch_replicator:cancel_replication(RepId), + is_process_alive(RepPid) + end). + +should_populate_and_compact(RepPid, Source, Target, BatchSize, Rounds) -> + {timeout, ?TIMEOUT_EUNIT, ?_test(begin + {ok, SourceDb0} = reopen_db(Source), + Writer = spawn_writer(SourceDb0), + lists:foreach( + fun(N) -> + {ok, SourceDb} = reopen_db(Source), + {ok, TargetDb} = reopen_db(Target), + pause_writer(Writer), + + compact_db("source", SourceDb), + ?assert(is_process_alive(RepPid)), + ?assert(is_process_alive(SourceDb#db.main_pid)), + check_ref_counter("source", SourceDb), + + compact_db("target", TargetDb), + ?assert(is_process_alive(RepPid)), + ?assert(is_process_alive(TargetDb#db.main_pid)), + check_ref_counter("target", TargetDb), + + {ok, SourceDb2} = reopen_db(SourceDb), + {ok, TargetDb2} = reopen_db(TargetDb), + + resume_writer(Writer), + wait_writer(Writer, BatchSize * N), + + compact_db("source", SourceDb2), + ?assert(is_process_alive(RepPid)), + ?assert(is_process_alive(SourceDb2#db.main_pid)), + pause_writer(Writer), + check_ref_counter("source", SourceDb2), + resume_writer(Writer), + + compact_db("target", TargetDb2), + ?assert(is_process_alive(RepPid)), + ?assert(is_process_alive(TargetDb2#db.main_pid)), + pause_writer(Writer), + check_ref_counter("target", TargetDb2), + resume_writer(Writer) + end, lists:seq(1, Rounds)), + stop_writer(Writer) + end)}. + +should_wait_target_in_sync({remote, Source}, Target) -> + should_wait_target_in_sync(Source, Target); +should_wait_target_in_sync(Source, {remote, Target}) -> + should_wait_target_in_sync(Source, Target); +should_wait_target_in_sync(Source, Target) -> + {timeout, ?TIMEOUT_EUNIT, ?_assert(begin + {ok, SourceDb} = couch_db:open_int(Source, []), + {ok, SourceInfo} = couch_db:get_db_info(SourceDb), + ok = couch_db:close(SourceDb), + SourceDocCount = couch_util:get_value(doc_count, SourceInfo), + wait_target_in_sync_loop(SourceDocCount, Target, 300) + end)}. + +wait_target_in_sync_loop(_DocCount, _TargetName, 0) -> + erlang:error( + {assertion_failed, + [{module, ?MODULE}, {line, ?LINE}, + {reason, "Could not get source and target databases in sync"}]}); +wait_target_in_sync_loop(DocCount, {remote, TargetName}, RetriesLeft) -> + wait_target_in_sync_loop(DocCount, TargetName, RetriesLeft); +wait_target_in_sync_loop(DocCount, TargetName, RetriesLeft) -> + {ok, Target} = couch_db:open_int(TargetName, []), + {ok, TargetInfo} = couch_db:get_db_info(Target), + ok = couch_db:close(Target), + TargetDocCount = couch_util:get_value(doc_count, TargetInfo), + case TargetDocCount == DocCount of + true -> + true; + false -> + ok = timer:sleep(?DELAY), + wait_target_in_sync_loop(DocCount, TargetName, RetriesLeft - 1) + end. + +should_compare_databases({remote, Source}, Target) -> + should_compare_databases(Source, Target); +should_compare_databases(Source, {remote, Target}) -> + should_compare_databases(Source, Target); +should_compare_databases(Source, Target) -> + {timeout, 35, ?_test(begin + {ok, SourceDb} = couch_db:open_int(Source, []), + {ok, TargetDb} = couch_db:open_int(Target, []), + Fun = fun(FullDocInfo, _, Acc) -> + {ok, Doc} = couch_db:open_doc(SourceDb, FullDocInfo), + {Props} = DocJson = couch_doc:to_json_obj(Doc, [attachments]), + DocId = couch_util:get_value(<<"_id">>, Props), + DocTarget = case couch_db:open_doc(TargetDb, DocId) of + {ok, DocT} -> + DocT; + Error -> + erlang:error( + {assertion_failed, + [{module, ?MODULE}, {line, ?LINE}, + {reason, lists:concat(["Error opening document '", + ?b2l(DocId), "' from target: ", + couch_util:to_list(Error)])}]}) + end, + DocTargetJson = couch_doc:to_json_obj(DocTarget, [attachments]), + ?assertEqual(DocJson, DocTargetJson), + {ok, Acc} + end, + {ok, _, _} = couch_db:enum_docs(SourceDb, Fun, [], []), + ok = couch_db:close(SourceDb), + ok = couch_db:close(TargetDb) + end)}. + + +reopen_db({remote, Db}) -> + reopen_db(Db); +reopen_db(#db{name=DbName}) -> + reopen_db(DbName); +reopen_db(DbName) -> + {ok, Db} = couch_db:open_int(DbName, []), + ok = couch_db:close(Db), + {ok, Db}. + +compact_db(Type, #db{name = Name}) -> + {ok, Db} = couch_db:open_int(Name, []), + {ok, CompactPid} = couch_db:start_compact(Db), + MonRef = erlang:monitor(process, CompactPid), + receive + {'DOWN', MonRef, process, CompactPid, normal} -> + ok; + {'DOWN', MonRef, process, CompactPid, Reason} -> + erlang:error( + {assertion_failed, + [{module, ?MODULE}, {line, ?LINE}, + {reason, + lists:concat(["Error compacting ", Type, " database ", + ?b2l(Name), ": ", + couch_util:to_list(Reason)])}]}) + after ?TIMEOUT -> + erlang:error( + {assertion_failed, + [{module, ?MODULE}, {line, ?LINE}, + {reason, lists:concat(["Compaction for ", Type, " database ", + ?b2l(Name), " didn't finish"])}]}) + end, + ok = couch_db:close(Db). + +check_ref_counter(Type, #db{name = Name, fd_ref_counter = OldRefCounter}) -> + MonRef = erlang:monitor(process, OldRefCounter), + receive + {'DOWN', MonRef, process, OldRefCounter, _} -> + ok + after ?TIMEOUT -> + erlang:error( + {assertion_failed, + [{module, ?MODULE}, {line, ?LINE}, + {reason, lists:concat(["Old ", Type, + " database ref counter didn't" + " terminate"])}]}) + end, + {ok, #db{fd_ref_counter = NewRefCounter} = Db} = couch_db:open_int(Name, []), + ok = couch_db:close(Db), + ?assertNotEqual(OldRefCounter, NewRefCounter). + +db_url(DbName) -> + iolist_to_binary([ + "http://", couch_config:get("httpd", "bind_address", "127.0.0.1"), + ":", integer_to_list(mochiweb_socket_server:get(couch_httpd, port)), + "/", DbName + ]). + +replicate({remote, Db}, Target) -> + replicate(db_url(Db), Target); + +replicate(Source, {remote, Db}) -> + replicate(Source, db_url(Db)); + +replicate(Source, Target) -> + RepObject = {[ + {<<"source">>, Source}, + {<<"target">>, Target}, + {<<"continuous">>, true} + ]}, + {ok, Rep} = couch_replicator_utils:parse_rep_doc(RepObject, ?ADMIN_ROLE), + {ok, Pid} = couch_replicator:async_replicate(Rep), + {ok, Pid, Rep#rep.id}. + + +wait_writer(Pid, NumDocs) -> + case get_writer_num_docs_written(Pid) of + N when N >= NumDocs -> + ok; + _ -> + wait_writer(Pid, NumDocs) + end. + +spawn_writer(Db) -> + Parent = self(), + Pid = spawn(fun() -> writer_loop(Db, Parent, 0) end), + Pid. + + +pause_writer(Pid) -> + Ref = make_ref(), + Pid ! {pause, Ref}, + receive + {paused, Ref} -> + ok + after ?TIMEOUT_WRITER -> + erlang:error({assertion_failed, + [{module, ?MODULE}, + {line, ?LINE}, + {reason, "Failed to pause source database writer"}]}) + end. + +resume_writer(Pid) -> + Ref = make_ref(), + Pid ! {continue, Ref}, + receive + {ok, Ref} -> + ok + after ?TIMEOUT_WRITER -> + erlang:error({assertion_failed, + [{module, ?MODULE}, + {line, ?LINE}, + {reason, "Failed to pause source database writer"}]}) + end. + +get_writer_num_docs_written(Pid) -> + Ref = make_ref(), + Pid ! {get_count, Ref}, + receive + {count, Ref, Count} -> + Count + after ?TIMEOUT_WRITER -> + erlang:error({assertion_failed, + [{module, ?MODULE}, + {line, ?LINE}, + {reason, "Timeout getting number of documents written" + " from source database writer"}]}) + end. + +stop_writer(Pid) -> + Ref = make_ref(), + Pid ! {stop, Ref}, + receive + {stopped, Ref, DocsWritten} -> + MonRef = erlang:monitor(process, Pid), + receive + {'DOWN', MonRef, process, Pid, _Reason} -> + DocsWritten + after ?TIMEOUT -> + erlang:error({assertion_failed, + [{module, ?MODULE}, + {line, ?LINE}, + {reason, "Timeout stopping source database writer"}]}) + end + after ?TIMEOUT_WRITER -> + erlang:error({assertion_failed, + [{module, ?MODULE}, + {line, ?LINE}, + {reason, "Timeout stopping source database writer"}]}) + end. + +writer_loop(#db{name = DbName}, Parent, Counter) -> + {ok, Data} = file:read_file(?ATTFILE), + maybe_pause(Parent, Counter), + Doc = couch_doc:from_json_obj({[ + {<<"_id">>, ?l2b(integer_to_list(Counter + 1))}, + {<<"value">>, Counter + 1}, + {<<"_attachments">>, {[ + {<<"icon1.png">>, {[ + {<<"data">>, base64:encode(Data)}, + {<<"content_type">>, <<"image/png">>} + ]}}, + {<<"icon2.png">>, {[ + {<<"data">>, base64:encode(iolist_to_binary([Data, Data]))}, + {<<"content_type">>, <<"image/png">>} + ]}} + ]}} + ]}), + maybe_pause(Parent, Counter), + {ok, Db} = couch_db:open_int(DbName, []), + {ok, _} = couch_db:update_doc(Db, Doc, []), + ok = couch_db:close(Db), + receive + {get_count, Ref} -> + Parent ! {count, Ref, Counter + 1}, + writer_loop(Db, Parent, Counter + 1); + {stop, Ref} -> + Parent ! {stopped, Ref, Counter + 1} + after 0 -> + timer:sleep(?DELAY), + writer_loop(Db, Parent, Counter + 1) + end. + +maybe_pause(Parent, Counter) -> + receive + {get_count, Ref} -> + Parent ! {count, Ref, Counter}; + {pause, Ref} -> + Parent ! {paused, Ref}, + receive + {continue, Ref2} -> + Parent ! {ok, Ref2} + end + after 0 -> + ok + end. http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/2850b946/test/couch_replicator_httpc_pool_tests.erl ---------------------------------------------------------------------- diff --git a/test/couch_replicator_httpc_pool_tests.erl b/test/couch_replicator_httpc_pool_tests.erl new file mode 100644 index 0000000..6bede4c --- /dev/null +++ b/test/couch_replicator_httpc_pool_tests.erl @@ -0,0 +1,189 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_replicator_httpc_pool_tests). + +-include("../../../test/couchdb/couch_eunit.hrl"). +-include_lib("couchdb/couch_db.hrl"). + +-define(ADMIN_USER, {user_ctx, #user_ctx{roles=[<<"_admin">>]}}). +-define(TIMEOUT, 1000). + + +start() -> + {ok, Pid} = couch_server_sup:start_link(?CONFIG_CHAIN), + Pid. + +stop(Pid) -> + erlang:monitor(process, Pid), + couch_server_sup:stop(), + receive + {'DOWN', _, _, Pid, _} -> + ok + after ?TIMEOUT -> + throw({timeout, server_stop}) + end. + +setup() -> + spawn_pool(). + +teardown(Pool) -> + stop_pool(Pool). + + +httpc_pool_test_() -> + { + "httpc pool tests", + { + setup, + fun start/0, fun stop/1, + { + foreach, + fun setup/0, fun teardown/1, + [ + fun should_block_new_clients_when_full/1, + fun should_replace_worker_on_death/1 + ] + } + } + }. + + +should_block_new_clients_when_full(Pool) -> + ?_test(begin + Client1 = spawn_client(Pool), + Client2 = spawn_client(Pool), + Client3 = spawn_client(Pool), + + ?assertEqual(ok, ping_client(Client1)), + ?assertEqual(ok, ping_client(Client2)), + ?assertEqual(ok, ping_client(Client3)), + + Worker1 = get_client_worker(Client1, "1"), + Worker2 = get_client_worker(Client2, "2"), + Worker3 = get_client_worker(Client3, "3"), + + ?assert(is_process_alive(Worker1)), + ?assert(is_process_alive(Worker2)), + ?assert(is_process_alive(Worker3)), + + ?assertNotEqual(Worker1, Worker2), + ?assertNotEqual(Worker2, Worker3), + ?assertNotEqual(Worker3, Worker1), + + Client4 = spawn_client(Pool), + ?assertEqual(timeout, ping_client(Client4)), + + ?assertEqual(ok, stop_client(Client1)), + ?assertEqual(ok, ping_client(Client4)), + + Worker4 = get_client_worker(Client4, "4"), + ?assertEqual(Worker1, Worker4), + + lists:foreach( + fun(C) -> + ?assertEqual(ok, stop_client(C)) + end, [Client2, Client3, Client4]) + end). + +should_replace_worker_on_death(Pool) -> + ?_test(begin + Client1 = spawn_client(Pool), + ?assertEqual(ok, ping_client(Client1)), + Worker1 = get_client_worker(Client1, "1"), + ?assert(is_process_alive(Worker1)), + + ?assertEqual(ok, kill_client_worker(Client1)), + ?assertNot(is_process_alive(Worker1)), + ?assertEqual(ok, stop_client(Client1)), + + Client2 = spawn_client(Pool), + ?assertEqual(ok, ping_client(Client2)), + Worker2 = get_client_worker(Client2, "2"), + ?assert(is_process_alive(Worker2)), + + ?assertNotEqual(Worker1, Worker2), + ?assertEqual(ok, stop_client(Client2)) + end). + + +spawn_client(Pool) -> + Parent = self(), + Ref = make_ref(), + Pid = spawn(fun() -> + {ok, Worker} = couch_replicator_httpc_pool:get_worker(Pool), + loop(Parent, Ref, Worker, Pool) + end), + {Pid, Ref}. + +ping_client({Pid, Ref}) -> + Pid ! ping, + receive + {pong, Ref} -> + ok + after ?TIMEOUT -> + timeout + end. + +get_client_worker({Pid, Ref}, ClientName) -> + Pid ! get_worker, + receive + {worker, Ref, Worker} -> + Worker + after ?TIMEOUT -> + erlang:error( + {assertion_failed, + [{module, ?MODULE}, {line, ?LINE}, + {reason, "Timeout getting client " ++ ClientName ++ " worker"}]}) + end. + +stop_client({Pid, Ref}) -> + Pid ! stop, + receive + {stop, Ref} -> + ok + after ?TIMEOUT -> + timeout + end. + +kill_client_worker({Pid, Ref}) -> + Pid ! get_worker, + receive + {worker, Ref, Worker} -> + exit(Worker, kill), + ok + after ?TIMEOUT -> + timeout + end. + +loop(Parent, Ref, Worker, Pool) -> + receive + ping -> + Parent ! {pong, Ref}, + loop(Parent, Ref, Worker, Pool); + get_worker -> + Parent ! {worker, Ref, Worker}, + loop(Parent, Ref, Worker, Pool); + stop -> + couch_replicator_httpc_pool:release_worker(Pool, Worker), + Parent ! {stop, Ref} + end. + +spawn_pool() -> + Host = couch_config:get("httpd", "bind_address", "127.0.0.1"), + Port = couch_config:get("httpd", "port", "5984"), + {ok, Pool} = couch_replicator_httpc_pool:start_link( + "http://" ++ Host ++ ":" ++ Port, [{max_connections, 3}]), + Pool. + +stop_pool(Pool) -> + ok = couch_replicator_httpc_pool:stop(Pool).