417 lines
12 KiB
Erlang
417 lines
12 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(emqx_ft_assembly).
|
|
|
|
-export([new/1]).
|
|
-export([append/3]).
|
|
-export([update/1]).
|
|
|
|
-export([status/1]).
|
|
-export([filemeta/1]).
|
|
-export([nodes/1]).
|
|
-export([coverage/1]).
|
|
-export([properties/1]).
|
|
|
|
-export_type([t/0]).
|
|
|
|
-type filemeta() :: emqx_ft:filemeta().
|
|
-type filefrag() :: emqx_ft_storage_fs:filefrag().
|
|
-type filefrag(T) :: emqx_ft_storage_fs:filefrag(T).
|
|
-type segmentinfo() :: emqx_ft_storage_fs:segmentinfo().
|
|
|
|
-record(asm, {
|
|
status :: status(),
|
|
coverage :: coverage() | undefined,
|
|
properties :: properties() | undefined,
|
|
meta :: #{filemeta() => {node(), filefrag({filemeta, filemeta()})}},
|
|
segs :: emqx_wdgraph:t(emqx_ft:offset(), {node(), filefrag({segment, segmentinfo()})}),
|
|
size :: emqx_ft:bytes()
|
|
}).
|
|
|
|
-type status() ::
|
|
{incomplete, {missing, _}}
|
|
| complete
|
|
| {error, {inconsistent, _}}.
|
|
|
|
-type coverage() :: [{node(), filefrag({segment, segmentinfo()})}].
|
|
|
|
-type properties() :: #{
|
|
%% Node where "most" of the segments are located.
|
|
dominant => node()
|
|
}.
|
|
|
|
-opaque t() :: #asm{}.
|
|
|
|
-spec new(emqx_ft:bytes()) -> t().
|
|
new(Size) ->
|
|
#asm{
|
|
status = {incomplete, {missing, filemeta}},
|
|
meta = #{},
|
|
segs = emqx_wdgraph:new(),
|
|
size = Size
|
|
}.
|
|
|
|
-spec append(t(), node(), filefrag() | [filefrag()]) -> t().
|
|
append(Asm, Node, Fragments) when is_list(Fragments) ->
|
|
lists:foldl(fun(F, AsmIn) -> append(AsmIn, Node, F) end, Asm, Fragments);
|
|
append(Asm, Node, Fragment = #{fragment := {filemeta, _}}) ->
|
|
append_filemeta(Asm, Node, Fragment);
|
|
append(Asm, Node, Segment = #{fragment := {segment, _}}) ->
|
|
append_segmentinfo(Asm, Node, Segment).
|
|
|
|
-spec update(t()) -> t().
|
|
update(Asm) ->
|
|
case status(meta, Asm) of
|
|
{complete, _Meta} ->
|
|
case status(coverage, Asm) of
|
|
{complete, Coverage, Props} ->
|
|
Asm#asm{
|
|
status = complete,
|
|
coverage = Coverage,
|
|
properties = Props
|
|
};
|
|
Status ->
|
|
Asm#asm{status = Status}
|
|
end;
|
|
Status ->
|
|
Asm#asm{status = Status}
|
|
end.
|
|
|
|
-spec status(t()) -> status().
|
|
status(#asm{status = Status}) ->
|
|
Status.
|
|
|
|
-spec filemeta(t()) -> filemeta().
|
|
filemeta(Asm) ->
|
|
case status(meta, Asm) of
|
|
{complete, Meta} -> Meta;
|
|
_Other -> undefined
|
|
end.
|
|
|
|
-spec coverage(t()) -> coverage() | undefined.
|
|
coverage(#asm{coverage = Coverage}) ->
|
|
Coverage.
|
|
|
|
-spec nodes(t()) -> [node()].
|
|
nodes(#asm{meta = Meta, segs = Segs}) ->
|
|
S1 = maps:fold(
|
|
fun(_Meta, {Node, _Fragment}, Acc) ->
|
|
ordsets:add_element(Node, Acc)
|
|
end,
|
|
ordsets:new(),
|
|
Meta
|
|
),
|
|
S2 = emqx_wdgraph:fold(
|
|
fun(_Offset, {_End, _, {Node, _Fragment}}, Acc) ->
|
|
ordsets:add_element(Node, Acc)
|
|
end,
|
|
ordsets:new(),
|
|
Segs
|
|
),
|
|
ordsets:to_list(ordsets:union(S1, S2)).
|
|
|
|
properties(#asm{properties = Properties}) ->
|
|
Properties.
|
|
|
|
status(meta, #asm{meta = Meta}) ->
|
|
status(meta, maps:to_list(Meta));
|
|
status(meta, [{Meta, {_Node, _Frag}}]) ->
|
|
{complete, Meta};
|
|
status(meta, []) ->
|
|
{incomplete, {missing, filemeta}};
|
|
status(meta, [_M1, _M2 | _] = Metas) ->
|
|
{error, {inconsistent, [Frag#{node => Node} || {_, {Node, Frag}} <- Metas]}};
|
|
status(coverage, #asm{segs = Segments, size = Size}) ->
|
|
case coverage(Segments, Size) of
|
|
Coverage when is_list(Coverage) ->
|
|
{complete, Coverage, #{
|
|
dominant => dominant(Coverage)
|
|
}};
|
|
Missing = {missing, _} ->
|
|
{incomplete, Missing}
|
|
end.
|
|
|
|
append_filemeta(Asm, Node, Fragment = #{fragment := {filemeta, Meta}}) ->
|
|
Asm#asm{
|
|
meta = maps:put(Meta, {Node, Fragment}, Asm#asm.meta)
|
|
}.
|
|
|
|
append_segmentinfo(Asm, _Node, #{fragment := {segment, #{size := 0}}}) ->
|
|
% NOTE
|
|
% Empty segments are valid but meaningless for coverage.
|
|
Asm;
|
|
append_segmentinfo(Asm, Node, Fragment = #{fragment := {segment, Info}}) ->
|
|
Offset = maps:get(offset, Info),
|
|
Size = maps:get(size, Info),
|
|
End = Offset + Size,
|
|
Segs = add_edge(Asm#asm.segs, Offset, End, locality(Node) * Size, {Node, Fragment}),
|
|
Asm#asm{
|
|
% TODO
|
|
% In theory it's possible to have two segments with same offset + size on
|
|
% different nodes but with differing content. We'd need a checksum to
|
|
% be able to disambiguate them though.
|
|
segs = Segs
|
|
}.
|
|
|
|
add_edge(Segs, Offset, End, Weight, Label) ->
|
|
% NOTE
|
|
% We are expressing coverage problem as a shortest path problem on weighted directed
|
|
% graph, where nodes are segments offsets, two nodes are connected with edge if
|
|
% there is a segment which "covers" these offsets (i.e. it starts at first node's
|
|
% offset and ends at second node's offst) and weights are segments sizes adjusted
|
|
% for locality (i.e. weight are always 0 for any local segment).
|
|
case emqx_wdgraph:find_edge(Offset, End, Segs) of
|
|
{WeightWas, _Label} when WeightWas =< Weight ->
|
|
% NOTE
|
|
% Discarding any edges with higher weight here. This is fine as long as we
|
|
% optimize for locality.
|
|
Segs;
|
|
_ ->
|
|
emqx_wdgraph:insert_edge(Offset, End, Weight, Label, Segs)
|
|
end.
|
|
|
|
coverage(Segs, Size) ->
|
|
case emqx_wdgraph:find_shortest_path(0, Size, Segs) of
|
|
Path when is_list(Path) ->
|
|
Path;
|
|
{false, LastOffset} ->
|
|
% NOTE
|
|
% This is far from being accurate, but needs no hairy specifics in the
|
|
% `emqx_wdgraph` interface.
|
|
{missing, {segment, LastOffset, Size}}
|
|
end.
|
|
|
|
dominant(Coverage) ->
|
|
% TODO: needs improvement, better defined _dominance_, maybe some score
|
|
Freqs = frequencies(fun({Node, Segment}) -> {Node, segsize(Segment)} end, Coverage),
|
|
maxfreq(Freqs, node()).
|
|
|
|
frequencies(Fun, List) ->
|
|
lists:foldl(
|
|
fun(E, Acc) ->
|
|
{K, N} = Fun(E),
|
|
maps:update_with(K, fun(M) -> M + N end, N, Acc)
|
|
end,
|
|
#{},
|
|
List
|
|
).
|
|
|
|
maxfreq(Freqs, Init) ->
|
|
{_, Max} = maps:fold(
|
|
fun
|
|
(F, N, {M, _MF}) when N > M -> {N, F};
|
|
(_F, _N, {M, MF}) -> {M, MF}
|
|
end,
|
|
{0, Init},
|
|
Freqs
|
|
),
|
|
Max.
|
|
|
|
locality(Node) when Node =:= node() ->
|
|
% NOTE
|
|
% This should prioritize locally available segments over those on remote nodes.
|
|
0;
|
|
locality(_RemoteNode) ->
|
|
1.
|
|
|
|
segsize(#{fragment := {segment, Info}}) ->
|
|
maps:get(size, Info).
|
|
|
|
-ifdef(TEST).
|
|
|
|
-include_lib("eunit/include/eunit.hrl").
|
|
|
|
incomplete_new_test() ->
|
|
?assertEqual(
|
|
{incomplete, {missing, filemeta}},
|
|
status(update(new(42)))
|
|
).
|
|
|
|
incomplete_test() ->
|
|
?assertEqual(
|
|
{incomplete, {missing, filemeta}},
|
|
status(
|
|
update(
|
|
append(new(142), node(), [
|
|
segment(p1, 0, 42),
|
|
segment(p1, 42, 100)
|
|
])
|
|
)
|
|
)
|
|
).
|
|
|
|
consistent_test() ->
|
|
Asm1 = append(new(42), n1, [filemeta(m1, "blarg")]),
|
|
Asm2 = append(Asm1, n2, [segment(s2, 0, 42)]),
|
|
Asm3 = append(Asm2, n3, [filemeta(m3, "blarg")]),
|
|
?assertMatch({complete, _}, status(meta, Asm3)).
|
|
|
|
inconsistent_test() ->
|
|
Asm1 = append(new(42), node(), [segment(s1, 0, 42)]),
|
|
Asm2 = append(Asm1, n1, [filemeta(m1, "blarg")]),
|
|
Asm3 = append(Asm2, n2, [segment(s2, 0, 42), filemeta(m1, "blorg")]),
|
|
Asm4 = append(Asm3, n3, [filemeta(m3, "blarg")]),
|
|
?assertMatch(
|
|
{error,
|
|
{inconsistent, [
|
|
% blarg < blorg
|
|
#{node := n3, path := m3, fragment := {filemeta, #{name := "blarg"}}},
|
|
#{node := n2, path := m1, fragment := {filemeta, #{name := "blorg"}}}
|
|
]}},
|
|
status(meta, Asm4)
|
|
).
|
|
|
|
simple_coverage_test() ->
|
|
Node = node(),
|
|
Segs = [
|
|
{node42, segment(n1, 20, 30)},
|
|
{Node, segment(n2, 0, 10)},
|
|
{Node, segment(n3, 50, 50)},
|
|
{Node, segment(n4, 10, 10)}
|
|
],
|
|
Asm = append_many(new(100), Segs),
|
|
?assertMatch(
|
|
{complete,
|
|
[
|
|
{Node, #{path := n2}},
|
|
{Node, #{path := n4}},
|
|
{node42, #{path := n1}},
|
|
{Node, #{path := n3}}
|
|
],
|
|
#{dominant := Node}},
|
|
status(coverage, Asm)
|
|
).
|
|
|
|
redundant_coverage_test() ->
|
|
Node = node(),
|
|
Segs = [
|
|
{Node, segment(n1, 0, 20)},
|
|
{node1, segment(n2, 0, 10)},
|
|
{Node, segment(n3, 20, 40)},
|
|
{node2, segment(n4, 10, 10)},
|
|
{node2, segment(n5, 50, 20)},
|
|
{node3, segment(n6, 20, 20)},
|
|
{Node, segment(n7, 50, 10)},
|
|
{node1, segment(n8, 40, 10)}
|
|
],
|
|
Asm = append_many(new(70), Segs),
|
|
?assertMatch(
|
|
{complete,
|
|
[
|
|
{Node, #{path := n1}},
|
|
{node3, #{path := n6}},
|
|
{node1, #{path := n8}},
|
|
{node2, #{path := n5}}
|
|
],
|
|
#{dominant := _}},
|
|
status(coverage, Asm)
|
|
).
|
|
|
|
redundant_coverage_prefer_local_test() ->
|
|
Node = node(),
|
|
Segs = [
|
|
{node1, segment(n1, 0, 20)},
|
|
{Node, segment(n2, 0, 10)},
|
|
{Node, segment(n3, 10, 10)},
|
|
{node2, segment(n4, 20, 20)},
|
|
{Node, segment(n5, 30, 10)},
|
|
{Node, segment(n6, 20, 10)}
|
|
],
|
|
Asm = append_many(new(40), Segs),
|
|
?assertMatch(
|
|
{complete,
|
|
[
|
|
{Node, #{path := n2}},
|
|
{Node, #{path := n3}},
|
|
{Node, #{path := n6}},
|
|
{Node, #{path := n5}}
|
|
],
|
|
#{dominant := Node}},
|
|
status(coverage, Asm)
|
|
).
|
|
|
|
missing_coverage_test() ->
|
|
Node = node(),
|
|
Segs = [
|
|
{Node, segment(n1, 0, 10)},
|
|
{node1, segment(n3, 10, 20)},
|
|
{Node, segment(n2, 0, 20)},
|
|
{node2, segment(n4, 50, 50)},
|
|
{Node, segment(n5, 40, 60)}
|
|
],
|
|
Asm = append_many(new(100), Segs),
|
|
?assertEqual(
|
|
% {incomplete, {missing, {segment, 30, 40}}} would be more accurate
|
|
{incomplete, {missing, {segment, 30, 100}}},
|
|
status(coverage, Asm)
|
|
).
|
|
|
|
missing_end_coverage_test() ->
|
|
Node = node(),
|
|
Segs = [
|
|
{Node, segment(n1, 0, 15)},
|
|
{node1, segment(n3, 10, 10)}
|
|
],
|
|
Asm = append_many(new(20), Segs),
|
|
?assertEqual(
|
|
{incomplete, {missing, {segment, 15, 20}}},
|
|
status(coverage, Asm)
|
|
).
|
|
|
|
missing_coverage_with_redudancy_test() ->
|
|
Segs = [
|
|
{node(), segment(n1, 0, 10)},
|
|
{node(), segment(n2, 0, 20)},
|
|
{node42, segment(n3, 10, 20)},
|
|
{node43, segment(n4, 10, 50)},
|
|
{node(), segment(n5, 40, 60)}
|
|
],
|
|
Asm = append_many(new(100), Segs),
|
|
?assertEqual(
|
|
% {incomplete, {missing, {segment, 50, 60}}}, ???
|
|
{incomplete, {missing, {segment, 60, 100}}},
|
|
status(coverage, Asm)
|
|
).
|
|
|
|
append_many(Asm, List) ->
|
|
lists:foldl(
|
|
fun({Node, Frag}, Acc) -> append(Acc, Node, Frag) end,
|
|
Asm,
|
|
List
|
|
).
|
|
|
|
filemeta(Path, Name) ->
|
|
#{
|
|
path => Path,
|
|
fragment =>
|
|
{filemeta, #{
|
|
name => Name
|
|
}}
|
|
}.
|
|
|
|
segment(Path, Offset, Size) ->
|
|
#{
|
|
path => Path,
|
|
fragment =>
|
|
{segment, #{
|
|
offset => Offset,
|
|
size => Size
|
|
}}
|
|
}.
|
|
|
|
-endif.
|