Return-Path: X-Original-To: apmail-couchdb-commits-archive@www.apache.org Delivered-To: apmail-couchdb-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C85D31018F for ; Thu, 13 Feb 2014 18:14:21 +0000 (UTC) Received: (qmail 77064 invoked by uid 500); 13 Feb 2014 18:13:20 -0000 Delivered-To: apmail-couchdb-commits-archive@couchdb.apache.org Received: (qmail 76018 invoked by uid 500); 13 Feb 2014 18:12:34 -0000 Mailing-List: contact commits-help@couchdb.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@couchdb.apache.org Delivered-To: mailing list commits@couchdb.apache.org Received: (qmail 74880 invoked by uid 99); 13 Feb 2014 18:12:03 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 13 Feb 2014 18:12:03 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 46DEE8A928B; Thu, 13 Feb 2014 18:12:01 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: benoitc@apache.org To: commits@couchdb.apache.org Date: Thu, 13 Feb 2014 18:12:43 -0000 Message-Id: <1ac7255a00514b009a8c9540c4f28a57@git.apache.org> In-Reply-To: <113237e1f9904b5cb0e6f0a915a4b1ba@git.apache.org> References: <113237e1f9904b5cb0e6f0a915a4b1ba@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [45/57] [abbrv] remove couch_replicator http://git-wip-us.apache.org/repos/asf/couchdb/blob/4c39d7a5/apps/couch_replicator/src/couch_replicator_worker.erl ---------------------------------------------------------------------- diff --git a/apps/couch_replicator/src/couch_replicator_worker.erl b/apps/couch_replicator/src/couch_replicator_worker.erl deleted file mode 100644 index 0f65900..0000000 --- a/apps/couch_replicator/src/couch_replicator_worker.erl +++ /dev/null @@ -1,515 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(couch_replicator_worker). --behaviour(gen_server). - -% public API --export([start_link/5]). - -% gen_server callbacks --export([init/1, terminate/2, code_change/3]). --export([handle_call/3, handle_cast/2, handle_info/2]). - --include_lib("couch/include/couch_db.hrl"). --include("couch_replicator_api_wrap.hrl"). --include("couch_replicator.hrl"). - -% TODO: maybe make both buffer max sizes configurable --define(DOC_BUFFER_BYTE_SIZE, 512 * 1024). % for remote targets --define(DOC_BUFFER_LEN, 10). % for local targets, # of documents --define(MAX_BULK_ATT_SIZE, 64 * 1024). --define(MAX_BULK_ATTS_PER_DOC, 8). --define(STATS_DELAY, 10000000). % 10 seconds (in microseconds) - --define(inc_stat(StatPos, Stats, Inc), - setelement(StatPos, Stats, element(StatPos, Stats) + Inc)). - --import(couch_replicator_utils, [ - open_db/1, - close_db/1, - start_db_compaction_notifier/2, - stop_db_compaction_notifier/1 -]). --import(couch_util, [ - to_binary/1, - get_value/2, - get_value/3 -]). - - --record(batch, { - docs = [], - size = 0 -}). - --record(state, { - cp, - loop, - max_parallel_conns, - source, - target, - readers = [], - writer = nil, - pending_fetch = nil, - flush_waiter = nil, - stats = #rep_stats{}, - source_db_compaction_notifier = nil, - target_db_compaction_notifier = nil, - batch = #batch{} -}). - - - -start_link(Cp, #db{} = Source, Target, ChangesManager, _MaxConns) -> - Pid = spawn_link(fun() -> - erlang:put(last_stats_report, now()), - queue_fetch_loop(Source, Target, Cp, Cp, ChangesManager) - end), - {ok, Pid}; - -start_link(Cp, Source, Target, ChangesManager, MaxConns) -> - gen_server:start_link( - ?MODULE, {Cp, Source, Target, ChangesManager, MaxConns}, []). - - -init({Cp, Source, Target, ChangesManager, MaxConns}) -> - process_flag(trap_exit, true), - Parent = self(), - LoopPid = spawn_link(fun() -> - queue_fetch_loop(Source, Target, Parent, Cp, ChangesManager) - end), - erlang:put(last_stats_report, now()), - State = #state{ - cp = Cp, - max_parallel_conns = MaxConns, - loop = LoopPid, - source = open_db(Source), - target = open_db(Target), - source_db_compaction_notifier = - start_db_compaction_notifier(Source, self()), - target_db_compaction_notifier = - start_db_compaction_notifier(Target, self()) - }, - {ok, State}. - - -handle_call({fetch_doc, {_Id, _Revs, _PAs} = Params}, {Pid, _} = From, - #state{loop = Pid, readers = Readers, pending_fetch = nil, - source = Src, target = Tgt, max_parallel_conns = MaxConns} = State) -> - case length(Readers) of - Size when Size < MaxConns -> - Reader = spawn_doc_reader(Src, Tgt, Params), - NewState = State#state{ - readers = [Reader | Readers] - }, - {reply, ok, NewState}; - _ -> - NewState = State#state{ - pending_fetch = {From, Params} - }, - {noreply, NewState} - end; - -handle_call({batch_doc, Doc}, From, State) -> - gen_server:reply(From, ok), - {noreply, maybe_flush_docs(Doc, State)}; - -handle_call({add_stats, IncStats}, From, #state{stats = Stats} = State) -> - gen_server:reply(From, ok), - NewStats = couch_replicator_utils:sum_stats(Stats, IncStats), - NewStats2 = maybe_report_stats(State#state.cp, NewStats), - {noreply, State#state{stats = NewStats2}}; - -handle_call(flush, {Pid, _} = From, - #state{loop = Pid, writer = nil, flush_waiter = nil, - target = Target, batch = Batch} = State) -> - State2 = case State#state.readers of - [] -> - State#state{writer = spawn_writer(Target, Batch)}; - _ -> - State - end, - {noreply, State2#state{flush_waiter = From}}. - - -handle_cast({db_compacted, DbName}, - #state{source = #db{name = DbName} = Source} = State) -> - {ok, NewSource} = couch_db:reopen(Source), - {noreply, State#state{source = NewSource}}; - -handle_cast({db_compacted, DbName}, - #state{target = #db{name = DbName} = Target} = State) -> - {ok, NewTarget} = couch_db:reopen(Target), - {noreply, State#state{target = NewTarget}}; - -handle_cast(Msg, State) -> - {stop, {unexpected_async_call, Msg}, State}. - - -handle_info({'EXIT', Pid, normal}, #state{loop = Pid} = State) -> - #state{ - batch = #batch{docs = []}, readers = [], writer = nil, - pending_fetch = nil, flush_waiter = nil - } = State, - {stop, normal, State}; - -handle_info({'EXIT', Pid, normal}, #state{writer = Pid} = State) -> - {noreply, after_full_flush(State)}; - -handle_info({'EXIT', Pid, normal}, #state{writer = nil} = State) -> - #state{ - readers = Readers, writer = Writer, batch = Batch, - source = Source, target = Target, - pending_fetch = Fetch, flush_waiter = FlushWaiter - } = State, - case Readers -- [Pid] of - Readers -> - {noreply, State}; - Readers2 -> - State2 = case Fetch of - nil -> - case (FlushWaiter =/= nil) andalso (Writer =:= nil) andalso - (Readers2 =:= []) of - true -> - State#state{ - readers = Readers2, - writer = spawn_writer(Target, Batch) - }; - false -> - State#state{readers = Readers2} - end; - {From, FetchParams} -> - Reader = spawn_doc_reader(Source, Target, FetchParams), - gen_server:reply(From, ok), - State#state{ - readers = [Reader | Readers2], - pending_fetch = nil - } - end, - {noreply, State2} - end; - -handle_info({'EXIT', Pid, Reason}, State) -> - {stop, {process_died, Pid, Reason}, State}. - - -terminate(_Reason, State) -> - close_db(State#state.source), - close_db(State#state.target), - stop_db_compaction_notifier(State#state.source_db_compaction_notifier), - stop_db_compaction_notifier(State#state.target_db_compaction_notifier). - - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - - -queue_fetch_loop(Source, Target, Parent, Cp, ChangesManager) -> - ChangesManager ! {get_changes, self()}, - receive - {closed, ChangesManager} -> - ok; - {changes, ChangesManager, Changes, ReportSeq} -> - Target2 = open_db(Target), - {IdRevs, Stats0} = find_missing(Changes, Target2), - case Source of - #db{} -> - Source2 = open_db(Source), - Stats = local_process_batch( - IdRevs, Cp, Source2, Target2, #batch{}, Stats0), - close_db(Source2); - #httpdb{} -> - ok = gen_server:call(Parent, {add_stats, Stats0}, infinity), - remote_process_batch(IdRevs, Parent), - {ok, Stats} = gen_server:call(Parent, flush, infinity) - end, - close_db(Target2), - ok = gen_server:call(Cp, {report_seq_done, ReportSeq, Stats}, infinity), - erlang:put(last_stats_report, now()), - ?LOG_DEBUG("Worker reported completion of seq ~p", [ReportSeq]), - queue_fetch_loop(Source, Target, Parent, Cp, ChangesManager) - end. - - -local_process_batch([], _Cp, _Src, _Tgt, #batch{docs = []}, Stats) -> - Stats; - -local_process_batch([], Cp, Source, Target, #batch{docs = Docs, size = Size}, Stats) -> - case Target of - #httpdb{} -> - ?LOG_DEBUG("Worker flushing doc batch of size ~p bytes", [Size]); - #db{} -> - ?LOG_DEBUG("Worker flushing doc batch of ~p docs", [Size]) - end, - Stats2 = flush_docs(Target, Docs), - Stats3 = couch_replicator_utils:sum_stats(Stats, Stats2), - local_process_batch([], Cp, Source, Target, #batch{}, Stats3); - -local_process_batch([IdRevs | Rest], Cp, Source, Target, Batch, Stats) -> - {ok, {_, DocList, Stats2, _}} = fetch_doc( - Source, IdRevs, fun local_doc_handler/2, {Target, [], Stats, Cp}), - {Batch2, Stats3} = lists:foldl( - fun(Doc, {Batch0, Stats0}) -> - {Batch1, S} = maybe_flush_docs(Target, Batch0, Doc), - {Batch1, couch_replicator_utils:sum_stats(Stats0, S)} - end, - {Batch, Stats2}, DocList), - local_process_batch(Rest, Cp, Source, Target, Batch2, Stats3). - - -remote_process_batch([], _Parent) -> - ok; - -remote_process_batch([{Id, Revs, PAs} | Rest], Parent) -> - % When the source is a remote database, we fetch a single document revision - % per HTTP request. This is mostly to facilitate retrying of HTTP requests - % due to network transient failures. It also helps not exceeding the maximum - % URL length allowed by proxies and Mochiweb. - lists:foreach( - fun(Rev) -> - ok = gen_server:call(Parent, {fetch_doc, {Id, [Rev], PAs}}, infinity) - end, - Revs), - remote_process_batch(Rest, Parent). - - -spawn_doc_reader(Source, Target, FetchParams) -> - Parent = self(), - spawn_link(fun() -> - Source2 = open_db(Source), - fetch_doc( - Source2, FetchParams, fun remote_doc_handler/2, {Parent, Target}), - close_db(Source2) - end). - - -fetch_doc(Source, {Id, Revs, PAs}, DocHandler, Acc) -> - try - couch_replicator_api_wrap:open_doc_revs( - Source, Id, Revs, [{atts_since, PAs}, latest], DocHandler, Acc) - catch - throw:{missing_stub, _} -> - ?LOG_ERROR("Retrying fetch and update of document `~s` due to out of " - "sync attachment stubs. Missing revisions are: ~s", - [Id, couch_doc:revs_to_strs(Revs)]), - couch_replicator_api_wrap:open_doc_revs(Source, Id, Revs, [latest], DocHandler, Acc) - end. - - -local_doc_handler({ok, Doc}, {Target, DocList, Stats, Cp}) -> - Stats2 = ?inc_stat(#rep_stats.docs_read, Stats, 1), - case batch_doc(Doc) of - true -> - {ok, {Target, [Doc | DocList], Stats2, Cp}}; - false -> - ?LOG_DEBUG("Worker flushing doc with attachments", []), - Target2 = open_db(Target), - Success = (flush_doc(Target2, Doc) =:= ok), - close_db(Target2), - Stats3 = case Success of - true -> - ?inc_stat(#rep_stats.docs_written, Stats2, 1); - false -> - ?inc_stat(#rep_stats.doc_write_failures, Stats2, 1) - end, - Stats4 = maybe_report_stats(Cp, Stats3), - {ok, {Target, DocList, Stats4, Cp}} - end; -local_doc_handler(_, Acc) -> - {ok, Acc}. - - -remote_doc_handler({ok, #doc{atts = []} = Doc}, {Parent, _} = Acc) -> - ok = gen_server:call(Parent, {batch_doc, Doc}, infinity), - {ok, Acc}; -remote_doc_handler({ok, Doc}, {Parent, Target} = Acc) -> - % Immediately flush documents with attachments received from a remote - % source. The data property of each attachment is a function that starts - % streaming the attachment data from the remote source, therefore it's - % convenient to call it ASAP to avoid ibrowse inactivity timeouts. - Stats = #rep_stats{docs_read = 1}, - ?LOG_DEBUG("Worker flushing doc with attachments", []), - Target2 = open_db(Target), - Success = (flush_doc(Target2, Doc) =:= ok), - close_db(Target2), - {Result, Stats2} = case Success of - true -> - {{ok, Acc}, ?inc_stat(#rep_stats.docs_written, Stats, 1)}; - false -> - {{skip, Acc}, ?inc_stat(#rep_stats.doc_write_failures, Stats, 1)} - end, - ok = gen_server:call(Parent, {add_stats, Stats2}, infinity), - Result; -remote_doc_handler(_, Acc) -> - {ok, Acc}. - - -spawn_writer(Target, #batch{docs = DocList, size = Size}) -> - case {Target, Size > 0} of - {#httpdb{}, true} -> - ?LOG_DEBUG("Worker flushing doc batch of size ~p bytes", [Size]); - {#db{}, true} -> - ?LOG_DEBUG("Worker flushing doc batch of ~p docs", [Size]); - _ -> - ok - end, - Parent = self(), - spawn_link( - fun() -> - Target2 = open_db(Target), - Stats = flush_docs(Target2, DocList), - close_db(Target2), - ok = gen_server:call(Parent, {add_stats, Stats}, infinity) - end). - - -after_full_flush(#state{stats = Stats, flush_waiter = Waiter} = State) -> - gen_server:reply(Waiter, {ok, Stats}), - erlang:put(last_stats_report, now()), - State#state{ - stats = #rep_stats{}, - flush_waiter = nil, - writer = nil, - batch = #batch{} - }. - - -maybe_flush_docs(Doc,State) -> - #state{ - target = Target, batch = Batch, - stats = Stats, cp = Cp - } = State, - {Batch2, WStats} = maybe_flush_docs(Target, Batch, Doc), - Stats2 = couch_replicator_utils:sum_stats(Stats, WStats), - Stats3 = ?inc_stat(#rep_stats.docs_read, Stats2, 1), - Stats4 = maybe_report_stats(Cp, Stats3), - State#state{stats = Stats4, batch = Batch2}. - - -maybe_flush_docs(#httpdb{} = Target, Batch, Doc) -> - #batch{docs = DocAcc, size = SizeAcc} = Batch, - case batch_doc(Doc) of - false -> - ?LOG_DEBUG("Worker flushing doc with attachments", []), - case flush_doc(Target, Doc) of - ok -> - {Batch, #rep_stats{docs_written = 1}}; - _ -> - {Batch, #rep_stats{doc_write_failures = 1}} - end; - true -> - JsonDoc = ?JSON_ENCODE(couch_doc:to_json_obj(Doc, [revs, attachments])), - case SizeAcc + iolist_size(JsonDoc) of - SizeAcc2 when SizeAcc2 > ?DOC_BUFFER_BYTE_SIZE -> - ?LOG_DEBUG("Worker flushing doc batch of size ~p bytes", [SizeAcc2]), - Stats = flush_docs(Target, [JsonDoc | DocAcc]), - {#batch{}, Stats}; - SizeAcc2 -> - {#batch{docs = [JsonDoc | DocAcc], size = SizeAcc2}, #rep_stats{}} - end - end; - -maybe_flush_docs(#db{} = Target, #batch{docs = DocAcc, size = SizeAcc}, Doc) -> - case SizeAcc + 1 of - SizeAcc2 when SizeAcc2 >= ?DOC_BUFFER_LEN -> - ?LOG_DEBUG("Worker flushing doc batch of ~p docs", [SizeAcc2]), - Stats = flush_docs(Target, [Doc | DocAcc]), - {#batch{}, Stats}; - SizeAcc2 -> - {#batch{docs = [Doc | DocAcc], size = SizeAcc2}, #rep_stats{}} - end. - - -batch_doc(#doc{atts = []}) -> - true; -batch_doc(#doc{atts = Atts}) -> - (length(Atts) =< ?MAX_BULK_ATTS_PER_DOC) andalso - lists:all( - fun(#att{disk_len = L, data = Data}) -> - (L =< ?MAX_BULK_ATT_SIZE) andalso (Data =/= stub) - end, Atts). - - -flush_docs(_Target, []) -> - #rep_stats{}; - -flush_docs(Target, DocList) -> - {ok, Errors} = couch_replicator_api_wrap:update_docs( - Target, DocList, [delay_commit], replicated_changes), - DbUri = couch_replicator_api_wrap:db_uri(Target), - lists:foreach( - fun({Props}) -> - ?LOG_ERROR("Replicator: couldn't write document `~s`, revision `~s`," - " to target database `~s`. Error: `~s`, reason: `~s`.", - [get_value(id, Props, ""), get_value(rev, Props, ""), DbUri, - get_value(error, Props, ""), get_value(reason, Props, "")]) - end, Errors), - #rep_stats{ - docs_written = length(DocList) - length(Errors), - doc_write_failures = length(Errors) - }. - -flush_doc(Target, #doc{id = Id, revs = {Pos, [RevId | _]}} = Doc) -> - try couch_replicator_api_wrap:update_doc(Target, Doc, [], replicated_changes) of - {ok, _} -> - ok; - Error -> - ?LOG_ERROR("Replicator: error writing document `~s` to `~s`: ~s", - [Id, couch_replicator_api_wrap:db_uri(Target), couch_util:to_binary(Error)]), - Error - catch - throw:{missing_stub, _} = MissingStub -> - throw(MissingStub); - throw:{Error, Reason} -> - ?LOG_ERROR("Replicator: couldn't write document `~s`, revision `~s`," - " to target database `~s`. Error: `~s`, reason: `~s`.", - [Id, couch_doc:rev_to_str({Pos, RevId}), - couch_replicator_api_wrap:db_uri(Target), to_binary(Error), to_binary(Reason)]), - {error, Error}; - throw:Err -> - ?LOG_ERROR("Replicator: couldn't write document `~s`, revision `~s`," - " to target database `~s`. Error: `~s`.", - [Id, couch_doc:rev_to_str({Pos, RevId}), - couch_replicator_api_wrap:db_uri(Target), to_binary(Err)]), - {error, Err} - end. - - -find_missing(DocInfos, Target) -> - {IdRevs, AllRevsCount} = lists:foldr( - fun(#doc_info{id = Id, revs = RevsInfo}, {IdRevAcc, CountAcc}) -> - Revs = [Rev || #rev_info{rev = Rev} <- RevsInfo], - {[{Id, Revs} | IdRevAcc], CountAcc + length(Revs)} - end, - {[], 0}, DocInfos), - {ok, Missing} = couch_replicator_api_wrap:get_missing_revs(Target, IdRevs), - MissingRevsCount = lists:foldl( - fun({_Id, MissingRevs, _PAs}, Acc) -> Acc + length(MissingRevs) end, - 0, Missing), - Stats = #rep_stats{ - missing_checked = AllRevsCount, - missing_found = MissingRevsCount - }, - {Missing, Stats}. - - -maybe_report_stats(Cp, Stats) -> - Now = now(), - case timer:now_diff(erlang:get(last_stats_report), Now) >= ?STATS_DELAY of - true -> - ok = gen_server:call(Cp, {add_stats, Stats}, infinity), - erlang:put(last_stats_report, Now), - #rep_stats{}; - false -> - Stats - end. http://git-wip-us.apache.org/repos/asf/couchdb/blob/4c39d7a5/apps/couch_replicator/test/01-load.t ---------------------------------------------------------------------- diff --git a/apps/couch_replicator/test/01-load.t b/apps/couch_replicator/test/01-load.t deleted file mode 100644 index b900297..0000000 --- a/apps/couch_replicator/test/01-load.t +++ /dev/null @@ -1,38 +0,0 @@ -#!/usr/bin/env escript -%% -*- erlang -*- -%%! -pa ./deps/*/ebin -pa ./apps/*/ebin -pa ./test/etap - -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - -% Test that we can load each module. - -main(_) -> - test_util:init_code_path(), - Modules = [ - couch_replicator_api_wrap, - couch_replicator_httpc, - couch_replicator_httpd, - couch_replicator_manager, - couch_replicator_notifier, - couch_replicator, - couch_replicator_worker, - couch_replicator_utils, - couch_replicator_job_sup - ], - - etap:plan(length(Modules)), - lists:foreach( - fun(Module) -> - etap:loaded_ok(Module, lists:concat(["Loaded: ", Module])) - end, Modules), - etap:end_tests(). http://git-wip-us.apache.org/repos/asf/couchdb/blob/4c39d7a5/apps/couch_replicator/test/02-httpc-pool.t ---------------------------------------------------------------------- diff --git a/apps/couch_replicator/test/02-httpc-pool.t b/apps/couch_replicator/test/02-httpc-pool.t deleted file mode 100755 index 560b6a1..0000000 --- a/apps/couch_replicator/test/02-httpc-pool.t +++ /dev/null @@ -1,249 +0,0 @@ -#!/usr/bin/env escript -%% -*- erlang -*- -%%! -pa ./deps/*/ebin -pa ./apps/*/ebin -pa ./test/etap - -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - -main(_) -> - etap:plan(55), - case (catch test()) of - ok -> - etap:end_tests(); - Other -> - etap:diag(io_lib:format("Test died abnormally: ~p", [Other])), - etap:bail(Other) - end, - ok. - - -test() -> - test_util:start_couch(), - - test_pool_full(), - test_worker_dead_pool_non_full(), - test_worker_dead_pool_full(), - - test_util:stop_couch(), - ok. - - -test_pool_full() -> - Pool = spawn_pool(), - Client1 = spawn_client(Pool), - Client2 = spawn_client(Pool), - Client3 = spawn_client(Pool), - - etap:diag("Check that we can spawn the max number of connections."), - etap:is(ping_client(Client1), ok, "Client 1 started ok."), - etap:is(ping_client(Client2), ok, "Client 2 started ok."), - etap:is(ping_client(Client3), ok, "Client 3 started ok."), - - Worker1 = get_client_worker(Client1, "1"), - Worker2 = get_client_worker(Client2, "2"), - Worker3 = get_client_worker(Client3, "3"), - etap:is(is_process_alive(Worker1), true, "Client's 1 worker is alive."), - etap:is(is_process_alive(Worker2), true, "Client's 2 worker is alive."), - etap:is(is_process_alive(Worker3), true, "Client's 3 worker is alive."), - - etap:isnt(Worker1, Worker2, "Clients 1 and 2 got different workers."), - etap:isnt(Worker2, Worker3, "Clients 2 and 3 got different workers."), - etap:isnt(Worker1, Worker3, "Clients 1 and 3 got different workers."), - - etap:diag("Check that client 4 blocks waiting for a worker."), - Client4 = spawn_client(Pool), - etap:is(ping_client(Client4), timeout, "Client 4 blocked while waiting."), - - etap:diag("Check that stopping a client gives up its worker."), - etap:is(stop_client(Client1), ok, "First client stopped."), - - etap:diag("And check that our blocked client has been unblocked."), - etap:is(ping_client(Client4), ok, "Client 4 was unblocked."), - - Worker4 = get_client_worker(Client4, "4"), - etap:is(is_process_alive(Worker4), true, "Client's 4 worker is alive."), - etap:is(Worker4, Worker1, "Client 4 got worker that client 1 got before."), - - lists:foreach(fun(C) -> ok = stop_client(C) end, [Client2, Client3, Client4]), - stop_pool(Pool). - - -test_worker_dead_pool_non_full() -> - Pool = spawn_pool(), - Client1 = spawn_client(Pool), - - etap:is(ping_client(Client1), ok, "Client 1 started ok."), - Worker1 = get_client_worker(Client1, "1"), - etap:is(is_process_alive(Worker1), true, "Client's 1 worker is alive."), - - etap:diag("Kill client's 1 worker."), - etap:is(kill_client_worker(Client1), ok, "Killed client's 1 worker."), - etap:is(is_process_alive(Worker1), false, "Client's 1 worker process is dead."), - - etap:is(stop_client(Client1), ok, "First client stopped and released its worker."), - - Client2 = spawn_client(Pool), - etap:is(ping_client(Client2), ok, "Client 2 started ok."), - Worker2 = get_client_worker(Client2, "2"), - etap:isnt(Worker2, Worker1, "Client 2 got a different worker from client 1"), - etap:is(is_process_alive(Worker2), true, "Client's 2 worker is alive."), - - etap:is(stop_client(Client2), ok, "Second client stopped."), - stop_pool(Pool). - - -test_worker_dead_pool_full() -> - Pool = spawn_pool(), - Client1 = spawn_client(Pool), - Client2 = spawn_client(Pool), - Client3 = spawn_client(Pool), - - etap:diag("Check that we can spawn the max number of connections."), - etap:is(ping_client(Client1), ok, "Client 1 started ok."), - etap:is(ping_client(Client2), ok, "Client 2 started ok."), - etap:is(ping_client(Client3), ok, "Client 3 started ok."), - - Worker1 = get_client_worker(Client1, "1"), - Worker2 = get_client_worker(Client2, "2"), - Worker3 = get_client_worker(Client3, "3"), - etap:is(is_process_alive(Worker1), true, "Client's 1 worker is alive."), - etap:is(is_process_alive(Worker2), true, "Client's 2 worker is alive."), - etap:is(is_process_alive(Worker3), true, "Client's 3 worker is alive."), - - etap:isnt(Worker1, Worker2, "Clients 1 and 2 got different workers."), - etap:isnt(Worker2, Worker3, "Clients 2 and 3 got different workers."), - etap:isnt(Worker1, Worker3, "Clients 1 and 3 got different workers."), - - etap:diag("Check that client 4 blocks waiting for a worker."), - Client4 = spawn_client(Pool), - etap:is(ping_client(Client4), timeout, "Client 4 blocked while waiting."), - - etap:diag("Kill client's 1 worker."), - etap:is(kill_client_worker(Client1), ok, "Killed client's 1 worker."), - etap:is(is_process_alive(Worker1), false, "Client's 1 worker process is dead."), - - etap:diag("Check client 4 got unblocked after first worker's death"), - etap:is(ping_client(Client4), ok, "Client 4 not blocked anymore."), - - Worker4 = get_client_worker(Client4, "4"), - etap:is(is_process_alive(Worker4), true, "Client's 4 worker is alive."), - etap:isnt(Worker4, Worker1, "Client 4 got a worker different from client 1."), - etap:isnt(Worker4, Worker2, "Client 4 got a worker different from client 2."), - etap:isnt(Worker4, Worker3, "Client 4 got a worker different from client 3."), - - etap:diag("Check that stopping client 1 is a noop."), - etap:is(stop_client(Client1), ok, "First client stopped."), - - etap:is(is_process_alive(Worker2), true, "Client's 2 worker still alive."), - etap:is(is_process_alive(Worker3), true, "Client's 3 worker still alive."), - etap:is(is_process_alive(Worker4), true, "Client's 4 worker still alive."), - - etap:diag("Check that client 5 blocks waiting for a worker."), - Client5 = spawn_client(Pool), - etap:is(ping_client(Client5), timeout, "Client 5 blocked while waiting."), - - etap:diag("Check that stopping client 2 gives up its worker."), - etap:is(stop_client(Client2), ok, "Second client stopped."), - - etap:diag("Now check that client 5 has been unblocked."), - etap:is(ping_client(Client5), ok, "Client 5 was unblocked."), - - Worker5 = get_client_worker(Client5, "5"), - etap:is(is_process_alive(Worker5), true, "Client's 5 worker is alive."), - etap:isnt(Worker5, Worker1, "Client 5 got a worker different from client 1."), - etap:is(Worker5, Worker2, "Client 5 got same worker as client 2."), - etap:isnt(Worker5, Worker3, "Client 5 got a worker different from client 3."), - etap:isnt(Worker5, Worker4, "Client 5 got a worker different from client 4."), - - etap:is(is_process_alive(Worker3), true, "Client's 3 worker still alive."), - etap:is(is_process_alive(Worker4), true, "Client's 4 worker still alive."), - etap:is(is_process_alive(Worker5), true, "Client's 5 worker still alive."), - - lists:foreach(fun(C) -> ok = stop_client(C) end, [Client3, Client4, Client5]), - stop_pool(Pool). - - -spawn_client(Pool) -> - Parent = self(), - Ref = make_ref(), - Pid = spawn(fun() -> - {ok, Worker} = couch_replicator_httpc_pool:get_worker(Pool), - loop(Parent, Ref, Worker, Pool) - end), - {Pid, Ref}. - - -ping_client({Pid, Ref}) -> - Pid ! ping, - receive - {pong, Ref} -> - ok - after 3000 -> - timeout - end. - - -get_client_worker({Pid, Ref}, ClientName) -> - Pid ! get_worker, - receive - {worker, Ref, Worker} -> - Worker - after 3000 -> - etap:bail("Timeout getting client " ++ ClientName ++ " worker.") - end. - - -stop_client({Pid, Ref}) -> - Pid ! stop, - receive - {stop, Ref} -> - ok - after 3000 -> - timeout - end. - - -kill_client_worker({Pid, Ref}) -> - Pid ! get_worker, - receive - {worker, Ref, Worker} -> - exit(Worker, kill), - ok - after 3000 -> - timeout - end. - - -loop(Parent, Ref, Worker, Pool) -> - receive - ping -> - Parent ! {pong, Ref}, - loop(Parent, Ref, Worker, Pool); - get_worker -> - Parent ! {worker, Ref, Worker}, - loop(Parent, Ref, Worker, Pool); - stop -> - couch_replicator_httpc_pool:release_worker(Pool, Worker), - Parent ! {stop, Ref} - end. - - -spawn_pool() -> - Host = couch_config:get("httpd", "bind_address", "127.0.0.1"), - Port = couch_config:get("httpd", "port", "5984"), - {ok, Pool} = couch_replicator_httpc_pool:start_link( - "http://" ++ Host ++ ":5984", [{max_connections, 3}]), - Pool. - - -stop_pool(Pool) -> - ok = couch_replicator_httpc_pool:stop(Pool). http://git-wip-us.apache.org/repos/asf/couchdb/blob/4c39d7a5/apps/couch_replicator/test/03-replication-compact.t ---------------------------------------------------------------------- diff --git a/apps/couch_replicator/test/03-replication-compact.t b/apps/couch_replicator/test/03-replication-compact.t deleted file mode 100755 index 2deb9dd..0000000 --- a/apps/couch_replicator/test/03-replication-compact.t +++ /dev/null @@ -1,489 +0,0 @@ -#!/usr/bin/env escript -%% -*- erlang -*- -%%! -pa ./deps/*/ebin -pa ./apps/*/ebin -pa ./test/etap - -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - -% Verify that compacting databases that are being used as the source or -% target of a replication doesn't affect the replication and that the -% replication doesn't hold their reference counters forever. - --define(b2l(B), binary_to_list(B)). - --record(user_ctx, { - name = null, - roles = [], - handler -}). - --record(db, { - main_pid = nil, - update_pid = nil, - compactor_pid = nil, - instance_start_time, % number of microsecs since jan 1 1970 as a binary string - fd, - updater_fd, - fd_ref_counter, - header = nil, - committed_update_seq, - fulldocinfo_by_id_btree, - docinfo_by_seq_btree, - local_docs_btree, - update_seq, - name, - filepath, - validate_doc_funs = [], - security = [], - security_ptr = nil, - user_ctx = #user_ctx{}, - waiting_delayed_commit = nil, - revs_limit = 1000, - fsync_options = [], - options = [], - compression, - before_doc_update, - after_doc_read -}). - --record(rep, { - id, - source, - target, - options, - user_ctx, - type = db, - view = nil, - doc_id -}). - - -source_db_name() -> <<"couch_test_rep_db_a">>. -target_db_name() -> <<"couch_test_rep_db_b">>. - - -main(_) -> -etap:plan(376), - case (catch test()) of - ok -> - etap:end_tests(); - Other -> - etap:diag(io_lib:format("Test died abnormally: ~p", [Other])), - etap:bail(Other) - end, - ok. - - -test() -> - test_util:start_couch(), - - Pairs = [ - {source_db_name(), target_db_name()}, - {{remote, source_db_name()}, target_db_name()}, - {source_db_name(), {remote, target_db_name()}}, - {{remote, source_db_name()}, {remote, (target_db_name())}} - ], - - lists:foreach( - fun({Source, Target}) -> - {ok, SourceDb} = create_db(source_db_name()), - etap:is(couch_db:is_idle(SourceDb), true, - "Source database is idle before starting replication"), - - {ok, TargetDb} = create_db(target_db_name()), - etap:is(couch_db:is_idle(TargetDb), true, - "Target database is idle before starting replication"), - - {ok, RepPid, RepId} = replicate(Source, Target), - check_active_tasks(RepPid, RepId, Source, Target), - {ok, DocsWritten} = populate_and_compact_test( - RepPid, SourceDb, TargetDb), - - wait_target_in_sync(DocsWritten, TargetDb), - check_active_tasks(RepPid, RepId, Source, Target), - cancel_replication(RepId, RepPid), - compare_dbs(SourceDb, TargetDb), - - delete_db(SourceDb), - delete_db(TargetDb), - test_util:stop_couch(), - ok = timer:sleep(1000), - test_util:start_couch() - end, - Pairs), - - test_util:stop_couch(), - ok. - - -populate_and_compact_test(RepPid, SourceDb0, TargetDb0) -> - etap:is(is_process_alive(RepPid), true, "Replication process is alive"), - check_db_alive("source", SourceDb0), - check_db_alive("target", TargetDb0), - - Writer = spawn_writer(SourceDb0), - - lists:foldl( - fun(_, {SourceDb, TargetDb, DocCount}) -> - pause_writer(Writer), - - compact_db("source", SourceDb), - etap:is(is_process_alive(RepPid), true, - "Replication process is alive after source database compaction"), - check_db_alive("source", SourceDb), - check_ref_counter("source", SourceDb), - - compact_db("target", TargetDb), - etap:is(is_process_alive(RepPid), true, - "Replication process is alive after target database compaction"), - check_db_alive("target", TargetDb), - check_ref_counter("target", TargetDb), - - {ok, SourceDb2} = reopen_db(SourceDb), - {ok, TargetDb2} = reopen_db(TargetDb), - - resume_writer(Writer), - wait_writer(Writer, DocCount), - - compact_db("source", SourceDb2), - etap:is(is_process_alive(RepPid), true, - "Replication process is alive after source database compaction"), - check_db_alive("source", SourceDb2), - pause_writer(Writer), - check_ref_counter("source", SourceDb2), - resume_writer(Writer), - - compact_db("target", TargetDb2), - etap:is(is_process_alive(RepPid), true, - "Replication process is alive after target database compaction"), - check_db_alive("target", TargetDb2), - pause_writer(Writer), - check_ref_counter("target", TargetDb2), - resume_writer(Writer), - - {ok, SourceDb3} = reopen_db(SourceDb2), - {ok, TargetDb3} = reopen_db(TargetDb2), - {SourceDb3, TargetDb3, DocCount + 50} - end, - {SourceDb0, TargetDb0, 50}, lists:seq(1, 5)), - - DocsWritten = stop_writer(Writer), - {ok, DocsWritten}. - - -check_db_alive(Type, #db{main_pid = Pid}) -> - etap:is(is_process_alive(Pid), true, - "Local " ++ Type ++ " database main pid is alive"). - - -compact_db(Type, #db{name = Name}) -> - {ok, Db} = couch_db:open_int(Name, []), - {ok, CompactPid} = couch_db:start_compact(Db), - MonRef = erlang:monitor(process, CompactPid), - receive - {'DOWN', MonRef, process, CompactPid, normal} -> - ok; - {'DOWN', MonRef, process, CompactPid, Reason} -> - etap:bail("Error compacting " ++ Type ++ " database " ++ ?b2l(Name) ++ - ": " ++ couch_util:to_list(Reason)) - after 30000 -> - etap:bail("Compaction for " ++ Type ++ " database " ++ ?b2l(Name) ++ - " didn't finish") - end, - ok = couch_db:close(Db). - - -check_ref_counter(Type, #db{name = Name, fd_ref_counter = OldRefCounter}) -> - MonRef = erlang:monitor(process, OldRefCounter), - receive - {'DOWN', MonRef, process, OldRefCounter, _} -> - etap:diag("Old " ++ Type ++ " database ref counter terminated") - after 30000 -> - etap:bail("Old " ++ Type ++ " database ref counter didn't terminate") - end, - {ok, #db{fd_ref_counter = NewRefCounter} = Db} = couch_db:open_int(Name, []), - ok = couch_db:close(Db), - etap:isnt( - NewRefCounter, OldRefCounter, Type ++ " database has new ref counter"). - - -reopen_db(#db{name = Name}) -> - {ok, Db} = couch_db:open_int(Name, []), - ok = couch_db:close(Db), - {ok, Db}. - - -wait_target_in_sync(DocCount, #db{name = TargetName}) -> - wait_target_in_sync_loop(DocCount, TargetName, 300). - - -wait_target_in_sync_loop(_DocCount, _TargetName, 0) -> - etap:bail("Could not get source and target databases in sync"); -wait_target_in_sync_loop(DocCount, TargetName, RetriesLeft) -> - {ok, Target} = couch_db:open_int(TargetName, []), - {ok, TargetInfo} = couch_db:get_db_info(Target), - ok = couch_db:close(Target), - TargetDocCount = couch_util:get_value(doc_count, TargetInfo), - case TargetDocCount == DocCount of - true -> - etap:diag("Source and target databases are in sync"); - false -> - ok = timer:sleep(100), - wait_target_in_sync_loop(DocCount, TargetName, RetriesLeft - 1) - end. - - -compare_dbs(#db{name = SourceName}, #db{name = TargetName}) -> - {ok, SourceDb} = couch_db:open_int(SourceName, []), - {ok, TargetDb} = couch_db:open_int(TargetName, []), - Fun = fun(FullDocInfo, _, Acc) -> - {ok, Doc} = couch_db:open_doc(SourceDb, FullDocInfo), - {Props} = DocJson = couch_doc:to_json_obj(Doc, [attachments]), - DocId = couch_util:get_value(<<"_id">>, Props), - DocTarget = case couch_db:open_doc(TargetDb, DocId) of - {ok, DocT} -> - DocT; - Error -> - etap:bail("Error opening document '" ++ ?b2l(DocId) ++ - "' from target: " ++ couch_util:to_list(Error)) - end, - DocTargetJson = couch_doc:to_json_obj(DocTarget, [attachments]), - case DocTargetJson of - DocJson -> - ok; - _ -> - etap:bail("Content from document '" ++ ?b2l(DocId) ++ - "' differs in target database") - end, - {ok, Acc} - end, - {ok, _, _} = couch_db:enum_docs(SourceDb, Fun, [], []), - etap:diag("Target database has the same documents as the source database"), - ok = couch_db:close(SourceDb), - ok = couch_db:close(TargetDb). - - -check_active_tasks(RepPid, {BaseId, Ext} = _RepId, Src, Tgt) -> - Source = case Src of - {remote, NameSrc} -> - <<(db_url(NameSrc))/binary, $/>>; - _ -> - Src - end, - Target = case Tgt of - {remote, NameTgt} -> - <<(db_url(NameTgt))/binary, $/>>; - _ -> - Tgt - end, - FullRepId = list_to_binary(BaseId ++ Ext), - Pid = list_to_binary(pid_to_list(RepPid)), - [RepTask] = couch_task_status:all(), - etap:is(couch_util:get_value(pid, RepTask), Pid, - "_active_tasks entry has correct pid property"), - etap:is(couch_util:get_value(replication_id, RepTask), FullRepId, - "_active_tasks entry has right replication id"), - etap:is(couch_util:get_value(continuous, RepTask), true, - "_active_tasks entry has continuous property set to true"), - etap:is(couch_util:get_value(source, RepTask), Source, - "_active_tasks entry has correct source property"), - etap:is(couch_util:get_value(target, RepTask), Target, - "_active_tasks entry has correct target property"), - etap:is(is_integer(couch_util:get_value(docs_read, RepTask)), true, - "_active_tasks entry has integer docs_read property"), - etap:is(is_integer(couch_util:get_value(docs_written, RepTask)), true, - "_active_tasks entry has integer docs_written property"), - etap:is(is_integer(couch_util:get_value(doc_write_failures, RepTask)), true, - "_active_tasks entry has integer doc_write_failures property"), - etap:is(is_integer(couch_util:get_value(revisions_checked, RepTask)), true, - "_active_tasks entry has integer revisions_checked property"), - etap:is(is_integer(couch_util:get_value(missing_revisions_found, RepTask)), true, - "_active_tasks entry has integer missing_revisions_found property"), - etap:is(is_integer(couch_util:get_value(checkpointed_source_seq, RepTask)), true, - "_active_tasks entry has integer checkpointed_source_seq property"), - etap:is(is_integer(couch_util:get_value(source_seq, RepTask)), true, - "_active_tasks entry has integer source_seq property"), - Progress = couch_util:get_value(progress, RepTask), - etap:is(is_integer(Progress), true, - "_active_tasks entry has an integer progress property"), - etap:is(Progress =< 100, true, "Progress is not greater than 100%"). - - -wait_writer(Pid, NumDocs) -> - case get_writer_num_docs_written(Pid) of - N when N >= NumDocs -> - ok; - _ -> - wait_writer(Pid, NumDocs) - end. - - -spawn_writer(Db) -> - Parent = self(), - Pid = spawn(fun() -> writer_loop(Db, Parent, 0) end), - etap:diag("Started source database writer"), - Pid. - - -pause_writer(Pid) -> - Ref = make_ref(), - Pid ! {pause, Ref}, - receive - {paused, Ref} -> - ok - after 30000 -> - etap:bail("Failed to pause source database writer") - end. - - -resume_writer(Pid) -> - Ref = make_ref(), - Pid ! {continue, Ref}, - receive - {ok, Ref} -> - ok - after 30000 -> - etap:bail("Failed to unpause source database writer") - end. - - -get_writer_num_docs_written(Pid) -> - Ref = make_ref(), - Pid ! {get_count, Ref}, - receive - {count, Ref, Count} -> - Count - after 30000 -> - etap:bail("Timeout getting number of documents written from " - "source database writer") - end. - - -stop_writer(Pid) -> - Ref = make_ref(), - Pid ! {stop, Ref}, - receive - {stopped, Ref, DocsWritten} -> - MonRef = erlang:monitor(process, Pid), - receive - {'DOWN', MonRef, process, Pid, _Reason} -> - etap:diag("Stopped source database writer"), - DocsWritten - after 30000 -> - etap:bail("Timeout stopping source database writer") - end - after 30000 -> - etap:bail("Timeout stopping source database writer") - end. - - -writer_loop(#db{name = DbName}, Parent, Counter) -> - maybe_pause(Parent, Counter), - Doc = couch_doc:from_json_obj({[ - {<<"_id">>, list_to_binary(integer_to_list(Counter + 1))}, - {<<"value">>, Counter + 1}, - {<<"_attachments">>, {[ - {<<"icon1.png">>, {[ - {<<"data">>, base64:encode(att_data())}, - {<<"content_type">>, <<"image/png">>} - ]}}, - {<<"icon2.png">>, {[ - {<<"data">>, base64:encode(iolist_to_binary( - [att_data(), att_data()]))}, - {<<"content_type">>, <<"image/png">>} - ]}} - ]}} - ]}), - maybe_pause(Parent, Counter), - {ok, Db} = couch_db:open_int(DbName, []), - {ok, _} = couch_db:update_doc(Db, Doc, []), - ok = couch_db:close(Db), - receive - {get_count, Ref} -> - Parent ! {count, Ref, Counter + 1}, - writer_loop(Db, Parent, Counter + 1); - {stop, Ref} -> - Parent ! {stopped, Ref, Counter + 1} - after 0 -> - ok = timer:sleep(500), - writer_loop(Db, Parent, Counter + 1) - end. - - -maybe_pause(Parent, Counter) -> - receive - {get_count, Ref} -> - Parent ! {count, Ref, Counter}; - {pause, Ref} -> - Parent ! {paused, Ref}, - receive {continue, Ref2} -> Parent ! {ok, Ref2} end - after 0 -> - ok - end. - - -db_url(DbName) -> - iolist_to_binary([ - "http://", couch_config:get("httpd", "bind_address", "127.0.0.1"), - ":", integer_to_list(mochiweb_socket_server:get(couch_http, port)), - "/", DbName - ]). - - -create_db(DbName) -> - {ok, Db} = couch_db:create( - DbName, - [{user_ctx, #user_ctx{roles = [<<"_admin">>]}}, overwrite]), - couch_db:close(Db), - {ok, Db}. - - -delete_db(#db{name = DbName, main_pid = Pid}) -> - ok = couch_server:delete( - DbName, [{user_ctx, #user_ctx{roles = [<<"_admin">>]}}]), - MonRef = erlang:monitor(process, Pid), - receive - {'DOWN', MonRef, process, Pid, _Reason} -> - ok - after 30000 -> - etap:bail("Timeout deleting database") - end. - - -replicate({remote, Db}, Target) -> - replicate(db_url(Db), Target); - -replicate(Source, {remote, Db}) -> - replicate(Source, db_url(Db)); - -replicate(Source, Target) -> - RepObject = {[ - {<<"source">>, Source}, - {<<"target">>, Target}, - {<<"continuous">>, true} - ]}, - {ok, Rep} = couch_replicator_utils:parse_rep_doc( - RepObject, #user_ctx{roles = [<<"_admin">>]}), - {ok, Pid} = couch_replicator:async_replicate(Rep), - {ok, Pid, Rep#rep.id}. - - -cancel_replication(RepId, RepPid) -> - {ok, _} = couch_replicator:cancel_replication(RepId), - etap:is(is_process_alive(RepPid), false, - "Replication process is no longer alive after cancel"). - - -att_data() -> - {ok, Data} = file:read_file( - test_util:build_file("share/www/image/logo.png")), - Data. http://git-wip-us.apache.org/repos/asf/couchdb/blob/4c39d7a5/apps/couch_replicator/test/04-replication-large-atts.t ---------------------------------------------------------------------- diff --git a/apps/couch_replicator/test/04-replication-large-atts.t b/apps/couch_replicator/test/04-replication-large-atts.t deleted file mode 100755 index c001f3e..0000000 --- a/apps/couch_replicator/test/04-replication-large-atts.t +++ /dev/null @@ -1,266 +0,0 @@ -#!/usr/bin/env escript -%% -*- erlang -*- -%%! -pa ./deps/*/ebin -pa ./apps/*/ebin -pa ./test/etap - -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - -% Test replication of large attachments. Verify that both source and -% target have the same attachment data and metadata. - --define(b2l(Bin), binary_to_list(Bin)). - --record(user_ctx, { - name = null, - roles = [], - handler -}). - --record(doc, { - id = <<"">>, - revs = {0, []}, - body = {[]}, - atts = [], - deleted = false, - meta = [] -}). - --record(att, { - name, - type, - att_len, - disk_len, - md5= <<>>, - revpos=0, - data, - encoding=identity -}). - - -source_db_name() -> <<"couch_test_rep_db_a">>. -target_db_name() -> <<"couch_test_rep_db_b">>. - - -main(_) -> - etap:plan(1192), - case (catch test()) of - ok -> - etap:end_tests(); - Other -> - etap:diag(io_lib:format("Test died abnormally: ~p", [Other])), - etap:bail(Other) - end, - ok. - - -test() -> - test_util:start_couch(), - - couch_config:set("attachments", "compressible_types", "text/*", false), - - Pairs = [ - {source_db_name(), target_db_name()}, - {{remote, source_db_name()}, target_db_name()}, - {source_db_name(), {remote, target_db_name()}}, - {{remote, source_db_name()}, {remote, (target_db_name())}} - ], - - {ok, SourceDb} = create_db(source_db_name()), - etap:diag("Populating source database"), - populate_db(SourceDb, 11), - ok = couch_db:close(SourceDb), - - lists:foreach( - fun({Source, Target}) -> - etap:diag("Creating target database"), - {ok, TargetDb} = create_db(target_db_name()), - - ok = couch_db:close(TargetDb), - etap:diag("Triggering replication"), - replicate(Source, Target), - etap:diag("Replication finished, comparing source and target databases"), - compare_dbs(SourceDb, TargetDb), - - etap:diag("Deleting target database"), - delete_db(TargetDb), - ok = timer:sleep(1000) - end, - Pairs), - - delete_db(SourceDb), - test_util:stop_couch(), - ok. - - -populate_db(Db, DocCount) -> - Docs = lists:foldl( - fun(DocIdCounter, Acc) -> - Doc = #doc{ - id = iolist_to_binary(["doc", integer_to_list(DocIdCounter)]), - body = {[]}, - atts = [ - att(<<"att1">>, 2 * 1024 * 1024, <<"text/plain">>), - att(<<"att2">>, round(6.6 * 1024 * 1024), <<"app/binary">>) - ] - }, - [Doc | Acc] - end, - [], lists:seq(1, DocCount)), - {ok, _} = couch_db:update_docs(Db, Docs, []). - - -att(Name, Size, Type) -> - #att{ - name = Name, - type = Type, - att_len = Size, - data = fun(Count) -> crypto:rand_bytes(Count) end - }. - - -compare_dbs(Source, Target) -> - {ok, SourceDb} = couch_db:open_int(couch_db:name(Source), []), - {ok, TargetDb} = couch_db:open_int(couch_db:name(Target), []), - - Fun = fun(FullDocInfo, _, Acc) -> - {ok, DocSource} = couch_db:open_doc(SourceDb, FullDocInfo), - Id = DocSource#doc.id, - - etap:diag("Verifying document " ++ ?b2l(Id)), - - {ok, DocTarget} = couch_db:open_doc(TargetDb, Id), - etap:is(DocTarget#doc.body, DocSource#doc.body, - "Same body in source and target databases"), - - #doc{atts = SourceAtts} = DocSource, - #doc{atts = TargetAtts} = DocTarget, - etap:is( - lists:sort([N || #att{name = N} <- SourceAtts]), - lists:sort([N || #att{name = N} <- TargetAtts]), - "Document has same number (and names) of attachments in " - "source and target databases"), - - lists:foreach( - fun(#att{name = AttName} = Att) -> - etap:diag("Verifying attachment " ++ ?b2l(AttName)), - - {ok, AttTarget} = find_att(TargetAtts, AttName), - SourceMd5 = att_md5(Att), - TargetMd5 = att_md5(AttTarget), - case AttName of - <<"att1">> -> - etap:is(Att#att.encoding, gzip, - "Attachment is gzip encoded in source database"), - etap:is(AttTarget#att.encoding, gzip, - "Attachment is gzip encoded in target database"), - DecSourceMd5 = att_decoded_md5(Att), - DecTargetMd5 = att_decoded_md5(AttTarget), - etap:is(DecTargetMd5, DecSourceMd5, - "Same identity content in source and target databases"); - _ -> - etap:is(Att#att.encoding, identity, - "Attachment is not encoded in source database"), - etap:is(AttTarget#att.encoding, identity, - "Attachment is not encoded in target database") - end, - etap:is(TargetMd5, SourceMd5, - "Same content in source and target databases"), - etap:is(is_integer(Att#att.disk_len), true, - "#att.disk_len is an integer in source database"), - etap:is(is_integer(Att#att.att_len), true, - "#att.att_len is an integer in source database"), - etap:is(is_integer(AttTarget#att.disk_len), true, - "#att.disk_len is an integer in target database"), - etap:is(is_integer(AttTarget#att.att_len), true, - "#att.att_len is an integer in target database"), - etap:is(Att#att.disk_len, AttTarget#att.disk_len, - "Same identity length in source and target databases"), - etap:is(Att#att.att_len, AttTarget#att.att_len, - "Same encoded length in source and target databases"), - etap:is(Att#att.type, AttTarget#att.type, - "Same type in source and target databases"), - etap:is(Att#att.md5, SourceMd5, "Correct MD5 in source database"), - etap:is(AttTarget#att.md5, SourceMd5, "Correct MD5 in target database") - end, - SourceAtts), - - {ok, Acc} - end, - - {ok, _, _} = couch_db:enum_docs(SourceDb, Fun, [], []), - ok = couch_db:close(SourceDb), - ok = couch_db:close(TargetDb). - - -find_att([], _Name) -> - nil; -find_att([#att{name = Name} = Att | _], Name) -> - {ok, Att}; -find_att([_ | Rest], Name) -> - find_att(Rest, Name). - - -att_md5(Att) -> - Md50 = couch_doc:att_foldl( - Att, - fun(Chunk, Acc) -> couch_util:md5_update(Acc, Chunk) end, - couch_util:md5_init()), - couch_util:md5_final(Md50). - -att_decoded_md5(Att) -> - Md50 = couch_doc:att_foldl_decode( - Att, - fun(Chunk, Acc) -> couch_util:md5_update(Acc, Chunk) end, - couch_util:md5_init()), - couch_util:md5_final(Md50). - - -db_url(DbName) -> - iolist_to_binary([ - "http://", couch_config:get("httpd", "bind_address", "127.0.0.1"), - ":", integer_to_list(mochiweb_socket_server:get(couch_http, port)), - "/", DbName - ]). - - -create_db(DbName) -> - couch_db:create( - DbName, - [{user_ctx, #user_ctx{roles = [<<"_admin">>]}}, overwrite]). - - -delete_db(Db) -> - ok = couch_server:delete( - couch_db:name(Db), [{user_ctx, #user_ctx{roles = [<<"_admin">>]}}]). - - -replicate({remote, Db}, Target) -> - replicate(db_url(Db), Target); - -replicate(Source, {remote, Db}) -> - replicate(Source, db_url(Db)); - -replicate(Source, Target) -> - RepObject = {[ - {<<"source">>, Source}, - {<<"target">>, Target} - ]}, - {ok, Rep} = couch_replicator_utils:parse_rep_doc( - RepObject, #user_ctx{roles = [<<"_admin">>]}), - {ok, Pid} = couch_replicator:async_replicate(Rep), - MonRef = erlang:monitor(process, Pid), - receive - {'DOWN', MonRef, process, Pid, Reason} -> - etap:is(Reason, normal, "Replication finished successfully") - after 300000 -> - etap:bail("Timeout waiting for replication to finish") - end. http://git-wip-us.apache.org/repos/asf/couchdb/blob/4c39d7a5/apps/couch_replicator/test/05-replication-many-leaves.t ---------------------------------------------------------------------- diff --git a/apps/couch_replicator/test/05-replication-many-leaves.t b/apps/couch_replicator/test/05-replication-many-leaves.t deleted file mode 100755 index c4c0007..0000000 --- a/apps/couch_replicator/test/05-replication-many-leaves.t +++ /dev/null @@ -1,295 +0,0 @@ -#!/usr/bin/env escript -%% -*- erlang -*- -%%! -pa ./deps/*/ebin -pa ./apps/*/ebin -pa ./test/etap - -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - -% Test replication of documents with many leaf revisions. -% Motivated by COUCHDB-1340 and other similar issues where a document -% GET with a too long ?open_revs revision list doesn't work due to -% maximum web server limits for the HTTP request path. - --record(user_ctx, { - name = null, - roles = [], - handler -}). - --record(doc, { - id = <<"">>, - revs = {0, []}, - body = {[]}, - atts = [], - deleted = false, - meta = [] -}). - --record(att, { - name, - type, - att_len, - disk_len, - md5= <<>>, - revpos=0, - data, - encoding=identity -}). - --define(b2l(B), binary_to_list(B)). --define(l2b(L), list_to_binary(L)). --define(i2l(I), integer_to_list(I)). - - -source_db_name() -> <<"couch_test_rep_db_a">>. -target_db_name() -> <<"couch_test_rep_db_b">>. - -doc_ids() -> - [<<"doc1">>, <<"doc2">>, <<"doc3">>]. - -doc_num_conflicts(<<"doc1">>) -> 10; -doc_num_conflicts(<<"doc2">>) -> 100; -% a number > MaxURLlength (7000) / length(DocRevisionString) -doc_num_conflicts(<<"doc3">>) -> 210. - - -main(_) -> - test_util:init_code_path(), - - etap:plan(56), - case (catch test()) of - ok -> - etap:end_tests(); - Other -> - etap:diag(io_lib:format("Test died abnormally: ~p", [Other])), - etap:bail(Other) - end, - ok. - - -test() -> - test_util:start_couch(), - - couch_config:set("replicator", "connection_timeout", "90000", false), - - Pairs = [ - {source_db_name(), target_db_name()}, - {{remote, source_db_name()}, target_db_name()}, - {source_db_name(), {remote, target_db_name()}}, - {{remote, source_db_name()}, {remote, (target_db_name())}} - ], - - lists:foreach( - fun({Source, Target}) -> - {ok, SourceDb} = create_db(source_db_name()), - etap:diag("Populating source database"), - {ok, DocRevs} = populate_db(SourceDb), - ok = couch_db:close(SourceDb), - etap:diag("Creating target database"), - {ok, TargetDb} = create_db(target_db_name()), - - ok = couch_db:close(TargetDb), - etap:diag("Triggering replication"), - replicate(Source, Target), - etap:diag("Replication finished, comparing source and target databases"), - {ok, SourceDb2} = couch_db:open_int(source_db_name(), []), - {ok, TargetDb2} = couch_db:open_int(target_db_name(), []), - verify_target(SourceDb2, TargetDb2, DocRevs), - ok = couch_db:close(SourceDb2), - ok = couch_db:close(TargetDb2), - - {ok, SourceDb3} = couch_db:open_int(source_db_name(), []), - {ok, DocRevs2} = add_attachments(SourceDb3, DocRevs, 2), - ok = couch_db:close(SourceDb3), - etap:diag("Triggering replication again"), - replicate(Source, Target), - etap:diag("Replication finished, comparing source and target databases"), - {ok, SourceDb4} = couch_db:open_int(source_db_name(), []), - {ok, TargetDb4} = couch_db:open_int(target_db_name(), []), - verify_target(SourceDb4, TargetDb4, DocRevs2), - ok = couch_db:close(SourceDb4), - ok = couch_db:close(TargetDb4), - - etap:diag("Deleting source and target databases"), - delete_db(TargetDb), - delete_db(SourceDb), - ok = timer:sleep(1000) - end, - Pairs), - - test_util:stop_couch(), - ok. - - -populate_db(Db) -> - DocRevsDict = lists:foldl( - fun(DocId, Acc) -> - Value = <<"0">>, - Doc = #doc{ - id = DocId, - body = {[ {<<"value">>, Value} ]} - }, - {ok, Rev} = couch_db:update_doc(Db, Doc, []), - {ok, DocRevs} = add_doc_siblings(Db, DocId, doc_num_conflicts(DocId)), - dict:store(DocId, [Rev | DocRevs], Acc) - end, - dict:new(), doc_ids()), - {ok, dict:to_list(DocRevsDict)}. - - -add_doc_siblings(Db, DocId, NumLeaves) when NumLeaves > 0 -> - add_doc_siblings(Db, DocId, NumLeaves, [], []). - - -add_doc_siblings(Db, _DocId, 0, AccDocs, AccRevs) -> - {ok, []} = couch_db:update_docs(Db, AccDocs, [], replicated_changes), - {ok, AccRevs}; - -add_doc_siblings(Db, DocId, NumLeaves, AccDocs, AccRevs) -> - Value = list_to_binary(integer_to_list(NumLeaves)), - Rev = couch_util:md5(Value), - Doc = #doc{ - id = DocId, - revs = {1, [Rev]}, - body = {[ {<<"value">>, Value} ]} - }, - add_doc_siblings(Db, DocId, NumLeaves - 1, [Doc | AccDocs], [{1, Rev} | AccRevs]). - - -verify_target(_SourceDb, _TargetDb, []) -> - ok; - -verify_target(SourceDb, TargetDb, [{DocId, RevList} | Rest]) -> - {ok, Lookups} = couch_db:open_doc_revs( - TargetDb, - DocId, - RevList, - [conflicts, deleted_conflicts]), - Docs = [Doc || {ok, Doc} <- Lookups], - {ok, SourceLookups} = couch_db:open_doc_revs( - SourceDb, - DocId, - RevList, - [conflicts, deleted_conflicts]), - SourceDocs = [Doc || {ok, Doc} <- SourceLookups], - Total = doc_num_conflicts(DocId) + 1, - etap:is( - length(Docs), - Total, - "Target has " ++ ?i2l(Total) ++ " leaf revisions of document " ++ ?b2l(DocId)), - etap:diag("Verifying all revisions of document " ++ ?b2l(DocId)), - lists:foreach( - fun({#doc{id = Id, revs = Revs} = TgtDoc, #doc{id = Id, revs = Revs} = SrcDoc}) -> - SourceJson = couch_doc:to_json_obj(SrcDoc, [attachments]), - TargetJson = couch_doc:to_json_obj(TgtDoc, [attachments]), - case TargetJson of - SourceJson -> - ok; - _ -> - {Pos, [Rev | _]} = Revs, - etap:bail("Wrong value for revision " ++ - ?b2l(couch_doc:rev_to_str({Pos, Rev})) ++ - " of document " ++ ?b2l(DocId)) - end - end, - lists:zip(Docs, SourceDocs)), - verify_target(SourceDb, TargetDb, Rest). - - -add_attachments(Source, DocIdRevs, NumAtts) -> - add_attachments(Source, DocIdRevs, NumAtts, []). - -add_attachments(_SourceDb, [], _NumAtts, Acc) -> - {ok, Acc}; - -add_attachments(SourceDb, [{DocId, RevList} | Rest], NumAtts, IdRevsAcc) -> - {ok, SourceLookups} = couch_db:open_doc_revs( - SourceDb, - DocId, - RevList, - []), - SourceDocs = [Doc || {ok, Doc} <- SourceLookups], - Total = doc_num_conflicts(DocId) + 1, - etap:is( - length(SourceDocs), - Total, - "Source still has " ++ ?i2l(Total) ++ - " leaf revisions of document " ++ ?b2l(DocId)), - etap:diag("Adding " ++ ?i2l(NumAtts) ++ - " attachments to each revision of the document " ++ ?b2l(DocId)), - NewDocs = lists:foldl( - fun(#doc{atts = Atts, revs = {Pos, [Rev | _]}} = Doc, Acc) -> - NewAtts = lists:foldl( - fun(I, AttAcc) -> - AttData = crypto:rand_bytes(100), - NewAtt = #att{ - name = iolist_to_binary( - ["att_", ?i2l(I), "_", couch_doc:rev_to_str({Pos, Rev})]), - type = <<"application/foobar">>, - att_len = byte_size(AttData), - data = AttData - }, - [NewAtt | AttAcc] - end, - [], lists:seq(1, NumAtts)), - [Doc#doc{atts = Atts ++ NewAtts} | Acc] - end, - [], SourceDocs), - {ok, UpdateResults} = couch_db:update_docs(SourceDb, NewDocs, []), - NewRevs = [R || {ok, R} <- UpdateResults], - etap:is( - length(NewRevs), - length(NewDocs), - "Document revisions updated with " ++ ?i2l(NumAtts) ++ " attachments"), - add_attachments(SourceDb, Rest, NumAtts, [{DocId, NewRevs} | IdRevsAcc]). - - -db_url(DbName) -> - iolist_to_binary([ - "http://", couch_config:get("httpd", "bind_address", "127.0.0.1"), - ":", integer_to_list(mochiweb_socket_server:get(couch_http, port)), - "/", DbName - ]). - - -create_db(DbName) -> - couch_db:create( - DbName, - [{user_ctx, #user_ctx{roles = [<<"_admin">>]}}, overwrite]). - - -delete_db(Db) -> - ok = couch_server:delete( - couch_db:name(Db), [{user_ctx, #user_ctx{roles = [<<"_admin">>]}}]). - - -replicate({remote, Db}, Target) -> - replicate(db_url(Db), Target); - -replicate(Source, {remote, Db}) -> - replicate(Source, db_url(Db)); - -replicate(Source, Target) -> - RepObject = {[ - {<<"source">>, Source}, - {<<"target">>, Target} - ]}, - {ok, Rep} = couch_replicator_utils:parse_rep_doc( - RepObject, #user_ctx{roles = [<<"_admin">>]}), - {ok, Pid} = couch_replicator:async_replicate(Rep), - MonRef = erlang:monitor(process, Pid), - receive - {'DOWN', MonRef, process, Pid, Reason} -> - etap:is(Reason, normal, "Replication finished successfully") - after 900000 -> - etap:bail("Timeout waiting for replication to finish") - end. http://git-wip-us.apache.org/repos/asf/couchdb/blob/4c39d7a5/apps/couch_replicator/test/06-doc-missing-stubs.t ---------------------------------------------------------------------- diff --git a/apps/couch_replicator/test/06-doc-missing-stubs.t b/apps/couch_replicator/test/06-doc-missing-stubs.t deleted file mode 100755 index 5db5830..0000000 --- a/apps/couch_replicator/test/06-doc-missing-stubs.t +++ /dev/null @@ -1,303 +0,0 @@ -#!/usr/bin/env escript -%% -*- erlang -*- -%%! -pa ./deps/*/ebin -pa ./apps/*/ebin -pa ./test/etap - -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - -% Test replication of documents with many leaf revisions. -% Motivated by COUCHDB-1340 and other similar issues where a document -% GET with a too long ?open_revs revision list doesn't work due to -% maximum web server limits for the HTTP request path. - --record(user_ctx, { - name = null, - roles = [], - handler -}). - --record(doc, { - id = <<"">>, - revs = {0, []}, - body = {[]}, - atts = [], - deleted = false, - meta = [] -}). - --record(att, { - name, - type, - att_len, - disk_len, - md5= <<>>, - revpos=0, - data, - encoding=identity -}). - --define(b2l(B), binary_to_list(B)). - -source_db_name() -> <<"couch_test_rep_db_a">>. -target_db_name() -> <<"couch_test_rep_db_b">>. - -target_revs_limit() -> 3. - - -main(_) -> - etap:plan(128), - case (catch test()) of - ok -> - etap:end_tests(); - Other -> - etap:diag(io_lib:format("Test died abnormally: ~p", [Other])), - etap:bail(Other) - end, - ok. - - -% Test motivated by COUCHDB-1365. -test() -> - test_util:start_couch(), - - Pairs = [ - {source_db_name(), target_db_name()}, - {{remote, source_db_name()}, target_db_name()}, - {source_db_name(), {remote, target_db_name()}}, - {{remote, source_db_name()}, {remote, (target_db_name())}} - ], - - lists:foreach( - fun({Source, Target}) -> - {ok, SourceDb} = create_db(source_db_name()), - etap:diag("Populating source database"), - populate_db(SourceDb), - ok = couch_db:close(SourceDb), - - etap:diag("Creating target database"), - {ok, TargetDb} = create_db(target_db_name()), - ok = couch_db:set_revs_limit(TargetDb, target_revs_limit()), - ok = couch_db:close(TargetDb), - - etap:diag("Triggering replication"), - replicate(Source, Target), - etap:diag("Replication finished, comparing source and target databases"), - compare_dbs(SourceDb, TargetDb), - - etap:diag("Updating source database docs"), - update_db_docs(couch_db:name(SourceDb), target_revs_limit() + 2), - - etap:diag("Triggering replication again"), - replicate(Source, Target), - etap:diag("Replication finished, comparing source and target databases"), - compare_dbs(SourceDb, TargetDb), - - etap:diag("Deleting databases"), - delete_db(TargetDb), - delete_db(SourceDb), - ok = timer:sleep(1000) - end, - Pairs), - - test_util:stop_couch(), - ok. - - -populate_db(Db) -> - AttData = crypto:rand_bytes(6000), - Doc1 = #doc{ - id = <<"doc1">>, - atts = [ - #att{ - name = <<"doc1_att1">>, - type = <<"application/foobar">>, - att_len = byte_size(AttData), - data = AttData - } - ] - }, - {ok, _} = couch_db:update_doc(Db, Doc1, []). - - -update_db_docs(DbName, Times) -> - {ok, Db} = couch_db:open_int(DbName, []), - {ok, _, _} = couch_db:enum_docs( - Db, - fun(FDI, _, Acc) -> db_fold_fun(FDI, Acc) end, - {DbName, Times}, - []), - ok = couch_db:close(Db). - - -db_fold_fun(FullDocInfo, {DbName, Times}) -> - {ok, Db} = couch_db:open_int(DbName, []), - {ok, Doc} = couch_db:open_doc(Db, FullDocInfo), - lists:foldl( - fun(_, {Pos, RevId}) -> - {ok, Db2} = couch_db:reopen(Db), - NewDocVersion = Doc#doc{ - revs = {Pos, [RevId]}, - body = {[{<<"value">>, base64:encode(crypto:rand_bytes(100))}]} - }, - {ok, NewRev} = couch_db:update_doc(Db2, NewDocVersion, []), - NewRev - end, - {element(1, Doc#doc.revs), hd(element(2, Doc#doc.revs))}, - lists:seq(1, Times)), - ok = couch_db:close(Db), - {ok, {DbName, Times}}. - - -compare_dbs(Source, Target) -> - {ok, SourceDb} = couch_db:open_int(couch_db:name(Source), []), - {ok, TargetDb} = couch_db:open_int(couch_db:name(Target), []), - - Fun = fun(FullDocInfo, _, Acc) -> - {ok, DocSource} = couch_db:open_doc( - SourceDb, FullDocInfo, [conflicts, deleted_conflicts]), - Id = DocSource#doc.id, - - etap:diag("Verifying document " ++ ?b2l(Id)), - - {ok, DocTarget} = couch_db:open_doc( - TargetDb, Id, [conflicts, deleted_conflicts]), - etap:is(DocTarget#doc.body, DocSource#doc.body, - "Same body in source and target databases"), - - etap:is( - couch_doc:to_json_obj(DocTarget, []), - couch_doc:to_json_obj(DocSource, []), - "Same doc body in source and target databases"), - - #doc{atts = SourceAtts} = DocSource, - #doc{atts = TargetAtts} = DocTarget, - etap:is( - lists:sort([N || #att{name = N} <- SourceAtts]), - lists:sort([N || #att{name = N} <- TargetAtts]), - "Document has same number (and names) of attachments in " - "source and target databases"), - - lists:foreach( - fun(#att{name = AttName} = Att) -> - etap:diag("Verifying attachment " ++ ?b2l(AttName)), - - {ok, AttTarget} = find_att(TargetAtts, AttName), - SourceMd5 = att_md5(Att), - TargetMd5 = att_md5(AttTarget), - case AttName of - <<"att1">> -> - etap:is(Att#att.encoding, gzip, - "Attachment is gzip encoded in source database"), - etap:is(AttTarget#att.encoding, gzip, - "Attachment is gzip encoded in target database"), - DecSourceMd5 = att_decoded_md5(Att), - DecTargetMd5 = att_decoded_md5(AttTarget), - etap:is(DecTargetMd5, DecSourceMd5, - "Same identity content in source and target databases"); - _ -> - etap:is(Att#att.encoding, identity, - "Attachment is not encoded in source database"), - etap:is(AttTarget#att.encoding, identity, - "Attachment is not encoded in target database") - end, - etap:is(TargetMd5, SourceMd5, - "Same content in source and target databases"), - etap:is(is_integer(Att#att.disk_len), true, - "#att.disk_len is an integer in source database"), - etap:is(is_integer(Att#att.att_len), true, - "#att.att_len is an integer in source database"), - etap:is(is_integer(AttTarget#att.disk_len), true, - "#att.disk_len is an integer in target database"), - etap:is(is_integer(AttTarget#att.att_len), true, - "#att.att_len is an integer in target database"), - etap:is(Att#att.disk_len, AttTarget#att.disk_len, - "Same identity length in source and target databases"), - etap:is(Att#att.att_len, AttTarget#att.att_len, - "Same encoded length in source and target databases"), - etap:is(Att#att.type, AttTarget#att.type, - "Same type in source and target databases"), - etap:is(Att#att.md5, SourceMd5, "Correct MD5 in source database"), - etap:is(AttTarget#att.md5, SourceMd5, "Correct MD5 in target database") - end, - SourceAtts), - - {ok, Acc} - end, - - {ok, _, _} = couch_db:enum_docs(SourceDb, Fun, [], []), - ok = couch_db:close(SourceDb), - ok = couch_db:close(TargetDb). - - -find_att([], _Name) -> - nil; -find_att([#att{name = Name} = Att | _], Name) -> - {ok, Att}; -find_att([_ | Rest], Name) -> - find_att(Rest, Name). - - -att_md5(Att) -> - Md50 = couch_doc:att_foldl( - Att, - fun(Chunk, Acc) -> couch_util:md5_update(Acc, Chunk) end, - couch_util:md5_init()), - couch_util:md5_final(Md50). - -att_decoded_md5(Att) -> - Md50 = couch_doc:att_foldl_decode( - Att, - fun(Chunk, Acc) -> couch_util:md5_update(Acc, Chunk) end, - couch_util:md5_init()), - couch_util:md5_final(Md50). - - -db_url(DbName) -> - iolist_to_binary([ - "http://", couch_config:get("httpd", "bind_address", "127.0.0.1"), - ":", integer_to_list(mochiweb_socket_server:get(couch_http, port)), - "/", DbName - ]). - - -create_db(DbName) -> - couch_db:create( - DbName, - [{user_ctx, #user_ctx{roles = [<<"_admin">>]}}, overwrite]). - - -delete_db(Db) -> - ok = couch_server:delete( - couch_db:name(Db), [{user_ctx, #user_ctx{roles = [<<"_admin">>]}}]). - - -replicate({remote, Db}, Target) -> - replicate(db_url(Db), Target); - -replicate(Source, {remote, Db}) -> - replicate(Source, db_url(Db)); - -replicate(Source, Target) -> - RepObject = {[ - {<<"source">>, Source}, - {<<"target">>, Target} - ]}, - {ok, Rep} = couch_replicator_utils:parse_rep_doc( - RepObject, #user_ctx{roles = [<<"_admin">>]}), - {ok, Pid} = couch_replicator:async_replicate(Rep), - MonRef = erlang:monitor(process, Pid), - receive - {'DOWN', MonRef, process, Pid, Reason} -> - etap:is(Reason, normal, "Replication finished successfully") - after 300000 -> - etap:bail("Timeout waiting for replication to finish") - end. http://git-wip-us.apache.org/repos/asf/couchdb/blob/4c39d7a5/apps/couch_replicator/test/07-use-checkpoints.t ---------------------------------------------------------------------- diff --git a/apps/couch_replicator/test/07-use-checkpoints.t b/apps/couch_replicator/test/07-use-checkpoints.t deleted file mode 100755 index 9f975e2..0000000 --- a/apps/couch_replicator/test/07-use-checkpoints.t +++ /dev/null @@ -1,255 +0,0 @@ -#!/usr/bin/env escript -%%! -pa ./deps/*/ebin -pa ./apps/*/ebin -pa ./test/etap - -%% -*- erlang -*- -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - -% Verify that compacting databases that are being used as the source or -% target of a replication doesn't affect the replication and that the -% replication doesn't hold their reference counters forever. - --define(b2l(B), binary_to_list(B)). - --record(user_ctx, { - name = null, - roles = [], - handler -}). - --record(doc, { - id = <<"">>, - revs = {0, []}, - body = {[]}, - atts = [], - deleted = false, - meta = [] -}). - --record(db, { - main_pid = nil, - update_pid = nil, - compactor_pid = nil, - instance_start_time, % number of microsecs since jan 1 1970 as a binary string - fd, - updater_fd, - fd_ref_counter, - header = nil, - committed_update_seq, - fulldocinfo_by_id_btree, - docinfo_by_seq_btree, - local_docs_btree, - update_seq, - name, - filepath, - validate_doc_funs = [], - security = [], - security_ptr = nil, - user_ctx = #user_ctx{}, - waiting_delayed_commit = nil, - revs_limit = 1000, - fsync_options = [], - options = [], - compression, - before_doc_update, - after_doc_read -}). - --record(rep, { - id, - source, - target, - options, - user_ctx, - doc_id -}). - - -source_db_name() -> <<"couch_test_rep_db_a">>. -target_db_name() -> <<"couch_test_rep_db_b">>. - - -main(_) -> - etap:plan(16), - case (catch test()) of - ok -> - etap:end_tests(); - Other -> - etap:diag(io_lib:format("Test died abnormally: ~p", [Other])), - etap:bail(Other) - end, - ok. - - -test() -> - test_util:start_couch(), - - test_use_checkpoints(false), - test_use_checkpoints(true), - - test_util:stop_couch(), - ok. - - -test_use_checkpoints(UseCheckpoints) -> - Pairs = [ - {source_db_name(), target_db_name()}, - {{remote, source_db_name()}, target_db_name()}, - {source_db_name(), {remote, target_db_name()}}, - {{remote, source_db_name()}, {remote, (target_db_name())}} - ], - - ListenerFun = case UseCheckpoints of - false -> - fun({finished, _, {CheckpointHistory}}) -> - etap:is(CheckpointHistory, - [{<<"use_checkpoints">>,false}], - "No checkpoints found"); - (_) -> - ok - end; - true -> - fun({finished, _, {CheckpointHistory}}) -> - SessionId = lists:keyfind( - <<"session_id">>, 1, CheckpointHistory), - etap:isnt(SessionId, false, "There's a checkpoint"); - (_) -> - ok - end - end, - {ok, Listener} = couch_replicator_notifier:start_link(ListenerFun), - - lists:foreach( - fun({Source, Target}) -> - {ok, SourceDb} = create_db(source_db_name()), - etap:diag("Populating source database"), - populate_db(SourceDb, 100), - ok = couch_db:close(SourceDb), - - etap:diag("Creating target database"), - {ok, TargetDb} = create_db(target_db_name()), - ok = couch_db:close(TargetDb), - - etap:diag("Setup replicator notifier listener"), - - etap:diag("Triggering replication"), - replicate(Source, Target, UseCheckpoints), - - etap:diag("Replication finished, comparing source and target databases"), - compare_dbs(SourceDb, TargetDb), - - etap:diag("Deleting databases"), - delete_db(TargetDb), - delete_db(SourceDb), - - ok = timer:sleep(1000) - end, - Pairs), - - couch_replicator_notifier:stop(Listener). - - -populate_db(Db, DocCount) -> - Docs = lists:foldl( - fun(DocIdCounter, Acc) -> - Id = iolist_to_binary(["doc", integer_to_list(DocIdCounter)]), - Value = iolist_to_binary(["val", integer_to_list(DocIdCounter)]), - Doc = #doc{ - id = Id, - body = {[ {<<"value">>, Value} ]} - }, - [Doc | Acc] - end, - [], lists:seq(1, DocCount)), - {ok, _} = couch_db:update_docs(Db, Docs, []). - - -compare_dbs(#db{name = SourceName}, #db{name = TargetName}) -> - {ok, SourceDb} = couch_db:open_int(SourceName, []), - {ok, TargetDb} = couch_db:open_int(TargetName, []), - Fun = fun(FullDocInfo, _, Acc) -> - {ok, Doc} = couch_db:open_doc(SourceDb, FullDocInfo), - {Props} = DocJson = couch_doc:to_json_obj(Doc, [attachments]), - DocId = couch_util:get_value(<<"_id">>, Props), - DocTarget = case couch_db:open_doc(TargetDb, DocId) of - {ok, DocT} -> - DocT; - Error -> - etap:bail("Error opening document '" ++ ?b2l(DocId) ++ - "' from target: " ++ couch_util:to_list(Error)) - end, - DocTargetJson = couch_doc:to_json_obj(DocTarget, [attachments]), - case DocTargetJson of - DocJson -> - ok; - _ -> - etap:bail("Content from document '" ++ ?b2l(DocId) ++ - "' differs in target database") - end, - {ok, Acc} - end, - {ok, _, _} = couch_db:enum_docs(SourceDb, Fun, [], []), - etap:diag("Target database has the same documents as the source database"), - ok = couch_db:close(SourceDb), - ok = couch_db:close(TargetDb). - - -db_url(DbName) -> - iolist_to_binary([ - "http://", couch_config:get("httpd", "bind_address", "127.0.0.1"), - ":", integer_to_list(mochiweb_socket_server:get(couch_http, port)), - "/", DbName - ]). - - -create_db(DbName) -> - {ok, Db} = couch_db:create( - DbName, - [{user_ctx, #user_ctx{roles = [<<"_admin">>]}}, overwrite]), - couch_db:close(Db), - {ok, Db}. - - -delete_db(#db{name = DbName, main_pid = Pid}) -> - ok = couch_server:delete( - DbName, [{user_ctx, #user_ctx{roles = [<<"_admin">>]}}]), - MonRef = erlang:monitor(process, Pid), - receive - {'DOWN', MonRef, process, Pid, _Reason} -> - ok - after 30000 -> - etap:bail("Timeout deleting database") - end. - - -replicate({remote, Db}, Target, UseCheckpoints) -> - replicate(db_url(Db), Target, UseCheckpoints); - -replicate(Source, {remote, Db}, UseCheckpoints) -> - replicate(Source, db_url(Db), UseCheckpoints); - -replicate(Source, Target, UseCheckpoints) -> - RepObject = {[ - {<<"source">>, Source}, - {<<"target">>, Target}, - {<<"use_checkpoints">>, UseCheckpoints} - ]}, - {ok, Rep} = couch_replicator_utils:parse_rep_doc( - RepObject, #user_ctx{roles = [<<"_admin">>]}), - {ok, Pid} = couch_replicator:async_replicate(Rep), - MonRef = erlang:monitor(process, Pid), - receive - {'DOWN', MonRef, process, Pid, Reason} -> - etap:is(Reason, normal, "Replication finished successfully") - after 300000 -> - etap:bail("Timeout waiting for replication to finish") - end.