perf(asm-ft): tradeoff optimality for computational complexity
Through squashing segments table into consecutive "runs".
This commit is contained in:
parent
97cfdf8eef
commit
0c84fc28b0
|
@ -40,9 +40,9 @@
|
|||
filemeta(),
|
||||
{node(), filefrag({filemeta, filemeta()})}
|
||||
),
|
||||
segs :: orddict:orddict(
|
||||
segs :: gb_trees:tree(
|
||||
{emqx_ft:offset(), _Locality, _MEnd, node()},
|
||||
filefrag({segment, segmentinfo()})
|
||||
[filefrag({segment, segmentinfo()})]
|
||||
),
|
||||
size :: emqx_ft:bytes()
|
||||
}).
|
||||
|
@ -66,7 +66,7 @@ new(Size) ->
|
|||
#asm{
|
||||
status = {incomplete, {missing, filemeta}},
|
||||
meta = orddict:new(),
|
||||
segs = orddict:new(),
|
||||
segs = gb_trees:empty(),
|
||||
size = Size
|
||||
}.
|
||||
|
||||
|
@ -123,7 +123,7 @@ status(meta, []) ->
|
|||
status(meta, [_M1, _M2 | _] = Metas) ->
|
||||
{error, {inconsistent, [Frag#{node => Node} || {_, {Node, Frag}} <- Metas]}};
|
||||
status(coverage, #asm{segs = Segments, size = Size}) ->
|
||||
case coverage(orddict:to_list(Segments), 0, Size) of
|
||||
case coverage(squash(Segments), Size) of
|
||||
Coverage when is_list(Coverage) ->
|
||||
{complete, Coverage, #{
|
||||
dominant => dominant(Coverage)
|
||||
|
@ -145,54 +145,80 @@ append_segmentinfo(Asm, Node, Fragment = #{fragment := {segment, Info}}) ->
|
|||
Offset = maps:get(offset, Info),
|
||||
Size = maps:get(size, Info),
|
||||
End = Offset + Size,
|
||||
Index = {Offset, locality(Node), -End, Node},
|
||||
Segs = insert(Asm#asm.segs, Index, [Fragment]),
|
||||
Asm#asm{
|
||||
% TODO
|
||||
% In theory it's possible to have two segments with same offset + size on
|
||||
% different nodes but with differing content. We'd need a checksum to
|
||||
% be able to disambiguate them though.
|
||||
segs = orddict:store({Offset, locality(Node), -End, Node}, Fragment, Asm#asm.segs)
|
||||
segs = Segs
|
||||
}.
|
||||
|
||||
coverage([{{Offset, _, _, _}, _Segment} | Rest], Cursor, Sz) when Offset < Cursor ->
|
||||
coverage(Rest, Cursor, Sz);
|
||||
coverage([{{Cursor, _Locality, MEnd, Node}, Segment} | _Rest] = Segments, Cursor, Sz) ->
|
||||
squash(Segs) ->
|
||||
% NOTE
|
||||
% Here we're "compressing" information about every known segment by adjoining
|
||||
% nearby segments on the same node into "runs".
|
||||
squash(Segs, gb_trees:next(gb_trees:iterator(Segs))).
|
||||
|
||||
squash(Segs, {Index = {Offset, Locality, MEnd, _}, Fragments, It}) ->
|
||||
SegsSquashed = squash_run(gb_trees:delete(Index, Segs), Index, Fragments, It),
|
||||
ItNext = gb_trees:iterator_from({Offset, Locality, MEnd + 1, 0}, SegsSquashed),
|
||||
squash(SegsSquashed, gb_trees:next(ItNext));
|
||||
squash(Segs, none) ->
|
||||
Segs.
|
||||
|
||||
squash_run(Segs, {Offset, Locality, MEnd, Node} = Index, Fragments, It) ->
|
||||
Next = gb_trees:next(It),
|
||||
case Next of
|
||||
{{OffsetNext, _, MEndNext, Node} = IndexNext, FragmentsNext, ItNext} when
|
||||
OffsetNext == -MEnd
|
||||
->
|
||||
SegsNext = gb_trees:delete(IndexNext, Segs),
|
||||
IndexSquashed = {Offset, Locality, MEndNext, Node},
|
||||
squash_run(SegsNext, IndexSquashed, Fragments ++ FragmentsNext, ItNext);
|
||||
{{OffsetNext, _, _, _}, _, ItNext} when OffsetNext =< -MEnd ->
|
||||
squash_run(Segs, Index, Fragments, ItNext);
|
||||
_ ->
|
||||
insert(Segs, Index, Fragments)
|
||||
end.
|
||||
|
||||
insert(Segs, Index, Fragments) ->
|
||||
try
|
||||
gb_trees:insert(Index, Fragments, Segs)
|
||||
catch
|
||||
error:{key_exists, _} -> Segs
|
||||
end.
|
||||
|
||||
coverage(Segs, Size) ->
|
||||
coverage(gb_trees:next(gb_trees:iterator(Segs)), 0, Size).
|
||||
|
||||
coverage({{Offset, _, _, _}, _Fragments, It}, Cursor, Sz) when Offset < Cursor ->
|
||||
coverage(gb_trees:next(It), Cursor, Sz);
|
||||
coverage({{Cursor, _Locality, MEnd, Node}, Fragments, It}, Cursor, Sz) ->
|
||||
% NOTE
|
||||
% We consider only whole fragments here, so for example from the point of view of
|
||||
% this algo `[{Offset1 = 0, Size1 = 15}, {Offset2 = 10, Size2 = 10}]` has no
|
||||
% coverage.
|
||||
Tail = tail(Segments),
|
||||
case coverage(Tail, -MEnd, Sz) of
|
||||
ItNext = gb_trees:next(It),
|
||||
case coverage(ItNext, -MEnd, Sz) of
|
||||
Coverage when is_list(Coverage) ->
|
||||
[{Node, Segment} | Coverage];
|
||||
[{Node, Frag} || Frag <- Fragments] ++ Coverage;
|
||||
Missing = {missing, _} ->
|
||||
case coverage(Tail, Cursor, Sz) of
|
||||
case coverage(ItNext, Cursor, Sz) of
|
||||
CoverageAlt when is_list(CoverageAlt) ->
|
||||
CoverageAlt;
|
||||
{missing, _} ->
|
||||
Missing
|
||||
end
|
||||
end;
|
||||
coverage([{{Offset, _MEnd, _, _}, _Segment} | _], Cursor, _Sz) when Offset > Cursor ->
|
||||
coverage({{Offset, _, _MEnd, _}, _Fragments, _It}, Cursor, _Sz) when Offset > Cursor ->
|
||||
{missing, {segment, Cursor, Offset}};
|
||||
coverage([], Cursor, Sz) when Cursor < Sz ->
|
||||
coverage(none, Cursor, Sz) when Cursor < Sz ->
|
||||
{missing, {segment, Cursor, Sz}};
|
||||
coverage([], Cursor, Cursor) ->
|
||||
coverage(none, Cursor, Cursor) ->
|
||||
[].
|
||||
|
||||
tail([Segment | Rest]) ->
|
||||
tail(Segment, Rest).
|
||||
|
||||
tail({{Cursor, _, MEnd, _}, _} = Segment, [{{Cursor, _, MEnd, _}, _} | Rest]) ->
|
||||
% NOTE
|
||||
% Discarding segments with same offset / size, potentially located on other nodes.
|
||||
% This is an optimization. They won't participate in coverage anyway given we're
|
||||
% currently optimizing coverage towards locality. Yet if we instead decide to
|
||||
% optimize for node dominance (e.g. compute such coverage that most of the data
|
||||
% located on a single node) we'll need to account them again.
|
||||
tail(Segment, Rest);
|
||||
tail(_Segment, Tail) ->
|
||||
Tail.
|
||||
|
||||
dominant(Coverage) ->
|
||||
% TODO: needs improvement, better defined _dominance_, maybe some score
|
||||
Freqs = frequencies(fun({Node, Segment}) -> {Node, segsize(Segment)} end, Coverage),
|
||||
|
|
Loading…
Reference in New Issue