Added: couchdb/trunk/src/couchdb/couch_replicator_doc_copier.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_replicator_doc_copier.erl?rev=1071375&view=auto
==============================================================================
--- couchdb/trunk/src/couchdb/couch_replicator_doc_copier.erl (added)
+++ couchdb/trunk/src/couchdb/couch_replicator_doc_copier.erl Wed Feb 16 20:05:31 2011
@@ -0,0 +1,483 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_doc_copier).
+-behaviour(gen_server).
+
+% public API
+-export([start_link/5]).
+
+% gen_server callbacks
+-export([init/1, terminate/2, code_change/3]).
+-export([handle_call/3, handle_cast/2, handle_info/2]).
+
+-include("couch_db.hrl").
+-include("couch_api_wrap.hrl").
+-include("couch_replicator.hrl").
+
+% TODO: maybe make both buffer max sizes configurable
+-define(DOC_BUFFER_BYTE_SIZE, 512 * 1024). % for remote targets
+-define(DOC_BUFFER_LEN, 10). % for local targets, # of documents
+-define(MAX_BULK_ATT_SIZE, 64 * 1024).
+-define(MAX_BULK_ATTS_PER_DOC, 8).
+
+-import(couch_replicator_utils, [
+ open_db/1,
+ close_db/1,
+ start_db_compaction_notifier/2,
+ stop_db_compaction_notifier/1
+]).
+
+-record(batch, {
+ docs = [],
+ size = 0
+}).
+
+-record(state, {
+ loop,
+ cp,
+ max_parallel_conns,
+ source,
+ target,
+ readers = [],
+ writer = nil,
+ pending_fetch = nil,
+ flush_waiter = nil,
+ highest_seq_seen = ?LOWEST_SEQ,
+ stats = #rep_stats{},
+ source_db_compaction_notifier = nil,
+ target_db_compaction_notifier = nil,
+ batch = #batch{}
+}).
+
+
+
+start_link(Cp, #db{} = Source, Target, MissingRevsQueue, _MaxConns) ->
+ Pid = spawn_link(
+ fun() -> queue_fetch_loop(Source, Target, Cp, MissingRevsQueue) end),
+ {ok, Pid};
+
+start_link(Cp, Source, Target, MissingRevsQueue, MaxConns) ->
+ gen_server:start_link(
+ ?MODULE, {Cp, Source, Target, MissingRevsQueue, MaxConns}, []).
+
+
+init({Cp, Source, Target, MissingRevsQueue, MaxConns}) ->
+ process_flag(trap_exit, true),
+ Parent = self(),
+ LoopPid = spawn_link(
+ fun() -> queue_fetch_loop(Source, Target, Parent, MissingRevsQueue) end
+ ),
+ State = #state{
+ cp = Cp,
+ max_parallel_conns = MaxConns,
+ loop = LoopPid,
+ source = open_db(Source),
+ target = open_db(Target),
+ source_db_compaction_notifier =
+ start_db_compaction_notifier(Source, self()),
+ target_db_compaction_notifier =
+ start_db_compaction_notifier(Target, self())
+ },
+ {ok, State}.
+
+
+handle_call({seq_done, Seq, RevCount}, {Pid, _},
+ #state{loop = Pid, highest_seq_seen = HighSeq, stats = Stats} = State) ->
+ NewState = State#state{
+ highest_seq_seen = lists:max([Seq, HighSeq]),
+ stats = Stats#rep_stats{
+ missing_checked = Stats#rep_stats.missing_checked + RevCount
+ }
+ },
+ {reply, ok, NewState};
+
+handle_call({fetch_doc, {_Id, Revs, _PAs, Seq} = Params}, {Pid, _} = From,
+ #state{loop = Pid, readers = Readers, pending_fetch = nil,
+ highest_seq_seen = HighSeq, stats = Stats, source = Src, target = Tgt,
+ max_parallel_conns = MaxConns} = State) ->
+ Stats2 = Stats#rep_stats{
+ missing_checked = Stats#rep_stats.missing_checked + length(Revs),
+ missing_found = Stats#rep_stats.missing_found + length(Revs)
+ },
+ case length(Readers) of
+ Size when Size < MaxConns ->
+ Reader = spawn_doc_reader(Src, Tgt, Params),
+ NewState = State#state{
+ highest_seq_seen = lists:max([Seq, HighSeq]),
+ stats = Stats2,
+ readers = [Reader | Readers]
+ },
+ {reply, ok, NewState};
+ _ ->
+ NewState = State#state{
+ highest_seq_seen = lists:max([Seq, HighSeq]),
+ stats = Stats2,
+ pending_fetch = {From, Params}
+ },
+ {noreply, NewState}
+ end;
+
+handle_call({batch_doc, Doc}, From, State) ->
+ gen_server:reply(From, ok),
+ {noreply, maybe_flush_docs(Doc, State)};
+
+handle_call({doc_flushed, true}, _From, #state{stats = Stats} = State) ->
+ NewStats = Stats#rep_stats{
+ docs_read = Stats#rep_stats.docs_read + 1,
+ docs_written = Stats#rep_stats.docs_written + 1
+ },
+ {reply, ok, State#state{stats = NewStats}};
+
+handle_call({doc_flushed, false}, _From, #state{stats = Stats} = State) ->
+ NewStats = Stats#rep_stats{
+ docs_read = Stats#rep_stats.docs_read + 1,
+ doc_write_failures = Stats#rep_stats.doc_write_failures + 1
+ },
+ {reply, ok, State#state{stats = NewStats}};
+
+handle_call({add_write_stats, Written, Failed}, _From,
+ #state{stats = Stats} = State) ->
+ NewStats = Stats#rep_stats{
+ docs_written = Stats#rep_stats.docs_written + Written,
+ doc_write_failures = Stats#rep_stats.doc_write_failures + Failed
+ },
+ {reply, ok, State#state{stats = NewStats}};
+
+handle_call(flush, {Pid, _} = From,
+ #state{loop = Pid, writer = nil, flush_waiter = nil,
+ target = Target, batch = Batch} = State) ->
+ State2 = case State#state.readers of
+ [] ->
+ State#state{writer = spawn_writer(Target, Batch)};
+ _ ->
+ State
+ end,
+ {noreply, State2#state{flush_waiter = From}}.
+
+
+handle_cast({db_compacted, DbName},
+ #state{source = #db{name = DbName} = Source} = State) ->
+ {ok, NewSource} = couch_db:reopen(Source),
+ {noreply, State#state{source = NewSource}};
+
+handle_cast({db_compacted, DbName},
+ #state{target = #db{name = DbName} = Target} = State) ->
+ {ok, NewTarget} = couch_db:reopen(Target),
+ {noreply, State#state{target = NewTarget}};
+
+handle_cast(Msg, State) ->
+ {stop, {unexpected_async_call, Msg}, State}.
+
+
+handle_info({'EXIT', Pid, normal}, #state{loop = Pid} = State) ->
+ #state{
+ batch = #batch{docs = []}, readers = [], writer = nil,
+ pending_fetch = nil, flush_waiter = nil
+ } = State,
+ {stop, normal, State};
+
+handle_info({'EXIT', Pid, normal}, #state{writer = Pid} = State) ->
+ {noreply, after_full_flush(State)};
+
+handle_info({'EXIT', Pid, normal}, #state{writer = nil} = State) ->
+ #state{
+ readers = Readers, writer = Writer, batch = Batch,
+ source = Source, target = Target,
+ pending_fetch = Fetch, flush_waiter = FlushWaiter
+ } = State,
+ case Readers -- [Pid] of
+ Readers ->
+ {noreply, State};
+ Readers2 ->
+ State2 = case Fetch of
+ nil ->
+ case (FlushWaiter =/= nil) andalso (Writer =:= nil) andalso
+ (Readers2 =:= []) of
+ true ->
+ State#state{
+ readers = Readers2,
+ writer = spawn_writer(Target, Batch)
+ };
+ false ->
+ State#state{readers = Readers2}
+ end;
+ {From, FetchParams} ->
+ Reader = spawn_doc_reader(Source, Target, FetchParams),
+ gen_server:reply(From, ok),
+ State#state{
+ readers = [Reader | Readers2],
+ pending_fetch = nil
+ }
+ end,
+ {noreply, State2}
+ end;
+
+handle_info({'EXIT', Pid, Reason}, State) ->
+ {stop, {process_died, Pid, Reason}, State}.
+
+
+terminate(_Reason, State) ->
+ close_db(State#state.source),
+ close_db(State#state.target),
+ stop_db_compaction_notifier(State#state.source_db_compaction_notifier),
+ stop_db_compaction_notifier(State#state.target_db_compaction_notifier).
+
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+
+queue_fetch_loop(Source, Target, Parent, MissingRevsQueue) ->
+ case couch_work_queue:dequeue(MissingRevsQueue, 1) of
+ closed ->
+ ok;
+ {ok, [IdRevs]} ->
+ case Source of
+ #db{} ->
+ Source2 = open_db(Source),
+ Target2 = open_db(Target),
+ {Stats, HighSeqDone} = local_process_batch(
+ IdRevs, Source2, Target2, #batch{}, #rep_stats{}, ?LOWEST_SEQ),
+ close_db(Source2),
+ close_db(Target2),
+ ok = gen_server:cast(Parent, {report_seq_done, HighSeqDone, Stats}),
+ ?LOG_DEBUG("Worker reported completion of seq ~p", [HighSeqDone]);
+ #httpdb{} ->
+ remote_process_batch(IdRevs, Parent)
+ end,
+ queue_fetch_loop(Source, Target, Parent, MissingRevsQueue)
+ end.
+
+
+local_process_batch([], _Src, _Tgt, #batch{docs = []}, Stats, HighestSeqDone) ->
+ {Stats, HighestSeqDone};
+
+local_process_batch([], _Source, Target, #batch{docs = Docs, size = Size},
+ Stats, HighestSeqDone) ->
+ case Target of
+ #httpdb{} ->
+ ?LOG_DEBUG("Worker flushing doc batch of size ~p bytes", [Size]);
+ #db{} ->
+ ?LOG_DEBUG("Worker flushing doc batch of ~p docs", [Size])
+ end,
+ {Written, WriteFailures} = flush_docs(Target, Docs),
+ Stats2 = Stats#rep_stats{
+ docs_written = Stats#rep_stats.docs_written + Written,
+ doc_write_failures = Stats#rep_stats.doc_write_failures + WriteFailures
+ },
+ {Stats2, HighestSeqDone};
+
+local_process_batch([{Seq, {Id, Revs, NotMissingCount, PAs}} | Rest],
+ Source, Target, Batch, Stats, HighestSeqSeen) ->
+ {ok, DocList} = fetch_doc(
+ Source, {Id, Revs, PAs, Seq}, fun local_doc_handler/2, []),
+ {Batch2, Written, WriteFailures} = lists:foldl(
+ fun(Doc, {Batch0, W0, F0}) ->
+ {Batch1, W, F} = maybe_flush_docs(Target, Batch0, Doc),
+ {Batch1, W0 + W, F0 + F}
+ end,
+ {Batch, 0, 0}, DocList),
+ Stats2 = Stats#rep_stats{
+ missing_checked = Stats#rep_stats.missing_checked + length(Revs)
+ + NotMissingCount,
+ missing_found = Stats#rep_stats.missing_found + length(Revs),
+ docs_read = Stats#rep_stats.docs_read + length(DocList),
+ docs_written = Stats#rep_stats.docs_written + Written,
+ doc_write_failures = Stats#rep_stats.doc_write_failures + WriteFailures
+ },
+ local_process_batch(
+ Rest, Source, Target, Batch2, Stats2, lists:max([Seq, HighestSeqSeen])).
+
+
+remote_process_batch([], Parent) ->
+ ok = gen_server:call(Parent, flush, infinity);
+
+remote_process_batch([{Seq, {Id, Revs, NotMissing, PAs}} | Rest], Parent) ->
+ case NotMissing > 0 of
+ true ->
+ ok = gen_server:call(Parent, {seq_done, Seq, NotMissing}, infinity);
+ false ->
+ ok
+ end,
+ % When the source is a remote database, we fetch a single document revision
+ % per HTTP request. This is mostly to facilitate retrying of HTTP requests
+ % due to network transient failures. It also helps not exceeding the maximum
+ % URL length allowed by proxies and Mochiweb.
+ lists:foreach(
+ fun(Rev) ->
+ ok = gen_server:call(
+ Parent, {fetch_doc, {Id, [Rev], PAs, Seq}}, infinity)
+ end,
+ Revs),
+ remote_process_batch(Rest, Parent).
+
+
+spawn_doc_reader(Source, Target, FetchParams) ->
+ Parent = self(),
+ spawn_link(fun() ->
+ Source2 = open_db(Source),
+ fetch_doc(
+ Source2, FetchParams, fun remote_doc_handler/2, {Parent, Target}),
+ close_db(Source2)
+ end).
+
+
+fetch_doc(Source, {Id, Revs, PAs, _Seq}, DocHandler, Acc) ->
+ couch_api_wrap:open_doc_revs(
+ Source, Id, Revs, [{atts_since, PAs}], DocHandler, Acc).
+
+
+local_doc_handler({ok, Doc}, DocList) ->
+ [Doc | DocList];
+local_doc_handler(_, DocList) ->
+ DocList.
+
+
+remote_doc_handler({ok, #doc{atts = []} = Doc}, {Parent, _} = Acc) ->
+ ok = gen_server:call(Parent, {batch_doc, Doc}, infinity),
+ Acc;
+remote_doc_handler({ok, Doc}, {Parent, Target} = Acc) ->
+ % Immediately flush documents with attachments received from a remote
+ % source. The data property of each attachment is a function that starts
+ % streaming the attachment data from the remote source, therefore it's
+ % convenient to call it ASAP to avoid ibrowse inactivity timeouts.
+ Target2 = open_db(Target),
+ Success = (flush_doc(Target2, Doc) =:= ok),
+ ok = gen_server:call(Parent, {doc_flushed, Success}, infinity),
+ close_db(Target2),
+ Acc;
+remote_doc_handler(_, Acc) ->
+ Acc.
+
+
+spawn_writer(Target, #batch{docs = DocList, size = Size}) ->
+ case {Target, Size > 0} of
+ {#httpdb{}, true} ->
+ ?LOG_DEBUG("Worker flushing doc batch of size ~p bytes", [Size]);
+ {#db{}, true} ->
+ ?LOG_DEBUG("Worker flushing doc batch of ~p docs", [Size]);
+ _ ->
+ ok
+ end,
+ Parent = self(),
+ spawn_link(
+ fun() ->
+ Target2 = open_db(Target),
+ {Written, Failed} = flush_docs(Target2, DocList),
+ close_db(Target2),
+ ok = gen_server:call(
+ Parent, {add_write_stats, Written, Failed}, infinity)
+ end).
+
+
+after_full_flush(#state{cp = Cp, stats = Stats, flush_waiter = Waiter,
+ highest_seq_seen = HighSeqDone} = State) ->
+ ok = gen_server:cast(Cp, {report_seq_done, HighSeqDone, Stats}),
+ ?LOG_DEBUG("Worker reported completion of seq ~p", [HighSeqDone]),
+ gen_server:reply(Waiter, ok),
+ State#state{
+ stats = #rep_stats{},
+ flush_waiter = nil,
+ writer = nil,
+ batch = #batch{},
+ highest_seq_seen = ?LOWEST_SEQ
+ }.
+
+
+maybe_flush_docs(Doc, #state{target = Target, batch = Batch,
+ stats = Stats} = State) ->
+ {Batch2, W, F} = maybe_flush_docs(Target, Batch, Doc),
+ Stats2 = Stats#rep_stats{
+ docs_read = Stats#rep_stats.docs_read + 1,
+ docs_written = Stats#rep_stats.docs_written + W,
+ doc_write_failures = Stats#rep_stats.doc_write_failures + F
+ },
+ State#state{
+ stats = Stats2,
+ batch = Batch2
+ }.
+
+
+maybe_flush_docs(#httpdb{} = Target,
+ #batch{docs = DocAcc, size = SizeAcc} = Batch, #doc{atts = Atts} = Doc) ->
+ case (length(Atts) > ?MAX_BULK_ATTS_PER_DOC) orelse
+ lists:any(
+ fun(A) -> A#att.disk_len > ?MAX_BULK_ATT_SIZE end, Atts) of
+ true ->
+ ?LOG_DEBUG("Worker flushing doc with attachments", []),
+ case flush_doc(Target, Doc) of
+ ok ->
+ {Batch, 1, 0};
+ error ->
+ {Batch, 0, 1}
+ end;
+ false ->
+ JsonDoc = ?JSON_ENCODE(couch_doc:to_json_obj(Doc, [revs, attachments])),
+ case SizeAcc + iolist_size(JsonDoc) of
+ SizeAcc2 when SizeAcc2 > ?DOC_BUFFER_BYTE_SIZE ->
+ ?LOG_DEBUG("Worker flushing doc batch of size ~p bytes", [SizeAcc2]),
+ {Written, Failed} = flush_docs(Target, [JsonDoc | DocAcc]),
+ {#batch{}, Written, Failed};
+ SizeAcc2 ->
+ {#batch{docs = [JsonDoc | DocAcc], size = SizeAcc2}, 0, 0}
+ end
+ end;
+
+maybe_flush_docs(#db{} = Target, #batch{docs = DocAcc, size = SizeAcc},
+ #doc{atts = []} = Doc) ->
+ case SizeAcc + 1 of
+ SizeAcc2 when SizeAcc2 >= ?DOC_BUFFER_LEN ->
+ ?LOG_DEBUG("Worker flushing doc batch of ~p docs", [SizeAcc2]),
+ {Written, Failed} = flush_docs(Target, [Doc | DocAcc]),
+ {#batch{}, Written, Failed};
+ SizeAcc2 ->
+ {#batch{docs = [Doc | DocAcc], size = SizeAcc2}, 0, 0}
+ end;
+
+maybe_flush_docs(#db{} = Target, Batch, Doc) ->
+ ?LOG_DEBUG("Worker flushing doc with attachments", []),
+ case flush_doc(Target, Doc) of
+ ok ->
+ {Batch, 1, 0};
+ error ->
+ {Batch, 0, 1}
+ end.
+
+
+flush_docs(_Target, []) ->
+ {0, 0};
+
+flush_docs(Target, DocList) when is_list(DocList) ->
+ {ok, Errors} = couch_api_wrap:update_docs(
+ Target, DocList, [delay_commit], replicated_changes),
+ DbUri = couch_api_wrap:db_uri(Target),
+ lists:foreach(
+ fun({[ {<<"id">>, Id}, {<<"error">>, <<"unauthorized">>} ]}) ->
+ ?LOG_ERROR("Replicator: unauthorized to write document"
+ " `~s` to `~s`", [Id, DbUri]);
+ (_) ->
+ ok
+ end, Errors),
+ {length(DocList) - length(Errors), length(Errors)}.
+
+flush_doc(Target, Doc) ->
+ case couch_api_wrap:update_doc(Target, Doc, [], replicated_changes) of
+ {ok, _} ->
+ ok;
+ {error, <<"unauthorized">>} ->
+ ?LOG_ERROR("Replicator: unauthorized to write document `~s` to `~s`",
+ [Doc#doc.id, couch_api_wrap:db_uri(Target)]),
+ error;
+ _ ->
+ error
+ end.
Added: couchdb/trunk/src/couchdb/couch_replicator_rev_finder.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_replicator_rev_finder.erl?rev=1071375&view=auto
==============================================================================
--- couchdb/trunk/src/couchdb/couch_replicator_rev_finder.erl (added)
+++ couchdb/trunk/src/couchdb/couch_replicator_rev_finder.erl Wed Feb 16 20:05:31 2011
@@ -0,0 +1,88 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_rev_finder).
+
+-export([start_link/5]).
+
+-include("couch_db.hrl").
+
+-import(couch_replicator_utils, [
+ open_db/1,
+ close_db/1
+]).
+
+
+start_link(Cp, Target, ChangesQueue, MissingRevsQueue, BatchSize) ->
+ Pid = spawn_link(fun() ->
+ missing_revs_finder_loop(
+ Cp, Target, ChangesQueue, MissingRevsQueue, BatchSize)
+ end),
+ {ok, Pid}.
+
+
+missing_revs_finder_loop(Cp, Target, ChangesQueue, RevsQueue, BatchSize) ->
+ case couch_work_queue:dequeue(ChangesQueue, BatchSize) of
+ closed ->
+ ok;
+ {ok, DocInfos} ->
+ #doc_info{high_seq = ReportSeq} = lists:last(DocInfos),
+ ok = gen_server:cast(Cp, {report_seq, ReportSeq}),
+ ?LOG_DEBUG("Missing revs finder defined report seq ~p", [ReportSeq]),
+ IdRevs = [{Id, [Rev || #rev_info{rev = Rev} <- RevsInfo]} ||
+ #doc_info{id = Id, revs = RevsInfo} <- DocInfos],
+ Target2 = open_db(Target),
+ {ok, Missing} = couch_api_wrap:get_missing_revs(Target2, IdRevs),
+ close_db(Target2),
+ queue_missing_revs(Missing, DocInfos, RevsQueue),
+ missing_revs_finder_loop(Cp, Target2, ChangesQueue, RevsQueue, BatchSize)
+ end.
+
+
+queue_missing_revs(Missing, DocInfos, Queue) ->
+ IdRevsSeqDict = dict:from_list(
+ [{Id, {[Rev || #rev_info{rev = Rev} <- RevsInfo], Seq}} ||
+ #doc_info{id = Id, revs = RevsInfo, high_seq = Seq} <- DocInfos]),
+ AllDict = lists:foldl(
+ fun({Id, MissingRevs, PAs}, Acc) ->
+ {_, Seq} = dict:fetch(Id, IdRevsSeqDict),
+ dict:store(Seq, {Id, MissingRevs, 0, PAs}, Acc)
+ end,
+ dict:new(), Missing),
+ AllDict2 = dict:fold(
+ fun(Id, {NotMissingRevs, Seq}, Acc) ->
+ case dict:find(Seq, Acc) of
+ error ->
+ dict:store(Seq, {Id, [], length(NotMissingRevs), []}, Acc);
+ {ok, {Id, MissingRevs, NotMissingCount, PAs}} ->
+ NotMissingCount2 = NotMissingCount + length(NotMissingRevs),
+ dict:store(Seq, {Id, MissingRevs, NotMissingCount2, PAs}, Acc)
+ end
+ end,
+ AllDict, non_missing(IdRevsSeqDict, Missing)),
+ ?LOG_DEBUG("Missing revs finder adding batch of ~p IdRevs to work queue",
+ [dict:size(AllDict2)]),
+ ok = couch_work_queue:queue(Queue, dict:to_list(AllDict2)).
+
+
+non_missing(NonMissingDict, []) ->
+ NonMissingDict;
+non_missing(IdRevsSeqDict, [{MissingId, MissingRevs, _} | Rest]) ->
+ {AllRevs, Seq} = dict:fetch(MissingId, IdRevsSeqDict),
+ case AllRevs -- MissingRevs of
+ [] ->
+ non_missing(dict:erase(MissingId, IdRevsSeqDict), Rest);
+ NotMissing ->
+ non_missing(
+ dict:store(MissingId, {NotMissing, Seq}, IdRevsSeqDict),
+ Rest)
+ end.
Added: couchdb/trunk/src/couchdb/couch_replicator_utils.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_replicator_utils.erl?rev=1071375&view=auto
==============================================================================
--- couchdb/trunk/src/couchdb/couch_replicator_utils.erl (added)
+++ couchdb/trunk/src/couchdb/couch_replicator_utils.erl Wed Feb 16 20:05:31 2011
@@ -0,0 +1,382 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_utils).
+
+-export([parse_rep_doc/2]).
+-export([update_rep_doc/2]).
+-export([ensure_rep_db_exists/0]).
+-export([open_db/1, close_db/1]).
+-export([start_db_compaction_notifier/2, stop_db_compaction_notifier/1]).
+-export([replication_id/2]).
+
+-include("couch_db.hrl").
+-include("couch_api_wrap.hrl").
+-include("couch_replicator.hrl").
+-include("couch_js_functions.hrl").
+-include("../ibrowse/ibrowse.hrl").
+
+-import(couch_util, [
+ get_value/2,
+ get_value/3
+]).
+
+
+parse_rep_doc({Props} = RepObj, UserCtx) ->
+ ProxyParams = parse_proxy_params(get_value(<<"proxy">>, Props, <<>>)),
+ Options = make_options(Props),
+ Source = parse_rep_db(get_value(<<"source">>, Props), ProxyParams, Options),
+ Target = parse_rep_db(get_value(<<"target">>, Props), ProxyParams, Options),
+ Rep = #rep{
+ source = Source,
+ target = Target,
+ options = Options,
+ user_ctx = UserCtx,
+ doc = RepObj
+ },
+ {ok, Rep#rep{id = replication_id(Rep)}}.
+
+
+update_rep_doc({Props} = _RepDoc, KVs) ->
+ case get_value(<<"_id">>, Props) of
+ undefined ->
+ ok;
+ RepDocId ->
+ {ok, RepDb} = ensure_rep_db_exists(),
+ case couch_db:open_doc(RepDb, RepDocId, []) of
+ {ok, LatestRepDoc} ->
+ update_rep_doc(RepDb, LatestRepDoc, KVs);
+ _ ->
+ ok
+ end,
+ couch_db:close(RepDb)
+ end.
+
+update_rep_doc(RepDb, #doc{body = {RepDocBody}} = RepDoc, KVs) ->
+ NewRepDocBody = lists:foldl(
+ fun({<<"_replication_state">> = K, _V} = KV, Body) ->
+ Body1 = lists:keystore(K, 1, Body, KV),
+ {Mega, Secs, _} = erlang:now(),
+ UnixTime = Mega * 1000000 + Secs,
+ lists:keystore(
+ <<"_replication_state_time">>, 1,
+ Body1, {<<"_replication_state_time">>, UnixTime});
+ ({K, _V} = KV, Body) ->
+ lists:keystore(K, 1, Body, KV)
+ end,
+ RepDocBody,
+ KVs
+ ),
+ % might not succeed - when the replication doc is deleted right
+ % before this update (not an error)
+ couch_db:update_doc(
+ RepDb,
+ RepDoc#doc{body = {NewRepDocBody}},
+ []).
+
+
+ensure_rep_db_exists() ->
+ DbName = ?l2b(couch_config:get("replicator", "db", "_replicator")),
+ Opts = [
+ {user_ctx, #user_ctx{roles=[<<"_admin">>, <<"_replicator">>]}},
+ sys_db
+ ],
+ case couch_db:open(DbName, Opts) of
+ {ok, Db} ->
+ Db;
+ _Error ->
+ {ok, Db} = couch_db:create(DbName, Opts)
+ end,
+ ok = ensure_rep_ddoc_exists(Db, <<"_design/_replicator">>),
+ {ok, Db}.
+
+
+ensure_rep_ddoc_exists(RepDb, DDocID) ->
+ case couch_db:open_doc(RepDb, DDocID, []) of
+ {ok, _Doc} ->
+ ok;
+ _ ->
+ DDoc = couch_doc:from_json_obj({[
+ {<<"_id">>, DDocID},
+ {<<"language">>, <<"javascript">>},
+ {<<"validate_doc_update">>, ?REP_DB_DOC_VALIDATE_FUN}
+ ]}),
+ {ok, _Rev} = couch_db:update_doc(RepDb, DDoc, [])
+ end,
+ ok.
+
+
+replication_id(#rep{options = Options} = Rep) ->
+ BaseId = replication_id(Rep, ?REP_ID_VERSION),
+ {BaseId, maybe_append_options([continuous, create_target], Options)}.
+
+
+% Versioned clauses for generating replication IDs.
+% If a change is made to how replications are identified,
+% please add a new clause and increase ?REP_ID_VERSION.
+
+replication_id(#rep{user_ctx = UserCtx} = Rep, 2) ->
+ {ok, HostName} = inet:gethostname(),
+ Port = mochiweb_socket_server:get(couch_httpd, port),
+ Src = get_rep_endpoint(UserCtx, Rep#rep.source),
+ Tgt = get_rep_endpoint(UserCtx, Rep#rep.target),
+ maybe_append_filters([HostName, Port, Src, Tgt], Rep);
+
+replication_id(#rep{user_ctx = UserCtx} = Rep, 1) ->
+ {ok, HostName} = inet:gethostname(),
+ Src = get_rep_endpoint(UserCtx, Rep#rep.source),
+ Tgt = get_rep_endpoint(UserCtx, Rep#rep.target),
+ maybe_append_filters([HostName, Src, Tgt], Rep).
+
+
+maybe_append_filters(Base,
+ #rep{source = Source, user_ctx = UserCtx, options = Options}) ->
+ Base2 = Base ++
+ case get_value(filter, Options) of
+ undefined ->
+ case get_value(doc_ids, Options) of
+ undefined ->
+ [];
+ DocIds ->
+ [DocIds]
+ end;
+ Filter ->
+ [filter_code(Filter, Source, UserCtx),
+ get_value(query_params, Options, {[]})]
+ end,
+ couch_util:to_hex(couch_util:md5(term_to_binary(Base2))).
+
+
+filter_code(Filter, Source, UserCtx) ->
+ {match, [DDocName, FilterName]} =
+ re:run(Filter, "(.*?)/(.*)", [{capture, [1, 2], binary}]),
+ {ok, Db} = couch_api_wrap:db_open(Source, [{user_ctx, UserCtx}]),
+ try
+ {ok, #doc{body = Body}} =
+ couch_api_wrap:open_doc(Db, <<"_design/", DDocName/binary>>, []),
+ Code = couch_util:get_nested_json_value(
+ Body, [<<"filters">>, FilterName]),
+ re:replace(Code, [$^, "\s*(.*?)\s*", $$], "\\1", [{return, binary}])
+ after
+ couch_api_wrap:db_close(Db)
+ end.
+
+
+maybe_append_options(Options, RepOptions) ->
+ lists:foldl(fun(Option, Acc) ->
+ Acc ++
+ case get_value(Option, RepOptions, false) of
+ true ->
+ "+" ++ atom_to_list(Option);
+ false ->
+ ""
+ end
+ end, [], Options).
+
+
+get_rep_endpoint(_UserCtx, #httpdb{url=Url, headers=Headers, oauth=OAuth}) ->
+ DefaultHeaders = (#httpdb{})#httpdb.headers,
+ case OAuth of
+ nil ->
+ {remote, Url, Headers -- DefaultHeaders};
+ #oauth{} ->
+ {remote, Url, Headers -- DefaultHeaders, OAuth}
+ end;
+get_rep_endpoint(UserCtx, <<DbName/binary>>) ->
+ {local, DbName, UserCtx}.
+
+
+parse_rep_db({Props}, ProxyParams, Options) ->
+ Url = maybe_add_trailing_slash(get_value(<<"url">>, Props)),
+ {AuthProps} = get_value(<<"auth">>, Props, {[]}),
+ {BinHeaders} = get_value(<<"headers">>, Props, {[]}),
+ Headers = lists:ukeysort(1, [{?b2l(K), ?b2l(V)} || {K, V} <- BinHeaders]),
+ DefaultHeaders = (#httpdb{})#httpdb.headers,
+ OAuth = case get_value(<<"oauth">>, AuthProps) of
+ undefined ->
+ nil;
+ {OauthProps} ->
+ #oauth{
+ consumer_key = ?b2l(get_value(<<"consumer_key">>, OauthProps)),
+ token = ?b2l(get_value(<<"token">>, OauthProps)),
+ token_secret = ?b2l(get_value(<<"token_secret">>, OauthProps)),
+ consumer_secret = ?b2l(get_value(<<"consumer_secret">>, OauthProps)),
+ signature_method =
+ case get_value(<<"signature_method">>, OauthProps) of
+ undefined -> hmac_sha1;
+ <<"PLAINTEXT">> -> plaintext;
+ <<"HMAC-SHA1">> -> hmac_sha1;
+ <<"RSA-SHA1">> -> rsa_sha1
+ end
+ }
+ end,
+ #httpdb{
+ url = Url,
+ oauth = OAuth,
+ headers = lists:ukeymerge(1, Headers, DefaultHeaders),
+ ibrowse_options = lists:keysort(1,
+ [{socket_options, get_value(socket_options, Options)} |
+ ProxyParams ++ ssl_params(Url)]),
+ timeout = get_value(connection_timeout, Options),
+ http_connections = get_value(http_connections, Options),
+ http_pipeline_size = get_value(http_pipeline_size, Options)
+ };
+parse_rep_db(<<"http://", _/binary>> = Url, ProxyParams, Options) ->
+ parse_rep_db({[{<<"url">>, Url}]}, ProxyParams, Options);
+parse_rep_db(<<"https://", _/binary>> = Url, ProxyParams, Options) ->
+ parse_rep_db({[{<<"url">>, Url}]}, ProxyParams, Options);
+parse_rep_db(<<DbName/binary>>, _ProxyParams, _Options) ->
+ DbName.
+
+
+maybe_add_trailing_slash(Url) when is_binary(Url) ->
+ maybe_add_trailing_slash(?b2l(Url));
+maybe_add_trailing_slash(Url) ->
+ case lists:last(Url) of
+ $/ ->
+ Url;
+ _ ->
+ Url ++ "/"
+ end.
+
+
+make_options(Props) ->
+ Options = lists:ukeysort(1, convert_options(Props)),
+ DefWorkers = couch_config:get("replicator", "worker_processes", "4"),
+ DefBatchSize = couch_config:get("replicator", "worker_batch_size", "1000"),
+ DefConns = couch_config:get("replicator", "http_connections", "20"),
+ DefPipeSize = couch_config:get("replicator", "http_pipeline_size", "50"),
+ DefTimeout = couch_config:get("replicator", "connection_timeout", "30000"),
+ {ok, DefSocketOptions} = couch_util:parse_term(
+ couch_config:get("replicator", "socket_options",
+ "[{keepalive, true}, {nodelay, false}]")),
+ lists:ukeymerge(1, Options, [
+ {connection_timeout, list_to_integer(DefTimeout)},
+ {http_connections, list_to_integer(DefConns)},
+ {http_pipeline_size, list_to_integer(DefPipeSize)},
+ {socket_options, DefSocketOptions},
+ {worker_batch_size, list_to_integer(DefBatchSize)},
+ {worker_processes, list_to_integer(DefWorkers)}
+ ]).
+
+
+convert_options([])->
+ [];
+convert_options([{<<"cancel">>, V} | R]) ->
+ [{cancel, V} | convert_options(R)];
+convert_options([{<<"create_target">>, V} | R]) ->
+ [{create_target, V} | convert_options(R)];
+convert_options([{<<"continuous">>, V} | R]) ->
+ [{continuous, V} | convert_options(R)];
+convert_options([{<<"filter">>, V} | R]) ->
+ [{filter, V} | convert_options(R)];
+convert_options([{<<"query_params">>, V} | R]) ->
+ [{query_params, V} | convert_options(R)];
+convert_options([{<<"doc_ids">>, V} | R]) ->
+ % Ensure same behaviour as old replicator: accept a list of percent
+ % encoded doc IDs.
+ DocIds = [?l2b(couch_httpd:unquote(Id)) || Id <- V],
+ [{doc_ids, DocIds} | convert_options(R)];
+convert_options([{<<"worker_processes">>, V} | R]) ->
+ [{worker_processes, couch_util:to_integer(V)} | convert_options(R)];
+convert_options([{<<"worker_batch_size">>, V} | R]) ->
+ [{worker_batch_size, couch_util:to_integer(V)} | convert_options(R)];
+convert_options([{<<"http_connections">>, V} | R]) ->
+ [{http_connections, couch_util:to_integer(V)} | convert_options(R)];
+convert_options([{<<"http_pipeline_size">>, V} | R]) ->
+ [{http_pipeline_size, couch_util:to_integer(V)} | convert_options(R)];
+convert_options([{<<"connection_timeout">>, V} | R]) ->
+ [{connection_timeout, couch_util:to_integer(V)} | convert_options(R)];
+convert_options([{<<"socket_options">>, V} | R]) ->
+ {ok, SocketOptions} = couch_util:parse_term(V),
+ [{socket_options, SocketOptions} | convert_options(R)];
+convert_options([_ | R]) -> % skip unknown option
+ convert_options(R).
+
+
+parse_proxy_params(ProxyUrl) when is_binary(ProxyUrl) ->
+ parse_proxy_params(?b2l(ProxyUrl));
+parse_proxy_params([]) ->
+ [];
+parse_proxy_params(ProxyUrl) ->
+ #url{
+ host = Host,
+ port = Port,
+ username = User,
+ password = Passwd
+ } = ibrowse_lib:parse_url(ProxyUrl),
+ [{proxy_host, Host}, {proxy_port, Port}] ++
+ case is_list(User) andalso is_list(Passwd) of
+ false ->
+ [];
+ true ->
+ [{proxy_user, User}, {proxy_password, Passwd}]
+ end.
+
+
+ssl_params(Url) ->
+ case ibrowse_lib:parse_url(Url) of
+ #url{protocol = https} ->
+ Depth = list_to_integer(
+ couch_config:get("replicator", "ssl_certificate_max_depth", "3")
+ ),
+ VerifyCerts = couch_config:get("replicator", "verify_ssl_certificates"),
+ SslOpts = [{depth, Depth} | ssl_verify_options(VerifyCerts =:= "true")],
+ [{is_ssl, true}, {ssl_options, SslOpts}];
+ #url{protocol = http} ->
+ []
+ end.
+
+ssl_verify_options(Value) ->
+ ssl_verify_options(Value, erlang:system_info(otp_release)).
+
+ssl_verify_options(true, OTPVersion) when OTPVersion >= "R14" ->
+ CAFile = couch_config:get("replicator", "ssl_trusted_certificates_file"),
+ [{verify, verify_peer}, {cacertfile, CAFile}];
+ssl_verify_options(false, OTPVersion) when OTPVersion >= "R14" ->
+ [{verify, verify_none}];
+ssl_verify_options(true, _OTPVersion) ->
+ CAFile = couch_config:get("replicator", "ssl_trusted_certificates_file"),
+ [{verify, 2}, {cacertfile, CAFile}];
+ssl_verify_options(false, _OTPVersion) ->
+ [{verify, 0}].
+
+
+open_db(#db{name = Name, user_ctx = UserCtx, options = Options}) ->
+ {ok, Db} = couch_db:open(Name, [{user_ctx, UserCtx} | Options]),
+ Db;
+open_db(HttpDb) ->
+ HttpDb.
+
+
+close_db(#db{} = Db) ->
+ couch_db:close(Db);
+close_db(_HttpDb) ->
+ ok.
+
+
+start_db_compaction_notifier(#db{name = DbName}, Server) ->
+ {ok, Notifier} = couch_db_update_notifier:start_link(
+ fun({compacted, DbName1}) when DbName1 =:= DbName ->
+ ok = gen_server:cast(Server, {db_compacted, DbName});
+ (_) ->
+ ok
+ end),
+ Notifier;
+start_db_compaction_notifier(_, _) ->
+ nil.
+
+
+stop_db_compaction_notifier(nil) ->
+ ok;
+stop_db_compaction_notifier(Notifier) ->
+ couch_db_update_notifier:stop(Notifier).
Modified: couchdb/trunk/src/couchdb/couch_work_queue.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_work_queue.erl?rev=1071375&r1=1071374&r2=1071375&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_work_queue.erl (original)
+++ couchdb/trunk/src/couchdb/couch_work_queue.erl Wed Feb 16 20:05:31 2011
@@ -59,8 +59,8 @@ close(Wq) ->
init(Options) ->
Q = #q{
- max_size = couch_util:get_value(max_size, Options),
- max_items = couch_util:get_value(max_items, Options),
+ max_size = couch_util:get_value(max_size, Options, nil),
+ max_items = couch_util:get_value(max_items, Options, nil),
multi_workers = couch_util:get_value(multi_workers, Options, false)
},
{ok, Q}.
@@ -71,7 +71,7 @@ terminate(_Reason, #q{work_waiters=Worke
handle_call({queue, Item}, From, #q{work_waiters = []} = Q0) ->
- Q = Q0#q{size = Q0#q.size + byte_size(term_to_binary(Item)),
+ Q = Q0#q{size = increment_queue_size(Q0, Item),
items = Q0#q.items + 1,
queue = queue:in(Item, Q0#q.queue)},
case (Q#q.size >= Q#q.max_size) orelse
@@ -153,3 +153,10 @@ code_change(_OldVsn, State, _Extra) ->
handle_info(X, Q) ->
{stop, X, Q}.
+
+increment_queue_size(#q{max_size = nil, size = Size}, _Item) ->
+ Size;
+increment_queue_size(#q{size = Size}, Item) when is_binary(Item) ->
+ Size + byte_size(Item);
+increment_queue_size(#q{size = Size}, Item) ->
+ Size + byte_size(term_to_binary(Item)).
Added: couchdb/trunk/src/couchdb/json_stream_parse.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/json_stream_parse.erl?rev=1071375&view=auto
==============================================================================
--- couchdb/trunk/src/couchdb/json_stream_parse.erl (added)
+++ couchdb/trunk/src/couchdb/json_stream_parse.erl Wed Feb 16 20:05:31 2011
@@ -0,0 +1,432 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(json_stream_parse).
+
+
+-export([events/2, to_ejson/1, collect_object/2]).
+
+-define(IS_WS(X), (X == $\ orelse X == $\t orelse X == $\n orelse X == $\r)).
+-define(IS_DELIM(X), (X == $} orelse X == $] orelse X == $,)).
+-define(IS_DIGIT(X), (X >= $0 andalso X =< $9)).
+
+
+
+% Parses the json into events.
+%
+% The DataFun param is a function that produces the data for parsing. When
+% called it must yield a tuple, or the atom done. The first element in the
+% tuple is the data itself, and the second element is a function to be called
+% next to get the next chunk of data in the stream.
+%
+% The EventFun is called everytime a json element is parsed. It must produce
+% a new function to be called for the next event.
+%
+% Events happen each time a new element in the json string is parsed.
+% For simple value types, the data itself is returned:
+% Strings
+% Integers
+% Floats
+% true
+% false
+% null
+%
+% For arrays, the start of the array is signaled by the event array_start
+% atom. The end is signaled by array_end. The events before the end are the
+% values, or nested values.
+%
+% For objects, the start of the object is signaled by the event object_start
+% atom. The end is signaled by object_end. Each key is signaled by
+% {key, KeyString}, and the following event is the value, or start of the
+% value (array_start, object_start).
+%
+events(Data,EventFun) when is_list(Data)->
+ events(list_to_binary(Data),EventFun);
+events(Data,EventFun) when is_binary(Data)->
+ events(fun() -> {Data, fun() -> done end} end,EventFun);
+events(DataFun,EventFun) ->
+ parse_one(DataFun, EventFun, <<>>).
+
+% converts the JSON directly to the erlang represention of Json
+to_ejson(DF) ->
+ {_DF2, EF, _Rest} = events(DF, fun(Ev) -> collect_events(Ev, []) end),
+ [[EJson]] = make_ejson(EF(get_results), [[]]),
+ EJson.
+
+
+% This function is used to return complete objects while parsing streams.
+%
+% Return this function from inside an event function right after getting an
+% object_start event. It then collects the remaining events for that object
+% and converts it to the erlang represention of Json.
+%
+% It then calls your ReturnControl function with the erlang object. Your
+% return control function then should yield another event function.
+%
+% This example stream parses an array of objects, calling
+% fun do_something_with_the_object/1 for each object.
+%
+% ev_array(array_start) ->
+% fun(Ev) -> ev_object_loop(Ev) end.
+%
+% ev_object_loop(object_start) ->
+% fun(Ev) ->
+% json_stream_parse:collect_object(Ev,
+% fun(Obj) ->
+% do_something_with_the_object(Obj),
+% fun(Ev2) -> ev_object_loop(Ev2) end
+% end)
+% end;
+% ev_object_loop(array_end) ->
+% ok
+% end.
+%
+% % invoke the parse
+% main() ->
+% ...
+% events(Data, fun(Ev) -> ev_array(Ev) end).
+
+collect_object(Ev, ReturnControl) ->
+ collect_object(Ev, 0, ReturnControl, [object_start]).
+
+
+
+% internal methods
+
+parse_one(DF,EF,Acc) ->
+ case toke(DF, Acc) of
+ none ->
+ none;
+ {Token, DF2, Rest} ->
+ case Token of
+ "{" ->
+ EF2 = EF(object_start),
+ {DF3, EF3, Rest2} = parse_object(DF2, EF2, Rest),
+ {DF3, EF3(object_end), Rest2};
+ "[" ->
+ EF2 = EF(array_start),
+ {DF3, EF3, Rest2} = parse_array(DF2, EF2, Rest),
+ {DF3, EF3(array_end), Rest2};
+ Int when is_integer(Int)->
+ {DF2, EF(Int), Rest};
+ Float when is_float(Float)->
+ {DF2, EF(Float), Rest};
+ Atom when is_atom(Atom)->
+ {DF2, EF(Atom), Rest};
+ String when is_binary(String)->
+ {DF2, EF(String), Rest};
+ _OtherToken ->
+ err(unexpected_token)
+ end
+ end.
+
+must_parse_one(DF,EF,Acc,Error)->
+ case parse_one(DF, EF, Acc) of
+ none ->
+ err(Error);
+ Else ->
+ Else
+ end.
+
+must_toke(DF, Data, Error) ->
+ case toke(DF, Data) of
+ none ->
+ err(Error);
+ Result ->
+ Result
+ end.
+
+toke(DF, <<>>) ->
+ case DF() of
+ done ->
+ none;
+ {Data, DF2} ->
+ toke(DF2, Data)
+ end;
+toke(DF, <<C,Rest/binary>>) when ?IS_WS(C)->
+ toke(DF, Rest);
+toke(DF, <<${,Rest/binary>>) ->
+ {"{", DF, Rest};
+toke(DF, <<$},Rest/binary>>) ->
+ {"}", DF, Rest};
+toke(DF, <<$[,Rest/binary>>) ->
+ {"[", DF, Rest};
+toke(DF, <<$],Rest/binary>>) ->
+ {"]", DF, Rest};
+toke(DF, <<$",Rest/binary>>) ->
+ toke_string(DF,Rest,[]);
+toke(DF, <<$,,Rest/binary>>) ->
+ {",", DF, Rest};
+toke(DF, <<$:,Rest/binary>>) ->
+ {":", DF, Rest};
+toke(DF, <<$-,Rest/binary>>) ->
+ {<<C,_/binary>> = Data, DF2} = must_df(DF,1,Rest,expected_number),
+ case ?IS_DIGIT(C) of
+ true ->
+ toke_number_leading(DF2, Data, "-");
+ false ->
+ err(expected_number)
+ end;
+toke(DF, <<C,_/binary>> = Data) when ?IS_DIGIT(C) ->
+ toke_number_leading(DF, Data, []);
+toke(DF, <<$t,Rest/binary>>) ->
+ {Data, DF2} = must_match(<<"rue">>, DF, Rest),
+ {true, DF2, Data};
+toke(DF, <<$f,Rest/binary>>) ->
+ {Data, DF2} = must_match(<<"alse">>, DF, Rest),
+ {false, DF2, Data};
+toke(DF, <<$n,Rest/binary>>) ->
+ {Data, DF2} = must_match(<<"ull">>, DF, Rest),
+ {null, DF2, Data};
+toke(_, _) ->
+ err(bad_token).
+
+
+must_match(Pattern, DF, Data) ->
+ Size = size(Pattern),
+ case must_df(DF, Size, Data, bad_token) of
+ {<<Pattern:Size/binary,Data2/binary>>, DF2} ->
+ {Data2, DF2};
+ {_, _} ->
+ err(bad_token)
+ end.
+
+must_df(DF,Error)->
+ case DF() of
+ done ->
+ err(Error);
+ {Data, DF2} ->
+ {Data, DF2}
+ end.
+
+
+must_df(DF,NeedLen,Acc,Error)->
+ if size(Acc) >= NeedLen ->
+ {Acc, DF};
+ true ->
+ case DF() of
+ done ->
+ err(Error);
+ {Data, DF2} ->
+ must_df(DF2, NeedLen, <<Acc/binary, Data/binary>>, Error)
+ end
+ end.
+
+
+parse_object(DF,EF,Acc) ->
+ case must_toke(DF, Acc, unterminated_object) of
+ {String, DF2, Rest} when is_binary(String)->
+ EF2 = EF({key,String}),
+ case must_toke(DF2,Rest,unterminated_object) of
+ {":", DF3, Rest2} ->
+ {DF4, EF3, Rest3} = must_parse_one(DF3, EF2, Rest2, expected_value),
+ case must_toke(DF4,Rest3, unterminated_object) of
+ {",", DF5, Rest4} ->
+ parse_object(DF5, EF3, Rest4);
+ {"}", DF5, Rest4} ->
+ {DF5, EF3, Rest4};
+ {_, _, _} ->
+ err(unexpected_token)
+ end;
+ _Else ->
+ err(expected_colon)
+ end;
+ {"}", DF2, Rest} ->
+ {DF2, EF, Rest};
+ {_, _, _} ->
+ err(unexpected_token)
+ end.
+
+parse_array0(DF,EF,Acc) ->
+ case toke(DF, Acc) of
+ none ->
+ err(unterminated_array);
+ {",", DF2, Rest} ->
+ parse_array(DF2,EF,Rest);
+ {"]", DF2, Rest} ->
+ {DF2,EF,Rest};
+ _ ->
+ err(unexpected_token)
+ end.
+
+parse_array(DF,EF,Acc) ->
+ case toke(DF, Acc) of
+ none ->
+ err(unterminated_array);
+ {Token, DF2, Rest} ->
+ case Token of
+ "{" ->
+ EF2 = EF(object_start),
+ {DF3, EF3, Rest2} = parse_object(DF2, EF2, Rest),
+ parse_array0(DF3, EF3(object_end), Rest2);
+ "[" ->
+ EF2 = EF(array_start),
+ {DF3, EF3, Rest2} = parse_array(DF2, EF2, Rest),
+ parse_array0(DF3, EF3(array_end), Rest2);
+ Int when is_integer(Int)->
+ parse_array0(DF2, EF(Int), Rest);
+ Float when is_float(Float)->
+ parse_array0(DF2, EF(Float), Rest);
+ Atom when is_atom(Atom)->
+ parse_array0(DF2, EF(Atom), Rest);
+ String when is_binary(String)->
+ parse_array0(DF2, EF(String), Rest);
+ "]" ->
+ {DF2, EF, Rest};
+ _ ->
+ err(unexpected_token)
+ end
+ end.
+
+
+toke_string(DF, <<>>, Acc) ->
+ {Data, DF2} = must_df(DF, unterminated_string),
+ toke_string(DF2, Data, Acc);
+toke_string(DF, <<$\\,$",Rest/binary>>, Acc) ->
+ toke_string(DF, Rest, [$" | Acc]);
+toke_string(DF, <<$\\,$\\,Rest/binary>>, Acc) ->
+ toke_string(DF, Rest, [$\\ | Acc]);
+toke_string(DF, <<$\\,$/,Rest/binary>>, Acc) ->
+ toke_string(DF, Rest, [$/ | Acc]);
+toke_string(DF, <<$\\,$b,Rest/binary>>, Acc) ->
+ toke_string(DF, Rest, [$\b | Acc]);
+toke_string(DF, <<$\\,$f,Rest/binary>>, Acc) ->
+ toke_string(DF, Rest, [$\f | Acc]);
+toke_string(DF, <<$\\,$n,Rest/binary>>, Acc) ->
+ toke_string(DF, Rest, [$\n | Acc]);
+toke_string(DF, <<$\\,$r,Rest/binary>>, Acc) ->
+ toke_string(DF, Rest, [$\r | Acc]);
+toke_string(DF, <<$\\,$t,Rest/binary>>, Acc) ->
+ toke_string(DF, Rest, [$\t | Acc]);
+toke_string(DF, <<$\\,$u,Rest/binary>>, Acc) ->
+ {<<A,B,C,D,Data/binary>>, DF2} = must_df(DF,4,Rest,missing_hex),
+ UTFChar = erlang:list_to_integer([A, B, C, D], 16),
+ if UTFChar == 16#FFFF orelse UTFChar == 16#FFFE ->
+ err(invalid_utf_char);
+ true ->
+ ok
+ end,
+ Chars = xmerl_ucs:to_utf8(UTFChar),
+ toke_string(DF2, Data, lists:reverse(Chars) ++ Acc);
+toke_string(DF, <<$\\>>, Acc) ->
+ {Data, DF2} = must_df(DF, unterminated_string),
+ toke_string(DF2, <<$\\,Data/binary>>, Acc);
+toke_string(_DF, <<$\\, _/binary>>, _Acc) ->
+ err(bad_escape);
+toke_string(DF, <<$", Rest/binary>>, Acc) ->
+ {list_to_binary(lists:reverse(Acc)), DF, Rest};
+toke_string(DF, <<C, Rest/binary>>, Acc) ->
+ toke_string(DF, Rest, [C | Acc]).
+
+
+toke_number_leading(DF, <<Digit,Rest/binary>>, Acc)
+ when ?IS_DIGIT(Digit) ->
+ toke_number_leading(DF, Rest, [Digit | Acc]);
+toke_number_leading(DF, <<C,_/binary>>=Rest, Acc)
+ when ?IS_WS(C) orelse ?IS_DELIM(C) ->
+ {list_to_integer(lists:reverse(Acc)), DF, Rest};
+toke_number_leading(DF, <<>>, Acc) ->
+ case DF() of
+ done ->
+ {list_to_integer(lists:reverse(Acc)), fun() -> done end, <<>>};
+ {Data, DF2} ->
+ toke_number_leading(DF2, Data, Acc)
+ end;
+toke_number_leading(DF, <<$., Rest/binary>>, Acc) ->
+ toke_number_trailing(DF, Rest, [$.|Acc]);
+toke_number_leading(DF, <<$e, Rest/binary>>, Acc) ->
+ toke_number_exponent(DF, Rest, [$e, $0, $.|Acc]);
+toke_number_leading(DF, <<$E, Rest/binary>>, Acc) ->
+ toke_number_exponent(DF, Rest, [$e, $0, $.|Acc]);
+toke_number_leading(_, _, _) ->
+ err(unexpected_character_in_number).
+
+toke_number_trailing(DF, <<Digit,Rest/binary>>, Acc)
+ when ?IS_DIGIT(Digit) ->
+ toke_number_trailing(DF, Rest, [Digit | Acc]);
+toke_number_trailing(DF, <<C,_/binary>>=Rest, Acc)
+ when ?IS_WS(C) orelse ?IS_DELIM(C) ->
+ {list_to_float(lists:reverse(Acc)), DF, Rest};
+toke_number_trailing(DF, <<>>, Acc) ->
+ case DF() of
+ done ->
+ {list_to_float(lists:reverse(Acc)), fun() -> done end, <<>>};
+ {Data, DF2} ->
+ toke_number_trailing(DF2, Data, Acc)
+ end;
+toke_number_trailing(DF, <<"e", Rest/binary>>, [C|_]=Acc) when C /= $. ->
+ toke_number_exponent(DF, Rest, [$e|Acc]);
+toke_number_trailing(DF, <<"E", Rest/binary>>, [C|_]=Acc) when C /= $. ->
+ toke_number_exponent(DF, Rest, [$e|Acc]);
+toke_number_trailing(_, _, _) ->
+ err(unexpected_character_in_number).
+
+
+toke_number_exponent(DF, <<Digit,Rest/binary>>, Acc) when ?IS_DIGIT(Digit) ->
+ toke_number_exponent(DF, Rest, [Digit | Acc]);
+toke_number_exponent(DF, <<Sign,Rest/binary>>, [$e|_]=Acc)
+ when Sign == $+ orelse Sign == $- ->
+ toke_number_exponent(DF, Rest, [Sign | Acc]);
+toke_number_exponent(DF, <<C,_/binary>>=Rest, Acc)
+ when ?IS_WS(C) orelse ?IS_DELIM(C) ->
+ {list_to_float(lists:reverse(Acc)), DF, Rest};
+toke_number_exponent(DF, <<>>, Acc) ->
+ case DF() of
+ done ->
+ {list_to_float(lists:reverse(Acc)), fun() -> done end, <<>>};
+ {Data, DF2} ->
+ toke_number_exponent(DF2, Data, Acc)
+ end;
+toke_number_exponent(_, _, _) ->
+ err(unexpected_character_in_number).
+
+
+err(Error)->
+ throw({parse_error,Error}).
+
+
+make_ejson([], Stack) ->
+ Stack;
+make_ejson([array_start | RevEvs], [ArrayValues, PrevValues | RestStack]) ->
+ make_ejson(RevEvs, [[ArrayValues | PrevValues] | RestStack]);
+make_ejson([array_end | RevEvs], Stack) ->
+ make_ejson(RevEvs, [[] | Stack]);
+make_ejson([object_start | RevEvs], [ObjValues, PrevValues | RestStack]) ->
+ make_ejson(RevEvs, [[{ObjValues} | PrevValues] | RestStack]);
+make_ejson([object_end | RevEvs], Stack) ->
+ make_ejson(RevEvs, [[] | Stack]);
+make_ejson([{key, String} | RevEvs], [[PrevValue|RestObject] | RestStack] = _Stack) ->
+ make_ejson(RevEvs, [[{String, PrevValue}|RestObject] | RestStack]);
+make_ejson([Value | RevEvs], [Vals | RestStack] = _Stack) ->
+ make_ejson(RevEvs, [[Value | Vals] | RestStack]).
+
+collect_events(get_results, Acc) ->
+ Acc;
+collect_events(Ev, Acc) ->
+ fun(NextEv) -> collect_events(NextEv, [Ev | Acc]) end.
+
+
+collect_object(object_end, 0, ReturnControl, Acc) ->
+ [[Obj]] = make_ejson([object_end | Acc], [[]]),
+ ReturnControl(Obj);
+collect_object(object_end, NestCount, ReturnControl, Acc) ->
+ fun(Ev) ->
+ collect_object(Ev, NestCount - 1, ReturnControl, [object_end | Acc])
+ end;
+collect_object(object_start, NestCount, ReturnControl, Acc) ->
+ fun(Ev) ->
+ collect_object(Ev, NestCount + 1, ReturnControl, [object_start | Acc])
+ end;
+collect_object(Ev, NestCount, ReturnControl, Acc) ->
+ fun(Ev2) ->
+ collect_object(Ev2, NestCount, ReturnControl, [Ev | Acc])
+ end.
Modified: couchdb/trunk/test/etap/001-load.t
URL: http://svn.apache.org/viewvc/couchdb/trunk/test/etap/001-load.t?rev=1071375&r1=1071374&r2=1071375&view=diff
==============================================================================
--- couchdb/trunk/test/etap/001-load.t (original)
+++ couchdb/trunk/test/etap/001-load.t Wed Feb 16 20:05:31 2011
@@ -17,7 +17,7 @@
main(_) ->
test_util:init_code_path(),
- etap:plan(37),
+ etap:plan(40),
Modules = [
couch_btree,
couch_config,
@@ -43,7 +43,10 @@ main(_) ->
couch_os_process,
couch_query_servers,
couch_ref_counter,
- couch_rep,
+ couch_replicator,
+ couch_replicator_doc_copier,
+ couch_replicator_rev_finder,
+ couch_replicator_utils,
couch_rep_sup,
couch_server,
couch_server_sup,
Added: couchdb/trunk/test/etap/190-json-stream-parse.t
URL: http://svn.apache.org/viewvc/couchdb/trunk/test/etap/190-json-stream-parse.t?rev=1071375&view=auto
==============================================================================
--- couchdb/trunk/test/etap/190-json-stream-parse.t (added)
+++ couchdb/trunk/test/etap/190-json-stream-parse.t Wed Feb 16 20:05:31 2011
@@ -0,0 +1,184 @@
+#!/usr/bin/env escript
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+main(_) ->
+ test_util:init_code_path(),
+ etap:plan(99),
+ case (catch test()) of
+ ok ->
+ etap:end_tests();
+ Other ->
+ etap:diag("Test died abnormally: ~p", [Other]),
+ etap:bail("Bad return value.")
+ end,
+ ok.
+
+test() ->
+ crypto:start(),
+ ok = test_raw_json_input(),
+ ok = test_1_byte_data_function(),
+ ok = test_multiple_bytes_data_function().
+
+
+test_raw_json_input() ->
+ etap:diag("Tests with raw JSON string as the input."),
+ lists:foreach(
+ fun({EJson, JsonString, Desc}) ->
+ etap:is(
+ equiv(EJson, json_stream_parse:to_ejson(JsonString)),
+ true,
+ Desc)
+ end,
+ cases()),
+ ok.
+
+
+test_1_byte_data_function() ->
+ etap:diag("Tests with a 1 byte output data function as the input."),
+ lists:foreach(
+ fun({EJson, JsonString, Desc}) ->
+ DataFun = fun() -> single_byte_data_fun(JsonString) end,
+ etap:is(
+ equiv(EJson, json_stream_parse:to_ejson(DataFun)),
+ true,
+ Desc)
+ end,
+ cases()),
+ ok.
+
+
+test_multiple_bytes_data_function() ->
+ etap:diag("Tests with a multiple bytes output data function as the input."),
+ lists:foreach(
+ fun({EJson, JsonString, Desc}) ->
+ DataFun = fun() -> multiple_bytes_data_fun(JsonString) end,
+ etap:is(
+ equiv(EJson, json_stream_parse:to_ejson(DataFun)),
+ true,
+ Desc)
+ end,
+ cases()),
+ ok.
+
+
+cases() ->
+ [
+ {1, "1", "integer numeric literial"},
+ {3.1416, "3.14160", "float numeric literal"}, % text representation may truncate, trail zeroes
+ {-1, "-1", "negative integer numeric literal"},
+ {-3.1416, "-3.14160", "negative float numeric literal"},
+ {12.0e10, "1.20000e+11", "float literal in scientific notation"},
+ {1.234E+10, "1.23400e+10", "another float literal in scientific notation"},
+ {-1.234E-10, "-1.23400e-10", "negative float literal in scientific notation"},
+ {10.0, "1.0e+01", "yet another float literal in scientific notation"},
+ {123.456, "1.23456E+2", "yet another float literal in scientific notation"},
+ {10.0, "1e1", "yet another float literal in scientific notation"},
+ {<<"foo">>, "\"foo\"", "string literal"},
+ {<<"foo", 5, "bar">>, "\"foo\\u0005bar\"", "string literal with \\u0005"},
+ {<<"">>, "\"\"", "empty string literal"},
+ {<<"\n\n\n">>, "\"\\n\\n\\n\"", "only new lines literal"},
+ {<<"\" \b\f\r\n\t\"">>, "\"\\\" \\b\\f\\r\\n\\t\\\"\"",
+ "only white spaces string literal"},
+ {null, "null", "null literal"},
+ {true, "true", "true literal"},
+ {false, "false", "false literal"},
+ {<<"null">>, "\"null\"", "null string literal"},
+ {<<"true">>, "\"true\"", "true string literal"},
+ {<<"false">>, "\"false\"", "false string literal"},
+ {{[]}, "{}", "empty object literal"},
+ {{[{<<"foo">>, <<"bar">>}]}, "{\"foo\":\"bar\"}",
+ "simple object literal"},
+ {{[{<<"foo">>, <<"bar">>}, {<<"baz">>, 123}]},
+ "{\"foo\":\"bar\",\"baz\":123}", "another simple object literal"},
+ {[], "[]", "empty array literal"},
+ {[[]], "[[]]", "empty array literal inside a single element array literal"},
+ {[1, <<"foo">>], "[1,\"foo\"]", "simple non-empty array literal"},
+ {[1199344435545.0, 1], "[1199344435545.0,1]",
+ "another simple non-empty array literal"},
+ {[false, true, 321, null], "[false, true, 321, null]", "array of literals"},
+ {{[{<<"foo">>, [123]}]}, "{\"foo\":[123]}",
+ "object literal with an array valued property"},
+ {{[{<<"foo">>, {[{<<"bar">>, true}]}}]},
+ "{\"foo\":{\"bar\":true}}", "nested object literal"},
+ {{[{<<"foo">>, []}, {<<"bar">>, {[{<<"baz">>, true}]}},
+ {<<"alice">>, <<"bob">>}]},
+ "{\"foo\":[],\"bar\":{\"baz\":true},\"alice\":\"bob\"}",
+ "complex object literal"},
+ {[-123, <<"foo">>, {[{<<"bar">>, []}]}, null],
+ "[-123,\"foo\",{\"bar\":[]},null]",
+ "complex array literal"}
+ ].
+
+
+%% Test for equivalence of Erlang terms.
+%% Due to arbitrary order of construction, equivalent objects might
+%% compare unequal as erlang terms, so we need to carefully recurse
+%% through aggregates (tuples and objects).
+equiv({Props1}, {Props2}) ->
+ equiv_object(Props1, Props2);
+equiv(L1, L2) when is_list(L1), is_list(L2) ->
+ equiv_list(L1, L2);
+equiv(N1, N2) when is_number(N1), is_number(N2) ->
+ N1 == N2;
+equiv(B1, B2) when is_binary(B1), is_binary(B2) ->
+ B1 == B2;
+equiv(true, true) ->
+ true;
+equiv(false, false) ->
+ true;
+equiv(null, null) ->
+ true.
+
+
+%% Object representation and traversal order is unknown.
+%% Use the sledgehammer and sort property lists.
+equiv_object(Props1, Props2) ->
+ L1 = lists:keysort(1, Props1),
+ L2 = lists:keysort(1, Props2),
+ Pairs = lists:zip(L1, L2),
+ true = lists:all(
+ fun({{K1, V1}, {K2, V2}}) ->
+ equiv(K1, K2) andalso equiv(V1, V2)
+ end,
+ Pairs).
+
+
+%% Recursively compare tuple elements for equivalence.
+equiv_list([], []) ->
+ true;
+equiv_list([V1 | L1], [V2 | L2]) ->
+ equiv(V1, V2) andalso equiv_list(L1, L2).
+
+
+single_byte_data_fun([]) ->
+ done;
+single_byte_data_fun([H | T]) ->
+ {<<H>>, fun() -> single_byte_data_fun(T) end}.
+
+
+multiple_bytes_data_fun([]) ->
+ done;
+multiple_bytes_data_fun(L) ->
+ N = crypto:rand_uniform(0, 7),
+ {Part, Rest} = split(L, N),
+ {list_to_binary(Part), fun() -> multiple_bytes_data_fun(Rest) end}.
+
+split(L, N) when length(L) =< N ->
+ {L, []};
+split(L, N) ->
+ take(N, L, []).
+
+take(0, L, Acc) ->
+ {lists:reverse(Acc), L};
+take(N, [H|L], Acc) ->
+ take(N - 1, L, [H | Acc]).
Modified: couchdb/trunk/test/etap/Makefile.am
URL: http://svn.apache.org/viewvc/couchdb/trunk/test/etap/Makefile.am?rev=1071375&r1=1071374&r2=1071375&view=diff
==============================================================================
--- couchdb/trunk/test/etap/Makefile.am (original)
+++ couchdb/trunk/test/etap/Makefile.am Wed Feb 16 20:05:31 2011
@@ -57,10 +57,6 @@ EXTRA_DIST = \
083-config-no-files.t \
090-task-status.t \
100-ref-counter.t \
- 110-replication-httpc.t \
- 111-replication-changes-feed.t \
- 112-replication-missing-revs.t \
- 113-replication-attachment-comp.t \
120-stats-collect.t \
121-stats-aggregates.cfg \
121-stats-aggregates.ini \
@@ -81,4 +77,5 @@ EXTRA_DIST = \
173-os-daemon-cfg-register.es \
173-os-daemon-cfg-register.t \
180-http-proxy.ini \
- 180-http-proxy.t
+ 180-http-proxy.t \
+ 190-json-stream-parse.t
|