diff --git a/apps/emqx_ft/src/emqx_ft_assembly.erl b/apps/emqx_ft/src/emqx_ft_assembly.erl index 652636088..bea320bbf 100644 --- a/apps/emqx_ft/src/emqx_ft_assembly.erl +++ b/apps/emqx_ft/src/emqx_ft_assembly.erl @@ -40,10 +40,7 @@ filemeta(), {node(), filefrag({filemeta, filemeta()})} ), - segs :: gb_trees:tree( - {emqx_ft:offset(), _Locality, _MEnd, node()}, - [filefrag({segment, segmentinfo()})] - ), + segs :: emqx_wdgraph:t(emqx_ft:offset(), {node(), filefrag({segment, segmentinfo()})}), size :: emqx_ft:bytes() }). @@ -66,7 +63,7 @@ new(Size) -> #asm{ status = {incomplete, {missing, filemeta}}, meta = orddict:new(), - segs = gb_trees:empty(), + segs = emqx_wdgraph:new(), size = Size }. @@ -154,142 +151,32 @@ append_segmentinfo(Asm, Node, Fragment = #{fragment := {segment, Info}}) -> segs = Segs }. -coverage(Segs, Size) -> - find_shortest_path(Segs, 0, Size). - -find_shortest_path(G1, From, To) -> +add_edge(Segs, Offset, End, Weight, Label) -> % NOTE - % This is a Dijkstra shortest path algorithm implemented on top of `gb_trees`. - % It is one-way right now, for simplicity sake. - G2 = set_cost(G1, From, 0, []), - case find_shortest_path(G2, From, 0, To) of - {found, G3} -> - construct_path(G3, From, To, []); - {error, Last} -> - % NOTE: this is actually just an estimation of what is missing. - {missing, {segment, Last, emqx_maybe:define(find_successor(G2, Last), To)}} - end. - -find_shortest_path(G1, Node, Cost, Target) -> - Edges = get_edges(G1, Node), - G2 = update_neighbours(G1, Node, Cost, Edges), - case take_queued(G2) of - {Target, _NextCost, G3} -> - {found, G3}; - {Next, NextCost, G3} -> - find_shortest_path(G3, Next, NextCost, Target); - none -> - {error, Node} - end. - -construct_path(_G, From, From, Acc) -> - Acc; -construct_path(G, From, To, Acc) -> - {Prev, Label} = get_label(G, To), - construct_path(G, From, Prev, [Label | Acc]). - -update_neighbours(G1, Node, NodeCost, Edges) -> - lists:foldl( - fun({Neighbour, Weight, Label}, GAcc) -> - case is_visited(GAcc, Neighbour) of - false -> - NeighCost = NodeCost + Weight, - CurrentCost = get_cost(GAcc, Neighbour), - case NeighCost < CurrentCost of - true -> - set_cost(GAcc, Neighbour, NeighCost, {Node, Label}); - false -> - GAcc - end; - true -> - GAcc - end - end, - G1, - Edges - ). - -add_edge(G, Node, ToNode, WeightIn, EdgeLabel) -> - Edges = tree_lookup({Node}, G, []), - case lists:keyfind(ToNode, 1, Edges) of - {ToNode, Weight, _} when Weight =< WeightIn -> + % We are expressing coverage problem as a shortest path problem on weighted directed + % graph, where nodes are segments offsets, two nodes are connected with edge if + % there is a segment which "covers" these offsets (i.e. it starts at first node's + % offset and ends at second node's offst) and weights are segments sizes adjusted + % for locality (i.e. weight are always 0 for any local segment). + case emqx_wdgraph:find_edge(Offset, End, Segs) of + {WeightWas, _Label} when WeightWas =< Weight -> % NOTE % Discarding any edges with higher weight here. This is fine as long as we % optimize for locality. - G; + Segs; _ -> - EdgesNext = lists:keystore(ToNode, 1, Edges, {ToNode, WeightIn, EdgeLabel}), - tree_update({Node}, EdgesNext, G) + emqx_wdgraph:insert_edge(Offset, End, Weight, Label, Segs) end. -get_edges(G, Node) -> - tree_lookup({Node}, G, []). - -get_cost(G, Node) -> - tree_lookup({Node, cost}, G, inf). - -get_label(G, Node) -> - gb_trees:get({Node, label}, G). - -set_cost(G1, Node, Cost, Label) -> - G3 = - case tree_lookup({Node, cost}, G1, inf) of - CostWas when CostWas /= inf -> - {true, G2} = gb_trees:take({queued, CostWas, Node}, G1), - tree_update({queued, Cost, Node}, true, G2); - inf -> - tree_update({queued, Cost, Node}, true, G1) - end, - G4 = tree_update({Node, cost}, Cost, G3), - G5 = tree_update({Node, label}, Label, G4), - G5. - -take_queued(G1) -> - It = gb_trees:iterator_from({queued, 0, 0}, G1), - case gb_trees:next(It) of - {{queued, Cost, Node} = Index, true, _It} -> - {Node, Cost, gb_trees:delete(Index, G1)}; - _ -> - none - end. - -is_visited(G, Node) -> - case tree_lookup({Node, cost}, G, inf) of - inf -> - false; - Cost -> - not tree_lookup({queued, Cost, Node}, G, false) - end. - -find_successor(G, Node) -> - case gb_trees:next(gb_trees:iterator_from({Node}, G)) of - {{Node}, _, It} -> - case gb_trees:next(It) of - {{Successor}, _, _} -> - Successor; - _ -> - undefined - end; - {{Successor}, _, _} -> - Successor; - _ -> - undefined - end. - -tree_lookup(Index, Tree, Default) -> - case gb_trees:lookup(Index, Tree) of - {value, V} -> - V; - none -> - Default - end. - -tree_update(Index, Value, Tree) -> - case gb_trees:take_any(Index, Tree) of - {_, TreeNext} -> - gb_trees:insert(Index, Value, TreeNext); - error -> - gb_trees:insert(Index, Value, Tree) +coverage(Segs, Size) -> + case emqx_wdgraph:find_shortest_path(0, Size, Segs) of + Path when is_list(Path) -> + Path; + {false, LastOffset} -> + % NOTE + % This is far from being accurate, but needs no hairy specifics in the + % `emqx_wdgraph` interface. + {missing, {segment, LastOffset, Size}} end. dominant(Coverage) -> @@ -452,7 +339,8 @@ missing_coverage_test() -> ], Asm = append_many(new(100), Segs), ?assertEqual( - {incomplete, {missing, {segment, 30, 40}}}, + % {incomplete, {missing, {segment, 30, 40}}} would be more accurate + {incomplete, {missing, {segment, 30, 100}}}, status(coverage, Asm) ).