emqx/apps/emqx_ft/src/emqx_ft_assembly.erl

417 lines
12 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_ft_assembly).
-export([new/1]).
-export([append/3]).
-export([update/1]).
-export([status/1]).
-export([filemeta/1]).
-export([nodes/1]).
-export([coverage/1]).
-export([properties/1]).
-export_type([t/0]).
-type filemeta() :: emqx_ft:filemeta().
-type filefrag() :: emqx_ft_storage_fs:filefrag().
-type filefrag(T) :: emqx_ft_storage_fs:filefrag(T).
-type segmentinfo() :: emqx_ft_storage_fs:segmentinfo().
-record(asm, {
status :: status(),
coverage :: coverage() | undefined,
properties :: properties() | undefined,
meta :: #{filemeta() => {node(), filefrag({filemeta, filemeta()})}},
segs :: emqx_wdgraph:t(emqx_ft:offset(), {node(), filefrag({segment, segmentinfo()})}),
size :: emqx_ft:bytes()
}).
-type status() ::
{incomplete, {missing, _}}
| complete
| {error, {inconsistent, _}}.
-type coverage() :: [{node(), filefrag({segment, segmentinfo()})}].
-type properties() :: #{
%% Node where "most" of the segments are located.
dominant => node()
}.
-opaque t() :: #asm{}.
-spec new(emqx_ft:bytes()) -> t().
new(Size) ->
#asm{
status = {incomplete, {missing, filemeta}},
meta = #{},
segs = emqx_wdgraph:new(),
size = Size
}.
-spec append(t(), node(), filefrag() | [filefrag()]) -> t().
append(Asm, Node, Fragments) when is_list(Fragments) ->
lists:foldl(fun(F, AsmIn) -> append(AsmIn, Node, F) end, Asm, Fragments);
append(Asm, Node, Fragment = #{fragment := {filemeta, _}}) ->
append_filemeta(Asm, Node, Fragment);
append(Asm, Node, Segment = #{fragment := {segment, _}}) ->
append_segmentinfo(Asm, Node, Segment).
-spec update(t()) -> t().
update(Asm) ->
case status(meta, Asm) of
{complete, _Meta} ->
case status(coverage, Asm) of
{complete, Coverage, Props} ->
Asm#asm{
status = complete,
coverage = Coverage,
properties = Props
};
Status ->
Asm#asm{status = Status}
end;
Status ->
Asm#asm{status = Status}
end.
-spec status(t()) -> status().
status(#asm{status = Status}) ->
Status.
-spec filemeta(t()) -> filemeta().
filemeta(Asm) ->
case status(meta, Asm) of
{complete, Meta} -> Meta;
_Other -> undefined
end.
-spec coverage(t()) -> coverage() | undefined.
coverage(#asm{coverage = Coverage}) ->
Coverage.
-spec nodes(t()) -> [node()].
nodes(#asm{meta = Meta, segs = Segs}) ->
S1 = maps:fold(
fun(_Meta, {Node, _Fragment}, Acc) ->
ordsets:add_element(Node, Acc)
end,
ordsets:new(),
Meta
),
S2 = emqx_wdgraph:fold(
fun(_Offset, {_End, _, {Node, _Fragment}}, Acc) ->
ordsets:add_element(Node, Acc)
end,
ordsets:new(),
Segs
),
ordsets:to_list(ordsets:union(S1, S2)).
properties(#asm{properties = Properties}) ->
Properties.
status(meta, #asm{meta = Meta}) ->
status(meta, maps:to_list(Meta));
status(meta, [{Meta, {_Node, _Frag}}]) ->
{complete, Meta};
status(meta, []) ->
{incomplete, {missing, filemeta}};
status(meta, [_M1, _M2 | _] = Metas) ->
{error, {inconsistent, [Frag#{node => Node} || {_, {Node, Frag}} <- Metas]}};
status(coverage, #asm{segs = Segments, size = Size}) ->
case coverage(Segments, Size) of
Coverage when is_list(Coverage) ->
{complete, Coverage, #{
dominant => dominant(Coverage)
}};
Missing = {missing, _} ->
{incomplete, Missing}
end.
append_filemeta(Asm, Node, Fragment = #{fragment := {filemeta, Meta}}) ->
Asm#asm{
meta = maps:put(Meta, {Node, Fragment}, Asm#asm.meta)
}.
append_segmentinfo(Asm, _Node, #{fragment := {segment, #{size := 0}}}) ->
% NOTE
% Empty segments are valid but meaningless for coverage.
Asm;
append_segmentinfo(Asm, Node, Fragment = #{fragment := {segment, Info}}) ->
Offset = maps:get(offset, Info),
Size = maps:get(size, Info),
End = Offset + Size,
Segs = add_edge(Asm#asm.segs, Offset, End, locality(Node) * Size, {Node, Fragment}),
Asm#asm{
% TODO
% In theory it's possible to have two segments with same offset + size on
% different nodes but with differing content. We'd need a checksum to
% be able to disambiguate them though.
segs = Segs
}.
add_edge(Segs, Offset, End, Weight, Label) ->
% NOTE
% We are expressing coverage problem as a shortest path problem on weighted directed
% graph, where nodes are segments offsets, two nodes are connected with edge if
% there is a segment which "covers" these offsets (i.e. it starts at first node's
% offset and ends at second node's offst) and weights are segments sizes adjusted
% for locality (i.e. weight are always 0 for any local segment).
case emqx_wdgraph:find_edge(Offset, End, Segs) of
{WeightWas, _Label} when WeightWas =< Weight ->
% NOTE
% Discarding any edges with higher weight here. This is fine as long as we
% optimize for locality.
Segs;
_ ->
emqx_wdgraph:insert_edge(Offset, End, Weight, Label, Segs)
end.
coverage(Segs, Size) ->
case emqx_wdgraph:find_shortest_path(0, Size, Segs) of
Path when is_list(Path) ->
Path;
{false, LastOffset} ->
% NOTE
% This is far from being accurate, but needs no hairy specifics in the
% `emqx_wdgraph` interface.
{missing, {segment, LastOffset, Size}}
end.
dominant(Coverage) ->
% TODO: needs improvement, better defined _dominance_, maybe some score
Freqs = frequencies(fun({Node, Segment}) -> {Node, segsize(Segment)} end, Coverage),
maxfreq(Freqs, node()).
frequencies(Fun, List) ->
lists:foldl(
fun(E, Acc) ->
{K, N} = Fun(E),
maps:update_with(K, fun(M) -> M + N end, N, Acc)
end,
#{},
List
).
maxfreq(Freqs, Init) ->
{_, Max} = maps:fold(
fun
(F, N, {M, _MF}) when N > M -> {N, F};
(_F, _N, {M, MF}) -> {M, MF}
end,
{0, Init},
Freqs
),
Max.
locality(Node) when Node =:= node() ->
% NOTE
% This should prioritize locally available segments over those on remote nodes.
0;
locality(_RemoteNode) ->
1.
segsize(#{fragment := {segment, Info}}) ->
maps:get(size, Info).
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
incomplete_new_test() ->
?assertEqual(
{incomplete, {missing, filemeta}},
status(update(new(42)))
).
incomplete_test() ->
?assertEqual(
{incomplete, {missing, filemeta}},
status(
update(
append(new(142), node(), [
segment(p1, 0, 42),
segment(p1, 42, 100)
])
)
)
).
consistent_test() ->
Asm1 = append(new(42), n1, [filemeta(m1, "blarg")]),
Asm2 = append(Asm1, n2, [segment(s2, 0, 42)]),
Asm3 = append(Asm2, n3, [filemeta(m3, "blarg")]),
?assertMatch({complete, _}, status(meta, Asm3)).
inconsistent_test() ->
Asm1 = append(new(42), node(), [segment(s1, 0, 42)]),
Asm2 = append(Asm1, n1, [filemeta(m1, "blarg")]),
Asm3 = append(Asm2, n2, [segment(s2, 0, 42), filemeta(m1, "blorg")]),
Asm4 = append(Asm3, n3, [filemeta(m3, "blarg")]),
?assertMatch(
{error,
{inconsistent, [
% blarg < blorg
#{node := n3, path := m3, fragment := {filemeta, #{name := "blarg"}}},
#{node := n2, path := m1, fragment := {filemeta, #{name := "blorg"}}}
]}},
status(meta, Asm4)
).
simple_coverage_test() ->
Node = node(),
Segs = [
{node42, segment(n1, 20, 30)},
{Node, segment(n2, 0, 10)},
{Node, segment(n3, 50, 50)},
{Node, segment(n4, 10, 10)}
],
Asm = append_many(new(100), Segs),
?assertMatch(
{complete,
[
{Node, #{path := n2}},
{Node, #{path := n4}},
{node42, #{path := n1}},
{Node, #{path := n3}}
],
#{dominant := Node}},
status(coverage, Asm)
).
redundant_coverage_test() ->
Node = node(),
Segs = [
{Node, segment(n1, 0, 20)},
{node1, segment(n2, 0, 10)},
{Node, segment(n3, 20, 40)},
{node2, segment(n4, 10, 10)},
{node2, segment(n5, 50, 20)},
{node3, segment(n6, 20, 20)},
{Node, segment(n7, 50, 10)},
{node1, segment(n8, 40, 10)}
],
Asm = append_many(new(70), Segs),
?assertMatch(
{complete,
[
{Node, #{path := n1}},
{node3, #{path := n6}},
{node1, #{path := n8}},
{node2, #{path := n5}}
],
#{dominant := _}},
status(coverage, Asm)
).
redundant_coverage_prefer_local_test() ->
Node = node(),
Segs = [
{node1, segment(n1, 0, 20)},
{Node, segment(n2, 0, 10)},
{Node, segment(n3, 10, 10)},
{node2, segment(n4, 20, 20)},
{Node, segment(n5, 30, 10)},
{Node, segment(n6, 20, 10)}
],
Asm = append_many(new(40), Segs),
?assertMatch(
{complete,
[
{Node, #{path := n2}},
{Node, #{path := n3}},
{Node, #{path := n6}},
{Node, #{path := n5}}
],
#{dominant := Node}},
status(coverage, Asm)
).
missing_coverage_test() ->
Node = node(),
Segs = [
{Node, segment(n1, 0, 10)},
{node1, segment(n3, 10, 20)},
{Node, segment(n2, 0, 20)},
{node2, segment(n4, 50, 50)},
{Node, segment(n5, 40, 60)}
],
Asm = append_many(new(100), Segs),
?assertEqual(
% {incomplete, {missing, {segment, 30, 40}}} would be more accurate
{incomplete, {missing, {segment, 30, 100}}},
status(coverage, Asm)
).
missing_end_coverage_test() ->
Node = node(),
Segs = [
{Node, segment(n1, 0, 15)},
{node1, segment(n3, 10, 10)}
],
Asm = append_many(new(20), Segs),
?assertEqual(
{incomplete, {missing, {segment, 15, 20}}},
status(coverage, Asm)
).
missing_coverage_with_redudancy_test() ->
Segs = [
{node(), segment(n1, 0, 10)},
{node(), segment(n2, 0, 20)},
{node42, segment(n3, 10, 20)},
{node43, segment(n4, 10, 50)},
{node(), segment(n5, 40, 60)}
],
Asm = append_many(new(100), Segs),
?assertEqual(
% {incomplete, {missing, {segment, 50, 60}}}, ???
{incomplete, {missing, {segment, 60, 100}}},
status(coverage, Asm)
).
append_many(Asm, List) ->
lists:foldl(
fun({Node, Frag}, Acc) -> append(Acc, Node, Frag) end,
Asm,
List
).
filemeta(Path, Name) ->
#{
path => Path,
fragment =>
{filemeta, #{
name => Name
}}
}.
segment(Path, Offset, Size) ->
#{
path => Path,
fragment =>
{segment, #{
offset => Offset,
size => Size
}}
}.
-endif.