Return-Path: X-Original-To: apmail-couchdb-commits-archive@www.apache.org Delivered-To: apmail-couchdb-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 04B0A11618 for ; Wed, 24 Sep 2014 20:12:58 +0000 (UTC) Received: (qmail 78440 invoked by uid 500); 24 Sep 2014 20:12:57 -0000 Delivered-To: apmail-couchdb-commits-archive@couchdb.apache.org Received: (qmail 78387 invoked by uid 500); 24 Sep 2014 20:12:57 -0000 Mailing-List: contact commits-help@couchdb.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@couchdb.apache.org Delivered-To: mailing list commits@couchdb.apache.org Received: (qmail 78378 invoked by uid 99); 24 Sep 2014 20:12:57 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 24 Sep 2014 20:12:57 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 916DF9A386C; Wed, 24 Sep 2014 20:12:57 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: rnewson@apache.org To: commits@couchdb.apache.org Message-Id: <834fc04cf25043aeb2de64ce2deb6cbd@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: mem3 commit: updated refs/heads/master to 37a57d3 Date: Wed, 24 Sep 2014 20:12:57 +0000 (UTC) Repository: couchdb-mem3 Updated Branches: refs/heads/master d92e4c463 -> 37a57d349 Delete mem3_rebalance for now, currently useless Project: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/repo Commit: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/commit/37a57d34 Tree: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/tree/37a57d34 Diff: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/diff/37a57d34 Branch: refs/heads/master Commit: 37a57d3492c4147ab67db2ea6047c4f98fcc902e Parents: d92e4c4 Author: Robert Newson Authored: Wed Sep 24 21:12:36 2014 +0100 Committer: Robert Newson Committed: Wed Sep 24 21:12:36 2014 +0100 ---------------------------------------------------------------------- src/mem3_rebalance.erl | 530 -------------------------------------------- 1 file changed, 530 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/37a57d34/src/mem3_rebalance.erl ---------------------------------------------------------------------- diff --git a/src/mem3_rebalance.erl b/src/mem3_rebalance.erl deleted file mode 100644 index 1c1ed64..0000000 --- a/src/mem3_rebalance.erl +++ /dev/null @@ -1,530 +0,0 @@ -% Copyright 2013 Cloudant -% -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(mem3_rebalance). - --export([ - contract/0, - contract/1, - contract/3, - expand/0, - expand/1, - expand/3, - fix_zoning/0, - fix_zoning/1, - fix_zoning/2, - print/1 -]). - -% Exposed for debugging purposes --export([ - shard_count_by_node/1, - shard_count_view/0 -]). - --include("mem3.hrl"). - --record (gacc, { - donors, - targets, - moves, - limit, - target_level -}). - -%% @equiv expand(1000) --spec expand() -> [{atom(), #shard{}, node()}]. -expand() -> - expand(1000). - -%% @doc Expands a cluster without requiring each DB to be optimally balanced. --spec expand(integer() | global) -> [{atom(), #shard{}, node()}]. -expand(global) -> - {ok, FD} = file:open("/tmp/rebalance_plan.txt", [write]), - erlang:put(fd, FD), - global_expand(surviving_nodes(), [], 1000); - -%% @doc Expands all databases in the cluster, stopping at Limit operations. -expand(Limit) when is_integer(Limit), Limit > 0 -> - {ok, FD} = file:open("/tmp/rebalance_plan.txt", [write]), - erlang:put(fd, FD), - TargetNodes = surviving_nodes(), - LocalBalanceFun = fun(Db, Moves) -> expand(Db, TargetNodes, Moves) end, - LocalBalanceOps = apply_to_cluster(LocalBalanceFun, Limit), - % Now apply additional operations as needed to achieve global balance. - global_expand(TargetNodes, LocalBalanceOps, Limit); - -expand(DbName) when is_binary(DbName); is_list(DbName) -> - TargetNodes = surviving_nodes(), - expand(DbName, TargetNodes, []). - -%% @doc Computes a plan to balance the shards across the target nodes. --spec expand(DbName::iolist(), [node()], [{atom(), #shard{}, node()}]) -> - [{atom(), #shard{}, node()}]. -expand(DbName, Nodes, PrevMoves) -> - Shards = mem3:shards(DbName), - Floor = length(Shards) div length(Nodes), - % Ensure every target node reaches the floor - {NewShards, Moves0} = rebalance2(Floor, Shards, Nodes, Nodes, PrevMoves), - % Now look for any nodes with more than floor+1 shards - {_, Moves} = rebalance2(Floor+1, NewShards, Nodes, Nodes, Moves0), - Moves. - -%% @equiv contract(1000) --spec contract() -> [{atom(), #shard{}, node()}]. -contract() -> - contract(1000). - -%% @doc Computes a plan to remove up to Limit shards from nodes in "decom" zone. --spec contract(integer()) -> [{atom(), #shard{}, node()}]. -contract(Limit) when is_integer(Limit), Limit > 0 -> - {ok, FD} = file:open("/tmp/rebalance_plan.txt", [write]), - erlang:put(fd, FD), - TargetNodes = surviving_nodes(), - apply_to_cluster(fun(Db, Moves) -> contract(Db, TargetNodes, Moves) end, Limit); - -contract(DbName) when is_binary(DbName); is_list(DbName) -> - TargetNodes = surviving_nodes(), - contract(DbName, TargetNodes, []). - -%% @doc Computes a plan to consolidate shards from a single database onto the -%% supplied set of nodes. --spec contract(DbName::iolist(), [node()], [{atom(), #shard{}, node()}]) -> - [{atom(), #shard{}, node()}]. -contract(DbName, TargetNodes, PrevMoves) -> - {OK, MoveThese} = lists:partition(fun(#shard{node=Node}) -> - lists:member(Node, TargetNodes) - end, mem3:shards(DbName)), - find_homes(MoveThese, shards_by_node(OK, TargetNodes), PrevMoves). - -%% @equiv fix_zoning(1000) --spec fix_zoning() -> [{atom(), #shard{}, node()}]. -fix_zoning() -> - fix_zoning(1000). - -%% @doc Computes a plan containg up to Limit operations to repair replica -%% levels and improper zoning. --spec fix_zoning(integer()) -> [{atom(), #shard{}, node()}]. -fix_zoning(Limit) when is_integer(Limit), Limit > 0 -> - {ok, FD} = file:open("/tmp/rebalance_plan.txt", [write]), - erlang:put(fd, FD), - apply_to_cluster(fun fix_zoning/2, Limit); - -fix_zoning(DbName) when is_binary(DbName); is_list(DbName) -> - fix_zoning(DbName, []). - -%% @doc Computes a plan to repair replica levels and improper zoning for a -%% single database. --spec fix_zoning(DbName::iolist(), [{atom(), #shard{}, node()}]) -> - [{atom(), #shard{}, node()}]. -fix_zoning(DbName, PrevMoves) -> - IdealZoning = orddict:from_list(mem3:get_placement([])), - ByRange = shards_by_range(mem3:shards(DbName)), - orddict:fold(fun(_Range, Shards, Acc) -> - compute_moves(IdealZoning, computed_zoning(Shards), Shards, Acc) - end, PrevMoves, ByRange). - -%% Internal functions. - -global_expand(TargetNodes0, LocalOps, Limit) -> - TargetNodes = [couch_util:to_binary(Node) || Node <- TargetNodes0], - CountByNode = lists:filter(fun({Node, _Count}) -> - lists:member(Node, TargetNodes) - end, shard_count_by_node(LocalOps)), - TotalCount = lists:foldl(fun({_, C}, Sum) -> Sum + C end, 0, CountByNode), - TargetLevel = TotalCount div length(TargetNodes), - Donors = [{list_to_existing_atom(binary_to_list(N)), C - TargetLevel} || - {N, C} <- CountByNode, C > TargetLevel], - InternalAcc0 = #gacc{ - donors = orddict:from_list(Donors), - targets = TargetNodes0, - moves = LocalOps, - limit = Limit - length(LocalOps), - target_level = TargetLevel - }, - try mem3_shards:fold(fun donate_fold/2, InternalAcc0) of - #gacc{moves = Moves} -> - Moves - catch - {complete, Moves} -> - Moves - end. - -donate_fold(_Shard, #gacc{limit = 0, moves = Moves}) -> - throw({complete, Moves}); -donate_fold(#shard{node = Node} = Shard, Acc0) -> - #gacc{ - donors = Donors, - targets = Nodes, - moves = Moves, - limit = DC, - target_level = TargetLevel - } = Acc0, - Zone = mem3:node_info(Node, <<"zone">>), - Shards = apply_shard_moves(mem3:shards(Shard#shard.dbname), Moves), - InZone = filter_map_by_zone(shards_by_node(Shards, Nodes), Zone), - SortedByCount = lists:sort(smallest_first(Moves), InZone), - SourceCount = get_shard_count(Node, SortedByCount), - GlobalShardCounts = shard_count_by_node(Moves), - TotalSource = get_global_shard_count(Node, GlobalShardCounts), - Fun = fun({CandidateNode, OwnShards}) -> - HasRange = lists:keymember(Shard#shard.range, #shard.range, OwnShards), - TargetCount = get_shard_count(CandidateNode, SortedByCount), - TotalTarget = get_global_shard_count(CandidateNode, GlobalShardCounts), - if - CandidateNode =:= Node -> - % Can't move a shard to ourselves - true; - HasRange -> - % The candidate already has this shard - true; - TargetCount >= SourceCount -> - % Executing this move would create a local imbalance in the DB - true; - TotalTarget > TargetLevel -> - % The candidate has already exceeded the target level - true; - (TotalSource - TotalTarget) < 2 -> - % Donating here is wasted work - true; - true -> - false - end - end, - case {lists:member(Shard, Shards), lists:keymember(Node, 1, Donors)} of - {true, true} -> - case lists:dropwhile(Fun, SortedByCount) of - [{Target, _} | _] -> - NewMoves = [{move, Shard, Target} | Moves], - print({move, Shard, Target}), - Acc0#gacc{ - moves = NewMoves, - limit = DC - 1, - donors = update_donors(Node, Donors, NewMoves) - }; - [] -> - Acc0 - end; - _ -> - Acc0 - end; -donate_fold(_Shard, Acc) -> - Acc. - -update_donors(Node, Donors, Moves) -> - NewDonors = case orddict:fetch(Node, Donors) of - 1 -> - orddict:erase(Node, Donors); - X -> - orddict:store(Node, X-1, Donors) - end, - case orddict:size(NewDonors) of - 0 -> - throw({complete, Moves}); - _ -> - NewDonors - end. - -get_shard_count(AtomKey, ShardsByNode) when is_atom(AtomKey) -> - length(couch_util:get_value(AtomKey, ShardsByNode, [])). - -get_global_shard_count(Node, Counts) when is_atom(Node) -> - get_global_shard_count(couch_util:to_binary(Node), Counts); -get_global_shard_count(Node, Counts) when is_binary(Node) -> - couch_util:get_value(Node, Counts, 0). - -compute_moves(IdealZoning, IdealZoning, _Copies, OtherMoves) -> - OtherMoves; -compute_moves(IdealZoning, ActualZoning, Copies, OtherMoves) -> - {Donor, Recipient} = find_donor_and_recipient(IdealZoning, ActualZoning), - pair_up(Donor, Recipient, Copies, OtherMoves). - -find_donor_and_recipient(IdealZoning, ActualZoning) -> - lists:foldl(fun({Zone, IdealCopies}, {D,R}) -> - case couch_util:get_value(Zone, ActualZoning, 0) of - Actual when Actual < IdealCopies -> - {D, Zone}; - Actual when Actual > IdealCopies -> - {Zone, R}; - _ -> - {D, R} - end - end, {nil, nil}, IdealZoning). - -pair_up(_, nil, _Copies, Moves) -> - Moves; -pair_up(nil, Recipient, Copies, Moves) -> - % We've got an insufficient replica level -- a recipient but no donor - Candidate = hd(Copies), - TargetNode = choose_node_in_target_zone(Candidate, Recipient, Moves), - print({copy, Candidate, TargetNode}), - [{copy, Candidate, TargetNode}|Moves]; -pair_up(Donor, Recipient, Copies, Moves) -> - Candidate = hd(lists:filter(fun(#shard{node = Node}) -> - mem3:node_info(Node, <<"zone">>) =:= Donor - end, Copies)), - TargetNode = choose_node_in_target_zone(Candidate, Recipient, Moves), - print({move, Candidate, TargetNode}), - [{move, Candidate, TargetNode}|Moves]. - -choose_node_in_target_zone(#shard{dbname = DbName} = Candidate, Take, Moves) -> - TargetNodes = allowed_nodes(fun(Zone) -> Zone =:= Take end), - CurrentShards = apply_shard_moves(mem3:shards(DbName), Moves), - ByTargetNode = shards_by_node(CurrentShards, TargetNodes), - InZone = filter_map_by_zone(ByTargetNode, Take), - {TargetNode, _} = find_home(Candidate, InZone, Moves), - TargetNode. - --spec find_homes([#shard{}], [{node(), [#shard{}]}], [{atom(), #shard{}, node()}]) -> - [{atom(), #shard{}, node()}]. -find_homes([], _ShardsByTargetNode, Result) -> - Result; -find_homes([#shard{node = Node0} = Shard | Rest], ShardsByNode, PrevMoves) -> - InZone = filter_map_by_zone(ShardsByNode, mem3:node_info(Node0, <<"zone">>)), - {TargetNode, NewMap} = find_home(Shard, InZone, PrevMoves), - print({move, Shard, TargetNode}), - MergedMap = orddict:merge(fun(_, V1, _) -> V1 end, NewMap, ShardsByNode), - find_homes(Rest, MergedMap, [{move, Shard, TargetNode} | PrevMoves]). - -find_home(Shard, ShardsByNode, PrevMoves) -> - SortedByCount = lists:sort(smallest_first(PrevMoves), ShardsByNode), - % Ensure that the target node is not already an owner of this range - [{TargetNode, _} | _] = lists:dropwhile(fun({_Node, Shards}) -> - lists:keymember(Shard#shard.range, #shard.range, Shards) - end, SortedByCount), - NewMap = orddict:append(TargetNode, Shard#shard{node=TargetNode}, ShardsByNode), - {TargetNode, NewMap}. - -rebalance2(_TargetLevel, Shards, _Nodes, [], Moves) -> - {Shards, Moves}; -rebalance2(TargetLevel, Shards, Nodes, [Node | Rest], Moves) -> - ShardsForNode = [S || S <- Shards, S#shard.node =:= Node], - CurrentLevel = length(ShardsForNode), - case CurrentLevel < TargetLevel of - true -> - case victim(TargetLevel, Shards, Nodes, Node, Moves) of - {ok, Victim} -> - print({move, Victim, Node}), - rebalance2(TargetLevel, - replace(Victim, Victim#shard{node=Node}, Shards), - Nodes, [Node|Rest], [{move, Victim, Node}|Moves]); - false -> - rebalance2(TargetLevel, Shards, Nodes, Rest, Moves) - end; - false -> - rebalance2(TargetLevel, Shards, Nodes, Rest, Moves) - end. - -victim(TargetLevel, Shards, Nodes, TargetNode, Moves) -> - % Build a map of shards owned by nodes in the target zone. - TargetZone = mem3:node_info(TargetNode, <<"zone">>), - ShardsByNode0 = filter_map_by_zone(shards_by_node(Shards, Nodes), TargetZone), - % Filter nodes that would drop below target level (including TargetNode). - ShardsByNode1 = [{N, SS} || {N, SS} <- ShardsByNode0, length(SS) > TargetLevel], - % Prefer to take from a node with more shards than others. - ShardsByNode2 = lists:sort(largest_first(Moves), ShardsByNode1), - % Don't take a shard for a range already hosted by the target. - TargetRanges = [S#shard.range || S <- Shards, S#shard.node =:= TargetNode], - ShardsByNode3 = lists:map(fun({N, SS}) -> - {N, [S || S <- SS, not lists:member(S#shard.range, TargetRanges)]} - end, ShardsByNode2), - % Find the first node that still owns a candidate shard. - case lists:dropwhile(fun({_, SS}) -> SS =:= [] end, ShardsByNode3) of - [] -> - false; - [{_SourceNode, [Victim | _OtherShards]} | _] -> - {ok, Victim} - end. - -largest_first(PrevMoves) -> - % use the global shard count on each node to break the tie - Global = shard_count_by_node(PrevMoves), - fun(A, B) -> sort_by_count(A, B, Global) >= 0 end. - -smallest_first(PrevMoves) -> - % use the global shard count on each node to break the tie - Global = shard_count_by_node(PrevMoves), - fun(A, B) -> sort_by_count(A, B, Global) =< 0 end. - -sort_by_count({NodeA, SA}, {NodeB, SB}, Global) when length(SA) =:= length(SB) -> - CountA = get_global_shard_count(NodeA, Global), - CountB = get_global_shard_count(NodeB, Global), - cmp(CountA, CountB); -sort_by_count({_, A}, {_, B}, _) -> - cmp(length(A), length(B)). - -cmp(A, B) when A < B -> - -1; -cmp(A, B) when A > B -> - 1; -cmp(_, _) -> - 0. - -replace(A, B, List) -> - replace(A, B, List, []). - -replace(_A, _B, [], Acc) -> - Acc; -replace(A, B, [A | Rest], Acc) -> - replace(A, B, Rest, [B | Acc]); -replace(A, B, [C | Rest], Acc) -> - replace(A, B, Rest, [C | Acc]). - -%% @doc Takes a list of copy/move operations and applies them to the current -%% set of shards. Any moves that reference a shard not in the current set -%% will be ignored. -apply_shard_moves(Shards, []) -> - Shards; -apply_shard_moves(Shards, [{move, Shard, Node}| Rest]) -> - NewShards = replace(Shard, Shard#shard{node = Node}, Shards, []), - apply_shard_moves(NewShards, Rest); -apply_shard_moves(Shards, [{copy, Shard, Node}| Rest]) -> - case lists:member(Shard, Shards) of - true -> - apply_shard_moves([Shard#shard{node = Node} | Shards], Rest); - false -> - apply_shard_moves(Shards, Rest) - end. - -allowed_nodes(Fun) -> - lists:filter(fun(Node) -> - Fun(mem3:node_info(Node, <<"zone">>)) - end, surviving_nodes()). - -surviving_nodes() -> - lists:filter(fun(Node) -> - mem3:node_info(Node, <<"decom">>) =/= true - end, mem3:nodes()). - -shards_by_node(Shards, Nodes) -> - % Ensure every target node is present in the orddict - ShardsByNode0 = orddict:from_list([{N,[]} || N <- Nodes]), - lists:foldl(fun(#shard{node = Node} = Shard, Acc) -> - orddict:append(Node, Shard, Acc) - end, ShardsByNode0, Shards). - -filter_map_by_zone(ShardsByNode, Zone) -> - Result = orddict:filter(fun(Node, _Shards) -> - mem3:node_info(Node, <<"zone">>) =:= Zone - end, ShardsByNode), - if Result =:= [] -> - erlang:error({empty_zone, Zone}); - true -> - Result - end. - -shards_by_range(Shards) -> - lists:foldl(fun(#shard{range = Range} = Shard, OD) -> - orddict:append(Range, Shard, OD) - end, orddict:new(), Shards). - -computed_zoning(Shards) -> - lists:foldl(fun(#shard{node = Node}, OD) -> - orddict:update_counter(mem3:node_info(Node, <<"zone">>), 1, OD) - end, orddict:new(), Shards). - -shard_count_by_node(PrevMoves) -> - Map0 = case erlang:get(shard_count_by_node) of - undefined -> - try shard_count_view() catch _:_ -> [] end; - {T0, Map} -> - case timer:now_diff(os:timestamp(), T0) div 1000 of - Delta when Delta < 5000 -> - Map; - _Else -> - try shard_count_view() catch _:_ -> [] end - end - end, - % Incorporate the operations we've already scheduled into the total counts - lists:foldl(fun - ({copy, _, TargetNode}, OD0) -> - orddict:update_counter(couch_util:to_binary(TargetNode), 1, OD0); - ({move, #shard{node = SourceNode}, TargetNode}, OD0) -> - OD1 = orddict:update_counter(couch_util:to_binary(SourceNode), -1, OD0), - orddict:update_counter(couch_util:to_binary(TargetNode), 1, OD1) - end, orddict:from_list(Map0), PrevMoves). - -shard_count_view() -> - %% TODO rewrite CouchDB's internal view API. Wow! - {ok, Db} = couch_db:open(<<"dbs">>, []), - DDocId = <<"_design/rebalance">>, - Fold = fun view_cb/2, - Args = [{group_level, exact}], - {ok, Map} = couch_mrview:query_view( - Db, DDocId, <<"count_by_node">>, Fold, [], Args), - erlang:put(shard_count_by_node, {os:timestamp(), Map}), - Map. - -view_cb({meta, _}, Acc) -> - {ok, Acc}; -view_cb({row, Row}, Acc) -> - {key, Node} = lists:keyfind(key, 1, Row), - {value, Count} = lists:keyfind(value, 1, Row), - {ok, [{Node, Count} | Acc]}; -view_cb(complete, Acc) -> - {ok, lists:reverse(Acc)}. - -print({Op, Shard, TargetNode} = Operation) -> - {match, [SourceId, Cluster]} = re:run( - atom_to_list(Shard#shard.node), - "dbcore@db(?[0-9]+)\.(?[a-z0-9]+)\.cloudant.net", - [{capture, all_but_first, binary}] - ), - {match, [TargetId, Cluster]} = re:run( - atom_to_list(TargetNode), - "dbcore@db(?[0-9]+)\.(?[a-z0-9]+)\.cloudant.net", - [{capture, all_but_first, binary}] - ), - {match, [Range, Account, DbName]} = re:run( - Shard#shard.name, - "shards/(?[0-9a-f\-]+)/(?.+)/(?[a-z\\_][a-z0-9\\_\\$()\\+\\-\\/]+)\.[0-9]{8}", - [{capture, all_but_first, binary}] - ), - OpName = case Op of move -> move2; _ -> Op end, - case get(fd) of - undefined -> - io:format("clou shard ~s ~s ~s ~s ~s ~s ~s~n", [OpName, - Cluster, Account, DbName, Range, SourceId, TargetId]); - FD -> - io:format(FD, "clou shard ~s ~s ~s ~s ~s ~s ~s~n", [OpName, - Cluster, Account, DbName, Range, SourceId, TargetId]) - end, - Operation; - -print(Operations) when is_list(Operations) -> - [print(Operation) || Operation <- Operations]. - -apply_to_cluster(UserFun, Limit) -> - try mem3_shards:fold(cluster_fold_fun(UserFun, Limit), {nil, []}) of - {_LastDb, Moves} -> - Moves - catch - {complete, Moves} -> - Moves - end. - -cluster_fold_fun(UserFun, Limit) -> - fun - (#shard{dbname = DbName}, {DbName, PrevMoves}) -> - {DbName, PrevMoves}; - (#shard{dbname = DbName}, {_PrevName, PrevMoves}) -> - Moves = UserFun(DbName, PrevMoves), - check_limit(Moves, Limit), - {DbName, Moves} - end. - -check_limit(Moves, Limit) when length(Moves) >= Limit -> - throw({complete, Moves}); -check_limit(_, _) -> - ok.