Return-Path: X-Original-To: apmail-couchdb-commits-archive@www.apache.org Delivered-To: apmail-couchdb-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D4020102EF for ; Wed, 5 Feb 2014 14:51:54 +0000 (UTC) Received: (qmail 61197 invoked by uid 500); 5 Feb 2014 14:50:52 -0000 Delivered-To: apmail-couchdb-commits-archive@couchdb.apache.org Received: (qmail 60737 invoked by uid 500); 5 Feb 2014 14:50:40 -0000 Mailing-List: contact commits-help@couchdb.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@couchdb.apache.org Delivered-To: mailing list commits@couchdb.apache.org Received: (qmail 60125 invoked by uid 99); 5 Feb 2014 14:50:25 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 05 Feb 2014 14:50:25 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 3254691C06E; Wed, 5 Feb 2014 14:50:24 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: davisp@apache.org To: commits@couchdb.apache.org Date: Wed, 05 Feb 2014 14:50:49 -0000 Message-Id: <3c05e65e2762408abaf143e6e198ad06@git.apache.org> In-Reply-To: <6e58a2851da647aca8d721b7fb6b8775@git.apache.org> References: <6e58a2851da647aca8d721b7fb6b8775@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [27/49] Remove src/fabric http://git-wip-us.apache.org/repos/asf/couchdb/blob/753e7462/src/fabric/src/fabric_view_changes.erl ---------------------------------------------------------------------- diff --git a/src/fabric/src/fabric_view_changes.erl b/src/fabric/src/fabric_view_changes.erl deleted file mode 100644 index b0a3628..0000000 --- a/src/fabric/src/fabric_view_changes.erl +++ /dev/null @@ -1,422 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(fabric_view_changes). - --export([go/5, pack_seqs/1, unpack_seqs/2]). - --include_lib("fabric/include/fabric.hrl"). --include_lib("mem3/include/mem3.hrl"). --include_lib("couch/include/couch_db.hrl"). --include_lib("eunit/include/eunit.hrl"). - --import(fabric_db_update_listener, [wait_db_updated/1]). - -go(DbName, Feed, Options, Callback, Acc0) when Feed == "continuous" orelse - Feed == "longpoll" -> - Args = make_changes_args(Options), - Since = get_start_seq(DbName, Args), - case validate_start_seq(DbName, Since) of - ok -> - {ok, Acc} = Callback(start, Acc0), - {Timeout, _} = couch_changes:get_changes_timeout(Args, Callback), - Ref = make_ref(), - Parent = self(), - UpdateListener = {spawn_link(fabric_db_update_listener, go, - [Parent, Ref, DbName, Timeout]), - Ref}, - try - keep_sending_changes( - DbName, - Args, - Callback, - Since, - Acc, - Timeout, - UpdateListener, - os:timestamp() - ) - after - fabric_db_update_listener:stop(UpdateListener) - end; - Error -> - Callback(Error, Acc0) - end; - -go(DbName, "normal", Options, Callback, Acc0) -> - Args = make_changes_args(Options), - Since = get_start_seq(DbName, Args), - case validate_start_seq(DbName, Since) of - ok -> - {ok, Acc} = Callback(start, Acc0), - {ok, #collector{counters=Seqs, user_acc=AccOut}} = send_changes( - DbName, - Args, - Callback, - Since, - Acc, - 5000 - ), - Callback({stop, pack_seqs(Seqs)}, AccOut); - Error -> - Callback(Error, Acc0) - end. - -keep_sending_changes(DbName, Args, Callback, Seqs, AccIn, Timeout, UpListen, T0) -> - #changes_args{limit=Limit, feed=Feed, heartbeat=Heartbeat} = Args, - {ok, Collector} = send_changes(DbName, Args, Callback, Seqs, AccIn, Timeout), - #collector{limit=Limit2, counters=NewSeqs, user_acc=AccOut} = Collector, - LastSeq = pack_seqs(NewSeqs), - if Limit > Limit2, Feed == "longpoll" -> - Callback({stop, LastSeq}, AccOut); - true -> - WaitForUpdate = wait_db_updated(UpListen), - AccumulatedTime = timer:now_diff(os:timestamp(), T0) div 1000, - Max = case config:get("fabric", "changes_duration") of - undefined -> - infinity; - MaxStr -> - list_to_integer(MaxStr) - end, - case {Heartbeat, AccumulatedTime > Max, WaitForUpdate} of - {undefined, _, timeout} -> - Callback({stop, LastSeq}, AccOut); - {_, true, timeout} -> - Callback({stop, LastSeq}, AccOut); - _ -> - {ok, AccTimeout} = Callback(timeout, AccOut), - keep_sending_changes( - DbName, - Args#changes_args{limit=Limit2}, - Callback, - LastSeq, - AccTimeout, - Timeout, - UpListen, - T0 - ) - end - end. - -send_changes(DbName, ChangesArgs, Callback, PackedSeqs, AccIn, Timeout) -> - LiveNodes = [node() | nodes()], - AllLiveShards = mem3:live_shards(DbName, LiveNodes), - Seqs = lists:flatmap(fun({#shard{name=Name, node=N} = Shard, Seq}) -> - case lists:member(Shard, AllLiveShards) of - true -> - Ref = rexi:cast(N, {fabric_rpc, changes, [Name,ChangesArgs,Seq]}), - [{Shard#shard{ref = Ref}, Seq}]; - false -> - % Find some replacement shards to cover the missing range - % TODO It's possible in rare cases of shard merging to end up - % with overlapping shard ranges from this technique - lists:map(fun(#shard{name=Name2, node=N2} = NewShard) -> - Ref = rexi:cast(N2, {fabric_rpc, changes, [Name2,ChangesArgs,0]}), - {NewShard#shard{ref = Ref}, 0} - end, find_replacement_shards(Shard, AllLiveShards)) - end - end, unpack_seqs(PackedSeqs, DbName)), - {Workers, _} = lists:unzip(Seqs), - RexiMon = fabric_util:create_monitors(Workers), - State = #collector{ - query_args = ChangesArgs, - callback = Callback, - counters = orddict:from_list(Seqs), - user_acc = AccIn, - limit = ChangesArgs#changes_args.limit, - rows = Seqs % store sequence positions instead - }, - %% TODO: errors need to be handled here - try - receive_results(Workers, State, Timeout, Callback) - after - rexi_monitor:stop(RexiMon), - fabric_util:cleanup(Workers) - end. - -receive_results(Workers, State, Timeout, Callback) -> - case rexi_utils:recv(Workers, #shard.ref, fun handle_message/3, State, - infinity, Timeout) of - {timeout, NewState0} -> - {ok, AccOut} = Callback(timeout, NewState0#collector.user_acc), - NewState = NewState0#collector{user_acc = AccOut}, - receive_results(Workers, NewState, Timeout, Callback); - {_, NewState} -> - {ok, NewState} - end. - -handle_message({rexi_DOWN, _, {_, NodeRef}, _}, nil, State) -> - fabric_view:remove_down_shards(State, NodeRef); - -handle_message({rexi_EXIT, Reason}, Worker, State) -> - #collector{ - callback=Callback, - counters=Counters0, - rows = Seqs0, - user_acc=Acc - } = State, - Counters = fabric_dict:erase(Worker, Counters0), - Seqs = fabric_dict:erase(Worker, Seqs0), - case fabric_view:is_progress_possible(Counters) of - true -> - {ok, State#collector{counters = Counters, rows=Seqs}}; - false -> - {ok, Resp} = Callback({error, fabric_util:error_info(Reason)}, Acc), - {error, Resp} - end; - -handle_message(_, _, #collector{limit=0} = State) -> - {stop, State}; - -handle_message(#change{key=Seq} = Row0, {Worker, From}, St) -> - #collector{ - query_args = #changes_args{include_docs=IncludeDocs}, - callback = Callback, - counters = S0, - limit = Limit, - user_acc = AccIn - } = St, - case fabric_dict:lookup_element(Worker, S0) of - undefined -> - % this worker lost the race with other partition copies, terminate it - gen_server:reply(From, stop), - {ok, St}; - _ -> - S1 = fabric_dict:store(Worker, Seq, S0), - S2 = fabric_view:remove_overlapping_shards(Worker, S1), - % this check should not be necessary at all, as holes in the ranges - % created from DOWN messages would have led to errors - case fabric_view:is_progress_possible(S2) of - true -> - Row = Row0#change{key = pack_seqs(S2)}, - {Go, Acc} = Callback(changes_row(Row, IncludeDocs), AccIn), - gen_server:reply(From, Go), - {Go, St#collector{counters=S2, limit=Limit-1, user_acc=Acc}}; - false -> - Reason = {range_not_covered, <<"progress not possible">>}, - Callback({error, Reason}, AccIn), - gen_server:reply(From, stop), - {stop, St#collector{counters=S2}} - end - end; - -handle_message({complete, EndSeq}, Worker, State) -> - #collector{ - callback = Callback, - counters = S0, - total_rows = Completed, % override - user_acc = Acc - } = State, - case fabric_dict:lookup_element(Worker, S0) of - undefined -> - {ok, State}; - _ -> - S1 = fabric_dict:store(Worker, EndSeq, S0), - % unlikely to have overlaps here, but possible w/ filters - S2 = fabric_view:remove_overlapping_shards(Worker, S1), - NewState = State#collector{counters=S2, total_rows=Completed+1}, - case fabric_dict:size(S2) =:= (Completed+1) of - true -> - % check ranges are covered, again this should not be neccessary - % as any holes in the ranges due to DOWN messages would have errored - % out sooner - case fabric_view:is_progress_possible(S2) of - true -> - {stop, NewState}; - false -> - Reason = {range_not_covered, <<"progress not possible">>}, - Callback({error, Reason}, Acc), - {stop, NewState} - end; - false -> - {ok, NewState} - end - end. - -make_changes_args(#changes_args{style=Style, filter=undefined}=Args) -> - Args#changes_args{filter = Style}; -make_changes_args(Args) -> - Args. - -get_start_seq(_DbName, #changes_args{dir=fwd, since=Since}) -> - Since; -get_start_seq(DbName, #changes_args{dir=rev}) -> - Shards = mem3:shards(DbName), - Workers = fabric_util:submit_jobs(Shards, get_update_seq, []), - {ok, Since} = fabric_util:recv(Workers, #shard.ref, - fun collect_update_seqs/3, fabric_dict:init(Workers, -1)), - Since. - -collect_update_seqs(Seq, Shard, Counters) when is_integer(Seq) -> - case fabric_dict:lookup_element(Shard, Counters) of - undefined -> - % already heard from someone else in this range - {ok, Counters}; - -1 -> - C1 = fabric_dict:store(Shard, Seq, Counters), - C2 = fabric_view:remove_overlapping_shards(Shard, C1), - case fabric_dict:any(-1, C2) of - true -> - {ok, C2}; - false -> - {stop, pack_seqs(C2)} - end - end. - -pack_seqs(Workers) -> - SeqList = [{N,R,S} || {#shard{node=N, range=R}, S} <- Workers], - SeqSum = lists:sum(element(2, lists:unzip(Workers))), - Opaque = couch_util:encodeBase64Url(term_to_binary(SeqList, [compressed])), - [SeqSum, Opaque]. - -unpack_seqs(0, DbName) -> - fabric_dict:init(mem3:shards(DbName), 0); - -unpack_seqs("0", DbName) -> - fabric_dict:init(mem3:shards(DbName), 0); - -unpack_seqs([_SeqNum, Opaque], DbName) -> - do_unpack_seqs(Opaque, DbName); - -unpack_seqs(Packed, DbName) -> - NewPattern = "^\\[[0-9]+\s*,\s*\"(?.*)\"\\]$", - OldPattern = "^\"?([0-9]+-)?(?.*?)\"?$", - Options = [{capture, [opaque], binary}], - Opaque = case re:run(Packed, NewPattern, Options) of - {match, Match} -> - Match; - nomatch -> - {match, Match} = re:run(Packed, OldPattern, Options), - Match - end, - do_unpack_seqs(Opaque, DbName). - -do_unpack_seqs(Opaque, DbName) -> - % A preventative fix for FB 13533 to remove duplicate shards. - % This just picks each unique shard and keeps the largest seq - % value recorded. - Decoded = binary_to_term(couch_util:decodeBase64Url(Opaque)), - DedupDict = lists:foldl(fun({Node, [A, B], Seq}, Acc) -> - dict:append({Node, [A, B]}, Seq, Acc) - end, dict:new(), Decoded), - Deduped = lists:map(fun({{Node, [A, B]}, SeqList}) -> - {Node, [A, B], lists:max(SeqList)} - end, dict:to_list(DedupDict)), - - % Create a fabric_dict of {Shard, Seq} entries - % TODO relies on internal structure of fabric_dict as keylist - Unpacked = lists:flatmap(fun({Node, [A,B], Seq}) -> - case mem3:get_shard(DbName, Node, [A,B]) of - {ok, Shard} -> - [{Shard, Seq}]; - {error, not_found} -> - [] - end - end, Deduped), - - % Fill holes in the since sequence. If/when we ever start - % using overlapping shard ranges this will need to be updated - % to not include shard ranges that overlap entries in Upacked. - % A quick and dirty approach would be like such: - % - % lists:foldl(fun(S, Acc) -> - % fabric_view:remove_overlapping_shards(S, Acc) - % end, mem3:shards(DbName), Unpacked) - % - % Unfortunately remove_overlapping_shards isn't reusable because - % of its calls to rexi:kill/2. When we get to overlapping - % shard ranges and have to rewrite shard range management - % we can revisit this simpler algorithm. - case fabric_view:is_progress_possible(Unpacked) of - true -> - Unpacked; - false -> - Ranges = lists:usort([R || #shard{range=R} <- Unpacked]), - Filter = fun(S) -> not lists:member(S#shard.range, Ranges) end, - Replacements = lists:filter(Filter, mem3:shards(DbName)), - Unpacked ++ [{R, 0} || R <- Replacements] - end. - -changes_row(#change{key=Seq, id=Id, value=Value, deleted=true, doc=Doc}, true) -> - {change, {[{seq,Seq}, {id,Id}, {changes,Value}, {deleted, true}, {doc, Doc}]}}; -changes_row(#change{key=Seq, id=Id, value=Value, deleted=true}, false) -> - {change, {[{seq,Seq}, {id,Id}, {changes,Value}, {deleted, true}]}}; -changes_row(#change{key=Seq, id=Id, value=Value, doc={error,Reason}}, true) -> - {change, {[{seq,Seq}, {id,Id}, {changes,Value}, {error,Reason}]}}; -changes_row(#change{key=Seq, id=Id, value=Value, doc=Doc}, true) -> - {change, {[{seq,Seq}, {id,Id}, {changes,Value}, {doc,Doc}]}}; -changes_row(#change{key=Seq, id=Id, value=Value}, false) -> - {change, {[{seq,Seq}, {id,Id}, {changes,Value}]}}. - -find_replacement_shards(#shard{range=Range}, AllShards) -> - % TODO make this moar betta -- we might have split or merged the partition - [Shard || Shard <- AllShards, Shard#shard.range =:= Range]. - -validate_start_seq(DbName, Seq) -> - try unpack_seqs(Seq, DbName) of _Any -> - ok - catch - error:database_does_not_exist -> - {error, database_does_not_exist}; - _:_ -> - Reason = <<"Malformed sequence supplied in 'since' parameter.">>, - {error, {bad_request, Reason}} - end. - -unpack_seqs_test() -> - meck:new(mem3), - meck:new(fabric_view), - meck:expect(mem3, get_shard, fun(_, _, _) -> {ok, #shard{}} end), - meck:expect(fabric_view, is_progress_possible, fun(_) -> true end), - - % BigCouch 0.3 style. - assert_shards("23423-g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwND" - "LXMwBCwxygOFMiQ5L8____sxIZcKlIUgCSSfZgRUw4FTmAFMWDFTHiVJQAUlSPX1Ee" - "C5BkaABSQHXzsxKZ8StcAFG4H4_bIAoPQBTeJ2j1A4hCUJBkAQC7U1NA"), - - % BigCouch 0.4 style. - assert_shards([23423,<<"g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwND" - "LXMwBCwxygOFMiQ5L8____sxIZcKlIUgCSSfZgRUw4FTmAFMWDFTHiVJQAUlSPX1Ee" - "C5BkaABSQHXzsxKZ8StcAFG4H4_bIAoPQBTeJ2j1A4hCUJBkAQC7U1NA">>]), - - % BigCouch 0.4 style (as string). - assert_shards("[23423,\"g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwND" - "LXMwBCwxygOFMiQ5L8____sxIZcKlIUgCSSfZgRUw4FTmAFMWDFTHiVJQAUlSPX1Ee" - "C5BkaABSQHXzsxKZ8StcAFG4H4_bIAoPQBTeJ2j1A4hCUJBkAQC7U1NA\"]"), - assert_shards("[23423 ,\"g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwND" - "LXMwBCwxygOFMiQ5L8____sxIZcKlIUgCSSfZgRUw4FTmAFMWDFTHiVJQAUlSPX1Ee" - "C5BkaABSQHXzsxKZ8StcAFG4H4_bIAoPQBTeJ2j1A4hCUJBkAQC7U1NA\"]"), - assert_shards("[23423, \"g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwND" - "LXMwBCwxygOFMiQ5L8____sxIZcKlIUgCSSfZgRUw4FTmAFMWDFTHiVJQAUlSPX1Ee" - "C5BkaABSQHXzsxKZ8StcAFG4H4_bIAoPQBTeJ2j1A4hCUJBkAQC7U1NA\"]"), - assert_shards("[23423 , \"g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwND" - "LXMwBCwxygOFMiQ5L8____sxIZcKlIUgCSSfZgRUw4FTmAFMWDFTHiVJQAUlSPX1Ee" - "C5BkaABSQHXzsxKZ8StcAFG4H4_bIAoPQBTeJ2j1A4hCUJBkAQC7U1NA\"]"), - - % with internal hypen - assert_shards("651-g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwNDLXMwBCwxygOFMiQ" - "5L8____sxJTcalIUgCSSfZgReE4FTmAFMWDFYXgVJQAUlQPVuSKS1EeC5BkaABSQHXz8" - "VgJUbgAonB_VqIPfoUHIArvE7T6AUQh0I1-WQAzp1XB"), - assert_shards([651,"g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwNDLXMwBCwxygOFMiQ" - "5L8____sxJTcalIUgCSSfZgReE4FTmAFMWDFYXgVJQAUlQPVuSKS1EeC5BkaABSQHXz8" - "VgJUbgAonB_VqIPfoUHIArvE7T6AUQh0I1-WQAzp1XB"]), - - % CouchDB 1.2 style - assert_shards("\"23423-g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwND" - "LXMwBCwxygOFMiQ5L8____sxIZcKlIUgCSSfZgRUw4FTmAFMWDFTHiVJQAUlSPX1Ee" - "C5BkaABSQHXzsxKZ8StcAFG4H4_bIAoPQBTeJ2j1A4hCUJBkAQC7U1NA\""), - - meck:unload(fabric_view), - meck:unload(mem3). - -assert_shards(Packed) -> - ?assertMatch([{#shard{},_}|_], unpack_seqs(Packed, <<"foo">>)). http://git-wip-us.apache.org/repos/asf/couchdb/blob/753e7462/src/fabric/src/fabric_view_map.erl ---------------------------------------------------------------------- diff --git a/src/fabric/src/fabric_view_map.erl b/src/fabric/src/fabric_view_map.erl deleted file mode 100644 index 9e41c11..0000000 --- a/src/fabric/src/fabric_view_map.erl +++ /dev/null @@ -1,147 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(fabric_view_map). - --export([go/6]). - --include_lib("fabric/include/fabric.hrl"). --include_lib("mem3/include/mem3.hrl"). --include_lib("couch/include/couch_db.hrl"). --include_lib("couch_mrview/include/couch_mrview.hrl"). - -go(DbName, GroupId, View, Args, Callback, Acc0) when is_binary(GroupId) -> - {ok, DDoc} = fabric:open_doc(DbName, <<"_design/", GroupId/binary>>, []), - go(DbName, DDoc, View, Args, Callback, Acc0); - -go(DbName, DDoc, View, Args, Callback, Acc0) -> - Shards = fabric_view:get_shards(DbName, Args), - Workers = fabric_util:submit_jobs(Shards, map_view, [DDoc, View, Args]), - #mrargs{limit = Limit, skip = Skip, keys = Keys} = Args, - State = #collector{ - db_name=DbName, - query_args = Args, - callback = Callback, - counters = fabric_dict:init(Workers, 0), - skip = Skip, - limit = Limit, - keys = fabric_view:keydict(Keys), - sorted = Args#mrargs.sorted, - user_acc = Acc0 - }, - RexiMon = fabric_util:create_monitors(Workers), - try rexi_utils:recv(Workers, #shard.ref, fun handle_message/3, - State, infinity, 1000 * 60 * 60) of - {ok, NewState} -> - {ok, NewState#collector.user_acc}; - {timeout, NewState} -> - Callback({error, timeout}, NewState#collector.user_acc); - {error, Resp} -> - {ok, Resp} - after - rexi_monitor:stop(RexiMon), - fabric_util:cleanup(Workers) - end. - -handle_message({rexi_DOWN, _, {_, NodeRef}, _}, _, State) -> - fabric_view:remove_down_shards(State, NodeRef); - -handle_message({rexi_EXIT, Reason}, Worker, State) -> - #collector{callback=Callback, counters=Counters0, user_acc=Acc} = State, - Counters = fabric_dict:erase(Worker, Counters0), - case fabric_view:is_progress_possible(Counters) of - true -> - {ok, State#collector{counters = Counters}}; - false -> - {ok, Resp} = Callback({error, fabric_util:error_info(Reason)}, Acc), - {error, Resp} - end; - -handle_message({total_and_offset, Tot, Off}, {Worker, From}, State) -> - #collector{ - callback = Callback, - counters = Counters0, - total_rows = Total0, - offset = Offset0, - user_acc = AccIn - } = State, - case fabric_dict:lookup_element(Worker, Counters0) of - undefined -> - % this worker lost the race with other partition copies, terminate - gen_server:reply(From, stop), - {ok, State}; - 0 -> - gen_server:reply(From, ok), - Counters1 = fabric_dict:update_counter(Worker, 1, Counters0), - Counters2 = fabric_view:remove_overlapping_shards(Worker, Counters1), - Total = Total0 + Tot, - Offset = Offset0 + Off, - case fabric_dict:any(0, Counters2) of - true -> - {ok, State#collector{ - counters = Counters2, - total_rows = Total, - offset = Offset - }}; - false -> - FinalOffset = erlang:min(Total, Offset+State#collector.skip), - {Go, Acc} = Callback({total_and_offset, Total, FinalOffset}, AccIn), - {Go, State#collector{ - counters = fabric_dict:decrement_all(Counters2), - total_rows = Total, - offset = FinalOffset, - user_acc = Acc - }} - end - end; - -handle_message(#view_row{}, {_, _}, #collector{limit=0} = State) -> - #collector{callback=Callback} = State, - {_, Acc} = Callback(complete, State#collector.user_acc), - {stop, State#collector{user_acc=Acc}}; - -handle_message(#view_row{} = Row, {_,From}, #collector{sorted=false} = St) -> - #collector{callback=Callback, user_acc=AccIn, limit=Limit} = St, - {Go, Acc} = Callback(fabric_view:transform_row(Row), AccIn), - rexi:stream_ack(From), - {Go, St#collector{user_acc=Acc, limit=Limit-1}}; - -handle_message(#view_row{} = Row, {Worker, From}, State) -> - #collector{ - query_args = #mrargs{direction=Dir}, - counters = Counters0, - rows = Rows0, - keys = KeyDict - } = State, - Rows = merge_row(Dir, KeyDict, Row#view_row{worker={Worker, From}}, Rows0), - Counters1 = fabric_dict:update_counter(Worker, 1, Counters0), - State1 = State#collector{rows=Rows, counters=Counters1}, - fabric_view:maybe_send_row(State1); - -handle_message(complete, Worker, State) -> - Counters = fabric_dict:update_counter(Worker, 1, State#collector.counters), - fabric_view:maybe_send_row(State#collector{counters = Counters}). - -merge_row(fwd, undefined, Row, Rows) -> - lists:merge(fun(#view_row{key=KeyA, id=IdA}, #view_row{key=KeyB, id=IdB}) -> - couch_view:less_json([KeyA, IdA], [KeyB, IdB]) - end, [Row], Rows); -merge_row(rev, undefined, Row, Rows) -> - lists:merge(fun(#view_row{key=KeyA, id=IdA}, #view_row{key=KeyB, id=IdB}) -> - couch_view:less_json([KeyB, IdB], [KeyA, IdA]) - end, [Row], Rows); -merge_row(_, KeyDict, Row, Rows) -> - lists:merge(fun(#view_row{key=A, id=IdA}, #view_row{key=B, id=IdB}) -> - if A =:= B -> IdA < IdB; true -> - dict:fetch(A, KeyDict) < dict:fetch(B, KeyDict) - end - end, [Row], Rows). http://git-wip-us.apache.org/repos/asf/couchdb/blob/753e7462/src/fabric/src/fabric_view_reduce.erl ---------------------------------------------------------------------- diff --git a/src/fabric/src/fabric_view_reduce.erl b/src/fabric/src/fabric_view_reduce.erl deleted file mode 100644 index c922a7f..0000000 --- a/src/fabric/src/fabric_view_reduce.erl +++ /dev/null @@ -1,127 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(fabric_view_reduce). - --export([go/6]). - --include_lib("fabric/include/fabric.hrl"). --include_lib("mem3/include/mem3.hrl"). --include_lib("couch/include/couch_db.hrl"). --include_lib("couch_mrview/include/couch_mrview.hrl"). - -go(DbName, GroupId, View, Args, Callback, Acc0) when is_binary(GroupId) -> - {ok, DDoc} = fabric:open_doc(DbName, <<"_design/", GroupId/binary>>, []), - go(DbName, DDoc, View, Args, Callback, Acc0); - -go(DbName, DDoc, VName, Args, Callback, Acc0) -> - Group = couch_view_group:design_doc_to_view_group(DDoc), - Lang = couch_view_group:get_language(Group), - Views = couch_view_group:get_views(Group), - {NthRed, View} = fabric_view:extract_view(nil, VName, Views, reduce), - {VName, RedSrc} = lists:nth(NthRed, View#mrview.reduce_funs), - Workers = lists:map(fun(#shard{name=Name, node=N} = Shard) -> - Ref = rexi:cast(N, {fabric_rpc, reduce_view, [Name,DDoc,VName,Args]}), - Shard#shard{ref = Ref} - end, fabric_view:get_shards(DbName, Args)), - RexiMon = fabric_util:create_monitors(Workers), - #mrargs{limit = Limit, skip = Skip} = Args, - OsProc = case os_proc_needed(RedSrc) of - true -> couch_query_servers:get_os_process(Lang); - _ -> nil - end, - State = #collector{ - db_name = DbName, - query_args = Args, - callback = Callback, - counters = fabric_dict:init(Workers, 0), - keys = Args#mrargs.keys, - skip = Skip, - limit = Limit, - lang = Lang, - os_proc = OsProc, - reducer = RedSrc, - rows = dict:new(), - user_acc = Acc0 - }, - try rexi_utils:recv(Workers, #shard.ref, fun handle_message/3, - State, infinity, 1000 * 60 * 60) of - {ok, NewState} -> - {ok, NewState#collector.user_acc}; - {timeout, NewState} -> - Callback({error, timeout}, NewState#collector.user_acc); - {error, Resp} -> - {ok, Resp} - after - rexi_monitor:stop(RexiMon), - fabric_util:cleanup(Workers), - case State#collector.os_proc of - nil -> ok; - OsProc -> catch couch_query_servers:ret_os_process(OsProc) - end - end. - -handle_message({rexi_DOWN, _, {_, NodeRef}, _}, _, State) -> - fabric_view:remove_down_shards(State, NodeRef); - -handle_message({rexi_EXIT, Reason}, Worker, State) -> - #collector{callback=Callback, counters=Counters0, user_acc=Acc} = State, - Counters = fabric_dict:erase(Worker, Counters0), - case fabric_view:is_progress_possible(Counters) of - true -> - {ok, State#collector{counters = Counters}}; - false -> - {ok, Resp} = Callback({error, fabric_util:error_info(Reason)}, Acc), - {error, Resp} - end; - -handle_message(#view_row{key=Key} = Row, {Worker, From}, State) -> - #collector{counters = Counters0, rows = Rows0} = State, - case fabric_dict:lookup_element(Worker, Counters0) of - undefined -> - % this worker lost the race with other partition copies, terminate it - gen_server:reply(From, stop), - {ok, State}; - _ -> - Rows = dict:append(Key, Row#view_row{worker={Worker, From}}, Rows0), - C1 = fabric_dict:update_counter(Worker, 1, Counters0), - % TODO time this call, if slow don't do it every time - C2 = fabric_view:remove_overlapping_shards(Worker, C1), - State1 = State#collector{rows=Rows, counters=C2}, - fabric_view:maybe_send_row(State1) - end; - -handle_message(complete, Worker, #collector{counters = Counters0} = State) -> - case fabric_dict:lookup_element(Worker, Counters0) of - undefined -> - {ok, State}; - _ -> - C1 = fabric_dict:update_counter(Worker, 1, Counters0), - C2 = fabric_view:remove_overlapping_shards(Worker, C1), - fabric_view:maybe_send_row(State#collector{counters = C2}) - end. - -complete_worker_test() -> - Shards = - mem3_util:create_partition_map("foo",3,3,[node(),node(),node()]), - Workers = lists:map(fun(#shard{} = Shard) -> - Ref = make_ref(), - Shard#shard{ref = Ref} - end, - Shards), - State = #collector{counters=fabric_dict:init(Workers,0)}, - {ok, NewState} = handle_message(complete, lists:nth(2,Workers), State), - ?assertEqual(orddict:size(NewState#collector.counters),length(Workers) - 2). - -os_proc_needed(<<"_", _/binary>>) -> false; -os_proc_needed(_) -> true. -