couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fdman...@apache.org
Subject svn commit: r1169826 - in /couchdb/trunk/test/etap: 240-replication-compact.t Makefile.am
Date Mon, 12 Sep 2011 16:44:28 GMT
Author: fdmanana
Date: Mon Sep 12 16:44:28 2011
New Revision: 1169826

URL: http://svn.apache.org/viewvc?rev=1169826&view=rev
Log:
Add test 240-replication-compact.t

The purpose of this test is to verify that compacting databases
that are being used as the source or target of a replication
doesn't affect the replication and that the replication doesn't
hold their reference counters forever.


Added:
    couchdb/trunk/test/etap/240-replication-compact.t
Modified:
    couchdb/trunk/test/etap/Makefile.am

Added: couchdb/trunk/test/etap/240-replication-compact.t
URL: http://svn.apache.org/viewvc/couchdb/trunk/test/etap/240-replication-compact.t?rev=1169826&view=auto
==============================================================================
--- couchdb/trunk/test/etap/240-replication-compact.t (added)
+++ couchdb/trunk/test/etap/240-replication-compact.t Mon Sep 12 16:44:28 2011
@@ -0,0 +1,438 @@
+#!/usr/bin/env escript
+%% -*- erlang -*-
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+% Verify that compacting databases that are being used as the source or
+% target of a replication doesn't affect the replication and that the
+% replication doesn't hold their reference counters forever.
+
+-define(b2l(B), binary_to_list(B)).
+
+-record(user_ctx, {
+    name = null,
+    roles = [],
+    handler
+}).
+
+-record(db, {
+    main_pid = nil,
+    update_pid = nil,
+    compactor_pid = nil,
+    instance_start_time, % number of microsecs since jan 1 1970 as a binary string
+    fd,
+    updater_fd,
+    fd_ref_counter,
+    header = nil,
+    committed_update_seq,
+    fulldocinfo_by_id_btree,
+    docinfo_by_seq_btree,
+    local_docs_btree,
+    update_seq,
+    name,
+    filepath,
+    validate_doc_funs = [],
+    security = [],
+    security_ptr = nil,
+    user_ctx = #user_ctx{},
+    waiting_delayed_commit = nil,
+    revs_limit = 1000,
+    fsync_options = [],
+    options = [],
+    compression
+}).
+
+-record(rep, {
+    id,
+    source,
+    target,
+    options,
+    user_ctx,
+    doc_id
+}).
+
+
+source_db_name() -> <<"couch_test_rep_db_a">>.
+target_db_name() -> <<"couch_test_rep_db_b">>.
+
+
+main(_) ->
+    test_util:init_code_path(),
+
+    etap:plan(264),
+    case (catch test()) of
+        ok ->
+            etap:end_tests();
+        Other ->
+            etap:diag(io_lib:format("Test died abnormally: ~p", [Other])),
+            etap:bail(Other)
+    end,
+    ok.
+
+
+test() ->
+    couch_server_sup:start_link(test_util:config_files()),
+    ibrowse:start(),
+
+    Pairs = [
+        {source_db_name(), target_db_name()},
+        {{remote, source_db_name()}, target_db_name()},
+        {source_db_name(), {remote, target_db_name()}},
+        {{remote, source_db_name()}, {remote, (target_db_name())}}
+    ],
+
+    lists:foreach(
+        fun({Source, Target}) ->
+            {ok, SourceDb} = create_db(source_db_name()),
+            etap:is(couch_db:is_idle(SourceDb), true,
+                "Source database is idle before starting replication"),
+
+            {ok, TargetDb} = create_db(target_db_name()),
+            etap:is(couch_db:is_idle(TargetDb), true,
+                "Target database is idle before starting replication"),
+
+            {ok, RepPid, RepId} = replicate(Source, Target),
+            {ok, DocsWritten} = populate_and_compact_test(
+                RepPid, SourceDb, TargetDb),
+
+            wait_target_in_sync(DocsWritten, TargetDb),
+            cancel_replication(RepId, RepPid),
+            compare_dbs(SourceDb, TargetDb),
+
+            delete_db(SourceDb),
+            delete_db(TargetDb),
+            couch_server_sup:stop(),
+            ok = timer:sleep(1000),
+            couch_server_sup:start_link(test_util:config_files())
+        end,
+        Pairs),
+
+    couch_server_sup:stop(),
+    ok.
+
+
+populate_and_compact_test(RepPid, SourceDb0, TargetDb0) ->
+    etap:is(is_process_alive(RepPid), true, "Replication process is alive"),
+    check_db_alive("source", SourceDb0),
+    check_db_alive("target", TargetDb0),
+
+    Writer = spawn_writer(SourceDb0),
+
+    lists:foldl(
+        fun(_, {SourceDb, TargetDb, DocCount}) ->
+            pause_writer(Writer),
+
+            compact_db("source", SourceDb),
+            etap:is(is_process_alive(RepPid), true,
+                "Replication process is alive after source database compaction"),
+            check_db_alive("source", SourceDb),
+            check_ref_counter("source", SourceDb),
+
+            compact_db("target", TargetDb),
+            etap:is(is_process_alive(RepPid), true,
+                "Replication process is alive after target database compaction"),
+            check_db_alive("target", TargetDb),
+            check_ref_counter("target", TargetDb),
+
+            {ok, SourceDb2} = reopen_db(SourceDb),
+            {ok, TargetDb2} = reopen_db(TargetDb),
+
+            resume_writer(Writer),
+            wait_writer(Writer, DocCount),
+
+            compact_db("source", SourceDb2),
+            etap:is(is_process_alive(RepPid), true,
+                "Replication process is alive after source database compaction"),
+            check_db_alive("source", SourceDb2),
+            pause_writer(Writer),
+            check_ref_counter("source", SourceDb2),
+            resume_writer(Writer),
+
+            compact_db("target", TargetDb2),
+            etap:is(is_process_alive(RepPid), true,
+                "Replication process is alive after target database compaction"),
+            check_db_alive("target", TargetDb2),
+            pause_writer(Writer),
+            check_ref_counter("target", TargetDb2),
+            resume_writer(Writer),
+
+            {ok, SourceDb3} = reopen_db(SourceDb2),
+            {ok, TargetDb3} = reopen_db(TargetDb2),
+            {SourceDb3, TargetDb3, DocCount + 50}
+        end,
+        {SourceDb0, TargetDb0, 50}, lists:seq(1, 5)),
+
+    DocsWritten = stop_writer(Writer),
+    {ok, DocsWritten}.
+
+
+check_db_alive(Type, #db{main_pid = Pid}) ->
+    etap:is(is_process_alive(Pid), true,
+        "Local " ++ Type ++ " database main pid is alive").
+
+
+compact_db(Type, #db{name = Name}) ->
+    {ok, Db} = couch_db:open_int(Name, []),
+    {ok, CompactPid} = couch_db:start_compact(Db),
+    MonRef = erlang:monitor(process, CompactPid),
+    receive
+    {'DOWN', MonRef, process, CompactPid, normal} ->
+        ok;
+    {'DOWN', MonRef, process, CompactPid, Reason} ->
+        etap:bail("Error compacting " ++ Type ++ " database " ++ ?b2l(Name) ++
+            ": " ++ couch_util:to_list(Reason))
+    after 30000 ->
+        etap:bail("Compaction for " ++ Type ++ " database " ++ ?b2l(Name) ++
+            " didn't finish")
+    end,
+    ok = couch_db:close(Db).
+
+
+check_ref_counter(Type, #db{name = Name, fd_ref_counter = OldRefCounter}) ->
+    MonRef = erlang:monitor(process, OldRefCounter),
+    receive
+    {'DOWN', MonRef, process, OldRefCounter, _} ->
+        etap:diag("Old " ++ Type ++ " database ref counter terminated")
+    after 30000 ->
+        etap:bail("Old " ++ Type ++ " database ref counter didn't terminate")
+    end,
+    {ok, #db{fd_ref_counter = NewRefCounter} = Db} = couch_db:open_int(Name, []),
+    ok = couch_db:close(Db),
+    etap:isnt(
+        NewRefCounter, OldRefCounter, Type ++ " database has new ref counter").
+
+
+reopen_db(#db{name = Name}) ->
+    {ok, Db} = couch_db:open_int(Name, []),
+    ok = couch_db:close(Db),
+    {ok, Db}.
+
+
+wait_target_in_sync(DocCount, #db{name = TargetName}) ->
+    wait_target_in_sync_loop(DocCount, TargetName, 300).
+
+
+wait_target_in_sync_loop(_DocCount, _TargetName, 0) ->
+    etap:bail("Could not get source and target databases in sync");
+wait_target_in_sync_loop(DocCount, TargetName, RetriesLeft) ->
+    {ok, Target} = couch_db:open_int(TargetName, []),
+    {ok, TargetInfo} = couch_db:get_db_info(Target),
+    ok = couch_db:close(Target),
+    TargetDocCount = couch_util:get_value(doc_count, TargetInfo),
+    case TargetDocCount == DocCount of
+    true ->
+        etap:diag("Source and target databases are in sync");
+    false ->
+        ok = timer:sleep(100),
+        wait_target_in_sync_loop(DocCount, TargetName, RetriesLeft - 1)
+    end.
+
+
+compare_dbs(#db{name = SourceName}, #db{name = TargetName}) ->
+    {ok, SourceDb} = couch_db:open_int(SourceName, []),
+    {ok, TargetDb} = couch_db:open_int(TargetName, []),
+    Fun = fun(FullDocInfo, _, Acc) ->
+        {ok, Doc} = couch_db:open_doc(SourceDb, FullDocInfo),
+        {Props} = DocJson = couch_doc:to_json_obj(Doc, [attachments]),
+        DocId = couch_util:get_value(<<"_id">>, Props),
+        DocTarget = case couch_db:open_doc(TargetDb, DocId) of
+        {ok, DocT} ->
+            DocT;
+        Error ->
+            etap:bail("Error opening document '" ++ ?b2l(DocId) ++
+                "' from target: " ++ couch_util:to_list(Error))
+        end,
+        DocTargetJson = couch_doc:to_json_obj(DocTarget, [attachments]),
+        case DocTargetJson of
+        DocJson ->
+            ok;
+        _ ->
+            etap:bail("Content from document '" ++ ?b2l(DocId) ++
+                "' differs in target database")
+        end,
+        {ok, Acc}
+    end,
+    {ok, _, _} = couch_db:enum_docs(SourceDb, Fun, [], []),
+    etap:diag("Target database has the same documents as the source database"),
+    ok = couch_db:close(SourceDb),
+    ok = couch_db:close(TargetDb).
+
+
+wait_writer(Pid, NumDocs) ->
+    case get_writer_num_docs_written(Pid) of
+    N when N >= NumDocs ->
+        ok;
+    _ ->
+        wait_writer(Pid, NumDocs)
+    end.
+
+
+spawn_writer(Db) ->
+    Parent = self(),
+    Pid = spawn(fun() -> writer_loop(Db, Parent, 0) end),
+    etap:diag("Started source database writer"),
+    Pid.
+
+
+pause_writer(Pid) ->
+    Ref = make_ref(),
+    Pid ! {pause, Ref},
+    receive
+    {paused, Ref} ->
+        ok
+    after 30000 ->
+        etap:bail("Failed to pause source database writer")
+    end.
+
+
+resume_writer(Pid) ->
+    Ref = make_ref(),
+    Pid ! {continue, Ref},
+    receive
+    {ok, Ref} ->
+        ok
+    after 30000 ->
+        etap:bail("Failed to unpause source database writer")
+    end.
+
+
+get_writer_num_docs_written(Pid) ->
+    Ref = make_ref(),
+    Pid ! {get_count, Ref},
+    receive
+    {count, Ref, Count} ->
+        Count
+    after 30000 ->
+        etap:bail("Timeout getting number of documents written from "
+            "source database writer")
+    end.
+
+
+stop_writer(Pid) ->
+    Ref = make_ref(),
+    Pid ! {stop, Ref},
+    receive
+    {stopped, Ref, DocsWritten} ->
+        MonRef = erlang:monitor(process, Pid),
+        receive
+        {'DOWN', MonRef, process, Pid, _Reason} ->
+            etap:diag("Stopped source database writer"),
+            DocsWritten
+        after 30000 ->
+            etap:bail("Timeout stopping source database writer")
+        end
+    after 30000 ->
+        etap:bail("Timeout stopping source database writer")
+    end.
+
+
+writer_loop(#db{name = DbName}, Parent, Counter) ->
+    maybe_pause(Parent, Counter),
+    Doc = couch_doc:from_json_obj({[
+        {<<"_id">>, list_to_binary(integer_to_list(Counter + 1))},
+        {<<"value">>, Counter + 1},
+        {<<"_attachments">>, {[
+            {<<"icon1.png">>, {[
+                {<<"data">>, base64:encode(att_data())},
+                {<<"content_type">>, <<"image/png">>}
+            ]}},
+            {<<"icon2.png">>, {[
+                {<<"data">>, base64:encode(iolist_to_binary(
+                    [att_data(), att_data()]))},
+                {<<"content_type">>, <<"image/png">>}
+            ]}}
+        ]}}
+    ]}),
+    maybe_pause(Parent, Counter),
+    {ok, Db} = couch_db:open_int(DbName, []),
+    {ok, _} = couch_db:update_doc(Db, Doc, []),
+    ok = couch_db:close(Db),
+    receive
+    {get_count, Ref} ->
+        Parent ! {count, Ref, Counter + 1},
+        writer_loop(Db, Parent, Counter + 1);
+    {stop, Ref} ->
+        Parent ! {stopped, Ref, Counter + 1}
+    after 0 ->
+        ok = timer:sleep(50),
+        writer_loop(Db, Parent, Counter + 1)
+    end.
+
+
+maybe_pause(Parent, Counter) ->
+    receive
+    {get_count, Ref} ->
+        Parent ! {count, Ref, Counter};
+    {pause, Ref} ->
+        Parent ! {paused, Ref},
+        receive {continue, Ref2} -> Parent ! {ok, Ref2} end
+    after 0 ->
+        ok
+    end.
+
+
+db_url(DbName) ->
+    iolist_to_binary([
+        "http://", couch_config:get("httpd", "bind_address", "127.0.0.1"),
+        ":", integer_to_list(mochiweb_socket_server:get(couch_httpd, port)),
+        "/", DbName
+    ]).
+
+
+create_db(DbName) ->
+    {ok, Db} = couch_db:create(
+        DbName,
+        [{user_ctx, #user_ctx{roles = [<<"_admin">>]}}, overwrite]),
+    couch_db:close(Db),
+    {ok, Db}.
+
+
+delete_db(#db{name = DbName, main_pid = Pid}) ->
+    ok = couch_server:delete(
+        DbName, [{user_ctx, #user_ctx{roles = [<<"_admin">>]}}]),
+    MonRef = erlang:monitor(process, Pid),
+    receive
+    {'DOWN', MonRef, process, Pid, _Reason} ->
+        ok
+    after 30000 ->
+        etap:bail("Timeout deleting database")
+    end.
+
+
+replicate({remote, Db}, Target) ->
+    replicate(db_url(Db), Target);
+
+replicate(Source, {remote, Db}) ->
+    replicate(Source, db_url(Db));
+
+replicate(Source, Target) ->
+    RepObject = {[
+        {<<"source">>, Source},
+        {<<"target">>, Target},
+        {<<"continuous">>, true}
+    ]},
+    {ok, Rep} = couch_replicator_utils:parse_rep_doc(
+        RepObject, #user_ctx{roles = [<<"_admin">>]}),
+    {ok, Pid} = couch_replicator:async_replicate(Rep),
+    {ok, Pid, Rep#rep.id}.
+
+
+cancel_replication(RepId, RepPid) ->
+    {ok, _} = couch_replicator:cancel_replication(RepId),
+    etap:is(is_process_alive(RepPid), false,
+        "Replication process is no longer alive after cancel").
+
+
+att_data() ->
+    {ok, Data} = file:read_file(
+        test_util:source_file("share/www/image/logo.png")),
+    Data.

Modified: couchdb/trunk/test/etap/Makefile.am
URL: http://svn.apache.org/viewvc/couchdb/trunk/test/etap/Makefile.am?rev=1169826&r1=1169825&r2=1169826&view=diff
==============================================================================
--- couchdb/trunk/test/etap/Makefile.am (original)
+++ couchdb/trunk/test/etap/Makefile.am Mon Sep 12 16:44:28 2011
@@ -86,4 +86,5 @@ EXTRA_DIST = \
     200-view-group-no-db-leaks.t \
     210-os-proc-pool.t \
     220-compaction-daemon.t \
-    230-httpc-pool.t
+    230-httpc-pool.t \
+    240-replication-compact.t



Mime
View raw message