couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fdman...@apache.org
Subject svn commit: r1071375 [3/3] - in /couchdb/trunk: etc/couchdb/ share/www/script/test/ src/couchdb/ test/etap/
Date Wed, 16 Feb 2011 20:05:32 GMT
Added: couchdb/trunk/src/couchdb/couch_replicator_doc_copier.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_replicator_doc_copier.erl?rev=1071375&view=auto
==============================================================================
--- couchdb/trunk/src/couchdb/couch_replicator_doc_copier.erl (added)
+++ couchdb/trunk/src/couchdb/couch_replicator_doc_copier.erl Wed Feb 16 20:05:31 2011
@@ -0,0 +1,483 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_doc_copier).
+-behaviour(gen_server).
+
+% public API
+-export([start_link/5]).
+
+% gen_server callbacks
+-export([init/1, terminate/2, code_change/3]).
+-export([handle_call/3, handle_cast/2, handle_info/2]).
+
+-include("couch_db.hrl").
+-include("couch_api_wrap.hrl").
+-include("couch_replicator.hrl").
+
+% TODO: maybe make both buffer max sizes configurable
+-define(DOC_BUFFER_BYTE_SIZE, 512 * 1024).   % for remote targets
+-define(DOC_BUFFER_LEN, 10).                 % for local targets, # of documents
+-define(MAX_BULK_ATT_SIZE, 64 * 1024).
+-define(MAX_BULK_ATTS_PER_DOC, 8).
+
+-import(couch_replicator_utils, [
+    open_db/1,
+    close_db/1,
+    start_db_compaction_notifier/2,
+    stop_db_compaction_notifier/1
+]).
+
+-record(batch, {
+    docs = [],
+    size = 0
+}).
+
+-record(state, {
+    loop,
+    cp,
+    max_parallel_conns,
+    source,
+    target,
+    readers = [],
+    writer = nil,
+    pending_fetch = nil,
+    flush_waiter = nil,
+    highest_seq_seen = ?LOWEST_SEQ,
+    stats = #rep_stats{},
+    source_db_compaction_notifier = nil,
+    target_db_compaction_notifier = nil,
+    batch = #batch{}
+}).
+
+
+
+start_link(Cp, #db{} = Source, Target, MissingRevsQueue, _MaxConns) ->
+    Pid = spawn_link(
+        fun() -> queue_fetch_loop(Source, Target, Cp, MissingRevsQueue) end),
+    {ok, Pid};
+
+start_link(Cp, Source, Target, MissingRevsQueue, MaxConns) ->
+    gen_server:start_link(
+        ?MODULE, {Cp, Source, Target, MissingRevsQueue, MaxConns}, []).
+
+
+init({Cp, Source, Target, MissingRevsQueue, MaxConns}) ->
+    process_flag(trap_exit, true),
+    Parent = self(),
+    LoopPid = spawn_link(
+        fun() -> queue_fetch_loop(Source, Target, Parent, MissingRevsQueue) end
+    ),
+    State = #state{
+        cp = Cp,
+        max_parallel_conns = MaxConns,
+        loop = LoopPid,
+        source = open_db(Source),
+        target = open_db(Target),
+        source_db_compaction_notifier =
+            start_db_compaction_notifier(Source, self()),
+        target_db_compaction_notifier =
+            start_db_compaction_notifier(Target, self())
+    },
+    {ok, State}.
+
+
+handle_call({seq_done, Seq, RevCount}, {Pid, _},
+    #state{loop = Pid, highest_seq_seen = HighSeq, stats = Stats} = State) ->
+    NewState = State#state{
+        highest_seq_seen = lists:max([Seq, HighSeq]),
+        stats = Stats#rep_stats{
+            missing_checked = Stats#rep_stats.missing_checked + RevCount
+        }
+    },
+    {reply, ok, NewState};
+
+handle_call({fetch_doc, {_Id, Revs, _PAs, Seq} = Params}, {Pid, _} = From,
+    #state{loop = Pid, readers = Readers, pending_fetch = nil,
+        highest_seq_seen = HighSeq, stats = Stats, source = Src, target = Tgt,
+        max_parallel_conns = MaxConns} = State) ->
+    Stats2 = Stats#rep_stats{
+        missing_checked = Stats#rep_stats.missing_checked + length(Revs),
+        missing_found = Stats#rep_stats.missing_found + length(Revs)
+    },
+    case length(Readers) of
+    Size when Size < MaxConns ->
+        Reader = spawn_doc_reader(Src, Tgt, Params),
+        NewState = State#state{
+            highest_seq_seen = lists:max([Seq, HighSeq]),
+            stats = Stats2,
+            readers = [Reader | Readers]
+        },
+        {reply, ok, NewState};
+    _ ->
+        NewState = State#state{
+            highest_seq_seen = lists:max([Seq, HighSeq]),
+            stats = Stats2,
+            pending_fetch = {From, Params}
+        },
+        {noreply, NewState}
+    end;
+
+handle_call({batch_doc, Doc}, From, State) ->
+    gen_server:reply(From, ok),
+    {noreply, maybe_flush_docs(Doc, State)};
+
+handle_call({doc_flushed, true}, _From, #state{stats = Stats} = State) ->
+    NewStats = Stats#rep_stats{
+        docs_read = Stats#rep_stats.docs_read + 1,
+        docs_written = Stats#rep_stats.docs_written + 1
+    },
+    {reply, ok, State#state{stats = NewStats}};
+
+handle_call({doc_flushed, false}, _From, #state{stats = Stats} = State) ->
+    NewStats = Stats#rep_stats{
+        docs_read = Stats#rep_stats.docs_read + 1,
+        doc_write_failures = Stats#rep_stats.doc_write_failures + 1
+    },
+    {reply, ok, State#state{stats = NewStats}};
+
+handle_call({add_write_stats, Written, Failed}, _From,
+    #state{stats = Stats} = State) ->
+    NewStats = Stats#rep_stats{
+        docs_written = Stats#rep_stats.docs_written + Written,
+        doc_write_failures = Stats#rep_stats.doc_write_failures + Failed
+    },
+    {reply, ok, State#state{stats = NewStats}};
+
+handle_call(flush, {Pid, _} = From,
+    #state{loop = Pid, writer = nil, flush_waiter = nil,
+        target = Target, batch = Batch} = State) ->
+    State2 = case State#state.readers of
+    [] ->
+        State#state{writer = spawn_writer(Target, Batch)};
+    _ ->
+        State
+    end,
+    {noreply, State2#state{flush_waiter = From}}.
+
+
+handle_cast({db_compacted, DbName},
+    #state{source = #db{name = DbName} = Source} = State) ->
+    {ok, NewSource} = couch_db:reopen(Source),
+    {noreply, State#state{source = NewSource}};
+
+handle_cast({db_compacted, DbName},
+    #state{target = #db{name = DbName} = Target} = State) ->
+    {ok, NewTarget} = couch_db:reopen(Target),
+    {noreply, State#state{target = NewTarget}};
+
+handle_cast(Msg, State) ->
+    {stop, {unexpected_async_call, Msg}, State}.
+
+
+handle_info({'EXIT', Pid, normal}, #state{loop = Pid} = State) ->
+    #state{
+        batch = #batch{docs = []}, readers = [], writer = nil,
+        pending_fetch = nil, flush_waiter = nil
+    } = State,
+    {stop, normal, State};
+
+handle_info({'EXIT', Pid, normal}, #state{writer = Pid} = State) ->
+    {noreply, after_full_flush(State)};
+
+handle_info({'EXIT', Pid, normal}, #state{writer = nil} = State) ->
+    #state{
+        readers = Readers, writer = Writer, batch = Batch,
+        source = Source, target = Target,
+        pending_fetch = Fetch, flush_waiter = FlushWaiter
+    } = State,
+    case Readers -- [Pid] of
+    Readers ->
+        {noreply, State};
+    Readers2 ->
+        State2 = case Fetch of
+        nil ->
+            case (FlushWaiter =/= nil) andalso (Writer =:= nil) andalso
+                (Readers2 =:= [])  of
+            true ->
+                State#state{
+                    readers = Readers2,
+                    writer = spawn_writer(Target, Batch)
+                };
+            false ->
+                State#state{readers = Readers2}
+            end;
+        {From, FetchParams} ->
+            Reader = spawn_doc_reader(Source, Target, FetchParams),
+            gen_server:reply(From, ok),
+            State#state{
+                readers = [Reader | Readers2],
+                pending_fetch = nil
+            }
+        end,
+        {noreply, State2}
+    end;
+
+handle_info({'EXIT', Pid, Reason}, State) ->
+   {stop, {process_died, Pid, Reason}, State}.
+
+
+terminate(_Reason, State) ->
+    close_db(State#state.source),
+    close_db(State#state.target),
+    stop_db_compaction_notifier(State#state.source_db_compaction_notifier),
+    stop_db_compaction_notifier(State#state.target_db_compaction_notifier).
+
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+
+queue_fetch_loop(Source, Target, Parent, MissingRevsQueue) ->
+    case couch_work_queue:dequeue(MissingRevsQueue, 1) of
+    closed ->
+        ok;
+    {ok, [IdRevs]} ->
+        case Source of
+        #db{} ->
+            Source2 = open_db(Source),
+            Target2 = open_db(Target),
+            {Stats, HighSeqDone} = local_process_batch(
+                IdRevs, Source2, Target2, #batch{}, #rep_stats{}, ?LOWEST_SEQ),
+            close_db(Source2),
+            close_db(Target2),
+            ok = gen_server:cast(Parent, {report_seq_done, HighSeqDone, Stats}),
+            ?LOG_DEBUG("Worker reported completion of seq ~p", [HighSeqDone]);
+        #httpdb{} ->
+            remote_process_batch(IdRevs, Parent)
+        end,
+        queue_fetch_loop(Source, Target, Parent, MissingRevsQueue)
+    end.
+
+
+local_process_batch([], _Src, _Tgt, #batch{docs = []}, Stats, HighestSeqDone) ->
+    {Stats, HighestSeqDone};
+
+local_process_batch([], _Source, Target, #batch{docs = Docs, size = Size},
+    Stats, HighestSeqDone) ->
+    case Target of
+    #httpdb{} ->
+        ?LOG_DEBUG("Worker flushing doc batch of size ~p bytes", [Size]);
+    #db{} ->
+        ?LOG_DEBUG("Worker flushing doc batch of ~p docs", [Size])
+    end,
+    {Written, WriteFailures} = flush_docs(Target, Docs),
+    Stats2 = Stats#rep_stats{
+        docs_written = Stats#rep_stats.docs_written + Written,
+        doc_write_failures = Stats#rep_stats.doc_write_failures + WriteFailures
+    },
+    {Stats2, HighestSeqDone};
+
+local_process_batch([{Seq, {Id, Revs, NotMissingCount, PAs}} | Rest],
+    Source, Target, Batch, Stats, HighestSeqSeen) ->
+    {ok, DocList} = fetch_doc(
+        Source, {Id, Revs, PAs, Seq}, fun local_doc_handler/2, []),
+    {Batch2, Written, WriteFailures} = lists:foldl(
+        fun(Doc, {Batch0, W0, F0}) ->
+            {Batch1, W, F} = maybe_flush_docs(Target, Batch0, Doc),
+            {Batch1, W0 + W, F0 + F}
+        end,
+        {Batch, 0, 0}, DocList),
+    Stats2 = Stats#rep_stats{
+        missing_checked = Stats#rep_stats.missing_checked + length(Revs)
+            + NotMissingCount,
+        missing_found = Stats#rep_stats.missing_found + length(Revs),
+        docs_read = Stats#rep_stats.docs_read + length(DocList),
+        docs_written = Stats#rep_stats.docs_written + Written,
+        doc_write_failures = Stats#rep_stats.doc_write_failures + WriteFailures
+    },
+    local_process_batch(
+        Rest, Source, Target, Batch2, Stats2, lists:max([Seq, HighestSeqSeen])).
+
+
+remote_process_batch([], Parent) ->
+    ok = gen_server:call(Parent, flush, infinity);
+
+remote_process_batch([{Seq, {Id, Revs, NotMissing, PAs}} | Rest], Parent) ->
+    case NotMissing > 0 of
+    true ->
+        ok = gen_server:call(Parent, {seq_done, Seq, NotMissing}, infinity);
+    false ->
+        ok
+    end,
+    % When the source is a remote database, we fetch a single document revision
+    % per HTTP request. This is mostly to facilitate retrying of HTTP requests
+    % due to network transient failures. It also helps not exceeding the maximum
+    % URL length allowed by proxies and Mochiweb.
+    lists:foreach(
+        fun(Rev) ->
+            ok = gen_server:call(
+                Parent, {fetch_doc, {Id, [Rev], PAs, Seq}}, infinity)
+        end,
+        Revs),
+    remote_process_batch(Rest, Parent).
+
+
+spawn_doc_reader(Source, Target, FetchParams) ->
+    Parent = self(),
+    spawn_link(fun() ->
+        Source2 = open_db(Source),
+        fetch_doc(
+            Source2, FetchParams, fun remote_doc_handler/2, {Parent, Target}),
+        close_db(Source2)
+    end).
+
+
+fetch_doc(Source, {Id, Revs, PAs, _Seq}, DocHandler, Acc) ->
+    couch_api_wrap:open_doc_revs(
+        Source, Id, Revs, [{atts_since, PAs}], DocHandler, Acc).
+
+
+local_doc_handler({ok, Doc}, DocList) ->
+    [Doc | DocList];
+local_doc_handler(_, DocList) ->
+    DocList.
+
+
+remote_doc_handler({ok, #doc{atts = []} = Doc}, {Parent, _} = Acc) ->
+    ok = gen_server:call(Parent, {batch_doc, Doc}, infinity),
+    Acc;
+remote_doc_handler({ok, Doc}, {Parent, Target} = Acc) ->
+    % Immediately flush documents with attachments received from a remote
+    % source. The data property of each attachment is a function that starts
+    % streaming the attachment data from the remote source, therefore it's
+    % convenient to call it ASAP to avoid ibrowse inactivity timeouts.
+    Target2 = open_db(Target),
+    Success = (flush_doc(Target2, Doc) =:= ok),
+    ok = gen_server:call(Parent, {doc_flushed, Success}, infinity),
+    close_db(Target2),
+    Acc;
+remote_doc_handler(_, Acc) ->
+    Acc.
+
+
+spawn_writer(Target, #batch{docs = DocList, size = Size}) ->
+    case {Target, Size > 0} of
+    {#httpdb{}, true} ->
+        ?LOG_DEBUG("Worker flushing doc batch of size ~p bytes", [Size]);
+    {#db{}, true} ->
+        ?LOG_DEBUG("Worker flushing doc batch of ~p docs", [Size]);
+    _ ->
+        ok
+    end,
+    Parent = self(),
+    spawn_link(
+        fun() ->
+            Target2 = open_db(Target),
+            {Written, Failed} = flush_docs(Target2, DocList),
+            close_db(Target2),
+            ok = gen_server:call(
+                Parent, {add_write_stats, Written, Failed}, infinity)
+        end).
+
+
+after_full_flush(#state{cp = Cp, stats = Stats, flush_waiter = Waiter,
+        highest_seq_seen = HighSeqDone} = State) ->
+    ok = gen_server:cast(Cp, {report_seq_done, HighSeqDone, Stats}),
+    ?LOG_DEBUG("Worker reported completion of seq ~p", [HighSeqDone]),
+    gen_server:reply(Waiter, ok),
+    State#state{
+        stats = #rep_stats{},
+        flush_waiter = nil,
+        writer = nil,
+        batch = #batch{},
+        highest_seq_seen = ?LOWEST_SEQ
+    }.
+
+
+maybe_flush_docs(Doc, #state{target = Target, batch = Batch,
+        stats = Stats} = State) ->
+    {Batch2, W, F} = maybe_flush_docs(Target, Batch, Doc),
+    Stats2 = Stats#rep_stats{
+        docs_read = Stats#rep_stats.docs_read + 1,
+        docs_written = Stats#rep_stats.docs_written + W,
+        doc_write_failures = Stats#rep_stats.doc_write_failures + F
+    },
+    State#state{
+        stats = Stats2,
+        batch = Batch2
+    }.
+
+
+maybe_flush_docs(#httpdb{} = Target,
+    #batch{docs = DocAcc, size = SizeAcc} = Batch, #doc{atts = Atts} = Doc) ->
+    case (length(Atts) > ?MAX_BULK_ATTS_PER_DOC) orelse
+        lists:any(
+            fun(A) -> A#att.disk_len > ?MAX_BULK_ATT_SIZE end, Atts) of
+    true ->
+        ?LOG_DEBUG("Worker flushing doc with attachments", []),
+        case flush_doc(Target, Doc) of
+        ok ->
+            {Batch, 1, 0};
+        error ->
+            {Batch, 0, 1}
+        end;
+    false ->
+        JsonDoc = ?JSON_ENCODE(couch_doc:to_json_obj(Doc, [revs, attachments])),
+        case SizeAcc + iolist_size(JsonDoc) of
+        SizeAcc2 when SizeAcc2 > ?DOC_BUFFER_BYTE_SIZE ->
+            ?LOG_DEBUG("Worker flushing doc batch of size ~p bytes", [SizeAcc2]),
+            {Written, Failed} = flush_docs(Target, [JsonDoc | DocAcc]),
+            {#batch{}, Written, Failed};
+        SizeAcc2 ->
+            {#batch{docs = [JsonDoc | DocAcc], size = SizeAcc2}, 0, 0}
+        end
+    end;
+
+maybe_flush_docs(#db{} = Target, #batch{docs = DocAcc, size = SizeAcc},
+    #doc{atts = []} = Doc) ->
+    case SizeAcc + 1 of
+    SizeAcc2 when SizeAcc2 >= ?DOC_BUFFER_LEN ->
+        ?LOG_DEBUG("Worker flushing doc batch of ~p docs", [SizeAcc2]),
+        {Written, Failed} = flush_docs(Target, [Doc | DocAcc]),
+        {#batch{}, Written, Failed};
+    SizeAcc2 ->
+        {#batch{docs = [Doc | DocAcc], size = SizeAcc2}, 0, 0}
+    end;
+
+maybe_flush_docs(#db{} = Target, Batch, Doc) ->
+    ?LOG_DEBUG("Worker flushing doc with attachments", []),
+    case flush_doc(Target, Doc) of
+    ok ->
+        {Batch, 1, 0};
+    error ->
+        {Batch, 0, 1}
+    end.
+
+
+flush_docs(_Target, []) ->
+    {0, 0};
+
+flush_docs(Target, DocList) when is_list(DocList) ->
+    {ok, Errors} = couch_api_wrap:update_docs(
+        Target, DocList, [delay_commit], replicated_changes),
+    DbUri = couch_api_wrap:db_uri(Target),
+    lists:foreach(
+        fun({[ {<<"id">>, Id}, {<<"error">>, <<"unauthorized">>} ]}) ->
+                ?LOG_ERROR("Replicator: unauthorized to write document"
+                    " `~s` to `~s`", [Id, DbUri]);
+            (_) ->
+                ok
+        end, Errors),
+    {length(DocList) - length(Errors), length(Errors)}.
+
+flush_doc(Target, Doc) ->
+    case couch_api_wrap:update_doc(Target, Doc, [], replicated_changes) of
+    {ok, _} ->
+        ok;
+    {error, <<"unauthorized">>} ->
+        ?LOG_ERROR("Replicator: unauthorized to write document `~s` to `~s`",
+            [Doc#doc.id, couch_api_wrap:db_uri(Target)]),
+        error;
+    _ ->
+        error
+    end.

Added: couchdb/trunk/src/couchdb/couch_replicator_rev_finder.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_replicator_rev_finder.erl?rev=1071375&view=auto
==============================================================================
--- couchdb/trunk/src/couchdb/couch_replicator_rev_finder.erl (added)
+++ couchdb/trunk/src/couchdb/couch_replicator_rev_finder.erl Wed Feb 16 20:05:31 2011
@@ -0,0 +1,88 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_rev_finder).
+
+-export([start_link/5]).
+
+-include("couch_db.hrl").
+
+-import(couch_replicator_utils, [
+    open_db/1,
+    close_db/1
+]).
+
+
+start_link(Cp, Target, ChangesQueue, MissingRevsQueue, BatchSize) ->
+    Pid = spawn_link(fun() ->
+        missing_revs_finder_loop(
+            Cp, Target, ChangesQueue, MissingRevsQueue, BatchSize)
+        end),
+    {ok, Pid}.
+
+
+missing_revs_finder_loop(Cp, Target, ChangesQueue, RevsQueue, BatchSize) ->
+    case couch_work_queue:dequeue(ChangesQueue, BatchSize) of
+    closed ->
+        ok;
+    {ok, DocInfos} ->
+        #doc_info{high_seq = ReportSeq} = lists:last(DocInfos),
+        ok = gen_server:cast(Cp, {report_seq, ReportSeq}),
+        ?LOG_DEBUG("Missing revs finder defined report seq ~p", [ReportSeq]),
+        IdRevs = [{Id, [Rev || #rev_info{rev = Rev} <- RevsInfo]} ||
+                    #doc_info{id = Id, revs = RevsInfo} <- DocInfos],
+        Target2 = open_db(Target),
+        {ok, Missing} = couch_api_wrap:get_missing_revs(Target2, IdRevs),
+        close_db(Target2),
+        queue_missing_revs(Missing, DocInfos, RevsQueue),
+        missing_revs_finder_loop(Cp, Target2, ChangesQueue, RevsQueue, BatchSize)
+    end.
+
+
+queue_missing_revs(Missing, DocInfos, Queue) ->
+    IdRevsSeqDict = dict:from_list(
+        [{Id, {[Rev || #rev_info{rev = Rev} <- RevsInfo], Seq}} ||
+            #doc_info{id = Id, revs = RevsInfo, high_seq = Seq} <- DocInfos]),
+    AllDict = lists:foldl(
+        fun({Id, MissingRevs, PAs}, Acc) ->
+            {_, Seq} = dict:fetch(Id, IdRevsSeqDict),
+            dict:store(Seq, {Id, MissingRevs, 0, PAs}, Acc)
+        end,
+        dict:new(), Missing),
+    AllDict2 = dict:fold(
+        fun(Id, {NotMissingRevs, Seq}, Acc) ->
+            case dict:find(Seq, Acc) of
+            error ->
+                dict:store(Seq, {Id, [], length(NotMissingRevs), []}, Acc);
+            {ok, {Id, MissingRevs, NotMissingCount, PAs}} ->
+                NotMissingCount2 = NotMissingCount + length(NotMissingRevs),
+                dict:store(Seq, {Id, MissingRevs, NotMissingCount2, PAs}, Acc)
+            end
+        end,
+        AllDict, non_missing(IdRevsSeqDict, Missing)),
+    ?LOG_DEBUG("Missing revs finder adding batch of ~p IdRevs to work queue",
+        [dict:size(AllDict2)]),
+    ok = couch_work_queue:queue(Queue, dict:to_list(AllDict2)).
+
+
+non_missing(NonMissingDict, []) ->
+    NonMissingDict;
+non_missing(IdRevsSeqDict, [{MissingId, MissingRevs, _} | Rest]) ->
+    {AllRevs, Seq} = dict:fetch(MissingId, IdRevsSeqDict),
+    case AllRevs -- MissingRevs of
+    [] ->
+        non_missing(dict:erase(MissingId, IdRevsSeqDict), Rest);
+    NotMissing ->
+        non_missing(
+            dict:store(MissingId, {NotMissing, Seq}, IdRevsSeqDict),
+            Rest)
+    end.

Added: couchdb/trunk/src/couchdb/couch_replicator_utils.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_replicator_utils.erl?rev=1071375&view=auto
==============================================================================
--- couchdb/trunk/src/couchdb/couch_replicator_utils.erl (added)
+++ couchdb/trunk/src/couchdb/couch_replicator_utils.erl Wed Feb 16 20:05:31 2011
@@ -0,0 +1,382 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_utils).
+
+-export([parse_rep_doc/2]).
+-export([update_rep_doc/2]).
+-export([ensure_rep_db_exists/0]).
+-export([open_db/1, close_db/1]).
+-export([start_db_compaction_notifier/2, stop_db_compaction_notifier/1]).
+-export([replication_id/2]).
+
+-include("couch_db.hrl").
+-include("couch_api_wrap.hrl").
+-include("couch_replicator.hrl").
+-include("couch_js_functions.hrl").
+-include("../ibrowse/ibrowse.hrl").
+
+-import(couch_util, [
+    get_value/2,
+    get_value/3
+]).
+
+
+parse_rep_doc({Props} = RepObj, UserCtx) ->
+    ProxyParams = parse_proxy_params(get_value(<<"proxy">>, Props, <<>>)),
+    Options = make_options(Props),
+    Source = parse_rep_db(get_value(<<"source">>, Props), ProxyParams, Options),
+    Target = parse_rep_db(get_value(<<"target">>, Props), ProxyParams, Options),
+    Rep = #rep{
+        source = Source,
+        target = Target,
+        options = Options,
+        user_ctx = UserCtx,
+        doc = RepObj
+    },
+    {ok, Rep#rep{id = replication_id(Rep)}}.
+
+
+update_rep_doc({Props} = _RepDoc, KVs) ->
+    case get_value(<<"_id">>, Props) of
+    undefined ->
+        ok;
+    RepDocId ->
+        {ok, RepDb} = ensure_rep_db_exists(),
+        case couch_db:open_doc(RepDb, RepDocId, []) of
+        {ok, LatestRepDoc} ->
+            update_rep_doc(RepDb, LatestRepDoc, KVs);
+        _ ->
+            ok
+        end,
+        couch_db:close(RepDb)
+    end.
+
+update_rep_doc(RepDb, #doc{body = {RepDocBody}} = RepDoc, KVs) ->
+    NewRepDocBody = lists:foldl(
+        fun({<<"_replication_state">> = K, _V} = KV, Body) ->
+                Body1 = lists:keystore(K, 1, Body, KV),
+                {Mega, Secs, _} = erlang:now(),
+                UnixTime = Mega * 1000000 + Secs,
+                lists:keystore(
+                    <<"_replication_state_time">>, 1,
+                    Body1, {<<"_replication_state_time">>, UnixTime});
+            ({K, _V} = KV, Body) ->
+                lists:keystore(K, 1, Body, KV)
+        end,
+        RepDocBody,
+        KVs
+    ),
+    % might not succeed - when the replication doc is deleted right
+    % before this update (not an error)
+    couch_db:update_doc(
+        RepDb,
+        RepDoc#doc{body = {NewRepDocBody}},
+        []).
+
+
+ensure_rep_db_exists() ->
+    DbName = ?l2b(couch_config:get("replicator", "db", "_replicator")),
+    Opts = [
+        {user_ctx, #user_ctx{roles=[<<"_admin">>, <<"_replicator">>]}},
+        sys_db
+    ],
+    case couch_db:open(DbName, Opts) of
+    {ok, Db} ->
+        Db;
+    _Error ->
+        {ok, Db} = couch_db:create(DbName, Opts)
+    end,
+    ok = ensure_rep_ddoc_exists(Db, <<"_design/_replicator">>),
+    {ok, Db}.
+
+
+ensure_rep_ddoc_exists(RepDb, DDocID) ->
+    case couch_db:open_doc(RepDb, DDocID, []) of
+    {ok, _Doc} ->
+        ok;
+    _ ->
+        DDoc = couch_doc:from_json_obj({[
+            {<<"_id">>, DDocID},
+            {<<"language">>, <<"javascript">>},
+            {<<"validate_doc_update">>, ?REP_DB_DOC_VALIDATE_FUN}
+        ]}),
+        {ok, _Rev} = couch_db:update_doc(RepDb, DDoc, [])
+    end,
+    ok.
+
+
+replication_id(#rep{options = Options} = Rep) ->
+    BaseId = replication_id(Rep, ?REP_ID_VERSION),
+    {BaseId, maybe_append_options([continuous, create_target], Options)}.
+
+
+% Versioned clauses for generating replication IDs.
+% If a change is made to how replications are identified,
+% please add a new clause and increase ?REP_ID_VERSION.
+
+replication_id(#rep{user_ctx = UserCtx} = Rep, 2) ->
+    {ok, HostName} = inet:gethostname(),
+    Port = mochiweb_socket_server:get(couch_httpd, port),
+    Src = get_rep_endpoint(UserCtx, Rep#rep.source),
+    Tgt = get_rep_endpoint(UserCtx, Rep#rep.target),
+    maybe_append_filters([HostName, Port, Src, Tgt], Rep);
+
+replication_id(#rep{user_ctx = UserCtx} = Rep, 1) ->
+    {ok, HostName} = inet:gethostname(),
+    Src = get_rep_endpoint(UserCtx, Rep#rep.source),
+    Tgt = get_rep_endpoint(UserCtx, Rep#rep.target),
+    maybe_append_filters([HostName, Src, Tgt], Rep).
+
+
+maybe_append_filters(Base,
+        #rep{source = Source, user_ctx = UserCtx, options = Options}) ->
+    Base2 = Base ++
+        case get_value(filter, Options) of
+        undefined ->
+            case get_value(doc_ids, Options) of
+            undefined ->
+                [];
+            DocIds ->
+                [DocIds]
+            end;
+        Filter ->
+            [filter_code(Filter, Source, UserCtx),
+                get_value(query_params, Options, {[]})]
+        end,
+    couch_util:to_hex(couch_util:md5(term_to_binary(Base2))).
+
+
+filter_code(Filter, Source, UserCtx) ->
+    {match, [DDocName, FilterName]} =
+        re:run(Filter, "(.*?)/(.*)", [{capture, [1, 2], binary}]),
+    {ok, Db} = couch_api_wrap:db_open(Source, [{user_ctx, UserCtx}]),
+    try
+        {ok, #doc{body = Body}} =
+            couch_api_wrap:open_doc(Db, <<"_design/", DDocName/binary>>, []),
+        Code = couch_util:get_nested_json_value(
+            Body, [<<"filters">>, FilterName]),
+        re:replace(Code, [$^, "\s*(.*?)\s*", $$], "\\1", [{return, binary}])
+    after
+        couch_api_wrap:db_close(Db)
+    end.
+
+
+maybe_append_options(Options, RepOptions) ->
+    lists:foldl(fun(Option, Acc) ->
+        Acc ++
+        case get_value(Option, RepOptions, false) of
+        true ->
+            "+" ++ atom_to_list(Option);
+        false ->
+            ""
+        end
+    end, [], Options).
+
+
+get_rep_endpoint(_UserCtx, #httpdb{url=Url, headers=Headers, oauth=OAuth}) ->
+    DefaultHeaders = (#httpdb{})#httpdb.headers,
+    case OAuth of
+    nil ->
+        {remote, Url, Headers -- DefaultHeaders};
+    #oauth{} ->
+        {remote, Url, Headers -- DefaultHeaders, OAuth}
+    end;
+get_rep_endpoint(UserCtx, <<DbName/binary>>) ->
+    {local, DbName, UserCtx}.
+
+
+parse_rep_db({Props}, ProxyParams, Options) ->
+    Url = maybe_add_trailing_slash(get_value(<<"url">>, Props)),
+    {AuthProps} = get_value(<<"auth">>, Props, {[]}),
+    {BinHeaders} = get_value(<<"headers">>, Props, {[]}),
+    Headers = lists:ukeysort(1, [{?b2l(K), ?b2l(V)} || {K, V} <- BinHeaders]),
+    DefaultHeaders = (#httpdb{})#httpdb.headers,
+    OAuth = case get_value(<<"oauth">>, AuthProps) of
+    undefined ->
+        nil;
+    {OauthProps} ->
+        #oauth{
+            consumer_key = ?b2l(get_value(<<"consumer_key">>, OauthProps)),
+            token = ?b2l(get_value(<<"token">>, OauthProps)),
+            token_secret = ?b2l(get_value(<<"token_secret">>, OauthProps)),
+            consumer_secret = ?b2l(get_value(<<"consumer_secret">>, OauthProps)),
+            signature_method =
+                case get_value(<<"signature_method">>, OauthProps) of
+                undefined ->        hmac_sha1;
+                <<"PLAINTEXT">> ->  plaintext;
+                <<"HMAC-SHA1">> ->  hmac_sha1;
+                <<"RSA-SHA1">> ->   rsa_sha1
+                end
+        }
+    end,
+    #httpdb{
+        url = Url,
+        oauth = OAuth,
+        headers = lists:ukeymerge(1, Headers, DefaultHeaders),
+        ibrowse_options = lists:keysort(1,
+            [{socket_options, get_value(socket_options, Options)} |
+                ProxyParams ++ ssl_params(Url)]),
+        timeout = get_value(connection_timeout, Options),
+        http_connections = get_value(http_connections, Options),
+        http_pipeline_size = get_value(http_pipeline_size, Options)
+    };
+parse_rep_db(<<"http://", _/binary>> = Url, ProxyParams, Options) ->
+    parse_rep_db({[{<<"url">>, Url}]}, ProxyParams, Options);
+parse_rep_db(<<"https://", _/binary>> = Url, ProxyParams, Options) ->
+    parse_rep_db({[{<<"url">>, Url}]}, ProxyParams, Options);
+parse_rep_db(<<DbName/binary>>, _ProxyParams, _Options) ->
+    DbName.
+
+
+maybe_add_trailing_slash(Url) when is_binary(Url) ->
+    maybe_add_trailing_slash(?b2l(Url));
+maybe_add_trailing_slash(Url) ->
+    case lists:last(Url) of
+    $/ ->
+        Url;
+    _ ->
+        Url ++ "/"
+    end.
+
+
+make_options(Props) ->
+    Options = lists:ukeysort(1, convert_options(Props)),
+    DefWorkers = couch_config:get("replicator", "worker_processes", "4"),
+    DefBatchSize = couch_config:get("replicator", "worker_batch_size", "1000"),
+    DefConns = couch_config:get("replicator", "http_connections", "20"),
+    DefPipeSize = couch_config:get("replicator", "http_pipeline_size", "50"),
+    DefTimeout = couch_config:get("replicator", "connection_timeout", "30000"),
+    {ok, DefSocketOptions} = couch_util:parse_term(
+        couch_config:get("replicator", "socket_options",
+            "[{keepalive, true}, {nodelay, false}]")),
+    lists:ukeymerge(1, Options, [
+        {connection_timeout, list_to_integer(DefTimeout)},
+        {http_connections, list_to_integer(DefConns)},
+        {http_pipeline_size, list_to_integer(DefPipeSize)},
+        {socket_options, DefSocketOptions},
+        {worker_batch_size, list_to_integer(DefBatchSize)},
+        {worker_processes, list_to_integer(DefWorkers)}
+    ]).
+
+
+convert_options([])->
+    [];
+convert_options([{<<"cancel">>, V} | R]) ->
+    [{cancel, V} | convert_options(R)];
+convert_options([{<<"create_target">>, V} | R]) ->
+    [{create_target, V} | convert_options(R)];
+convert_options([{<<"continuous">>, V} | R]) ->
+    [{continuous, V} | convert_options(R)];
+convert_options([{<<"filter">>, V} | R]) ->
+    [{filter, V} | convert_options(R)];
+convert_options([{<<"query_params">>, V} | R]) ->
+    [{query_params, V} | convert_options(R)];
+convert_options([{<<"doc_ids">>, V} | R]) ->
+    % Ensure same behaviour as old replicator: accept a list of percent
+    % encoded doc IDs.
+    DocIds = [?l2b(couch_httpd:unquote(Id)) || Id <- V],
+    [{doc_ids, DocIds} | convert_options(R)];
+convert_options([{<<"worker_processes">>, V} | R]) ->
+    [{worker_processes, couch_util:to_integer(V)} | convert_options(R)];
+convert_options([{<<"worker_batch_size">>, V} | R]) ->
+    [{worker_batch_size, couch_util:to_integer(V)} | convert_options(R)];
+convert_options([{<<"http_connections">>, V} | R]) ->
+    [{http_connections, couch_util:to_integer(V)} | convert_options(R)];
+convert_options([{<<"http_pipeline_size">>, V} | R]) ->
+    [{http_pipeline_size, couch_util:to_integer(V)} | convert_options(R)];
+convert_options([{<<"connection_timeout">>, V} | R]) ->
+    [{connection_timeout, couch_util:to_integer(V)} | convert_options(R)];
+convert_options([{<<"socket_options">>, V} | R]) ->
+    {ok, SocketOptions} = couch_util:parse_term(V),
+    [{socket_options, SocketOptions} | convert_options(R)];
+convert_options([_ | R]) -> % skip unknown option
+    convert_options(R).
+
+
+parse_proxy_params(ProxyUrl) when is_binary(ProxyUrl) ->
+    parse_proxy_params(?b2l(ProxyUrl));
+parse_proxy_params([]) ->
+    [];
+parse_proxy_params(ProxyUrl) ->
+    #url{
+        host = Host,
+        port = Port,
+        username = User,
+        password = Passwd
+    } = ibrowse_lib:parse_url(ProxyUrl),
+    [{proxy_host, Host}, {proxy_port, Port}] ++
+        case is_list(User) andalso is_list(Passwd) of
+        false ->
+            [];
+        true ->
+            [{proxy_user, User}, {proxy_password, Passwd}]
+        end.
+
+
+ssl_params(Url) ->
+    case ibrowse_lib:parse_url(Url) of
+    #url{protocol = https} ->
+        Depth = list_to_integer(
+            couch_config:get("replicator", "ssl_certificate_max_depth", "3")
+        ),
+        VerifyCerts = couch_config:get("replicator", "verify_ssl_certificates"),
+        SslOpts = [{depth, Depth} | ssl_verify_options(VerifyCerts =:= "true")],
+        [{is_ssl, true}, {ssl_options, SslOpts}];
+    #url{protocol = http} ->
+        []
+    end.
+
+ssl_verify_options(Value) ->
+    ssl_verify_options(Value, erlang:system_info(otp_release)).
+
+ssl_verify_options(true, OTPVersion) when OTPVersion >= "R14" ->
+    CAFile = couch_config:get("replicator", "ssl_trusted_certificates_file"),
+    [{verify, verify_peer}, {cacertfile, CAFile}];
+ssl_verify_options(false, OTPVersion) when OTPVersion >= "R14" ->
+    [{verify, verify_none}];
+ssl_verify_options(true, _OTPVersion) ->
+    CAFile = couch_config:get("replicator", "ssl_trusted_certificates_file"),
+    [{verify, 2}, {cacertfile, CAFile}];
+ssl_verify_options(false, _OTPVersion) ->
+    [{verify, 0}].
+
+
+open_db(#db{name = Name, user_ctx = UserCtx, options = Options}) ->
+    {ok, Db} = couch_db:open(Name, [{user_ctx, UserCtx} | Options]),
+    Db;
+open_db(HttpDb) ->
+    HttpDb.
+
+
+close_db(#db{} = Db) ->
+    couch_db:close(Db);
+close_db(_HttpDb) ->
+    ok.
+
+
+start_db_compaction_notifier(#db{name = DbName}, Server) ->
+    {ok, Notifier} = couch_db_update_notifier:start_link(
+        fun({compacted, DbName1}) when DbName1 =:= DbName ->
+                ok = gen_server:cast(Server, {db_compacted, DbName});
+            (_) ->
+                ok
+        end),
+    Notifier;
+start_db_compaction_notifier(_, _) ->
+    nil.
+
+
+stop_db_compaction_notifier(nil) ->
+    ok;
+stop_db_compaction_notifier(Notifier) ->
+    couch_db_update_notifier:stop(Notifier).

Modified: couchdb/trunk/src/couchdb/couch_work_queue.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_work_queue.erl?rev=1071375&r1=1071374&r2=1071375&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_work_queue.erl (original)
+++ couchdb/trunk/src/couchdb/couch_work_queue.erl Wed Feb 16 20:05:31 2011
@@ -59,8 +59,8 @@ close(Wq) ->
 
 init(Options) ->
     Q = #q{
-        max_size = couch_util:get_value(max_size, Options),
-        max_items = couch_util:get_value(max_items, Options),
+        max_size = couch_util:get_value(max_size, Options, nil),
+        max_items = couch_util:get_value(max_items, Options, nil),
         multi_workers = couch_util:get_value(multi_workers, Options, false)
     },
     {ok, Q}.
@@ -71,7 +71,7 @@ terminate(_Reason, #q{work_waiters=Worke
 
     
 handle_call({queue, Item}, From, #q{work_waiters = []} = Q0) ->
-    Q = Q0#q{size = Q0#q.size + byte_size(term_to_binary(Item)),
+    Q = Q0#q{size = increment_queue_size(Q0, Item),
                 items = Q0#q.items + 1,
                 queue = queue:in(Item, Q0#q.queue)},
     case (Q#q.size >= Q#q.max_size) orelse
@@ -153,3 +153,10 @@ code_change(_OldVsn, State, _Extra) ->
 
 handle_info(X, Q) ->
     {stop, X, Q}.
+
+increment_queue_size(#q{max_size = nil, size = Size}, _Item) ->
+    Size;
+increment_queue_size(#q{size = Size}, Item) when is_binary(Item) ->
+    Size + byte_size(Item);
+increment_queue_size(#q{size = Size}, Item) ->
+    Size + byte_size(term_to_binary(Item)).

Added: couchdb/trunk/src/couchdb/json_stream_parse.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/json_stream_parse.erl?rev=1071375&view=auto
==============================================================================
--- couchdb/trunk/src/couchdb/json_stream_parse.erl (added)
+++ couchdb/trunk/src/couchdb/json_stream_parse.erl Wed Feb 16 20:05:31 2011
@@ -0,0 +1,432 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(json_stream_parse).
+
+
+-export([events/2, to_ejson/1, collect_object/2]).
+
+-define(IS_WS(X), (X == $\  orelse X == $\t orelse X == $\n orelse X == $\r)).
+-define(IS_DELIM(X), (X == $} orelse X == $] orelse X == $,)).
+-define(IS_DIGIT(X), (X >= $0 andalso X =< $9)).
+
+
+
+% Parses the json into events.
+%
+% The DataFun param is a function that produces the data for parsing. When
+% called it must yield a tuple, or the atom done. The first element in the
+% tuple is the data itself, and the second element is a function to be called
+% next to get the next chunk of data in the stream.
+%
+% The EventFun is called everytime a json element is parsed. It must produce
+% a new function to be called for the next event.
+%
+% Events happen each time a new element in the json string is parsed.
+% For simple value types, the data itself is returned:
+% Strings
+% Integers
+% Floats
+% true
+% false
+% null
+%
+% For arrays, the start of the array is signaled by the event array_start
+% atom. The end is signaled by array_end. The events before the end are the
+% values, or nested values.
+%
+% For objects, the start of the object is signaled by the event object_start
+% atom. The end is signaled by object_end. Each key is signaled by
+% {key, KeyString}, and the following event is the value, or start of the
+% value (array_start, object_start).
+%
+events(Data,EventFun) when is_list(Data)->
+    events(list_to_binary(Data),EventFun);
+events(Data,EventFun) when is_binary(Data)->
+    events(fun() -> {Data, fun() -> done end} end,EventFun);
+events(DataFun,EventFun) ->
+    parse_one(DataFun, EventFun, <<>>).
+
+% converts the JSON directly to the erlang represention of Json
+to_ejson(DF) ->
+    {_DF2, EF, _Rest} = events(DF, fun(Ev) -> collect_events(Ev, []) end),
+    [[EJson]] = make_ejson(EF(get_results), [[]]),
+    EJson.
+
+
+% This function is used to return complete objects while parsing streams.
+%
+% Return this function from inside an event function right after getting an
+% object_start event. It then collects the remaining events for that object
+% and converts it to the erlang represention of Json.
+%
+% It then calls your ReturnControl function with the erlang object. Your
+% return control function then should yield another event function.
+%
+% This example stream parses an array of objects, calling
+% fun do_something_with_the_object/1 for each object.
+%
+%    ev_array(array_start) ->
+%        fun(Ev) -> ev_object_loop(Ev) end.
+%
+%    ev_object_loop(object_start) ->
+%        fun(Ev) ->
+%            json_stream_parse:collect_object(Ev,
+%                fun(Obj) ->
+%                    do_something_with_the_object(Obj),
+%                    fun(Ev2) -> ev_object_loop(Ev2) end
+%                end)
+%        end;
+%    ev_object_loop(array_end) ->
+%        ok
+%    end.
+%
+%    % invoke the parse
+%    main() ->
+%        ...
+%        events(Data, fun(Ev) -> ev_array(Ev) end).
+
+collect_object(Ev, ReturnControl) ->
+    collect_object(Ev, 0, ReturnControl, [object_start]).
+
+
+
+% internal methods
+
+parse_one(DF,EF,Acc) ->
+    case toke(DF, Acc) of
+    none ->
+        none;
+    {Token, DF2, Rest} ->
+        case Token of
+        "{" ->
+            EF2 = EF(object_start),
+            {DF3, EF3, Rest2} = parse_object(DF2, EF2, Rest),
+            {DF3, EF3(object_end), Rest2};
+        "[" ->
+            EF2 = EF(array_start),
+            {DF3, EF3, Rest2} = parse_array(DF2, EF2, Rest),
+            {DF3, EF3(array_end), Rest2};
+        Int when is_integer(Int)->
+            {DF2, EF(Int), Rest};
+        Float when is_float(Float)->
+            {DF2, EF(Float), Rest};
+        Atom when is_atom(Atom)->
+            {DF2, EF(Atom), Rest};
+        String when is_binary(String)->
+            {DF2, EF(String), Rest};
+        _OtherToken ->
+            err(unexpected_token)
+        end
+    end.
+
+must_parse_one(DF,EF,Acc,Error)->
+    case parse_one(DF, EF, Acc) of
+    none ->
+        err(Error);
+    Else ->
+        Else
+    end.
+
+must_toke(DF, Data, Error) ->
+    case toke(DF, Data) of
+    none ->
+        err(Error);
+    Result ->
+        Result
+    end.
+
+toke(DF, <<>>) ->
+    case DF() of
+    done ->
+        none;
+    {Data, DF2} ->
+        toke(DF2, Data)
+    end;
+toke(DF, <<C,Rest/binary>>) when ?IS_WS(C)->
+    toke(DF, Rest);
+toke(DF, <<${,Rest/binary>>) ->
+    {"{", DF, Rest};
+toke(DF, <<$},Rest/binary>>) ->
+    {"}", DF, Rest};
+toke(DF, <<$[,Rest/binary>>) ->
+    {"[", DF, Rest};
+toke(DF, <<$],Rest/binary>>) ->
+    {"]", DF, Rest};
+toke(DF, <<$",Rest/binary>>) ->
+    toke_string(DF,Rest,[]);
+toke(DF, <<$,,Rest/binary>>) ->
+    {",", DF, Rest};
+toke(DF, <<$:,Rest/binary>>) ->
+    {":", DF, Rest};
+toke(DF, <<$-,Rest/binary>>) ->
+    {<<C,_/binary>> = Data, DF2} = must_df(DF,1,Rest,expected_number),
+    case ?IS_DIGIT(C) of
+    true ->
+        toke_number_leading(DF2, Data, "-");
+    false ->
+        err(expected_number)
+    end;
+toke(DF, <<C,_/binary>> = Data) when ?IS_DIGIT(C) ->
+    toke_number_leading(DF, Data, []);
+toke(DF, <<$t,Rest/binary>>) ->
+    {Data, DF2} = must_match(<<"rue">>, DF, Rest),
+    {true, DF2, Data};
+toke(DF, <<$f,Rest/binary>>) ->
+    {Data, DF2} = must_match(<<"alse">>, DF, Rest),
+    {false, DF2, Data};
+toke(DF, <<$n,Rest/binary>>) ->
+    {Data, DF2} = must_match(<<"ull">>, DF, Rest),
+    {null, DF2, Data};
+toke(_, _) ->
+    err(bad_token).
+
+
+must_match(Pattern, DF, Data) ->
+    Size = size(Pattern),
+    case must_df(DF, Size, Data, bad_token) of
+    {<<Pattern:Size/binary,Data2/binary>>, DF2} ->
+        {Data2, DF2};
+    {_, _} ->
+        err(bad_token)
+    end.
+
+must_df(DF,Error)->
+    case DF() of
+    done ->
+        err(Error);
+    {Data, DF2} ->
+        {Data, DF2}
+    end.
+
+
+must_df(DF,NeedLen,Acc,Error)->
+    if size(Acc) >= NeedLen ->
+        {Acc, DF};
+    true ->
+        case DF() of
+        done ->
+            err(Error);
+        {Data, DF2} ->
+            must_df(DF2, NeedLen, <<Acc/binary, Data/binary>>, Error)
+        end
+    end.
+
+
+parse_object(DF,EF,Acc) ->
+    case must_toke(DF, Acc, unterminated_object) of
+    {String, DF2, Rest} when is_binary(String)->
+        EF2 = EF({key,String}),
+        case must_toke(DF2,Rest,unterminated_object) of
+        {":", DF3, Rest2} ->
+            {DF4, EF3, Rest3} = must_parse_one(DF3, EF2, Rest2, expected_value),
+            case must_toke(DF4,Rest3, unterminated_object) of
+            {",", DF5, Rest4} ->
+                parse_object(DF5, EF3, Rest4);
+            {"}", DF5, Rest4} ->
+                {DF5, EF3, Rest4};
+            {_, _, _} ->
+                err(unexpected_token)
+            end;
+        _Else ->
+            err(expected_colon)
+        end;
+    {"}", DF2, Rest} ->
+        {DF2, EF, Rest};
+    {_, _, _} ->
+        err(unexpected_token)
+    end.
+
+parse_array0(DF,EF,Acc) ->
+    case toke(DF, Acc) of
+    none ->
+        err(unterminated_array);
+    {",", DF2, Rest} ->
+        parse_array(DF2,EF,Rest);
+    {"]", DF2, Rest} ->
+        {DF2,EF,Rest};
+    _ ->
+        err(unexpected_token)
+    end.
+
+parse_array(DF,EF,Acc) ->
+    case toke(DF, Acc) of
+    none ->
+         err(unterminated_array);
+    {Token, DF2, Rest} ->
+        case Token of
+        "{" ->
+            EF2 = EF(object_start),
+            {DF3, EF3, Rest2} = parse_object(DF2, EF2, Rest),
+            parse_array0(DF3, EF3(object_end), Rest2);
+        "[" ->
+            EF2 = EF(array_start),
+            {DF3, EF3, Rest2} = parse_array(DF2, EF2, Rest),
+            parse_array0(DF3, EF3(array_end), Rest2);
+        Int when is_integer(Int)->
+            parse_array0(DF2, EF(Int), Rest);
+        Float when is_float(Float)->
+            parse_array0(DF2, EF(Float), Rest);
+        Atom when is_atom(Atom)->
+            parse_array0(DF2, EF(Atom), Rest);
+        String when is_binary(String)->
+            parse_array0(DF2, EF(String), Rest);
+        "]" ->
+            {DF2, EF, Rest};
+        _ ->
+            err(unexpected_token)
+        end
+    end.
+
+
+toke_string(DF, <<>>, Acc) ->
+    {Data, DF2} = must_df(DF, unterminated_string),
+    toke_string(DF2, Data, Acc);
+toke_string(DF, <<$\\,$",Rest/binary>>, Acc) ->
+    toke_string(DF, Rest, [$" | Acc]);
+toke_string(DF, <<$\\,$\\,Rest/binary>>, Acc) ->
+    toke_string(DF, Rest, [$\\ | Acc]);
+toke_string(DF, <<$\\,$/,Rest/binary>>, Acc) ->
+    toke_string(DF, Rest, [$/ | Acc]);
+toke_string(DF, <<$\\,$b,Rest/binary>>, Acc) ->
+    toke_string(DF, Rest, [$\b | Acc]);
+toke_string(DF, <<$\\,$f,Rest/binary>>, Acc) ->
+    toke_string(DF, Rest, [$\f | Acc]);
+toke_string(DF, <<$\\,$n,Rest/binary>>, Acc) ->
+    toke_string(DF, Rest, [$\n | Acc]);
+toke_string(DF, <<$\\,$r,Rest/binary>>, Acc) ->
+    toke_string(DF, Rest, [$\r | Acc]);
+toke_string(DF, <<$\\,$t,Rest/binary>>, Acc) ->
+    toke_string(DF, Rest, [$\t | Acc]);
+toke_string(DF, <<$\\,$u,Rest/binary>>, Acc) ->
+    {<<A,B,C,D,Data/binary>>, DF2} = must_df(DF,4,Rest,missing_hex),
+    UTFChar = erlang:list_to_integer([A, B, C, D], 16),
+    if UTFChar == 16#FFFF orelse UTFChar == 16#FFFE ->
+        err(invalid_utf_char);
+    true ->
+        ok
+    end,
+    Chars = xmerl_ucs:to_utf8(UTFChar),
+    toke_string(DF2, Data, lists:reverse(Chars) ++ Acc);
+toke_string(DF, <<$\\>>, Acc) ->
+    {Data, DF2} = must_df(DF, unterminated_string),
+    toke_string(DF2, <<$\\,Data/binary>>, Acc);
+toke_string(_DF, <<$\\, _/binary>>, _Acc) ->
+    err(bad_escape);
+toke_string(DF, <<$", Rest/binary>>, Acc) ->
+    {list_to_binary(lists:reverse(Acc)), DF, Rest};
+toke_string(DF, <<C, Rest/binary>>, Acc) ->
+    toke_string(DF, Rest, [C | Acc]).
+
+
+toke_number_leading(DF, <<Digit,Rest/binary>>, Acc)
+        when ?IS_DIGIT(Digit) ->
+    toke_number_leading(DF, Rest, [Digit | Acc]);
+toke_number_leading(DF, <<C,_/binary>>=Rest, Acc)
+        when ?IS_WS(C) orelse ?IS_DELIM(C) ->
+    {list_to_integer(lists:reverse(Acc)), DF, Rest};
+toke_number_leading(DF, <<>>, Acc) ->
+    case DF() of
+    done ->
+         {list_to_integer(lists:reverse(Acc)), fun() -> done end, <<>>};
+    {Data, DF2} ->
+        toke_number_leading(DF2, Data, Acc)
+    end;
+toke_number_leading(DF, <<$., Rest/binary>>, Acc) ->
+    toke_number_trailing(DF, Rest, [$.|Acc]);
+toke_number_leading(DF, <<$e, Rest/binary>>, Acc) ->
+    toke_number_exponent(DF, Rest, [$e, $0, $.|Acc]);
+toke_number_leading(DF, <<$E, Rest/binary>>, Acc) ->
+    toke_number_exponent(DF, Rest, [$e, $0, $.|Acc]);
+toke_number_leading(_, _, _) ->
+    err(unexpected_character_in_number).
+
+toke_number_trailing(DF, <<Digit,Rest/binary>>, Acc)
+        when ?IS_DIGIT(Digit) ->
+    toke_number_trailing(DF, Rest, [Digit | Acc]);
+toke_number_trailing(DF, <<C,_/binary>>=Rest, Acc)
+        when ?IS_WS(C) orelse ?IS_DELIM(C) ->
+    {list_to_float(lists:reverse(Acc)), DF, Rest};
+toke_number_trailing(DF, <<>>, Acc) ->
+    case DF() of
+    done ->
+        {list_to_float(lists:reverse(Acc)), fun() -> done end, <<>>};
+    {Data, DF2} ->
+        toke_number_trailing(DF2, Data, Acc)
+    end;
+toke_number_trailing(DF, <<"e", Rest/binary>>, [C|_]=Acc) when C /= $. ->
+    toke_number_exponent(DF, Rest, [$e|Acc]);
+toke_number_trailing(DF, <<"E", Rest/binary>>, [C|_]=Acc) when C /= $. ->
+    toke_number_exponent(DF, Rest, [$e|Acc]);
+toke_number_trailing(_, _, _) ->
+    err(unexpected_character_in_number).
+
+
+toke_number_exponent(DF, <<Digit,Rest/binary>>, Acc) when ?IS_DIGIT(Digit) ->
+    toke_number_exponent(DF, Rest, [Digit | Acc]);
+toke_number_exponent(DF, <<Sign,Rest/binary>>, [$e|_]=Acc)
+        when Sign == $+ orelse Sign == $- ->
+    toke_number_exponent(DF, Rest, [Sign | Acc]);
+toke_number_exponent(DF, <<C,_/binary>>=Rest, Acc)
+        when ?IS_WS(C) orelse ?IS_DELIM(C) ->
+    {list_to_float(lists:reverse(Acc)), DF, Rest};
+toke_number_exponent(DF, <<>>, Acc) ->
+    case DF() of
+    done ->
+        {list_to_float(lists:reverse(Acc)), fun() -> done end, <<>>};
+    {Data, DF2} ->
+        toke_number_exponent(DF2, Data, Acc)
+    end;
+toke_number_exponent(_, _, _) ->
+        err(unexpected_character_in_number).
+
+
+err(Error)->
+    throw({parse_error,Error}).
+
+
+make_ejson([], Stack) ->
+    Stack;
+make_ejson([array_start | RevEvs], [ArrayValues, PrevValues | RestStack]) ->
+    make_ejson(RevEvs, [[ArrayValues | PrevValues] | RestStack]);
+make_ejson([array_end | RevEvs], Stack) ->
+    make_ejson(RevEvs, [[] | Stack]);
+make_ejson([object_start | RevEvs], [ObjValues, PrevValues | RestStack]) ->
+    make_ejson(RevEvs, [[{ObjValues} | PrevValues] | RestStack]);
+make_ejson([object_end | RevEvs], Stack) ->
+    make_ejson(RevEvs, [[] | Stack]);
+make_ejson([{key, String} | RevEvs], [[PrevValue|RestObject] | RestStack] = _Stack) ->
+    make_ejson(RevEvs, [[{String, PrevValue}|RestObject] | RestStack]);
+make_ejson([Value | RevEvs], [Vals | RestStack] = _Stack) ->
+    make_ejson(RevEvs, [[Value | Vals] | RestStack]).
+
+collect_events(get_results, Acc) ->
+    Acc;
+collect_events(Ev, Acc) ->
+    fun(NextEv) -> collect_events(NextEv, [Ev | Acc]) end.
+
+
+collect_object(object_end, 0, ReturnControl, Acc) ->
+    [[Obj]] = make_ejson([object_end | Acc], [[]]),
+    ReturnControl(Obj);
+collect_object(object_end, NestCount, ReturnControl, Acc) ->
+    fun(Ev) ->
+        collect_object(Ev, NestCount - 1, ReturnControl, [object_end | Acc])
+    end;
+collect_object(object_start, NestCount, ReturnControl, Acc) ->
+    fun(Ev) ->
+        collect_object(Ev, NestCount + 1, ReturnControl, [object_start | Acc])
+    end;
+collect_object(Ev, NestCount, ReturnControl, Acc) ->
+    fun(Ev2) ->
+        collect_object(Ev2, NestCount, ReturnControl, [Ev | Acc])
+    end.

Modified: couchdb/trunk/test/etap/001-load.t
URL: http://svn.apache.org/viewvc/couchdb/trunk/test/etap/001-load.t?rev=1071375&r1=1071374&r2=1071375&view=diff
==============================================================================
--- couchdb/trunk/test/etap/001-load.t (original)
+++ couchdb/trunk/test/etap/001-load.t Wed Feb 16 20:05:31 2011
@@ -17,7 +17,7 @@
 
 main(_) ->
     test_util:init_code_path(),
-    etap:plan(37),
+    etap:plan(40),
     Modules = [
         couch_btree,
         couch_config,
@@ -43,7 +43,10 @@ main(_) ->
         couch_os_process,
         couch_query_servers,
         couch_ref_counter,
-        couch_rep,
+        couch_replicator,
+        couch_replicator_doc_copier,
+        couch_replicator_rev_finder,
+        couch_replicator_utils,
         couch_rep_sup,
         couch_server,
         couch_server_sup,

Added: couchdb/trunk/test/etap/190-json-stream-parse.t
URL: http://svn.apache.org/viewvc/couchdb/trunk/test/etap/190-json-stream-parse.t?rev=1071375&view=auto
==============================================================================
--- couchdb/trunk/test/etap/190-json-stream-parse.t (added)
+++ couchdb/trunk/test/etap/190-json-stream-parse.t Wed Feb 16 20:05:31 2011
@@ -0,0 +1,184 @@
+#!/usr/bin/env escript
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+main(_) ->
+    test_util:init_code_path(),
+    etap:plan(99),
+    case (catch test()) of
+        ok ->
+            etap:end_tests();
+        Other ->
+            etap:diag("Test died abnormally: ~p", [Other]),
+            etap:bail("Bad return value.")
+    end,
+    ok.
+
+test() ->
+    crypto:start(),
+    ok = test_raw_json_input(),
+    ok = test_1_byte_data_function(),
+    ok = test_multiple_bytes_data_function().
+
+
+test_raw_json_input() ->
+    etap:diag("Tests with raw JSON string as the input."),
+    lists:foreach(
+        fun({EJson, JsonString, Desc}) ->
+            etap:is(
+              equiv(EJson, json_stream_parse:to_ejson(JsonString)),
+              true,
+              Desc)
+        end,
+        cases()),
+    ok.
+
+
+test_1_byte_data_function() ->
+    etap:diag("Tests with a 1 byte output data function as the input."),
+    lists:foreach(
+        fun({EJson, JsonString, Desc}) ->
+            DataFun = fun() -> single_byte_data_fun(JsonString) end,
+            etap:is(
+              equiv(EJson, json_stream_parse:to_ejson(DataFun)),
+              true,
+              Desc)
+        end,
+        cases()),
+    ok.
+
+
+test_multiple_bytes_data_function() ->
+    etap:diag("Tests with a multiple bytes output data function as the input."),
+    lists:foreach(
+        fun({EJson, JsonString, Desc}) ->
+            DataFun = fun() -> multiple_bytes_data_fun(JsonString) end,
+            etap:is(
+              equiv(EJson, json_stream_parse:to_ejson(DataFun)),
+              true,
+              Desc)
+        end,
+        cases()),
+    ok.
+
+
+cases() ->
+    [
+        {1, "1", "integer numeric literial"},
+        {3.1416, "3.14160", "float numeric literal"},  % text representation may truncate, trail zeroes
+        {-1, "-1", "negative integer numeric literal"},
+        {-3.1416, "-3.14160", "negative float numeric literal"},
+        {12.0e10, "1.20000e+11", "float literal in scientific notation"},
+        {1.234E+10, "1.23400e+10", "another float literal in scientific notation"},
+        {-1.234E-10, "-1.23400e-10", "negative float literal in scientific notation"},
+        {10.0, "1.0e+01", "yet another float literal in scientific notation"},
+        {123.456, "1.23456E+2", "yet another float literal in scientific notation"},
+        {10.0, "1e1", "yet another float literal in scientific notation"},
+        {<<"foo">>, "\"foo\"", "string literal"},
+        {<<"foo", 5, "bar">>, "\"foo\\u0005bar\"", "string literal with \\u0005"},
+        {<<"">>, "\"\"", "empty string literal"},
+        {<<"\n\n\n">>, "\"\\n\\n\\n\"", "only new lines literal"},
+        {<<"\" \b\f\r\n\t\"">>, "\"\\\" \\b\\f\\r\\n\\t\\\"\"",
+            "only white spaces string literal"},
+        {null, "null", "null literal"},
+        {true, "true", "true literal"},
+        {false, "false", "false literal"},
+        {<<"null">>, "\"null\"", "null string literal"},
+        {<<"true">>, "\"true\"", "true string literal"},
+        {<<"false">>, "\"false\"", "false string literal"},
+        {{[]}, "{}", "empty object literal"},
+        {{[{<<"foo">>, <<"bar">>}]}, "{\"foo\":\"bar\"}",
+            "simple object literal"},
+        {{[{<<"foo">>, <<"bar">>}, {<<"baz">>, 123}]},
+            "{\"foo\":\"bar\",\"baz\":123}", "another simple object literal"},
+        {[], "[]", "empty array literal"},
+        {[[]], "[[]]", "empty array literal inside a single element array literal"},
+        {[1, <<"foo">>], "[1,\"foo\"]", "simple non-empty array literal"},
+        {[1199344435545.0, 1], "[1199344435545.0,1]",
+             "another simple non-empty array literal"},
+        {[false, true, 321, null], "[false, true, 321, null]", "array of literals"},
+        {{[{<<"foo">>, [123]}]}, "{\"foo\":[123]}",
+             "object literal with an array valued property"},
+        {{[{<<"foo">>, {[{<<"bar">>, true}]}}]},
+            "{\"foo\":{\"bar\":true}}", "nested object literal"},
+        {{[{<<"foo">>, []}, {<<"bar">>, {[{<<"baz">>, true}]}},
+                {<<"alice">>, <<"bob">>}]},
+            "{\"foo\":[],\"bar\":{\"baz\":true},\"alice\":\"bob\"}",
+            "complex object literal"},
+        {[-123, <<"foo">>, {[{<<"bar">>, []}]}, null],
+            "[-123,\"foo\",{\"bar\":[]},null]",
+            "complex array literal"}
+    ].
+
+
+%% Test for equivalence of Erlang terms.
+%% Due to arbitrary order of construction, equivalent objects might
+%% compare unequal as erlang terms, so we need to carefully recurse
+%% through aggregates (tuples and objects).
+equiv({Props1}, {Props2}) ->
+    equiv_object(Props1, Props2);
+equiv(L1, L2) when is_list(L1), is_list(L2) ->
+    equiv_list(L1, L2);
+equiv(N1, N2) when is_number(N1), is_number(N2) ->
+    N1 == N2;
+equiv(B1, B2) when is_binary(B1), is_binary(B2) ->
+    B1 == B2;
+equiv(true, true) ->
+    true;
+equiv(false, false) ->
+    true;
+equiv(null, null) ->
+    true.
+
+
+%% Object representation and traversal order is unknown.
+%% Use the sledgehammer and sort property lists.
+equiv_object(Props1, Props2) ->
+    L1 = lists:keysort(1, Props1),
+    L2 = lists:keysort(1, Props2),
+    Pairs = lists:zip(L1, L2),
+    true = lists:all(
+        fun({{K1, V1}, {K2, V2}}) ->
+            equiv(K1, K2) andalso equiv(V1, V2)
+        end,
+        Pairs).
+
+
+%% Recursively compare tuple elements for equivalence.
+equiv_list([], []) ->
+    true;
+equiv_list([V1 | L1], [V2 | L2]) ->
+    equiv(V1, V2) andalso equiv_list(L1, L2).
+
+
+single_byte_data_fun([]) ->
+    done;
+single_byte_data_fun([H | T]) ->
+    {<<H>>, fun() -> single_byte_data_fun(T) end}.
+
+
+multiple_bytes_data_fun([]) ->
+    done;
+multiple_bytes_data_fun(L) ->
+    N = crypto:rand_uniform(0, 7),
+    {Part, Rest} = split(L, N),
+    {list_to_binary(Part), fun() -> multiple_bytes_data_fun(Rest) end}.
+
+split(L, N) when length(L) =< N ->
+    {L, []};
+split(L, N) ->
+    take(N, L, []).
+
+take(0, L, Acc) ->
+    {lists:reverse(Acc), L};
+take(N, [H|L], Acc) ->
+    take(N - 1, L, [H | Acc]).

Modified: couchdb/trunk/test/etap/Makefile.am
URL: http://svn.apache.org/viewvc/couchdb/trunk/test/etap/Makefile.am?rev=1071375&r1=1071374&r2=1071375&view=diff
==============================================================================
--- couchdb/trunk/test/etap/Makefile.am (original)
+++ couchdb/trunk/test/etap/Makefile.am Wed Feb 16 20:05:31 2011
@@ -57,10 +57,6 @@ EXTRA_DIST = \
     083-config-no-files.t \
     090-task-status.t \
     100-ref-counter.t \
-    110-replication-httpc.t \
-    111-replication-changes-feed.t \
-    112-replication-missing-revs.t \
-    113-replication-attachment-comp.t \
     120-stats-collect.t \
     121-stats-aggregates.cfg \
     121-stats-aggregates.ini \
@@ -81,4 +77,5 @@ EXTRA_DIST = \
     173-os-daemon-cfg-register.es \
     173-os-daemon-cfg-register.t \
     180-http-proxy.ini \
-    180-http-proxy.t
+    180-http-proxy.t \
+    190-json-stream-parse.t



Mime
View raw message