Merge pull request #4800 from zmstone/fix-compaction-performance-when-many-levels
Fix trie compaction performance when many levels
This commit is contained in:
commit
5dbf4f9867
|
@ -3,6 +3,9 @@
|
||||||
[
|
[
|
||||||
{"4.3.0", [
|
{"4.3.0", [
|
||||||
{load_module, emqx_logger_jsonfmt, brutal_purge, soft_purge, []},
|
{load_module, emqx_logger_jsonfmt, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_connection, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_frame, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_trie, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_metrics, brutal_purge, soft_purge, []},
|
{load_module, emqx_metrics, brutal_purge, soft_purge, []},
|
||||||
{apply, {emqx_metrics, upgrade_retained_delayed_counter_type, []}}
|
{apply, {emqx_metrics, upgrade_retained_delayed_counter_type, []}}
|
||||||
]},
|
]},
|
||||||
|
@ -11,6 +14,9 @@
|
||||||
[
|
[
|
||||||
{"4.3.0", [
|
{"4.3.0", [
|
||||||
{load_module, emqx_logger_jsonfmt, brutal_purge, soft_purge, []},
|
{load_module, emqx_logger_jsonfmt, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_connection, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_frame, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_trie, brutal_purge, soft_purge, []},
|
||||||
%% Just load the module. We don't need to change the 'messages.retained'
|
%% Just load the module. We don't need to change the 'messages.retained'
|
||||||
%% and 'messages.retained' counter type.
|
%% and 'messages.retained' counter type.
|
||||||
{load_module, emqx_metrics, brutal_purge, soft_purge, []}
|
{load_module, emqx_metrics, brutal_purge, soft_purge, []}
|
||||||
|
|
|
@ -18,32 +18,82 @@
|
||||||
|
|
||||||
-ifdef(EMQX_BENCHMARK).
|
-ifdef(EMQX_BENCHMARK).
|
||||||
|
|
||||||
-export([start/1, run1/0, run1/2]).
|
-export([run/1, run1/0, run1/4]).
|
||||||
|
|
||||||
run1() -> run1(4, 1000).
|
-define(T(Expr), timer:tc(fun() -> Expr end)).
|
||||||
|
|
||||||
run1(Factor, Limit) ->
|
run1() -> run1(80, 1000, 80, 10000).
|
||||||
start(#{factor => Factor,
|
|
||||||
limit => Limit,
|
run1(Subs, SubOps, Pubs, PubOps) ->
|
||||||
sub_ptn => <<"device/{{id}}/+/{{num}}/#">>,
|
run(#{subscribers => Subs,
|
||||||
pub_ptn => <<"device/{{id}}/xays/{{num}}/foo/bar/baz">>}).
|
publishers => Pubs,
|
||||||
|
sub_ops => SubOps,
|
||||||
|
pub_ops => PubOps,
|
||||||
|
sub_ptn => <<"device/{{id}}/+/{{num}}/#">>,
|
||||||
|
pub_ptn => <<"device/{{id}}/foo/{{num}}/bar/1/2/3/4/5">>
|
||||||
|
}).
|
||||||
|
|
||||||
%% setting fields:
|
%% setting fields:
|
||||||
%% - factor: spawn broker-pool-size * factor number of callers
|
%% - subscribers: spawn this number of subscriber workers
|
||||||
%% - limit: limit the total number of topics for each caller
|
%% - publishers: spawn this number of publisher workers
|
||||||
|
%% - sub_ops: the number of subscribes (route insert) each subscriber runs
|
||||||
|
%% - pub_ops: the number of publish (route lookups) each publisher runs
|
||||||
%% - sub_ptn: subscribe topic pattern like a/+/b/+/c/#
|
%% - sub_ptn: subscribe topic pattern like a/+/b/+/c/#
|
||||||
%% or a/+/{{id}}/{{num}}/# to generate pattern with {{id}}
|
%% or a/+/{{id}}/{{num}}/# to generate pattern with {{id}}
|
||||||
%% replaced by worker id and {{num}} replaced by topic number.
|
%% replaced by worker id and {{num}} replaced by topic number.
|
||||||
%% - pub_ptn: topic pattern used to benchmark publish (match) performance
|
%% - pub_ptn: topic pattern used to benchmark publish (match) performance
|
||||||
%% e.g. a/x/{{id}}/{{num}}/foo/bar
|
%% e.g. a/x/{{id}}/{{num}}/foo/bar
|
||||||
start(#{factor := Factor} = Settings) ->
|
run(#{subscribers := Subs,
|
||||||
BrokerPoolSize = emqx_vm:schedulers() * 2,
|
publishers := Pubs,
|
||||||
Pids = start_callers(BrokerPoolSize * Factor, Settings),
|
sub_ops := SubOps,
|
||||||
R = collect_results(Pids, #{subscribe => 0, match => 0}),
|
pub_ops := PubOps
|
||||||
|
} = Settings) ->
|
||||||
|
SubsPids = start_callers(Subs, fun start_subscriber/1, Settings),
|
||||||
|
PubsPids = start_callers(Pubs, fun start_publisher/1, Settings),
|
||||||
|
_ = collect_results(SubsPids, subscriber_ready),
|
||||||
|
io:format(user, "subscribe ...~n", []),
|
||||||
|
{T1, SubsTime} =
|
||||||
|
?T(begin
|
||||||
|
lists:foreach(fun(Pid) -> Pid ! start_subscribe end, SubsPids),
|
||||||
|
collect_results(SubsPids, subscribe_time)
|
||||||
|
end),
|
||||||
|
io:format(user, "InsertTotalTime: ~s~n", [ns(T1)]),
|
||||||
|
io:format(user, "InsertTimeAverage: ~s~n", [ns(SubsTime / Subs)]),
|
||||||
|
io:format(user, "InsertRps: ~p~n", [rps(Subs * SubOps, T1)]),
|
||||||
|
|
||||||
|
io:format(user, "lookup ...~n", []),
|
||||||
|
{T2, PubsTime} =
|
||||||
|
?T(begin
|
||||||
|
lists:foreach(fun(Pid) -> Pid ! start_lookup end, PubsPids),
|
||||||
|
collect_results(PubsPids, lookup_time)
|
||||||
|
end),
|
||||||
|
io:format(user, "LookupTotalTime: ~s~n", [ns(T2)]),
|
||||||
|
io:format(user, "LookupTimeAverage: ~s~n", [ns(PubsTime / Pubs)]),
|
||||||
|
io:format(user, "LookupRps: ~p~n", [rps(Pubs * PubOps, T2)]),
|
||||||
|
|
||||||
io:format(user, "mnesia table(s) RAM: ~p~n", [ram_bytes()]),
|
io:format(user, "mnesia table(s) RAM: ~p~n", [ram_bytes()]),
|
||||||
io:format(user, "~p~n", [erlang:memory()]),
|
|
||||||
io:format(user, "~p~n", [R]),
|
io:format(user, "unsubscribe ...~n", []),
|
||||||
lists:foreach(fun(Pid) -> Pid ! stop end, Pids).
|
{T3, ok} =
|
||||||
|
?T(begin
|
||||||
|
lists:foreach(fun(Pid) -> Pid ! stop end, SubsPids),
|
||||||
|
wait_until_empty()
|
||||||
|
end),
|
||||||
|
io:format(user, "TimeToUnsubscribeAll: ~s~n", [ns(T3)]).
|
||||||
|
|
||||||
|
wait_until_empty() ->
|
||||||
|
case emqx_trie:empty() of
|
||||||
|
true -> ok;
|
||||||
|
false ->
|
||||||
|
timer:sleep(5),
|
||||||
|
wait_until_empty()
|
||||||
|
end.
|
||||||
|
|
||||||
|
rps(N, NanoSec) -> N * 1_000_000 / NanoSec.
|
||||||
|
|
||||||
|
ns(T) when T > 1_000_000 -> io_lib:format("~p(s)", [T / 1_000_000]);
|
||||||
|
ns(T) when T > 1_000 -> io_lib:format("~p(ms)", [T / 1_000]);
|
||||||
|
ns(T) -> io_lib:format("~p(ns)", [T]).
|
||||||
|
|
||||||
ram_bytes() ->
|
ram_bytes() ->
|
||||||
Wordsize = erlang:system_info(wordsize),
|
Wordsize = erlang:system_info(wordsize),
|
||||||
|
@ -56,48 +106,69 @@ ram_bytes() ->
|
||||||
0
|
0
|
||||||
end.
|
end.
|
||||||
|
|
||||||
start_callers(0, _) -> [];
|
start_callers(N, F, Settings) ->
|
||||||
start_callers(N, Settings) ->
|
start_callers(N, F, Settings, []).
|
||||||
[start_caller(Settings#{id => N}) | start_callers(N - 1, Settings)].
|
|
||||||
|
|
||||||
collect_results([], R) -> R;
|
start_callers(0, _F, _Settings, Acc) ->
|
||||||
collect_results([Pid | Pids], Acc = #{subscribe := Sr, match := Mr}) ->
|
lists:reverse(Acc);
|
||||||
|
start_callers(N, F, Settings, Acc) ->
|
||||||
|
start_callers(N - 1, F, Settings, [F(Settings#{id => N}) | Acc]).
|
||||||
|
|
||||||
|
collect_results(Pids, Tag) ->
|
||||||
|
collect_results(Pids, Tag, 0).
|
||||||
|
|
||||||
|
collect_results([], _Tag, R) -> R;
|
||||||
|
collect_results([Pid | Pids], Tag, R) ->
|
||||||
receive
|
receive
|
||||||
{Pid, #{subscribe := Srd, match := Mrd}} ->
|
{Pid, Tag, N} ->
|
||||||
collect_results(Pids, Acc#{subscribe := Sr + Srd, match := Mr + Mrd})
|
collect_results(Pids, Tag, N + R)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%% ops per second
|
start_subscriber(#{id := Id, sub_ops := N, sub_ptn := SubPtn}) ->
|
||||||
rps(T, N) -> round(N / (T / 1000000)).
|
|
||||||
|
|
||||||
start_caller(#{id := Id, limit := N, sub_ptn := SubPtn, pub_ptn := PubPtn}) ->
|
|
||||||
Parent = self(),
|
Parent = self(),
|
||||||
proc_lib:spawn_link(
|
proc_lib:spawn_link(
|
||||||
fun() ->
|
fun() ->
|
||||||
SubTopics = make_topics(SubPtn, Id, N),
|
SubTopics = make_topics(SubPtn, Id, N),
|
||||||
{Ts, _} = timer:tc(fun() -> subscribe(SubTopics) end),
|
Parent ! {self(), subscriber_ready, 0},
|
||||||
PubTopics = make_topics(PubPtn, Id, N),
|
receive
|
||||||
{Tm, _} = timer:tc(fun() -> match(PubTopics) end),
|
start_subscribe ->
|
||||||
_ = erlang:send(Parent, {self(), #{subscribe => rps(Ts, N), match => rps(Tm, N)}}),
|
ok
|
||||||
|
end,
|
||||||
|
{Ts, _} = ?T(subscribe(SubTopics)),
|
||||||
|
_ = erlang:send(Parent, {self(), subscribe_time, Ts/ N}),
|
||||||
|
%% subscribers should not exit before publish test is done
|
||||||
receive
|
receive
|
||||||
stop ->
|
stop ->
|
||||||
ok
|
ok
|
||||||
end
|
end
|
||||||
end).
|
end).
|
||||||
|
|
||||||
match([]) -> ok;
|
start_publisher(#{id := Id, pub_ops := N, pub_ptn := PubPtn, subscribers := Subs}) ->
|
||||||
match([Topic | Topics]) ->
|
Parent = self(),
|
||||||
_ = emqx_router:lookup_routes(Topic),
|
proc_lib:spawn_link(
|
||||||
match(Topics).
|
fun() ->
|
||||||
|
L = lists:seq(1, N),
|
||||||
|
[Topic] = make_topics(PubPtn, (Id rem Subs) + 1, 1),
|
||||||
|
receive
|
||||||
|
start_lookup ->
|
||||||
|
ok
|
||||||
|
end,
|
||||||
|
{Tm, ok} = ?T(lists:foreach(fun(_) -> match(Topic) end, L)),
|
||||||
|
_ = erlang:send(Parent, {self(), lookup_time, Tm / N}),
|
||||||
|
ok
|
||||||
|
end).
|
||||||
|
|
||||||
|
match(Topic) ->
|
||||||
|
[_] = emqx_router:match_routes(Topic).
|
||||||
|
|
||||||
subscribe([]) -> ok;
|
subscribe([]) -> ok;
|
||||||
subscribe([Topic | Rest]) ->
|
subscribe([Topic | Rest]) ->
|
||||||
ok = emqx_broker:subscribe(Topic),
|
ok = emqx_broker:subscribe(Topic),
|
||||||
subscribe(Rest).
|
subscribe(Rest).
|
||||||
|
|
||||||
make_topics(SubPtn0, Id, Limit) ->
|
make_topics(Ptn0, Id, Limit) ->
|
||||||
SubPtn = emqx_topic:words(SubPtn0),
|
Ptn = emqx_topic:words(Ptn0),
|
||||||
F = fun(N) -> render(Id, N, SubPtn) end,
|
F = fun(N) -> render(Id, N, Ptn) end,
|
||||||
lists:map(F, lists:seq(1, Limit)).
|
lists:map(F, lists:seq(1, Limit)).
|
||||||
|
|
||||||
render(ID, N, Ptn) ->
|
render(ID, N, Ptn) ->
|
||||||
|
|
|
@ -194,6 +194,11 @@ delete_key(Key) ->
|
||||||
ok
|
ok
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
%% micro-optimization: no need to lookup when topic is not wildcard
|
||||||
|
%% because we only insert wildcards to emqx_trie
|
||||||
|
lookup_topic(_Topic, false) -> [];
|
||||||
|
lookup_topic(Topic, true) -> lookup_topic(Topic).
|
||||||
|
|
||||||
lookup_topic(Topic) when is_binary(Topic) ->
|
lookup_topic(Topic) when is_binary(Topic) ->
|
||||||
case ets:lookup(?TRIE, ?TOPIC(Topic)) of
|
case ets:lookup(?TRIE, ?TOPIC(Topic)) of
|
||||||
[#?TRIE{count = C}] -> [Topic || C > 0];
|
[#?TRIE{count = C}] -> [Topic || C > 0];
|
||||||
|
@ -219,15 +224,22 @@ do_match(Words) ->
|
||||||
do_match(Words, empty).
|
do_match(Words, empty).
|
||||||
|
|
||||||
do_match(Words, Prefix) ->
|
do_match(Words, Prefix) ->
|
||||||
match(is_compact(), Words, Prefix, []).
|
case is_compact() of
|
||||||
|
true -> match_compact(Words, Prefix, false, []);
|
||||||
|
false -> match_no_compact(Words, Prefix, false, [])
|
||||||
|
end.
|
||||||
|
|
||||||
match(_IsCompact, [], Topic, Acc) ->
|
match_no_compact([], Topic, IsWildcard, Acc) ->
|
||||||
'match_#'(Topic) ++ %% try match foo/bar/#
|
'match_#'(Topic) ++ %% try match foo/+/# or foo/bar/#
|
||||||
lookup_topic(Topic) ++ %% try match foo/bar
|
lookup_topic(Topic, IsWildcard) ++ %% e.g. foo/+
|
||||||
Acc;
|
Acc;
|
||||||
match(IsCompact, [Word | Words], Prefix, Acc0) ->
|
match_no_compact([Word | Words], Prefix, IsWildcard, Acc0) ->
|
||||||
case {has_prefix(Prefix), IsCompact} of
|
case has_prefix(Prefix) of
|
||||||
{false, false} ->
|
true ->
|
||||||
|
Acc1 = 'match_#'(Prefix) ++ Acc0,
|
||||||
|
Acc = match_no_compact(Words, join(Prefix, '+'), true, Acc1),
|
||||||
|
match_no_compact(Words, join(Prefix, Word), IsWildcard, Acc);
|
||||||
|
false ->
|
||||||
%% non-compact paths in database
|
%% non-compact paths in database
|
||||||
%% if there is no prefix matches the current topic prefix
|
%% if there is no prefix matches the current topic prefix
|
||||||
%% we can simpliy return from here
|
%% we can simpliy return from here
|
||||||
|
@ -240,21 +252,24 @@ match(IsCompact, [Word | Words], Prefix, Acc0) ->
|
||||||
%% then at the second level, we lookup prefix a/x,
|
%% then at the second level, we lookup prefix a/x,
|
||||||
%% no such prefix to be found, meaning there is no point
|
%% no such prefix to be found, meaning there is no point
|
||||||
%% searching for 'a/x/y', 'a/x/+' or 'a/x/#'
|
%% searching for 'a/x/y', 'a/x/+' or 'a/x/#'
|
||||||
Acc0;
|
Acc0
|
||||||
_ ->
|
end.
|
||||||
%% compact paths in database
|
|
||||||
%% we have to enumerate all possible prefixes
|
match_compact([], Topic, IsWildcard, Acc) ->
|
||||||
%% e.g. a/+/b/# results with below entries in database
|
'match_#'(Topic) ++ %% try match foo/bar/#
|
||||||
%% - a/+
|
lookup_topic(Topic, IsWildcard) ++ %% try match foo/bar
|
||||||
%% - a/+/b/#
|
Acc;
|
||||||
%% when matching a/x/y, we need to enumerate
|
match_compact([Word | Words], Prefix, IsWildcard, Acc0) ->
|
||||||
%% - a
|
Acc1 = 'match_#'(Prefix) ++ Acc0,
|
||||||
%% - a/x
|
Acc = match_compact(Words, join(Prefix, Word), IsWildcard, Acc1),
|
||||||
%% - a/x/y
|
WildcardPrefix = join(Prefix, '+'),
|
||||||
%% *with '+', '#' replaced at each level
|
%% go deeper to match current_prefix/+ only when:
|
||||||
Acc1 = 'match_#'(Prefix) ++ Acc0,
|
%% 1. current word is the last
|
||||||
Acc = match(IsCompact, Words, join(Prefix, '+'), Acc1),
|
%% OR
|
||||||
match(IsCompact, Words, join(Prefix, Word), Acc)
|
%% 2. there is a prefix = 'current_prefix/+'
|
||||||
|
case Words =:= [] orelse has_prefix(WildcardPrefix) of
|
||||||
|
true -> match_compact(Words, WildcardPrefix, true, Acc);
|
||||||
|
false -> Acc
|
||||||
end.
|
end.
|
||||||
|
|
||||||
'match_#'(Prefix) ->
|
'match_#'(Prefix) ->
|
||||||
|
|
|
@ -102,10 +102,13 @@ t_match2(_) ->
|
||||||
?assertEqual([], ?TRIE:match(<<"$SYS/broker/zenmq">>)).
|
?assertEqual([], ?TRIE:match(<<"$SYS/broker/zenmq">>)).
|
||||||
|
|
||||||
t_match3(_) ->
|
t_match3(_) ->
|
||||||
Topics = [<<"d/#">>, <<"a/b/c">>, <<"a/b/+">>, <<"a/#">>, <<"#">>, <<"$SYS/#">>],
|
Topics = [<<"d/#">>, <<"a/b/+">>, <<"a/#">>, <<"#">>, <<"$SYS/#">>],
|
||||||
trans(fun() -> [emqx_trie:insert(Topic) || Topic <- Topics] end),
|
trans(fun() -> [emqx_trie:insert(Topic) || Topic <- Topics] end),
|
||||||
Matched = mnesia:async_dirty(fun emqx_trie:match/1, [<<"a/b/c">>]),
|
Matched = mnesia:async_dirty(fun emqx_trie:match/1, [<<"a/b/c">>]),
|
||||||
?assertEqual(4, length(Matched)),
|
case length(Matched) of
|
||||||
|
3 -> ok;
|
||||||
|
_ -> error({unexpected, Matched})
|
||||||
|
end,
|
||||||
SysMatched = emqx_trie:match(<<"$SYS/a/b/c">>),
|
SysMatched = emqx_trie:match(<<"$SYS/a/b/c">>),
|
||||||
?assertEqual([<<"$SYS/#">>], SysMatched).
|
?assertEqual([<<"$SYS/#">>], SysMatched).
|
||||||
|
|
||||||
|
@ -114,6 +117,26 @@ t_match4(_) ->
|
||||||
trans(fun() -> lists:foreach(fun emqx_trie:insert/1, Topics) end),
|
trans(fun() -> lists:foreach(fun emqx_trie:insert/1, Topics) end),
|
||||||
?assertEqual([<<"/#">>, <<"/+/a/b/c">>], lists:sort(emqx_trie:match(<<"/0/a/b/c">>))).
|
?assertEqual([<<"/#">>, <<"/+/a/b/c">>], lists:sort(emqx_trie:match(<<"/0/a/b/c">>))).
|
||||||
|
|
||||||
|
t_match5(_) ->
|
||||||
|
T = <<"a/b/c/d/e/f/g/h/i/j/k/l/m/n/o/p/q/r/s/t/u/v/w/x/y/z">>,
|
||||||
|
Topics = [<<"#">>, <<T/binary, "/#">>, <<T/binary, "/+">>],
|
||||||
|
trans(fun() -> lists:foreach(fun emqx_trie:insert/1, Topics) end),
|
||||||
|
?assertEqual([<<"#">>, <<T/binary, "/#">>], lists:sort(emqx_trie:match(T))),
|
||||||
|
?assertEqual([<<"#">>, <<T/binary, "/#">>, <<T/binary, "/+">>],
|
||||||
|
lists:sort(emqx_trie:match(<<T/binary, "/1">>))).
|
||||||
|
|
||||||
|
t_match6(_) ->
|
||||||
|
T = <<"a/b/c/d/e/f/g/h/i/j/k/l/m/n/o/p/q/r/s/t/u/v/w/x/y/z">>,
|
||||||
|
W = <<"+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/#">>,
|
||||||
|
trans(fun() -> emqx_trie:insert(W) end),
|
||||||
|
?assertEqual([W], emqx_trie:match(T)).
|
||||||
|
|
||||||
|
t_match7(_) ->
|
||||||
|
T = <<"a/b/c/d/e/f/g/h/i/j/k/l/m/n/o/p/q/r/s/t/u/v/w/x/y/z">>,
|
||||||
|
W = <<"a/+/c/+/e/+/g/+/i/+/k/+/m/+/o/+/q/+/s/+/u/+/w/+/y/+/#">>,
|
||||||
|
trans(fun() -> emqx_trie:insert(W) end),
|
||||||
|
?assertEqual([W], emqx_trie:match(T)).
|
||||||
|
|
||||||
t_empty(_) ->
|
t_empty(_) ->
|
||||||
?assert(?TRIE:empty()),
|
?assert(?TRIE:empty()),
|
||||||
trans(fun ?TRIE:insert/1, [<<"topic/x/#">>]),
|
trans(fun ?TRIE:insert/1, [<<"topic/x/#">>]),
|
||||||
|
|
Loading…
Reference in New Issue