couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dav...@apache.org
Subject [27/49] Remove src/fabric
Date Wed, 05 Feb 2014 14:50:49 GMT
http://git-wip-us.apache.org/repos/asf/couchdb/blob/753e7462/src/fabric/src/fabric_view_changes.erl
----------------------------------------------------------------------
diff --git a/src/fabric/src/fabric_view_changes.erl b/src/fabric/src/fabric_view_changes.erl
deleted file mode 100644
index b0a3628..0000000
--- a/src/fabric/src/fabric_view_changes.erl
+++ /dev/null
@@ -1,422 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-%   http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(fabric_view_changes).
-
--export([go/5, pack_seqs/1, unpack_seqs/2]).
-
--include_lib("fabric/include/fabric.hrl").
--include_lib("mem3/include/mem3.hrl").
--include_lib("couch/include/couch_db.hrl").
--include_lib("eunit/include/eunit.hrl").
-
--import(fabric_db_update_listener, [wait_db_updated/1]).
-
-go(DbName, Feed, Options, Callback, Acc0) when Feed == "continuous" orelse
-        Feed == "longpoll" ->
-    Args = make_changes_args(Options),
-    Since = get_start_seq(DbName, Args),
-    case validate_start_seq(DbName, Since) of
-    ok ->
-        {ok, Acc} = Callback(start, Acc0),
-        {Timeout, _} = couch_changes:get_changes_timeout(Args, Callback),
-        Ref = make_ref(),
-        Parent = self(),
-        UpdateListener = {spawn_link(fabric_db_update_listener, go,
-                                     [Parent, Ref, DbName, Timeout]),
-                          Ref},
-        try
-            keep_sending_changes(
-                DbName,
-                Args,
-                Callback,
-                Since,
-                Acc,
-                Timeout,
-                UpdateListener,
-                os:timestamp()
-            )
-        after
-            fabric_db_update_listener:stop(UpdateListener)
-        end;
-    Error ->
-        Callback(Error, Acc0)
-    end;
-
-go(DbName, "normal", Options, Callback, Acc0) ->
-    Args = make_changes_args(Options),
-    Since = get_start_seq(DbName, Args),
-    case validate_start_seq(DbName, Since) of
-    ok ->
-        {ok, Acc} = Callback(start, Acc0),
-        {ok, #collector{counters=Seqs, user_acc=AccOut}} = send_changes(
-            DbName,
-            Args,
-            Callback,
-            Since,
-            Acc,
-            5000
-        ),
-        Callback({stop, pack_seqs(Seqs)}, AccOut);
-    Error ->
-        Callback(Error, Acc0)
-    end.
-
-keep_sending_changes(DbName, Args, Callback, Seqs, AccIn, Timeout, UpListen, T0) ->
-    #changes_args{limit=Limit, feed=Feed, heartbeat=Heartbeat} = Args,
-    {ok, Collector} = send_changes(DbName, Args, Callback, Seqs, AccIn, Timeout),
-    #collector{limit=Limit2, counters=NewSeqs, user_acc=AccOut} = Collector,
-    LastSeq = pack_seqs(NewSeqs),
-    if Limit > Limit2, Feed == "longpoll" ->
-        Callback({stop, LastSeq}, AccOut);
-    true ->
-        WaitForUpdate = wait_db_updated(UpListen),
-        AccumulatedTime = timer:now_diff(os:timestamp(), T0) div 1000,
-        Max = case config:get("fabric", "changes_duration") of
-        undefined ->
-            infinity;
-        MaxStr ->
-            list_to_integer(MaxStr)
-        end,
-        case {Heartbeat, AccumulatedTime > Max, WaitForUpdate} of
-        {undefined, _, timeout} ->
-            Callback({stop, LastSeq}, AccOut);
-        {_, true, timeout} ->
-            Callback({stop, LastSeq}, AccOut);
-        _ ->
-            {ok, AccTimeout} = Callback(timeout, AccOut),
-            keep_sending_changes(
-                DbName,
-                Args#changes_args{limit=Limit2},
-                Callback,
-                LastSeq,
-                AccTimeout,
-                Timeout,
-                UpListen,
-                T0
-            )
-        end
-    end.
-
-send_changes(DbName, ChangesArgs, Callback, PackedSeqs, AccIn, Timeout) ->
-    LiveNodes = [node() | nodes()],
-    AllLiveShards = mem3:live_shards(DbName, LiveNodes),
-    Seqs = lists:flatmap(fun({#shard{name=Name, node=N} = Shard, Seq}) ->
-        case lists:member(Shard, AllLiveShards) of
-        true ->
-            Ref = rexi:cast(N, {fabric_rpc, changes, [Name,ChangesArgs,Seq]}),
-            [{Shard#shard{ref = Ref}, Seq}];
-        false ->
-            % Find some replacement shards to cover the missing range
-            % TODO It's possible in rare cases of shard merging to end up
-            % with overlapping shard ranges from this technique
-            lists:map(fun(#shard{name=Name2, node=N2} = NewShard) ->
-                Ref = rexi:cast(N2, {fabric_rpc, changes, [Name2,ChangesArgs,0]}),
-                {NewShard#shard{ref = Ref}, 0}
-            end, find_replacement_shards(Shard, AllLiveShards))
-        end
-    end, unpack_seqs(PackedSeqs, DbName)),
-    {Workers, _} = lists:unzip(Seqs),
-    RexiMon = fabric_util:create_monitors(Workers),
-    State = #collector{
-        query_args = ChangesArgs,
-        callback = Callback,
-        counters = orddict:from_list(Seqs),
-        user_acc = AccIn,
-        limit = ChangesArgs#changes_args.limit,
-        rows = Seqs % store sequence positions instead
-    },
-    %% TODO: errors need to be handled here
-    try
-        receive_results(Workers, State, Timeout, Callback)
-    after
-        rexi_monitor:stop(RexiMon),
-        fabric_util:cleanup(Workers)
-    end.
-
-receive_results(Workers, State, Timeout, Callback) ->
-    case rexi_utils:recv(Workers, #shard.ref, fun handle_message/3, State,
-            infinity, Timeout) of
-    {timeout, NewState0} ->
-        {ok, AccOut} = Callback(timeout, NewState0#collector.user_acc),
-        NewState = NewState0#collector{user_acc = AccOut},
-        receive_results(Workers, NewState, Timeout, Callback);
-    {_, NewState} ->
-        {ok, NewState}
-    end.
-
-handle_message({rexi_DOWN, _, {_, NodeRef}, _}, nil, State) ->
-    fabric_view:remove_down_shards(State, NodeRef);
-
-handle_message({rexi_EXIT, Reason}, Worker, State) ->
-    #collector{
-        callback=Callback,
-        counters=Counters0,
-        rows = Seqs0,
-        user_acc=Acc
-    } = State,
-    Counters = fabric_dict:erase(Worker, Counters0),
-    Seqs = fabric_dict:erase(Worker, Seqs0),
-    case fabric_view:is_progress_possible(Counters) of
-    true ->
-        {ok, State#collector{counters = Counters, rows=Seqs}};
-    false ->
-        {ok, Resp} = Callback({error, fabric_util:error_info(Reason)}, Acc),
-        {error, Resp}
-    end;
-
-handle_message(_, _, #collector{limit=0} = State) ->
-    {stop, State};
-
-handle_message(#change{key=Seq} = Row0, {Worker, From}, St) ->
-    #collector{
-        query_args = #changes_args{include_docs=IncludeDocs},
-        callback = Callback,
-        counters = S0,
-        limit = Limit,
-        user_acc = AccIn
-    } = St,
-    case fabric_dict:lookup_element(Worker, S0) of
-    undefined ->
-        % this worker lost the race with other partition copies, terminate it
-        gen_server:reply(From, stop),
-        {ok, St};
-    _ ->
-        S1 = fabric_dict:store(Worker, Seq, S0),
-        S2 = fabric_view:remove_overlapping_shards(Worker, S1),
-        % this check should not be necessary at all, as holes in the ranges
-        % created from DOWN messages would have led to errors
-        case fabric_view:is_progress_possible(S2) of
-        true ->
-            Row = Row0#change{key = pack_seqs(S2)},
-            {Go, Acc} = Callback(changes_row(Row, IncludeDocs), AccIn),
-            gen_server:reply(From, Go),
-            {Go, St#collector{counters=S2, limit=Limit-1, user_acc=Acc}};
-        false ->
-            Reason = {range_not_covered, <<"progress not possible">>},
-            Callback({error, Reason}, AccIn),
-            gen_server:reply(From, stop),
-            {stop, St#collector{counters=S2}}
-        end
-    end;
-
-handle_message({complete, EndSeq}, Worker, State) ->
-    #collector{
-        callback = Callback,
-        counters = S0,
-        total_rows = Completed, % override
-        user_acc = Acc
-    } = State,
-    case fabric_dict:lookup_element(Worker, S0) of
-    undefined ->
-        {ok, State};
-    _ ->
-        S1 = fabric_dict:store(Worker, EndSeq, S0),
-        % unlikely to have overlaps here, but possible w/ filters
-        S2 = fabric_view:remove_overlapping_shards(Worker, S1),
-        NewState = State#collector{counters=S2, total_rows=Completed+1},
-        case fabric_dict:size(S2) =:= (Completed+1) of
-        true ->
-            % check ranges are covered, again this should not be neccessary
-            % as any holes in the ranges due to DOWN messages would have errored
-            % out sooner
-            case fabric_view:is_progress_possible(S2) of
-            true ->
-                {stop, NewState};
-            false ->
-                Reason = {range_not_covered, <<"progress not possible">>},
-                Callback({error, Reason}, Acc),
-                {stop, NewState}
-            end;
-        false ->
-            {ok, NewState}
-        end
-    end.
-
-make_changes_args(#changes_args{style=Style, filter=undefined}=Args) ->
-    Args#changes_args{filter = Style};
-make_changes_args(Args) ->
-    Args.
-
-get_start_seq(_DbName, #changes_args{dir=fwd, since=Since}) ->
-    Since;
-get_start_seq(DbName, #changes_args{dir=rev}) ->
-    Shards = mem3:shards(DbName),
-    Workers = fabric_util:submit_jobs(Shards, get_update_seq, []),
-    {ok, Since} = fabric_util:recv(Workers, #shard.ref,
-        fun collect_update_seqs/3, fabric_dict:init(Workers, -1)),
-    Since.
-
-collect_update_seqs(Seq, Shard, Counters) when is_integer(Seq) ->
-    case fabric_dict:lookup_element(Shard, Counters) of
-    undefined ->
-        % already heard from someone else in this range
-        {ok, Counters};
-    -1 ->
-        C1 = fabric_dict:store(Shard, Seq, Counters),
-        C2 = fabric_view:remove_overlapping_shards(Shard, C1),
-        case fabric_dict:any(-1, C2) of
-        true ->
-            {ok, C2};
-        false ->
-            {stop, pack_seqs(C2)}
-        end
-    end.
-
-pack_seqs(Workers) ->
-    SeqList = [{N,R,S} || {#shard{node=N, range=R}, S} <- Workers],
-    SeqSum = lists:sum(element(2, lists:unzip(Workers))),
-    Opaque = couch_util:encodeBase64Url(term_to_binary(SeqList, [compressed])),
-    [SeqSum, Opaque].
-
-unpack_seqs(0, DbName) ->
-    fabric_dict:init(mem3:shards(DbName), 0);
-
-unpack_seqs("0", DbName) ->
-    fabric_dict:init(mem3:shards(DbName), 0);
-
-unpack_seqs([_SeqNum, Opaque], DbName) ->
-    do_unpack_seqs(Opaque, DbName);
-
-unpack_seqs(Packed, DbName) ->
-    NewPattern = "^\\[[0-9]+\s*,\s*\"(?<opaque>.*)\"\\]$",
-    OldPattern = "^\"?([0-9]+-)?(?<opaque>.*?)\"?$",
-    Options = [{capture, [opaque], binary}],
-    Opaque = case re:run(Packed, NewPattern, Options) of
-    {match, Match} ->
-        Match;
-    nomatch ->
-        {match, Match} = re:run(Packed, OldPattern, Options),
-        Match
-    end,
-    do_unpack_seqs(Opaque, DbName).
-
-do_unpack_seqs(Opaque, DbName) ->
-    % A preventative fix for FB 13533 to remove duplicate shards.
-    % This just picks each unique shard and keeps the largest seq
-    % value recorded.
-    Decoded = binary_to_term(couch_util:decodeBase64Url(Opaque)),
-    DedupDict = lists:foldl(fun({Node, [A, B], Seq}, Acc) ->
-        dict:append({Node, [A, B]}, Seq, Acc)
-    end, dict:new(), Decoded),
-    Deduped = lists:map(fun({{Node, [A, B]}, SeqList}) ->
-        {Node, [A, B], lists:max(SeqList)}
-    end, dict:to_list(DedupDict)),
-
-    % Create a fabric_dict of {Shard, Seq} entries
-    % TODO relies on internal structure of fabric_dict as keylist
-    Unpacked = lists:flatmap(fun({Node, [A,B], Seq}) ->
-        case mem3:get_shard(DbName, Node, [A,B]) of
-        {ok, Shard} ->
-            [{Shard, Seq}];
-        {error, not_found} ->
-            []
-        end
-    end, Deduped),
-
-    % Fill holes in the since sequence. If/when we ever start
-    % using overlapping shard ranges this will need to be updated
-    % to not include shard ranges that overlap entries in Upacked.
-    % A quick and dirty approach would be like such:
-    %
-    %   lists:foldl(fun(S, Acc) ->
-    %       fabric_view:remove_overlapping_shards(S, Acc)
-    %   end, mem3:shards(DbName), Unpacked)
-    %
-    % Unfortunately remove_overlapping_shards isn't reusable because
-    % of its calls to rexi:kill/2. When we get to overlapping
-    % shard ranges and have to rewrite shard range management
-    % we can revisit this simpler algorithm.
-    case fabric_view:is_progress_possible(Unpacked) of
-        true ->
-            Unpacked;
-        false ->
-            Ranges = lists:usort([R || #shard{range=R} <- Unpacked]),
-            Filter = fun(S) -> not lists:member(S#shard.range, Ranges) end,
-            Replacements = lists:filter(Filter, mem3:shards(DbName)),
-            Unpacked ++ [{R, 0} || R <- Replacements]
-    end.
-
-changes_row(#change{key=Seq, id=Id, value=Value, deleted=true, doc=Doc}, true) ->
-    {change, {[{seq,Seq}, {id,Id}, {changes,Value}, {deleted, true}, {doc, Doc}]}};
-changes_row(#change{key=Seq, id=Id, value=Value, deleted=true}, false) ->
-    {change, {[{seq,Seq}, {id,Id}, {changes,Value}, {deleted, true}]}};
-changes_row(#change{key=Seq, id=Id, value=Value, doc={error,Reason}}, true) ->
-    {change, {[{seq,Seq}, {id,Id}, {changes,Value}, {error,Reason}]}};
-changes_row(#change{key=Seq, id=Id, value=Value, doc=Doc}, true) ->
-    {change, {[{seq,Seq}, {id,Id}, {changes,Value}, {doc,Doc}]}};
-changes_row(#change{key=Seq, id=Id, value=Value}, false) ->
-    {change, {[{seq,Seq}, {id,Id}, {changes,Value}]}}.
-
-find_replacement_shards(#shard{range=Range}, AllShards) ->
-    % TODO make this moar betta -- we might have split or merged the partition
-    [Shard || Shard <- AllShards, Shard#shard.range =:= Range].
-
-validate_start_seq(DbName, Seq) ->
-    try unpack_seqs(Seq, DbName) of _Any ->
-        ok
-    catch
-        error:database_does_not_exist ->
-            {error, database_does_not_exist};
-        _:_ ->
-            Reason = <<"Malformed sequence supplied in 'since' parameter.">>,
-            {error, {bad_request, Reason}}
-    end.
-
-unpack_seqs_test() ->
-    meck:new(mem3),
-    meck:new(fabric_view),
-    meck:expect(mem3, get_shard, fun(_, _, _) -> {ok, #shard{}} end),
-    meck:expect(fabric_view, is_progress_possible, fun(_) -> true end),
-
-    % BigCouch 0.3 style.
-    assert_shards("23423-g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwND"
-    "LXMwBCwxygOFMiQ5L8____sxIZcKlIUgCSSfZgRUw4FTmAFMWDFTHiVJQAUlSPX1Ee"
-    "C5BkaABSQHXzsxKZ8StcAFG4H4_bIAoPQBTeJ2j1A4hCUJBkAQC7U1NA"),
-
-    % BigCouch 0.4 style.
-    assert_shards([23423,<<"g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwND"
-    "LXMwBCwxygOFMiQ5L8____sxIZcKlIUgCSSfZgRUw4FTmAFMWDFTHiVJQAUlSPX1Ee"
-    "C5BkaABSQHXzsxKZ8StcAFG4H4_bIAoPQBTeJ2j1A4hCUJBkAQC7U1NA">>]),
-
-    % BigCouch 0.4 style (as string).
-    assert_shards("[23423,\"g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwND"
-    "LXMwBCwxygOFMiQ5L8____sxIZcKlIUgCSSfZgRUw4FTmAFMWDFTHiVJQAUlSPX1Ee"
-    "C5BkaABSQHXzsxKZ8StcAFG4H4_bIAoPQBTeJ2j1A4hCUJBkAQC7U1NA\"]"),
-    assert_shards("[23423 ,\"g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwND"
-    "LXMwBCwxygOFMiQ5L8____sxIZcKlIUgCSSfZgRUw4FTmAFMWDFTHiVJQAUlSPX1Ee"
-    "C5BkaABSQHXzsxKZ8StcAFG4H4_bIAoPQBTeJ2j1A4hCUJBkAQC7U1NA\"]"),
-    assert_shards("[23423, \"g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwND"
-    "LXMwBCwxygOFMiQ5L8____sxIZcKlIUgCSSfZgRUw4FTmAFMWDFTHiVJQAUlSPX1Ee"
-    "C5BkaABSQHXzsxKZ8StcAFG4H4_bIAoPQBTeJ2j1A4hCUJBkAQC7U1NA\"]"),
-    assert_shards("[23423 , \"g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwND"
-    "LXMwBCwxygOFMiQ5L8____sxIZcKlIUgCSSfZgRUw4FTmAFMWDFTHiVJQAUlSPX1Ee"
-    "C5BkaABSQHXzsxKZ8StcAFG4H4_bIAoPQBTeJ2j1A4hCUJBkAQC7U1NA\"]"),
-
-    % with internal hypen
-    assert_shards("651-g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwNDLXMwBCwxygOFMiQ"
-    "5L8____sxJTcalIUgCSSfZgReE4FTmAFMWDFYXgVJQAUlQPVuSKS1EeC5BkaABSQHXz8"
-    "VgJUbgAonB_VqIPfoUHIArvE7T6AUQh0I1-WQAzp1XB"),
-    assert_shards([651,"g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwNDLXMwBCwxygOFMiQ"
-    "5L8____sxJTcalIUgCSSfZgReE4FTmAFMWDFYXgVJQAUlQPVuSKS1EeC5BkaABSQHXz8"
-    "VgJUbgAonB_VqIPfoUHIArvE7T6AUQh0I1-WQAzp1XB"]),
-
-    % CouchDB 1.2 style
-    assert_shards("\"23423-g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwND"
-    "LXMwBCwxygOFMiQ5L8____sxIZcKlIUgCSSfZgRUw4FTmAFMWDFTHiVJQAUlSPX1Ee"
-    "C5BkaABSQHXzsxKZ8StcAFG4H4_bIAoPQBTeJ2j1A4hCUJBkAQC7U1NA\""),
-
-    meck:unload(fabric_view),
-    meck:unload(mem3).
-
-assert_shards(Packed) ->
-    ?assertMatch([{#shard{},_}|_], unpack_seqs(Packed, <<"foo">>)).

http://git-wip-us.apache.org/repos/asf/couchdb/blob/753e7462/src/fabric/src/fabric_view_map.erl
----------------------------------------------------------------------
diff --git a/src/fabric/src/fabric_view_map.erl b/src/fabric/src/fabric_view_map.erl
deleted file mode 100644
index 9e41c11..0000000
--- a/src/fabric/src/fabric_view_map.erl
+++ /dev/null
@@ -1,147 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-%   http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(fabric_view_map).
-
--export([go/6]).
-
--include_lib("fabric/include/fabric.hrl").
--include_lib("mem3/include/mem3.hrl").
--include_lib("couch/include/couch_db.hrl").
--include_lib("couch_mrview/include/couch_mrview.hrl").
-
-go(DbName, GroupId, View, Args, Callback, Acc0) when is_binary(GroupId) ->
-    {ok, DDoc} = fabric:open_doc(DbName, <<"_design/", GroupId/binary>>, []),
-    go(DbName, DDoc, View, Args, Callback, Acc0);
-
-go(DbName, DDoc, View, Args, Callback, Acc0) ->
-    Shards = fabric_view:get_shards(DbName, Args),
-    Workers = fabric_util:submit_jobs(Shards, map_view, [DDoc, View, Args]),
-    #mrargs{limit = Limit, skip = Skip, keys = Keys} = Args,
-    State = #collector{
-        db_name=DbName,
-        query_args = Args,
-        callback = Callback,
-        counters = fabric_dict:init(Workers, 0),
-        skip = Skip,
-        limit = Limit,
-        keys = fabric_view:keydict(Keys),
-        sorted = Args#mrargs.sorted,
-        user_acc = Acc0
-    },
-    RexiMon = fabric_util:create_monitors(Workers),
-    try rexi_utils:recv(Workers, #shard.ref, fun handle_message/3,
-        State, infinity, 1000 * 60 * 60) of
-    {ok, NewState} ->
-        {ok, NewState#collector.user_acc};
-    {timeout, NewState} ->
-        Callback({error, timeout}, NewState#collector.user_acc);
-    {error, Resp} ->
-        {ok, Resp}
-    after
-        rexi_monitor:stop(RexiMon),
-        fabric_util:cleanup(Workers)
-    end.
-
-handle_message({rexi_DOWN, _, {_, NodeRef}, _}, _, State) ->
-    fabric_view:remove_down_shards(State, NodeRef);
-
-handle_message({rexi_EXIT, Reason}, Worker, State) ->
-    #collector{callback=Callback, counters=Counters0, user_acc=Acc} = State,
-    Counters = fabric_dict:erase(Worker, Counters0),
-    case fabric_view:is_progress_possible(Counters) of
-    true ->
-        {ok, State#collector{counters = Counters}};
-    false ->
-        {ok, Resp} = Callback({error, fabric_util:error_info(Reason)}, Acc),
-        {error, Resp}
-    end;
-
-handle_message({total_and_offset, Tot, Off}, {Worker, From}, State) ->
-    #collector{
-        callback = Callback,
-        counters = Counters0,
-        total_rows = Total0,
-        offset = Offset0,
-        user_acc = AccIn
-    } = State,
-    case fabric_dict:lookup_element(Worker, Counters0) of
-    undefined ->
-        % this worker lost the race with other partition copies, terminate
-        gen_server:reply(From, stop),
-        {ok, State};
-    0 ->
-        gen_server:reply(From, ok),
-        Counters1 = fabric_dict:update_counter(Worker, 1, Counters0),
-        Counters2 = fabric_view:remove_overlapping_shards(Worker, Counters1),
-        Total = Total0 + Tot,
-        Offset = Offset0 + Off,
-        case fabric_dict:any(0, Counters2) of
-        true ->
-            {ok, State#collector{
-                counters = Counters2,
-                total_rows = Total,
-                offset = Offset
-            }};
-        false ->
-            FinalOffset = erlang:min(Total, Offset+State#collector.skip),
-            {Go, Acc} = Callback({total_and_offset, Total, FinalOffset}, AccIn),
-            {Go, State#collector{
-                counters = fabric_dict:decrement_all(Counters2),
-                total_rows = Total,
-                offset = FinalOffset,
-                user_acc = Acc
-            }}
-        end
-    end;
-
-handle_message(#view_row{}, {_, _}, #collector{limit=0} = State) ->
-    #collector{callback=Callback} = State,
-    {_, Acc} = Callback(complete, State#collector.user_acc),
-    {stop, State#collector{user_acc=Acc}};
-
-handle_message(#view_row{} = Row, {_,From}, #collector{sorted=false} = St) ->
-    #collector{callback=Callback, user_acc=AccIn, limit=Limit} = St,
-    {Go, Acc} = Callback(fabric_view:transform_row(Row), AccIn),
-    rexi:stream_ack(From),
-    {Go, St#collector{user_acc=Acc, limit=Limit-1}};
-
-handle_message(#view_row{} = Row, {Worker, From}, State) ->
-    #collector{
-        query_args = #mrargs{direction=Dir},
-        counters = Counters0,
-        rows = Rows0,
-        keys = KeyDict
-    } = State,
-    Rows = merge_row(Dir, KeyDict, Row#view_row{worker={Worker, From}}, Rows0),
-    Counters1 = fabric_dict:update_counter(Worker, 1, Counters0),
-    State1 = State#collector{rows=Rows, counters=Counters1},
-    fabric_view:maybe_send_row(State1);
-
-handle_message(complete, Worker, State) ->
-    Counters = fabric_dict:update_counter(Worker, 1, State#collector.counters),
-    fabric_view:maybe_send_row(State#collector{counters = Counters}).
-
-merge_row(fwd, undefined, Row, Rows) ->
-    lists:merge(fun(#view_row{key=KeyA, id=IdA}, #view_row{key=KeyB, id=IdB}) ->
-        couch_view:less_json([KeyA, IdA], [KeyB, IdB])
-    end, [Row], Rows);
-merge_row(rev, undefined, Row, Rows) ->
-    lists:merge(fun(#view_row{key=KeyA, id=IdA}, #view_row{key=KeyB, id=IdB}) ->
-        couch_view:less_json([KeyB, IdB], [KeyA, IdA])
-    end, [Row], Rows);
-merge_row(_, KeyDict, Row, Rows) ->
-    lists:merge(fun(#view_row{key=A, id=IdA}, #view_row{key=B, id=IdB}) ->
-        if A =:= B -> IdA < IdB; true ->
-            dict:fetch(A, KeyDict) < dict:fetch(B, KeyDict)
-        end
-    end, [Row], Rows).

http://git-wip-us.apache.org/repos/asf/couchdb/blob/753e7462/src/fabric/src/fabric_view_reduce.erl
----------------------------------------------------------------------
diff --git a/src/fabric/src/fabric_view_reduce.erl b/src/fabric/src/fabric_view_reduce.erl
deleted file mode 100644
index c922a7f..0000000
--- a/src/fabric/src/fabric_view_reduce.erl
+++ /dev/null
@@ -1,127 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-%   http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(fabric_view_reduce).
-
--export([go/6]).
-
--include_lib("fabric/include/fabric.hrl").
--include_lib("mem3/include/mem3.hrl").
--include_lib("couch/include/couch_db.hrl").
--include_lib("couch_mrview/include/couch_mrview.hrl").
-
-go(DbName, GroupId, View, Args, Callback, Acc0) when is_binary(GroupId) ->
-    {ok, DDoc} = fabric:open_doc(DbName, <<"_design/", GroupId/binary>>, []),
-    go(DbName, DDoc, View, Args, Callback, Acc0);
-
-go(DbName, DDoc, VName, Args, Callback, Acc0) ->
-    Group = couch_view_group:design_doc_to_view_group(DDoc),
-    Lang = couch_view_group:get_language(Group),
-    Views = couch_view_group:get_views(Group),
-    {NthRed, View} = fabric_view:extract_view(nil, VName, Views, reduce),
-    {VName, RedSrc} = lists:nth(NthRed, View#mrview.reduce_funs),
-    Workers = lists:map(fun(#shard{name=Name, node=N} = Shard) ->
-        Ref = rexi:cast(N, {fabric_rpc, reduce_view, [Name,DDoc,VName,Args]}),
-        Shard#shard{ref = Ref}
-    end, fabric_view:get_shards(DbName, Args)),
-    RexiMon = fabric_util:create_monitors(Workers),
-    #mrargs{limit = Limit, skip = Skip} = Args,
-    OsProc = case os_proc_needed(RedSrc) of
-        true -> couch_query_servers:get_os_process(Lang);
-        _ -> nil
-    end,
-    State = #collector{
-        db_name = DbName,
-        query_args = Args,
-        callback = Callback,
-        counters = fabric_dict:init(Workers, 0),
-        keys = Args#mrargs.keys,
-        skip = Skip,
-        limit = Limit,
-        lang = Lang,
-        os_proc = OsProc,
-        reducer = RedSrc,
-        rows = dict:new(),
-        user_acc = Acc0
-    },
-    try rexi_utils:recv(Workers, #shard.ref, fun handle_message/3,
-        State, infinity, 1000 * 60 * 60) of
-    {ok, NewState} ->
-        {ok, NewState#collector.user_acc};
-    {timeout, NewState} ->
-        Callback({error, timeout}, NewState#collector.user_acc);
-    {error, Resp} ->
-        {ok, Resp}
-    after
-        rexi_monitor:stop(RexiMon),
-        fabric_util:cleanup(Workers),
-        case State#collector.os_proc of
-            nil -> ok;
-            OsProc -> catch couch_query_servers:ret_os_process(OsProc)
-        end
-    end.
-
-handle_message({rexi_DOWN, _, {_, NodeRef}, _}, _, State) ->
-    fabric_view:remove_down_shards(State, NodeRef);
-
-handle_message({rexi_EXIT, Reason}, Worker, State) ->
-    #collector{callback=Callback, counters=Counters0, user_acc=Acc} = State,
-    Counters = fabric_dict:erase(Worker, Counters0),
-    case fabric_view:is_progress_possible(Counters) of
-    true ->
-        {ok, State#collector{counters = Counters}};
-    false ->
-        {ok, Resp} = Callback({error, fabric_util:error_info(Reason)}, Acc),
-        {error, Resp}
-    end;
-
-handle_message(#view_row{key=Key} = Row, {Worker, From}, State) ->
-    #collector{counters = Counters0, rows = Rows0} = State,
-    case fabric_dict:lookup_element(Worker, Counters0) of
-    undefined ->
-        % this worker lost the race with other partition copies, terminate it
-        gen_server:reply(From, stop),
-        {ok, State};
-    _ ->
-        Rows = dict:append(Key, Row#view_row{worker={Worker, From}}, Rows0),
-        C1 = fabric_dict:update_counter(Worker, 1, Counters0),
-        % TODO time this call, if slow don't do it every time
-        C2 = fabric_view:remove_overlapping_shards(Worker, C1),
-        State1 = State#collector{rows=Rows, counters=C2},
-        fabric_view:maybe_send_row(State1)
-    end;
-
-handle_message(complete, Worker, #collector{counters = Counters0} = State) ->
-    case fabric_dict:lookup_element(Worker, Counters0) of
-    undefined ->
-        {ok, State};
-    _ ->
-        C1 = fabric_dict:update_counter(Worker, 1, Counters0),
-        C2 = fabric_view:remove_overlapping_shards(Worker, C1),
-        fabric_view:maybe_send_row(State#collector{counters = C2})
-    end.
-
-complete_worker_test() ->
-    Shards =
-        mem3_util:create_partition_map("foo",3,3,[node(),node(),node()]),
-    Workers = lists:map(fun(#shard{} = Shard) ->
-                            Ref = make_ref(),
-                            Shard#shard{ref = Ref}
-                        end,
-                        Shards),
-    State = #collector{counters=fabric_dict:init(Workers,0)},
-    {ok, NewState} = handle_message(complete, lists:nth(2,Workers), State),
-    ?assertEqual(orddict:size(NewState#collector.counters),length(Workers) - 2).
-
-os_proc_needed(<<"_", _/binary>>) -> false;
-os_proc_needed(_) -> true.
-


Mime
View raw message