From 0c84fc28b041d02558a48cd2a6cff66d1c1ecaf2 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Mon, 27 Feb 2023 15:59:52 +0300 Subject: [PATCH] perf(asm-ft): tradeoff optimality for computational complexity Through squashing segments table into consecutive "runs". --- apps/emqx_ft/src/emqx_ft_assembly.erl | 84 ++++++++++++++++++--------- 1 file changed, 55 insertions(+), 29 deletions(-) diff --git a/apps/emqx_ft/src/emqx_ft_assembly.erl b/apps/emqx_ft/src/emqx_ft_assembly.erl index 2a8ccb485..4625e5ffc 100644 --- a/apps/emqx_ft/src/emqx_ft_assembly.erl +++ b/apps/emqx_ft/src/emqx_ft_assembly.erl @@ -40,9 +40,9 @@ filemeta(), {node(), filefrag({filemeta, filemeta()})} ), - segs :: orddict:orddict( + segs :: gb_trees:tree( {emqx_ft:offset(), _Locality, _MEnd, node()}, - filefrag({segment, segmentinfo()}) + [filefrag({segment, segmentinfo()})] ), size :: emqx_ft:bytes() }). @@ -66,7 +66,7 @@ new(Size) -> #asm{ status = {incomplete, {missing, filemeta}}, meta = orddict:new(), - segs = orddict:new(), + segs = gb_trees:empty(), size = Size }. @@ -123,7 +123,7 @@ status(meta, []) -> status(meta, [_M1, _M2 | _] = Metas) -> {error, {inconsistent, [Frag#{node => Node} || {_, {Node, Frag}} <- Metas]}}; status(coverage, #asm{segs = Segments, size = Size}) -> - case coverage(orddict:to_list(Segments), 0, Size) of + case coverage(squash(Segments), Size) of Coverage when is_list(Coverage) -> {complete, Coverage, #{ dominant => dominant(Coverage) @@ -145,54 +145,80 @@ append_segmentinfo(Asm, Node, Fragment = #{fragment := {segment, Info}}) -> Offset = maps:get(offset, Info), Size = maps:get(size, Info), End = Offset + Size, + Index = {Offset, locality(Node), -End, Node}, + Segs = insert(Asm#asm.segs, Index, [Fragment]), Asm#asm{ % TODO % In theory it's possible to have two segments with same offset + size on % different nodes but with differing content. We'd need a checksum to % be able to disambiguate them though. - segs = orddict:store({Offset, locality(Node), -End, Node}, Fragment, Asm#asm.segs) + segs = Segs }. -coverage([{{Offset, _, _, _}, _Segment} | Rest], Cursor, Sz) when Offset < Cursor -> - coverage(Rest, Cursor, Sz); -coverage([{{Cursor, _Locality, MEnd, Node}, Segment} | _Rest] = Segments, Cursor, Sz) -> +squash(Segs) -> + % NOTE + % Here we're "compressing" information about every known segment by adjoining + % nearby segments on the same node into "runs". + squash(Segs, gb_trees:next(gb_trees:iterator(Segs))). + +squash(Segs, {Index = {Offset, Locality, MEnd, _}, Fragments, It}) -> + SegsSquashed = squash_run(gb_trees:delete(Index, Segs), Index, Fragments, It), + ItNext = gb_trees:iterator_from({Offset, Locality, MEnd + 1, 0}, SegsSquashed), + squash(SegsSquashed, gb_trees:next(ItNext)); +squash(Segs, none) -> + Segs. + +squash_run(Segs, {Offset, Locality, MEnd, Node} = Index, Fragments, It) -> + Next = gb_trees:next(It), + case Next of + {{OffsetNext, _, MEndNext, Node} = IndexNext, FragmentsNext, ItNext} when + OffsetNext == -MEnd + -> + SegsNext = gb_trees:delete(IndexNext, Segs), + IndexSquashed = {Offset, Locality, MEndNext, Node}, + squash_run(SegsNext, IndexSquashed, Fragments ++ FragmentsNext, ItNext); + {{OffsetNext, _, _, _}, _, ItNext} when OffsetNext =< -MEnd -> + squash_run(Segs, Index, Fragments, ItNext); + _ -> + insert(Segs, Index, Fragments) + end. + +insert(Segs, Index, Fragments) -> + try + gb_trees:insert(Index, Fragments, Segs) + catch + error:{key_exists, _} -> Segs + end. + +coverage(Segs, Size) -> + coverage(gb_trees:next(gb_trees:iterator(Segs)), 0, Size). + +coverage({{Offset, _, _, _}, _Fragments, It}, Cursor, Sz) when Offset < Cursor -> + coverage(gb_trees:next(It), Cursor, Sz); +coverage({{Cursor, _Locality, MEnd, Node}, Fragments, It}, Cursor, Sz) -> % NOTE % We consider only whole fragments here, so for example from the point of view of % this algo `[{Offset1 = 0, Size1 = 15}, {Offset2 = 10, Size2 = 10}]` has no % coverage. - Tail = tail(Segments), - case coverage(Tail, -MEnd, Sz) of + ItNext = gb_trees:next(It), + case coverage(ItNext, -MEnd, Sz) of Coverage when is_list(Coverage) -> - [{Node, Segment} | Coverage]; + [{Node, Frag} || Frag <- Fragments] ++ Coverage; Missing = {missing, _} -> - case coverage(Tail, Cursor, Sz) of + case coverage(ItNext, Cursor, Sz) of CoverageAlt when is_list(CoverageAlt) -> CoverageAlt; {missing, _} -> Missing end end; -coverage([{{Offset, _MEnd, _, _}, _Segment} | _], Cursor, _Sz) when Offset > Cursor -> +coverage({{Offset, _, _MEnd, _}, _Fragments, _It}, Cursor, _Sz) when Offset > Cursor -> {missing, {segment, Cursor, Offset}}; -coverage([], Cursor, Sz) when Cursor < Sz -> +coverage(none, Cursor, Sz) when Cursor < Sz -> {missing, {segment, Cursor, Sz}}; -coverage([], Cursor, Cursor) -> +coverage(none, Cursor, Cursor) -> []. -tail([Segment | Rest]) -> - tail(Segment, Rest). - -tail({{Cursor, _, MEnd, _}, _} = Segment, [{{Cursor, _, MEnd, _}, _} | Rest]) -> - % NOTE - % Discarding segments with same offset / size, potentially located on other nodes. - % This is an optimization. They won't participate in coverage anyway given we're - % currently optimizing coverage towards locality. Yet if we instead decide to - % optimize for node dominance (e.g. compute such coverage that most of the data - % located on a single node) we'll need to account them again. - tail(Segment, Rest); -tail(_Segment, Tail) -> - Tail. - dominant(Coverage) -> % TODO: needs improvement, better defined _dominance_, maybe some score Freqs = frequencies(fun({Node, Segment}) -> {Node, segsize(Segment)} end, Coverage),