Added: couchdb/trunk/src/couchdb/couch_replicator_doc_copier.erl URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_replicator_doc_copier.erl?rev=1071375&view=auto ============================================================================== --- couchdb/trunk/src/couchdb/couch_replicator_doc_copier.erl (added) +++ couchdb/trunk/src/couchdb/couch_replicator_doc_copier.erl Wed Feb 16 20:05:31 2011 @@ -0,0 +1,483 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_replicator_doc_copier). +-behaviour(gen_server). + +% public API +-export([start_link/5]). + +% gen_server callbacks +-export([init/1, terminate/2, code_change/3]). +-export([handle_call/3, handle_cast/2, handle_info/2]). + +-include("couch_db.hrl"). +-include("couch_api_wrap.hrl"). +-include("couch_replicator.hrl"). + +% TODO: maybe make both buffer max sizes configurable +-define(DOC_BUFFER_BYTE_SIZE, 512 * 1024). % for remote targets +-define(DOC_BUFFER_LEN, 10). % for local targets, # of documents +-define(MAX_BULK_ATT_SIZE, 64 * 1024). +-define(MAX_BULK_ATTS_PER_DOC, 8). + +-import(couch_replicator_utils, [ + open_db/1, + close_db/1, + start_db_compaction_notifier/2, + stop_db_compaction_notifier/1 +]). + +-record(batch, { + docs = [], + size = 0 +}). + +-record(state, { + loop, + cp, + max_parallel_conns, + source, + target, + readers = [], + writer = nil, + pending_fetch = nil, + flush_waiter = nil, + highest_seq_seen = ?LOWEST_SEQ, + stats = #rep_stats{}, + source_db_compaction_notifier = nil, + target_db_compaction_notifier = nil, + batch = #batch{} +}). + + + +start_link(Cp, #db{} = Source, Target, MissingRevsQueue, _MaxConns) -> + Pid = spawn_link( + fun() -> queue_fetch_loop(Source, Target, Cp, MissingRevsQueue) end), + {ok, Pid}; + +start_link(Cp, Source, Target, MissingRevsQueue, MaxConns) -> + gen_server:start_link( + ?MODULE, {Cp, Source, Target, MissingRevsQueue, MaxConns}, []). + + +init({Cp, Source, Target, MissingRevsQueue, MaxConns}) -> + process_flag(trap_exit, true), + Parent = self(), + LoopPid = spawn_link( + fun() -> queue_fetch_loop(Source, Target, Parent, MissingRevsQueue) end + ), + State = #state{ + cp = Cp, + max_parallel_conns = MaxConns, + loop = LoopPid, + source = open_db(Source), + target = open_db(Target), + source_db_compaction_notifier = + start_db_compaction_notifier(Source, self()), + target_db_compaction_notifier = + start_db_compaction_notifier(Target, self()) + }, + {ok, State}. + + +handle_call({seq_done, Seq, RevCount}, {Pid, _}, + #state{loop = Pid, highest_seq_seen = HighSeq, stats = Stats} = State) -> + NewState = State#state{ + highest_seq_seen = lists:max([Seq, HighSeq]), + stats = Stats#rep_stats{ + missing_checked = Stats#rep_stats.missing_checked + RevCount + } + }, + {reply, ok, NewState}; + +handle_call({fetch_doc, {_Id, Revs, _PAs, Seq} = Params}, {Pid, _} = From, + #state{loop = Pid, readers = Readers, pending_fetch = nil, + highest_seq_seen = HighSeq, stats = Stats, source = Src, target = Tgt, + max_parallel_conns = MaxConns} = State) -> + Stats2 = Stats#rep_stats{ + missing_checked = Stats#rep_stats.missing_checked + length(Revs), + missing_found = Stats#rep_stats.missing_found + length(Revs) + }, + case length(Readers) of + Size when Size < MaxConns -> + Reader = spawn_doc_reader(Src, Tgt, Params), + NewState = State#state{ + highest_seq_seen = lists:max([Seq, HighSeq]), + stats = Stats2, + readers = [Reader | Readers] + }, + {reply, ok, NewState}; + _ -> + NewState = State#state{ + highest_seq_seen = lists:max([Seq, HighSeq]), + stats = Stats2, + pending_fetch = {From, Params} + }, + {noreply, NewState} + end; + +handle_call({batch_doc, Doc}, From, State) -> + gen_server:reply(From, ok), + {noreply, maybe_flush_docs(Doc, State)}; + +handle_call({doc_flushed, true}, _From, #state{stats = Stats} = State) -> + NewStats = Stats#rep_stats{ + docs_read = Stats#rep_stats.docs_read + 1, + docs_written = Stats#rep_stats.docs_written + 1 + }, + {reply, ok, State#state{stats = NewStats}}; + +handle_call({doc_flushed, false}, _From, #state{stats = Stats} = State) -> + NewStats = Stats#rep_stats{ + docs_read = Stats#rep_stats.docs_read + 1, + doc_write_failures = Stats#rep_stats.doc_write_failures + 1 + }, + {reply, ok, State#state{stats = NewStats}}; + +handle_call({add_write_stats, Written, Failed}, _From, + #state{stats = Stats} = State) -> + NewStats = Stats#rep_stats{ + docs_written = Stats#rep_stats.docs_written + Written, + doc_write_failures = Stats#rep_stats.doc_write_failures + Failed + }, + {reply, ok, State#state{stats = NewStats}}; + +handle_call(flush, {Pid, _} = From, + #state{loop = Pid, writer = nil, flush_waiter = nil, + target = Target, batch = Batch} = State) -> + State2 = case State#state.readers of + [] -> + State#state{writer = spawn_writer(Target, Batch)}; + _ -> + State + end, + {noreply, State2#state{flush_waiter = From}}. + + +handle_cast({db_compacted, DbName}, + #state{source = #db{name = DbName} = Source} = State) -> + {ok, NewSource} = couch_db:reopen(Source), + {noreply, State#state{source = NewSource}}; + +handle_cast({db_compacted, DbName}, + #state{target = #db{name = DbName} = Target} = State) -> + {ok, NewTarget} = couch_db:reopen(Target), + {noreply, State#state{target = NewTarget}}; + +handle_cast(Msg, State) -> + {stop, {unexpected_async_call, Msg}, State}. + + +handle_info({'EXIT', Pid, normal}, #state{loop = Pid} = State) -> + #state{ + batch = #batch{docs = []}, readers = [], writer = nil, + pending_fetch = nil, flush_waiter = nil + } = State, + {stop, normal, State}; + +handle_info({'EXIT', Pid, normal}, #state{writer = Pid} = State) -> + {noreply, after_full_flush(State)}; + +handle_info({'EXIT', Pid, normal}, #state{writer = nil} = State) -> + #state{ + readers = Readers, writer = Writer, batch = Batch, + source = Source, target = Target, + pending_fetch = Fetch, flush_waiter = FlushWaiter + } = State, + case Readers -- [Pid] of + Readers -> + {noreply, State}; + Readers2 -> + State2 = case Fetch of + nil -> + case (FlushWaiter =/= nil) andalso (Writer =:= nil) andalso + (Readers2 =:= []) of + true -> + State#state{ + readers = Readers2, + writer = spawn_writer(Target, Batch) + }; + false -> + State#state{readers = Readers2} + end; + {From, FetchParams} -> + Reader = spawn_doc_reader(Source, Target, FetchParams), + gen_server:reply(From, ok), + State#state{ + readers = [Reader | Readers2], + pending_fetch = nil + } + end, + {noreply, State2} + end; + +handle_info({'EXIT', Pid, Reason}, State) -> + {stop, {process_died, Pid, Reason}, State}. + + +terminate(_Reason, State) -> + close_db(State#state.source), + close_db(State#state.target), + stop_db_compaction_notifier(State#state.source_db_compaction_notifier), + stop_db_compaction_notifier(State#state.target_db_compaction_notifier). + + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + + +queue_fetch_loop(Source, Target, Parent, MissingRevsQueue) -> + case couch_work_queue:dequeue(MissingRevsQueue, 1) of + closed -> + ok; + {ok, [IdRevs]} -> + case Source of + #db{} -> + Source2 = open_db(Source), + Target2 = open_db(Target), + {Stats, HighSeqDone} = local_process_batch( + IdRevs, Source2, Target2, #batch{}, #rep_stats{}, ?LOWEST_SEQ), + close_db(Source2), + close_db(Target2), + ok = gen_server:cast(Parent, {report_seq_done, HighSeqDone, Stats}), + ?LOG_DEBUG("Worker reported completion of seq ~p", [HighSeqDone]); + #httpdb{} -> + remote_process_batch(IdRevs, Parent) + end, + queue_fetch_loop(Source, Target, Parent, MissingRevsQueue) + end. + + +local_process_batch([], _Src, _Tgt, #batch{docs = []}, Stats, HighestSeqDone) -> + {Stats, HighestSeqDone}; + +local_process_batch([], _Source, Target, #batch{docs = Docs, size = Size}, + Stats, HighestSeqDone) -> + case Target of + #httpdb{} -> + ?LOG_DEBUG("Worker flushing doc batch of size ~p bytes", [Size]); + #db{} -> + ?LOG_DEBUG("Worker flushing doc batch of ~p docs", [Size]) + end, + {Written, WriteFailures} = flush_docs(Target, Docs), + Stats2 = Stats#rep_stats{ + docs_written = Stats#rep_stats.docs_written + Written, + doc_write_failures = Stats#rep_stats.doc_write_failures + WriteFailures + }, + {Stats2, HighestSeqDone}; + +local_process_batch([{Seq, {Id, Revs, NotMissingCount, PAs}} | Rest], + Source, Target, Batch, Stats, HighestSeqSeen) -> + {ok, DocList} = fetch_doc( + Source, {Id, Revs, PAs, Seq}, fun local_doc_handler/2, []), + {Batch2, Written, WriteFailures} = lists:foldl( + fun(Doc, {Batch0, W0, F0}) -> + {Batch1, W, F} = maybe_flush_docs(Target, Batch0, Doc), + {Batch1, W0 + W, F0 + F} + end, + {Batch, 0, 0}, DocList), + Stats2 = Stats#rep_stats{ + missing_checked = Stats#rep_stats.missing_checked + length(Revs) + + NotMissingCount, + missing_found = Stats#rep_stats.missing_found + length(Revs), + docs_read = Stats#rep_stats.docs_read + length(DocList), + docs_written = Stats#rep_stats.docs_written + Written, + doc_write_failures = Stats#rep_stats.doc_write_failures + WriteFailures + }, + local_process_batch( + Rest, Source, Target, Batch2, Stats2, lists:max([Seq, HighestSeqSeen])). + + +remote_process_batch([], Parent) -> + ok = gen_server:call(Parent, flush, infinity); + +remote_process_batch([{Seq, {Id, Revs, NotMissing, PAs}} | Rest], Parent) -> + case NotMissing > 0 of + true -> + ok = gen_server:call(Parent, {seq_done, Seq, NotMissing}, infinity); + false -> + ok + end, + % When the source is a remote database, we fetch a single document revision + % per HTTP request. This is mostly to facilitate retrying of HTTP requests + % due to network transient failures. It also helps not exceeding the maximum + % URL length allowed by proxies and Mochiweb. + lists:foreach( + fun(Rev) -> + ok = gen_server:call( + Parent, {fetch_doc, {Id, [Rev], PAs, Seq}}, infinity) + end, + Revs), + remote_process_batch(Rest, Parent). + + +spawn_doc_reader(Source, Target, FetchParams) -> + Parent = self(), + spawn_link(fun() -> + Source2 = open_db(Source), + fetch_doc( + Source2, FetchParams, fun remote_doc_handler/2, {Parent, Target}), + close_db(Source2) + end). + + +fetch_doc(Source, {Id, Revs, PAs, _Seq}, DocHandler, Acc) -> + couch_api_wrap:open_doc_revs( + Source, Id, Revs, [{atts_since, PAs}], DocHandler, Acc). + + +local_doc_handler({ok, Doc}, DocList) -> + [Doc | DocList]; +local_doc_handler(_, DocList) -> + DocList. + + +remote_doc_handler({ok, #doc{atts = []} = Doc}, {Parent, _} = Acc) -> + ok = gen_server:call(Parent, {batch_doc, Doc}, infinity), + Acc; +remote_doc_handler({ok, Doc}, {Parent, Target} = Acc) -> + % Immediately flush documents with attachments received from a remote + % source. The data property of each attachment is a function that starts + % streaming the attachment data from the remote source, therefore it's + % convenient to call it ASAP to avoid ibrowse inactivity timeouts. + Target2 = open_db(Target), + Success = (flush_doc(Target2, Doc) =:= ok), + ok = gen_server:call(Parent, {doc_flushed, Success}, infinity), + close_db(Target2), + Acc; +remote_doc_handler(_, Acc) -> + Acc. + + +spawn_writer(Target, #batch{docs = DocList, size = Size}) -> + case {Target, Size > 0} of + {#httpdb{}, true} -> + ?LOG_DEBUG("Worker flushing doc batch of size ~p bytes", [Size]); + {#db{}, true} -> + ?LOG_DEBUG("Worker flushing doc batch of ~p docs", [Size]); + _ -> + ok + end, + Parent = self(), + spawn_link( + fun() -> + Target2 = open_db(Target), + {Written, Failed} = flush_docs(Target2, DocList), + close_db(Target2), + ok = gen_server:call( + Parent, {add_write_stats, Written, Failed}, infinity) + end). + + +after_full_flush(#state{cp = Cp, stats = Stats, flush_waiter = Waiter, + highest_seq_seen = HighSeqDone} = State) -> + ok = gen_server:cast(Cp, {report_seq_done, HighSeqDone, Stats}), + ?LOG_DEBUG("Worker reported completion of seq ~p", [HighSeqDone]), + gen_server:reply(Waiter, ok), + State#state{ + stats = #rep_stats{}, + flush_waiter = nil, + writer = nil, + batch = #batch{}, + highest_seq_seen = ?LOWEST_SEQ + }. + + +maybe_flush_docs(Doc, #state{target = Target, batch = Batch, + stats = Stats} = State) -> + {Batch2, W, F} = maybe_flush_docs(Target, Batch, Doc), + Stats2 = Stats#rep_stats{ + docs_read = Stats#rep_stats.docs_read + 1, + docs_written = Stats#rep_stats.docs_written + W, + doc_write_failures = Stats#rep_stats.doc_write_failures + F + }, + State#state{ + stats = Stats2, + batch = Batch2 + }. + + +maybe_flush_docs(#httpdb{} = Target, + #batch{docs = DocAcc, size = SizeAcc} = Batch, #doc{atts = Atts} = Doc) -> + case (length(Atts) > ?MAX_BULK_ATTS_PER_DOC) orelse + lists:any( + fun(A) -> A#att.disk_len > ?MAX_BULK_ATT_SIZE end, Atts) of + true -> + ?LOG_DEBUG("Worker flushing doc with attachments", []), + case flush_doc(Target, Doc) of + ok -> + {Batch, 1, 0}; + error -> + {Batch, 0, 1} + end; + false -> + JsonDoc = ?JSON_ENCODE(couch_doc:to_json_obj(Doc, [revs, attachments])), + case SizeAcc + iolist_size(JsonDoc) of + SizeAcc2 when SizeAcc2 > ?DOC_BUFFER_BYTE_SIZE -> + ?LOG_DEBUG("Worker flushing doc batch of size ~p bytes", [SizeAcc2]), + {Written, Failed} = flush_docs(Target, [JsonDoc | DocAcc]), + {#batch{}, Written, Failed}; + SizeAcc2 -> + {#batch{docs = [JsonDoc | DocAcc], size = SizeAcc2}, 0, 0} + end + end; + +maybe_flush_docs(#db{} = Target, #batch{docs = DocAcc, size = SizeAcc}, + #doc{atts = []} = Doc) -> + case SizeAcc + 1 of + SizeAcc2 when SizeAcc2 >= ?DOC_BUFFER_LEN -> + ?LOG_DEBUG("Worker flushing doc batch of ~p docs", [SizeAcc2]), + {Written, Failed} = flush_docs(Target, [Doc | DocAcc]), + {#batch{}, Written, Failed}; + SizeAcc2 -> + {#batch{docs = [Doc | DocAcc], size = SizeAcc2}, 0, 0} + end; + +maybe_flush_docs(#db{} = Target, Batch, Doc) -> + ?LOG_DEBUG("Worker flushing doc with attachments", []), + case flush_doc(Target, Doc) of + ok -> + {Batch, 1, 0}; + error -> + {Batch, 0, 1} + end. + + +flush_docs(_Target, []) -> + {0, 0}; + +flush_docs(Target, DocList) when is_list(DocList) -> + {ok, Errors} = couch_api_wrap:update_docs( + Target, DocList, [delay_commit], replicated_changes), + DbUri = couch_api_wrap:db_uri(Target), + lists:foreach( + fun({[ {<<"id">>, Id}, {<<"error">>, <<"unauthorized">>} ]}) -> + ?LOG_ERROR("Replicator: unauthorized to write document" + " `~s` to `~s`", [Id, DbUri]); + (_) -> + ok + end, Errors), + {length(DocList) - length(Errors), length(Errors)}. + +flush_doc(Target, Doc) -> + case couch_api_wrap:update_doc(Target, Doc, [], replicated_changes) of + {ok, _} -> + ok; + {error, <<"unauthorized">>} -> + ?LOG_ERROR("Replicator: unauthorized to write document `~s` to `~s`", + [Doc#doc.id, couch_api_wrap:db_uri(Target)]), + error; + _ -> + error + end. Added: couchdb/trunk/src/couchdb/couch_replicator_rev_finder.erl URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_replicator_rev_finder.erl?rev=1071375&view=auto ============================================================================== --- couchdb/trunk/src/couchdb/couch_replicator_rev_finder.erl (added) +++ couchdb/trunk/src/couchdb/couch_replicator_rev_finder.erl Wed Feb 16 20:05:31 2011 @@ -0,0 +1,88 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_replicator_rev_finder). + +-export([start_link/5]). + +-include("couch_db.hrl"). + +-import(couch_replicator_utils, [ + open_db/1, + close_db/1 +]). + + +start_link(Cp, Target, ChangesQueue, MissingRevsQueue, BatchSize) -> + Pid = spawn_link(fun() -> + missing_revs_finder_loop( + Cp, Target, ChangesQueue, MissingRevsQueue, BatchSize) + end), + {ok, Pid}. + + +missing_revs_finder_loop(Cp, Target, ChangesQueue, RevsQueue, BatchSize) -> + case couch_work_queue:dequeue(ChangesQueue, BatchSize) of + closed -> + ok; + {ok, DocInfos} -> + #doc_info{high_seq = ReportSeq} = lists:last(DocInfos), + ok = gen_server:cast(Cp, {report_seq, ReportSeq}), + ?LOG_DEBUG("Missing revs finder defined report seq ~p", [ReportSeq]), + IdRevs = [{Id, [Rev || #rev_info{rev = Rev} <- RevsInfo]} || + #doc_info{id = Id, revs = RevsInfo} <- DocInfos], + Target2 = open_db(Target), + {ok, Missing} = couch_api_wrap:get_missing_revs(Target2, IdRevs), + close_db(Target2), + queue_missing_revs(Missing, DocInfos, RevsQueue), + missing_revs_finder_loop(Cp, Target2, ChangesQueue, RevsQueue, BatchSize) + end. + + +queue_missing_revs(Missing, DocInfos, Queue) -> + IdRevsSeqDict = dict:from_list( + [{Id, {[Rev || #rev_info{rev = Rev} <- RevsInfo], Seq}} || + #doc_info{id = Id, revs = RevsInfo, high_seq = Seq} <- DocInfos]), + AllDict = lists:foldl( + fun({Id, MissingRevs, PAs}, Acc) -> + {_, Seq} = dict:fetch(Id, IdRevsSeqDict), + dict:store(Seq, {Id, MissingRevs, 0, PAs}, Acc) + end, + dict:new(), Missing), + AllDict2 = dict:fold( + fun(Id, {NotMissingRevs, Seq}, Acc) -> + case dict:find(Seq, Acc) of + error -> + dict:store(Seq, {Id, [], length(NotMissingRevs), []}, Acc); + {ok, {Id, MissingRevs, NotMissingCount, PAs}} -> + NotMissingCount2 = NotMissingCount + length(NotMissingRevs), + dict:store(Seq, {Id, MissingRevs, NotMissingCount2, PAs}, Acc) + end + end, + AllDict, non_missing(IdRevsSeqDict, Missing)), + ?LOG_DEBUG("Missing revs finder adding batch of ~p IdRevs to work queue", + [dict:size(AllDict2)]), + ok = couch_work_queue:queue(Queue, dict:to_list(AllDict2)). + + +non_missing(NonMissingDict, []) -> + NonMissingDict; +non_missing(IdRevsSeqDict, [{MissingId, MissingRevs, _} | Rest]) -> + {AllRevs, Seq} = dict:fetch(MissingId, IdRevsSeqDict), + case AllRevs -- MissingRevs of + [] -> + non_missing(dict:erase(MissingId, IdRevsSeqDict), Rest); + NotMissing -> + non_missing( + dict:store(MissingId, {NotMissing, Seq}, IdRevsSeqDict), + Rest) + end. Added: couchdb/trunk/src/couchdb/couch_replicator_utils.erl URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_replicator_utils.erl?rev=1071375&view=auto ============================================================================== --- couchdb/trunk/src/couchdb/couch_replicator_utils.erl (added) +++ couchdb/trunk/src/couchdb/couch_replicator_utils.erl Wed Feb 16 20:05:31 2011 @@ -0,0 +1,382 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_replicator_utils). + +-export([parse_rep_doc/2]). +-export([update_rep_doc/2]). +-export([ensure_rep_db_exists/0]). +-export([open_db/1, close_db/1]). +-export([start_db_compaction_notifier/2, stop_db_compaction_notifier/1]). +-export([replication_id/2]). + +-include("couch_db.hrl"). +-include("couch_api_wrap.hrl"). +-include("couch_replicator.hrl"). +-include("couch_js_functions.hrl"). +-include("../ibrowse/ibrowse.hrl"). + +-import(couch_util, [ + get_value/2, + get_value/3 +]). + + +parse_rep_doc({Props} = RepObj, UserCtx) -> + ProxyParams = parse_proxy_params(get_value(<<"proxy">>, Props, <<>>)), + Options = make_options(Props), + Source = parse_rep_db(get_value(<<"source">>, Props), ProxyParams, Options), + Target = parse_rep_db(get_value(<<"target">>, Props), ProxyParams, Options), + Rep = #rep{ + source = Source, + target = Target, + options = Options, + user_ctx = UserCtx, + doc = RepObj + }, + {ok, Rep#rep{id = replication_id(Rep)}}. + + +update_rep_doc({Props} = _RepDoc, KVs) -> + case get_value(<<"_id">>, Props) of + undefined -> + ok; + RepDocId -> + {ok, RepDb} = ensure_rep_db_exists(), + case couch_db:open_doc(RepDb, RepDocId, []) of + {ok, LatestRepDoc} -> + update_rep_doc(RepDb, LatestRepDoc, KVs); + _ -> + ok + end, + couch_db:close(RepDb) + end. + +update_rep_doc(RepDb, #doc{body = {RepDocBody}} = RepDoc, KVs) -> + NewRepDocBody = lists:foldl( + fun({<<"_replication_state">> = K, _V} = KV, Body) -> + Body1 = lists:keystore(K, 1, Body, KV), + {Mega, Secs, _} = erlang:now(), + UnixTime = Mega * 1000000 + Secs, + lists:keystore( + <<"_replication_state_time">>, 1, + Body1, {<<"_replication_state_time">>, UnixTime}); + ({K, _V} = KV, Body) -> + lists:keystore(K, 1, Body, KV) + end, + RepDocBody, + KVs + ), + % might not succeed - when the replication doc is deleted right + % before this update (not an error) + couch_db:update_doc( + RepDb, + RepDoc#doc{body = {NewRepDocBody}}, + []). + + +ensure_rep_db_exists() -> + DbName = ?l2b(couch_config:get("replicator", "db", "_replicator")), + Opts = [ + {user_ctx, #user_ctx{roles=[<<"_admin">>, <<"_replicator">>]}}, + sys_db + ], + case couch_db:open(DbName, Opts) of + {ok, Db} -> + Db; + _Error -> + {ok, Db} = couch_db:create(DbName, Opts) + end, + ok = ensure_rep_ddoc_exists(Db, <<"_design/_replicator">>), + {ok, Db}. + + +ensure_rep_ddoc_exists(RepDb, DDocID) -> + case couch_db:open_doc(RepDb, DDocID, []) of + {ok, _Doc} -> + ok; + _ -> + DDoc = couch_doc:from_json_obj({[ + {<<"_id">>, DDocID}, + {<<"language">>, <<"javascript">>}, + {<<"validate_doc_update">>, ?REP_DB_DOC_VALIDATE_FUN} + ]}), + {ok, _Rev} = couch_db:update_doc(RepDb, DDoc, []) + end, + ok. + + +replication_id(#rep{options = Options} = Rep) -> + BaseId = replication_id(Rep, ?REP_ID_VERSION), + {BaseId, maybe_append_options([continuous, create_target], Options)}. + + +% Versioned clauses for generating replication IDs. +% If a change is made to how replications are identified, +% please add a new clause and increase ?REP_ID_VERSION. + +replication_id(#rep{user_ctx = UserCtx} = Rep, 2) -> + {ok, HostName} = inet:gethostname(), + Port = mochiweb_socket_server:get(couch_httpd, port), + Src = get_rep_endpoint(UserCtx, Rep#rep.source), + Tgt = get_rep_endpoint(UserCtx, Rep#rep.target), + maybe_append_filters([HostName, Port, Src, Tgt], Rep); + +replication_id(#rep{user_ctx = UserCtx} = Rep, 1) -> + {ok, HostName} = inet:gethostname(), + Src = get_rep_endpoint(UserCtx, Rep#rep.source), + Tgt = get_rep_endpoint(UserCtx, Rep#rep.target), + maybe_append_filters([HostName, Src, Tgt], Rep). + + +maybe_append_filters(Base, + #rep{source = Source, user_ctx = UserCtx, options = Options}) -> + Base2 = Base ++ + case get_value(filter, Options) of + undefined -> + case get_value(doc_ids, Options) of + undefined -> + []; + DocIds -> + [DocIds] + end; + Filter -> + [filter_code(Filter, Source, UserCtx), + get_value(query_params, Options, {[]})] + end, + couch_util:to_hex(couch_util:md5(term_to_binary(Base2))). + + +filter_code(Filter, Source, UserCtx) -> + {match, [DDocName, FilterName]} = + re:run(Filter, "(.*?)/(.*)", [{capture, [1, 2], binary}]), + {ok, Db} = couch_api_wrap:db_open(Source, [{user_ctx, UserCtx}]), + try + {ok, #doc{body = Body}} = + couch_api_wrap:open_doc(Db, <<"_design/", DDocName/binary>>, []), + Code = couch_util:get_nested_json_value( + Body, [<<"filters">>, FilterName]), + re:replace(Code, [$^, "\s*(.*?)\s*", $$], "\\1", [{return, binary}]) + after + couch_api_wrap:db_close(Db) + end. + + +maybe_append_options(Options, RepOptions) -> + lists:foldl(fun(Option, Acc) -> + Acc ++ + case get_value(Option, RepOptions, false) of + true -> + "+" ++ atom_to_list(Option); + false -> + "" + end + end, [], Options). + + +get_rep_endpoint(_UserCtx, #httpdb{url=Url, headers=Headers, oauth=OAuth}) -> + DefaultHeaders = (#httpdb{})#httpdb.headers, + case OAuth of + nil -> + {remote, Url, Headers -- DefaultHeaders}; + #oauth{} -> + {remote, Url, Headers -- DefaultHeaders, OAuth} + end; +get_rep_endpoint(UserCtx, <>) -> + {local, DbName, UserCtx}. + + +parse_rep_db({Props}, ProxyParams, Options) -> + Url = maybe_add_trailing_slash(get_value(<<"url">>, Props)), + {AuthProps} = get_value(<<"auth">>, Props, {[]}), + {BinHeaders} = get_value(<<"headers">>, Props, {[]}), + Headers = lists:ukeysort(1, [{?b2l(K), ?b2l(V)} || {K, V} <- BinHeaders]), + DefaultHeaders = (#httpdb{})#httpdb.headers, + OAuth = case get_value(<<"oauth">>, AuthProps) of + undefined -> + nil; + {OauthProps} -> + #oauth{ + consumer_key = ?b2l(get_value(<<"consumer_key">>, OauthProps)), + token = ?b2l(get_value(<<"token">>, OauthProps)), + token_secret = ?b2l(get_value(<<"token_secret">>, OauthProps)), + consumer_secret = ?b2l(get_value(<<"consumer_secret">>, OauthProps)), + signature_method = + case get_value(<<"signature_method">>, OauthProps) of + undefined -> hmac_sha1; + <<"PLAINTEXT">> -> plaintext; + <<"HMAC-SHA1">> -> hmac_sha1; + <<"RSA-SHA1">> -> rsa_sha1 + end + } + end, + #httpdb{ + url = Url, + oauth = OAuth, + headers = lists:ukeymerge(1, Headers, DefaultHeaders), + ibrowse_options = lists:keysort(1, + [{socket_options, get_value(socket_options, Options)} | + ProxyParams ++ ssl_params(Url)]), + timeout = get_value(connection_timeout, Options), + http_connections = get_value(http_connections, Options), + http_pipeline_size = get_value(http_pipeline_size, Options) + }; +parse_rep_db(<<"http://", _/binary>> = Url, ProxyParams, Options) -> + parse_rep_db({[{<<"url">>, Url}]}, ProxyParams, Options); +parse_rep_db(<<"https://", _/binary>> = Url, ProxyParams, Options) -> + parse_rep_db({[{<<"url">>, Url}]}, ProxyParams, Options); +parse_rep_db(<>, _ProxyParams, _Options) -> + DbName. + + +maybe_add_trailing_slash(Url) when is_binary(Url) -> + maybe_add_trailing_slash(?b2l(Url)); +maybe_add_trailing_slash(Url) -> + case lists:last(Url) of + $/ -> + Url; + _ -> + Url ++ "/" + end. + + +make_options(Props) -> + Options = lists:ukeysort(1, convert_options(Props)), + DefWorkers = couch_config:get("replicator", "worker_processes", "4"), + DefBatchSize = couch_config:get("replicator", "worker_batch_size", "1000"), + DefConns = couch_config:get("replicator", "http_connections", "20"), + DefPipeSize = couch_config:get("replicator", "http_pipeline_size", "50"), + DefTimeout = couch_config:get("replicator", "connection_timeout", "30000"), + {ok, DefSocketOptions} = couch_util:parse_term( + couch_config:get("replicator", "socket_options", + "[{keepalive, true}, {nodelay, false}]")), + lists:ukeymerge(1, Options, [ + {connection_timeout, list_to_integer(DefTimeout)}, + {http_connections, list_to_integer(DefConns)}, + {http_pipeline_size, list_to_integer(DefPipeSize)}, + {socket_options, DefSocketOptions}, + {worker_batch_size, list_to_integer(DefBatchSize)}, + {worker_processes, list_to_integer(DefWorkers)} + ]). + + +convert_options([])-> + []; +convert_options([{<<"cancel">>, V} | R]) -> + [{cancel, V} | convert_options(R)]; +convert_options([{<<"create_target">>, V} | R]) -> + [{create_target, V} | convert_options(R)]; +convert_options([{<<"continuous">>, V} | R]) -> + [{continuous, V} | convert_options(R)]; +convert_options([{<<"filter">>, V} | R]) -> + [{filter, V} | convert_options(R)]; +convert_options([{<<"query_params">>, V} | R]) -> + [{query_params, V} | convert_options(R)]; +convert_options([{<<"doc_ids">>, V} | R]) -> + % Ensure same behaviour as old replicator: accept a list of percent + % encoded doc IDs. + DocIds = [?l2b(couch_httpd:unquote(Id)) || Id <- V], + [{doc_ids, DocIds} | convert_options(R)]; +convert_options([{<<"worker_processes">>, V} | R]) -> + [{worker_processes, couch_util:to_integer(V)} | convert_options(R)]; +convert_options([{<<"worker_batch_size">>, V} | R]) -> + [{worker_batch_size, couch_util:to_integer(V)} | convert_options(R)]; +convert_options([{<<"http_connections">>, V} | R]) -> + [{http_connections, couch_util:to_integer(V)} | convert_options(R)]; +convert_options([{<<"http_pipeline_size">>, V} | R]) -> + [{http_pipeline_size, couch_util:to_integer(V)} | convert_options(R)]; +convert_options([{<<"connection_timeout">>, V} | R]) -> + [{connection_timeout, couch_util:to_integer(V)} | convert_options(R)]; +convert_options([{<<"socket_options">>, V} | R]) -> + {ok, SocketOptions} = couch_util:parse_term(V), + [{socket_options, SocketOptions} | convert_options(R)]; +convert_options([_ | R]) -> % skip unknown option + convert_options(R). + + +parse_proxy_params(ProxyUrl) when is_binary(ProxyUrl) -> + parse_proxy_params(?b2l(ProxyUrl)); +parse_proxy_params([]) -> + []; +parse_proxy_params(ProxyUrl) -> + #url{ + host = Host, + port = Port, + username = User, + password = Passwd + } = ibrowse_lib:parse_url(ProxyUrl), + [{proxy_host, Host}, {proxy_port, Port}] ++ + case is_list(User) andalso is_list(Passwd) of + false -> + []; + true -> + [{proxy_user, User}, {proxy_password, Passwd}] + end. + + +ssl_params(Url) -> + case ibrowse_lib:parse_url(Url) of + #url{protocol = https} -> + Depth = list_to_integer( + couch_config:get("replicator", "ssl_certificate_max_depth", "3") + ), + VerifyCerts = couch_config:get("replicator", "verify_ssl_certificates"), + SslOpts = [{depth, Depth} | ssl_verify_options(VerifyCerts =:= "true")], + [{is_ssl, true}, {ssl_options, SslOpts}]; + #url{protocol = http} -> + [] + end. + +ssl_verify_options(Value) -> + ssl_verify_options(Value, erlang:system_info(otp_release)). + +ssl_verify_options(true, OTPVersion) when OTPVersion >= "R14" -> + CAFile = couch_config:get("replicator", "ssl_trusted_certificates_file"), + [{verify, verify_peer}, {cacertfile, CAFile}]; +ssl_verify_options(false, OTPVersion) when OTPVersion >= "R14" -> + [{verify, verify_none}]; +ssl_verify_options(true, _OTPVersion) -> + CAFile = couch_config:get("replicator", "ssl_trusted_certificates_file"), + [{verify, 2}, {cacertfile, CAFile}]; +ssl_verify_options(false, _OTPVersion) -> + [{verify, 0}]. + + +open_db(#db{name = Name, user_ctx = UserCtx, options = Options}) -> + {ok, Db} = couch_db:open(Name, [{user_ctx, UserCtx} | Options]), + Db; +open_db(HttpDb) -> + HttpDb. + + +close_db(#db{} = Db) -> + couch_db:close(Db); +close_db(_HttpDb) -> + ok. + + +start_db_compaction_notifier(#db{name = DbName}, Server) -> + {ok, Notifier} = couch_db_update_notifier:start_link( + fun({compacted, DbName1}) when DbName1 =:= DbName -> + ok = gen_server:cast(Server, {db_compacted, DbName}); + (_) -> + ok + end), + Notifier; +start_db_compaction_notifier(_, _) -> + nil. + + +stop_db_compaction_notifier(nil) -> + ok; +stop_db_compaction_notifier(Notifier) -> + couch_db_update_notifier:stop(Notifier). Modified: couchdb/trunk/src/couchdb/couch_work_queue.erl URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_work_queue.erl?rev=1071375&r1=1071374&r2=1071375&view=diff ============================================================================== --- couchdb/trunk/src/couchdb/couch_work_queue.erl (original) +++ couchdb/trunk/src/couchdb/couch_work_queue.erl Wed Feb 16 20:05:31 2011 @@ -59,8 +59,8 @@ close(Wq) -> init(Options) -> Q = #q{ - max_size = couch_util:get_value(max_size, Options), - max_items = couch_util:get_value(max_items, Options), + max_size = couch_util:get_value(max_size, Options, nil), + max_items = couch_util:get_value(max_items, Options, nil), multi_workers = couch_util:get_value(multi_workers, Options, false) }, {ok, Q}. @@ -71,7 +71,7 @@ terminate(_Reason, #q{work_waiters=Worke handle_call({queue, Item}, From, #q{work_waiters = []} = Q0) -> - Q = Q0#q{size = Q0#q.size + byte_size(term_to_binary(Item)), + Q = Q0#q{size = increment_queue_size(Q0, Item), items = Q0#q.items + 1, queue = queue:in(Item, Q0#q.queue)}, case (Q#q.size >= Q#q.max_size) orelse @@ -153,3 +153,10 @@ code_change(_OldVsn, State, _Extra) -> handle_info(X, Q) -> {stop, X, Q}. + +increment_queue_size(#q{max_size = nil, size = Size}, _Item) -> + Size; +increment_queue_size(#q{size = Size}, Item) when is_binary(Item) -> + Size + byte_size(Item); +increment_queue_size(#q{size = Size}, Item) -> + Size + byte_size(term_to_binary(Item)). Added: couchdb/trunk/src/couchdb/json_stream_parse.erl URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/json_stream_parse.erl?rev=1071375&view=auto ============================================================================== --- couchdb/trunk/src/couchdb/json_stream_parse.erl (added) +++ couchdb/trunk/src/couchdb/json_stream_parse.erl Wed Feb 16 20:05:31 2011 @@ -0,0 +1,432 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(json_stream_parse). + + +-export([events/2, to_ejson/1, collect_object/2]). + +-define(IS_WS(X), (X == $\ orelse X == $\t orelse X == $\n orelse X == $\r)). +-define(IS_DELIM(X), (X == $} orelse X == $] orelse X == $,)). +-define(IS_DIGIT(X), (X >= $0 andalso X =< $9)). + + + +% Parses the json into events. +% +% The DataFun param is a function that produces the data for parsing. When +% called it must yield a tuple, or the atom done. The first element in the +% tuple is the data itself, and the second element is a function to be called +% next to get the next chunk of data in the stream. +% +% The EventFun is called everytime a json element is parsed. It must produce +% a new function to be called for the next event. +% +% Events happen each time a new element in the json string is parsed. +% For simple value types, the data itself is returned: +% Strings +% Integers +% Floats +% true +% false +% null +% +% For arrays, the start of the array is signaled by the event array_start +% atom. The end is signaled by array_end. The events before the end are the +% values, or nested values. +% +% For objects, the start of the object is signaled by the event object_start +% atom. The end is signaled by object_end. Each key is signaled by +% {key, KeyString}, and the following event is the value, or start of the +% value (array_start, object_start). +% +events(Data,EventFun) when is_list(Data)-> + events(list_to_binary(Data),EventFun); +events(Data,EventFun) when is_binary(Data)-> + events(fun() -> {Data, fun() -> done end} end,EventFun); +events(DataFun,EventFun) -> + parse_one(DataFun, EventFun, <<>>). + +% converts the JSON directly to the erlang represention of Json +to_ejson(DF) -> + {_DF2, EF, _Rest} = events(DF, fun(Ev) -> collect_events(Ev, []) end), + [[EJson]] = make_ejson(EF(get_results), [[]]), + EJson. + + +% This function is used to return complete objects while parsing streams. +% +% Return this function from inside an event function right after getting an +% object_start event. It then collects the remaining events for that object +% and converts it to the erlang represention of Json. +% +% It then calls your ReturnControl function with the erlang object. Your +% return control function then should yield another event function. +% +% This example stream parses an array of objects, calling +% fun do_something_with_the_object/1 for each object. +% +% ev_array(array_start) -> +% fun(Ev) -> ev_object_loop(Ev) end. +% +% ev_object_loop(object_start) -> +% fun(Ev) -> +% json_stream_parse:collect_object(Ev, +% fun(Obj) -> +% do_something_with_the_object(Obj), +% fun(Ev2) -> ev_object_loop(Ev2) end +% end) +% end; +% ev_object_loop(array_end) -> +% ok +% end. +% +% % invoke the parse +% main() -> +% ... +% events(Data, fun(Ev) -> ev_array(Ev) end). + +collect_object(Ev, ReturnControl) -> + collect_object(Ev, 0, ReturnControl, [object_start]). + + + +% internal methods + +parse_one(DF,EF,Acc) -> + case toke(DF, Acc) of + none -> + none; + {Token, DF2, Rest} -> + case Token of + "{" -> + EF2 = EF(object_start), + {DF3, EF3, Rest2} = parse_object(DF2, EF2, Rest), + {DF3, EF3(object_end), Rest2}; + "[" -> + EF2 = EF(array_start), + {DF3, EF3, Rest2} = parse_array(DF2, EF2, Rest), + {DF3, EF3(array_end), Rest2}; + Int when is_integer(Int)-> + {DF2, EF(Int), Rest}; + Float when is_float(Float)-> + {DF2, EF(Float), Rest}; + Atom when is_atom(Atom)-> + {DF2, EF(Atom), Rest}; + String when is_binary(String)-> + {DF2, EF(String), Rest}; + _OtherToken -> + err(unexpected_token) + end + end. + +must_parse_one(DF,EF,Acc,Error)-> + case parse_one(DF, EF, Acc) of + none -> + err(Error); + Else -> + Else + end. + +must_toke(DF, Data, Error) -> + case toke(DF, Data) of + none -> + err(Error); + Result -> + Result + end. + +toke(DF, <<>>) -> + case DF() of + done -> + none; + {Data, DF2} -> + toke(DF2, Data) + end; +toke(DF, <>) when ?IS_WS(C)-> + toke(DF, Rest); +toke(DF, <<${,Rest/binary>>) -> + {"{", DF, Rest}; +toke(DF, <<$},Rest/binary>>) -> + {"}", DF, Rest}; +toke(DF, <<$[,Rest/binary>>) -> + {"[", DF, Rest}; +toke(DF, <<$],Rest/binary>>) -> + {"]", DF, Rest}; +toke(DF, <<$",Rest/binary>>) -> + toke_string(DF,Rest,[]); +toke(DF, <<$,,Rest/binary>>) -> + {",", DF, Rest}; +toke(DF, <<$:,Rest/binary>>) -> + {":", DF, Rest}; +toke(DF, <<$-,Rest/binary>>) -> + {<> = Data, DF2} = must_df(DF,1,Rest,expected_number), + case ?IS_DIGIT(C) of + true -> + toke_number_leading(DF2, Data, "-"); + false -> + err(expected_number) + end; +toke(DF, <> = Data) when ?IS_DIGIT(C) -> + toke_number_leading(DF, Data, []); +toke(DF, <<$t,Rest/binary>>) -> + {Data, DF2} = must_match(<<"rue">>, DF, Rest), + {true, DF2, Data}; +toke(DF, <<$f,Rest/binary>>) -> + {Data, DF2} = must_match(<<"alse">>, DF, Rest), + {false, DF2, Data}; +toke(DF, <<$n,Rest/binary>>) -> + {Data, DF2} = must_match(<<"ull">>, DF, Rest), + {null, DF2, Data}; +toke(_, _) -> + err(bad_token). + + +must_match(Pattern, DF, Data) -> + Size = size(Pattern), + case must_df(DF, Size, Data, bad_token) of + {<>, DF2} -> + {Data2, DF2}; + {_, _} -> + err(bad_token) + end. + +must_df(DF,Error)-> + case DF() of + done -> + err(Error); + {Data, DF2} -> + {Data, DF2} + end. + + +must_df(DF,NeedLen,Acc,Error)-> + if size(Acc) >= NeedLen -> + {Acc, DF}; + true -> + case DF() of + done -> + err(Error); + {Data, DF2} -> + must_df(DF2, NeedLen, <>, Error) + end + end. + + +parse_object(DF,EF,Acc) -> + case must_toke(DF, Acc, unterminated_object) of + {String, DF2, Rest} when is_binary(String)-> + EF2 = EF({key,String}), + case must_toke(DF2,Rest,unterminated_object) of + {":", DF3, Rest2} -> + {DF4, EF3, Rest3} = must_parse_one(DF3, EF2, Rest2, expected_value), + case must_toke(DF4,Rest3, unterminated_object) of + {",", DF5, Rest4} -> + parse_object(DF5, EF3, Rest4); + {"}", DF5, Rest4} -> + {DF5, EF3, Rest4}; + {_, _, _} -> + err(unexpected_token) + end; + _Else -> + err(expected_colon) + end; + {"}", DF2, Rest} -> + {DF2, EF, Rest}; + {_, _, _} -> + err(unexpected_token) + end. + +parse_array0(DF,EF,Acc) -> + case toke(DF, Acc) of + none -> + err(unterminated_array); + {",", DF2, Rest} -> + parse_array(DF2,EF,Rest); + {"]", DF2, Rest} -> + {DF2,EF,Rest}; + _ -> + err(unexpected_token) + end. + +parse_array(DF,EF,Acc) -> + case toke(DF, Acc) of + none -> + err(unterminated_array); + {Token, DF2, Rest} -> + case Token of + "{" -> + EF2 = EF(object_start), + {DF3, EF3, Rest2} = parse_object(DF2, EF2, Rest), + parse_array0(DF3, EF3(object_end), Rest2); + "[" -> + EF2 = EF(array_start), + {DF3, EF3, Rest2} = parse_array(DF2, EF2, Rest), + parse_array0(DF3, EF3(array_end), Rest2); + Int when is_integer(Int)-> + parse_array0(DF2, EF(Int), Rest); + Float when is_float(Float)-> + parse_array0(DF2, EF(Float), Rest); + Atom when is_atom(Atom)-> + parse_array0(DF2, EF(Atom), Rest); + String when is_binary(String)-> + parse_array0(DF2, EF(String), Rest); + "]" -> + {DF2, EF, Rest}; + _ -> + err(unexpected_token) + end + end. + + +toke_string(DF, <<>>, Acc) -> + {Data, DF2} = must_df(DF, unterminated_string), + toke_string(DF2, Data, Acc); +toke_string(DF, <<$\\,$",Rest/binary>>, Acc) -> + toke_string(DF, Rest, [$" | Acc]); +toke_string(DF, <<$\\,$\\,Rest/binary>>, Acc) -> + toke_string(DF, Rest, [$\\ | Acc]); +toke_string(DF, <<$\\,$/,Rest/binary>>, Acc) -> + toke_string(DF, Rest, [$/ | Acc]); +toke_string(DF, <<$\\,$b,Rest/binary>>, Acc) -> + toke_string(DF, Rest, [$\b | Acc]); +toke_string(DF, <<$\\,$f,Rest/binary>>, Acc) -> + toke_string(DF, Rest, [$\f | Acc]); +toke_string(DF, <<$\\,$n,Rest/binary>>, Acc) -> + toke_string(DF, Rest, [$\n | Acc]); +toke_string(DF, <<$\\,$r,Rest/binary>>, Acc) -> + toke_string(DF, Rest, [$\r | Acc]); +toke_string(DF, <<$\\,$t,Rest/binary>>, Acc) -> + toke_string(DF, Rest, [$\t | Acc]); +toke_string(DF, <<$\\,$u,Rest/binary>>, Acc) -> + {<>, DF2} = must_df(DF,4,Rest,missing_hex), + UTFChar = erlang:list_to_integer([A, B, C, D], 16), + if UTFChar == 16#FFFF orelse UTFChar == 16#FFFE -> + err(invalid_utf_char); + true -> + ok + end, + Chars = xmerl_ucs:to_utf8(UTFChar), + toke_string(DF2, Data, lists:reverse(Chars) ++ Acc); +toke_string(DF, <<$\\>>, Acc) -> + {Data, DF2} = must_df(DF, unterminated_string), + toke_string(DF2, <<$\\,Data/binary>>, Acc); +toke_string(_DF, <<$\\, _/binary>>, _Acc) -> + err(bad_escape); +toke_string(DF, <<$", Rest/binary>>, Acc) -> + {list_to_binary(lists:reverse(Acc)), DF, Rest}; +toke_string(DF, <>, Acc) -> + toke_string(DF, Rest, [C | Acc]). + + +toke_number_leading(DF, <>, Acc) + when ?IS_DIGIT(Digit) -> + toke_number_leading(DF, Rest, [Digit | Acc]); +toke_number_leading(DF, <>=Rest, Acc) + when ?IS_WS(C) orelse ?IS_DELIM(C) -> + {list_to_integer(lists:reverse(Acc)), DF, Rest}; +toke_number_leading(DF, <<>>, Acc) -> + case DF() of + done -> + {list_to_integer(lists:reverse(Acc)), fun() -> done end, <<>>}; + {Data, DF2} -> + toke_number_leading(DF2, Data, Acc) + end; +toke_number_leading(DF, <<$., Rest/binary>>, Acc) -> + toke_number_trailing(DF, Rest, [$.|Acc]); +toke_number_leading(DF, <<$e, Rest/binary>>, Acc) -> + toke_number_exponent(DF, Rest, [$e, $0, $.|Acc]); +toke_number_leading(DF, <<$E, Rest/binary>>, Acc) -> + toke_number_exponent(DF, Rest, [$e, $0, $.|Acc]); +toke_number_leading(_, _, _) -> + err(unexpected_character_in_number). + +toke_number_trailing(DF, <>, Acc) + when ?IS_DIGIT(Digit) -> + toke_number_trailing(DF, Rest, [Digit | Acc]); +toke_number_trailing(DF, <>=Rest, Acc) + when ?IS_WS(C) orelse ?IS_DELIM(C) -> + {list_to_float(lists:reverse(Acc)), DF, Rest}; +toke_number_trailing(DF, <<>>, Acc) -> + case DF() of + done -> + {list_to_float(lists:reverse(Acc)), fun() -> done end, <<>>}; + {Data, DF2} -> + toke_number_trailing(DF2, Data, Acc) + end; +toke_number_trailing(DF, <<"e", Rest/binary>>, [C|_]=Acc) when C /= $. -> + toke_number_exponent(DF, Rest, [$e|Acc]); +toke_number_trailing(DF, <<"E", Rest/binary>>, [C|_]=Acc) when C /= $. -> + toke_number_exponent(DF, Rest, [$e|Acc]); +toke_number_trailing(_, _, _) -> + err(unexpected_character_in_number). + + +toke_number_exponent(DF, <>, Acc) when ?IS_DIGIT(Digit) -> + toke_number_exponent(DF, Rest, [Digit | Acc]); +toke_number_exponent(DF, <>, [$e|_]=Acc) + when Sign == $+ orelse Sign == $- -> + toke_number_exponent(DF, Rest, [Sign | Acc]); +toke_number_exponent(DF, <>=Rest, Acc) + when ?IS_WS(C) orelse ?IS_DELIM(C) -> + {list_to_float(lists:reverse(Acc)), DF, Rest}; +toke_number_exponent(DF, <<>>, Acc) -> + case DF() of + done -> + {list_to_float(lists:reverse(Acc)), fun() -> done end, <<>>}; + {Data, DF2} -> + toke_number_exponent(DF2, Data, Acc) + end; +toke_number_exponent(_, _, _) -> + err(unexpected_character_in_number). + + +err(Error)-> + throw({parse_error,Error}). + + +make_ejson([], Stack) -> + Stack; +make_ejson([array_start | RevEvs], [ArrayValues, PrevValues | RestStack]) -> + make_ejson(RevEvs, [[ArrayValues | PrevValues] | RestStack]); +make_ejson([array_end | RevEvs], Stack) -> + make_ejson(RevEvs, [[] | Stack]); +make_ejson([object_start | RevEvs], [ObjValues, PrevValues | RestStack]) -> + make_ejson(RevEvs, [[{ObjValues} | PrevValues] | RestStack]); +make_ejson([object_end | RevEvs], Stack) -> + make_ejson(RevEvs, [[] | Stack]); +make_ejson([{key, String} | RevEvs], [[PrevValue|RestObject] | RestStack] = _Stack) -> + make_ejson(RevEvs, [[{String, PrevValue}|RestObject] | RestStack]); +make_ejson([Value | RevEvs], [Vals | RestStack] = _Stack) -> + make_ejson(RevEvs, [[Value | Vals] | RestStack]). + +collect_events(get_results, Acc) -> + Acc; +collect_events(Ev, Acc) -> + fun(NextEv) -> collect_events(NextEv, [Ev | Acc]) end. + + +collect_object(object_end, 0, ReturnControl, Acc) -> + [[Obj]] = make_ejson([object_end | Acc], [[]]), + ReturnControl(Obj); +collect_object(object_end, NestCount, ReturnControl, Acc) -> + fun(Ev) -> + collect_object(Ev, NestCount - 1, ReturnControl, [object_end | Acc]) + end; +collect_object(object_start, NestCount, ReturnControl, Acc) -> + fun(Ev) -> + collect_object(Ev, NestCount + 1, ReturnControl, [object_start | Acc]) + end; +collect_object(Ev, NestCount, ReturnControl, Acc) -> + fun(Ev2) -> + collect_object(Ev2, NestCount, ReturnControl, [Ev | Acc]) + end. Modified: couchdb/trunk/test/etap/001-load.t URL: http://svn.apache.org/viewvc/couchdb/trunk/test/etap/001-load.t?rev=1071375&r1=1071374&r2=1071375&view=diff ============================================================================== --- couchdb/trunk/test/etap/001-load.t (original) +++ couchdb/trunk/test/etap/001-load.t Wed Feb 16 20:05:31 2011 @@ -17,7 +17,7 @@ main(_) -> test_util:init_code_path(), - etap:plan(37), + etap:plan(40), Modules = [ couch_btree, couch_config, @@ -43,7 +43,10 @@ main(_) -> couch_os_process, couch_query_servers, couch_ref_counter, - couch_rep, + couch_replicator, + couch_replicator_doc_copier, + couch_replicator_rev_finder, + couch_replicator_utils, couch_rep_sup, couch_server, couch_server_sup, Added: couchdb/trunk/test/etap/190-json-stream-parse.t URL: http://svn.apache.org/viewvc/couchdb/trunk/test/etap/190-json-stream-parse.t?rev=1071375&view=auto ============================================================================== --- couchdb/trunk/test/etap/190-json-stream-parse.t (added) +++ couchdb/trunk/test/etap/190-json-stream-parse.t Wed Feb 16 20:05:31 2011 @@ -0,0 +1,184 @@ +#!/usr/bin/env escript +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +main(_) -> + test_util:init_code_path(), + etap:plan(99), + case (catch test()) of + ok -> + etap:end_tests(); + Other -> + etap:diag("Test died abnormally: ~p", [Other]), + etap:bail("Bad return value.") + end, + ok. + +test() -> + crypto:start(), + ok = test_raw_json_input(), + ok = test_1_byte_data_function(), + ok = test_multiple_bytes_data_function(). + + +test_raw_json_input() -> + etap:diag("Tests with raw JSON string as the input."), + lists:foreach( + fun({EJson, JsonString, Desc}) -> + etap:is( + equiv(EJson, json_stream_parse:to_ejson(JsonString)), + true, + Desc) + end, + cases()), + ok. + + +test_1_byte_data_function() -> + etap:diag("Tests with a 1 byte output data function as the input."), + lists:foreach( + fun({EJson, JsonString, Desc}) -> + DataFun = fun() -> single_byte_data_fun(JsonString) end, + etap:is( + equiv(EJson, json_stream_parse:to_ejson(DataFun)), + true, + Desc) + end, + cases()), + ok. + + +test_multiple_bytes_data_function() -> + etap:diag("Tests with a multiple bytes output data function as the input."), + lists:foreach( + fun({EJson, JsonString, Desc}) -> + DataFun = fun() -> multiple_bytes_data_fun(JsonString) end, + etap:is( + equiv(EJson, json_stream_parse:to_ejson(DataFun)), + true, + Desc) + end, + cases()), + ok. + + +cases() -> + [ + {1, "1", "integer numeric literial"}, + {3.1416, "3.14160", "float numeric literal"}, % text representation may truncate, trail zeroes + {-1, "-1", "negative integer numeric literal"}, + {-3.1416, "-3.14160", "negative float numeric literal"}, + {12.0e10, "1.20000e+11", "float literal in scientific notation"}, + {1.234E+10, "1.23400e+10", "another float literal in scientific notation"}, + {-1.234E-10, "-1.23400e-10", "negative float literal in scientific notation"}, + {10.0, "1.0e+01", "yet another float literal in scientific notation"}, + {123.456, "1.23456E+2", "yet another float literal in scientific notation"}, + {10.0, "1e1", "yet another float literal in scientific notation"}, + {<<"foo">>, "\"foo\"", "string literal"}, + {<<"foo", 5, "bar">>, "\"foo\\u0005bar\"", "string literal with \\u0005"}, + {<<"">>, "\"\"", "empty string literal"}, + {<<"\n\n\n">>, "\"\\n\\n\\n\"", "only new lines literal"}, + {<<"\" \b\f\r\n\t\"">>, "\"\\\" \\b\\f\\r\\n\\t\\\"\"", + "only white spaces string literal"}, + {null, "null", "null literal"}, + {true, "true", "true literal"}, + {false, "false", "false literal"}, + {<<"null">>, "\"null\"", "null string literal"}, + {<<"true">>, "\"true\"", "true string literal"}, + {<<"false">>, "\"false\"", "false string literal"}, + {{[]}, "{}", "empty object literal"}, + {{[{<<"foo">>, <<"bar">>}]}, "{\"foo\":\"bar\"}", + "simple object literal"}, + {{[{<<"foo">>, <<"bar">>}, {<<"baz">>, 123}]}, + "{\"foo\":\"bar\",\"baz\":123}", "another simple object literal"}, + {[], "[]", "empty array literal"}, + {[[]], "[[]]", "empty array literal inside a single element array literal"}, + {[1, <<"foo">>], "[1,\"foo\"]", "simple non-empty array literal"}, + {[1199344435545.0, 1], "[1199344435545.0,1]", + "another simple non-empty array literal"}, + {[false, true, 321, null], "[false, true, 321, null]", "array of literals"}, + {{[{<<"foo">>, [123]}]}, "{\"foo\":[123]}", + "object literal with an array valued property"}, + {{[{<<"foo">>, {[{<<"bar">>, true}]}}]}, + "{\"foo\":{\"bar\":true}}", "nested object literal"}, + {{[{<<"foo">>, []}, {<<"bar">>, {[{<<"baz">>, true}]}}, + {<<"alice">>, <<"bob">>}]}, + "{\"foo\":[],\"bar\":{\"baz\":true},\"alice\":\"bob\"}", + "complex object literal"}, + {[-123, <<"foo">>, {[{<<"bar">>, []}]}, null], + "[-123,\"foo\",{\"bar\":[]},null]", + "complex array literal"} + ]. + + +%% Test for equivalence of Erlang terms. +%% Due to arbitrary order of construction, equivalent objects might +%% compare unequal as erlang terms, so we need to carefully recurse +%% through aggregates (tuples and objects). +equiv({Props1}, {Props2}) -> + equiv_object(Props1, Props2); +equiv(L1, L2) when is_list(L1), is_list(L2) -> + equiv_list(L1, L2); +equiv(N1, N2) when is_number(N1), is_number(N2) -> + N1 == N2; +equiv(B1, B2) when is_binary(B1), is_binary(B2) -> + B1 == B2; +equiv(true, true) -> + true; +equiv(false, false) -> + true; +equiv(null, null) -> + true. + + +%% Object representation and traversal order is unknown. +%% Use the sledgehammer and sort property lists. +equiv_object(Props1, Props2) -> + L1 = lists:keysort(1, Props1), + L2 = lists:keysort(1, Props2), + Pairs = lists:zip(L1, L2), + true = lists:all( + fun({{K1, V1}, {K2, V2}}) -> + equiv(K1, K2) andalso equiv(V1, V2) + end, + Pairs). + + +%% Recursively compare tuple elements for equivalence. +equiv_list([], []) -> + true; +equiv_list([V1 | L1], [V2 | L2]) -> + equiv(V1, V2) andalso equiv_list(L1, L2). + + +single_byte_data_fun([]) -> + done; +single_byte_data_fun([H | T]) -> + {<>, fun() -> single_byte_data_fun(T) end}. + + +multiple_bytes_data_fun([]) -> + done; +multiple_bytes_data_fun(L) -> + N = crypto:rand_uniform(0, 7), + {Part, Rest} = split(L, N), + {list_to_binary(Part), fun() -> multiple_bytes_data_fun(Rest) end}. + +split(L, N) when length(L) =< N -> + {L, []}; +split(L, N) -> + take(N, L, []). + +take(0, L, Acc) -> + {lists:reverse(Acc), L}; +take(N, [H|L], Acc) -> + take(N - 1, L, [H | Acc]). Modified: couchdb/trunk/test/etap/Makefile.am URL: http://svn.apache.org/viewvc/couchdb/trunk/test/etap/Makefile.am?rev=1071375&r1=1071374&r2=1071375&view=diff ============================================================================== --- couchdb/trunk/test/etap/Makefile.am (original) +++ couchdb/trunk/test/etap/Makefile.am Wed Feb 16 20:05:31 2011 @@ -57,10 +57,6 @@ EXTRA_DIST = \ 083-config-no-files.t \ 090-task-status.t \ 100-ref-counter.t \ - 110-replication-httpc.t \ - 111-replication-changes-feed.t \ - 112-replication-missing-revs.t \ - 113-replication-attachment-comp.t \ 120-stats-collect.t \ 121-stats-aggregates.cfg \ 121-stats-aggregates.ini \ @@ -81,4 +77,5 @@ EXTRA_DIST = \ 173-os-daemon-cfg-register.es \ 173-os-daemon-cfg-register.t \ 180-http-proxy.ini \ - 180-http-proxy.t + 180-http-proxy.t \ + 190-json-stream-parse.t