From d0971ceb53f57f2649a568853db59d138d0fb18c Mon Sep 17 00:00:00 2001 From: Zaiming Shi Date: Fri, 14 May 2021 10:42:11 +0200 Subject: [PATCH 1/6] fix: add emqx appup --- src/emqx.appup.src | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/emqx.appup.src b/src/emqx.appup.src index b10c14a9c..d162cc15d 100644 --- a/src/emqx.appup.src +++ b/src/emqx.appup.src @@ -3,6 +3,8 @@ [ {"4.3.0", [ {load_module, emqx_logger_jsonfmt, brutal_purge, soft_purge, []}, + {load_module, emqx_connection, brutal_purge, soft_purge, []}, + {load_module, emqx_frame, brutal_purge, soft_purge, []}, {load_module, emqx_metrics, brutal_purge, soft_purge, []}, {apply, {emqx_metrics, upgrade_retained_delayed_counter_type, []}} ]}, @@ -11,6 +13,8 @@ [ {"4.3.0", [ {load_module, emqx_logger_jsonfmt, brutal_purge, soft_purge, []}, + {load_module, emqx_connection, brutal_purge, soft_purge, []}, + {load_module, emqx_frame, brutal_purge, soft_purge, []}, %% Just load the module. We don't need to change the 'messages.retained' %% and 'messages.retained' counter type. {load_module, emqx_metrics, brutal_purge, soft_purge, []} From 30990edbd474dcab81e67f195253d9c5eb83d43c Mon Sep 17 00:00:00 2001 From: Zaiming Shi Date: Wed, 12 May 2021 20:32:16 +0200 Subject: [PATCH 2/6] fix(emqx_broker_bench): test real match performance --- src/emqx_broker_bench.erl | 148 ++++++++++++++++++++++++++++---------- 1 file changed, 110 insertions(+), 38 deletions(-) diff --git a/src/emqx_broker_bench.erl b/src/emqx_broker_bench.erl index 45ef0eab6..725a65898 100644 --- a/src/emqx_broker_bench.erl +++ b/src/emqx_broker_bench.erl @@ -18,32 +18,83 @@ -ifdef(EMQX_BENCHMARK). --export([start/1, run1/0, run1/2]). +-export([run/1, run1/0, run1/4]). -run1() -> run1(4, 1000). +-define(T(Expr), timer:tc(fun() -> Expr end)). -run1(Factor, Limit) -> - start(#{factor => Factor, - limit => Limit, - sub_ptn => <<"device/{{id}}/+/{{num}}/#">>, - pub_ptn => <<"device/{{id}}/xays/{{num}}/foo/bar/baz">>}). +run1() -> run1(80, 1000, 80, 10000). + +run1(Subs, SubOps, Pubs, PubOps) -> + run(#{subscribers => Subs, + publishers => Pubs, + sub_ops => SubOps, + pub_ops => PubOps, + sub_ptn => <<"device/{{id}}/+/{{num}}/#">>, + pub_ptn => <<"device/{{id}}/foo/{{num}}/bar">> + }). %% setting fields: -%% - factor: spawn broker-pool-size * factor number of callers -%% - limit: limit the total number of topics for each caller +%% - subscribers: spawn this number of subscriber workers +%% - publishers: spawn this number of publisher workers +%% - sub_ops: the number of subscribes (route insert) each subscriber runs +%% - pub_ops: the number of publish (route lookups) each publisher runs %% - sub_ptn: subscribe topic pattern like a/+/b/+/c/# %% or a/+/{{id}}/{{num}}/# to generate pattern with {{id}} %% replaced by worker id and {{num}} replaced by topic number. %% - pub_ptn: topic pattern used to benchmark publish (match) performance %% e.g. a/x/{{id}}/{{num}}/foo/bar -start(#{factor := Factor} = Settings) -> - BrokerPoolSize = emqx_vm:schedulers() * 2, - Pids = start_callers(BrokerPoolSize * Factor, Settings), - R = collect_results(Pids, #{subscribe => 0, match => 0}), +run(#{subscribers := Subs, + publishers := Pubs, + sub_ops := SubOps, + pub_ops := PubOps + } = Settings) -> + SubsPids = start_callers(Subs, fun start_subscriber/1, Settings), + PubsPids = start_callers(Pubs, fun start_publisher/1, Settings), + _ = collect_results(SubsPids, subscriber_ready), + io:format(user, "subscribe ...~n", []), + {T1, SubsTime} = + ?T(begin + lists:foreach(fun(Pid) -> Pid ! start_subscribe end, SubsPids), + collect_results(SubsPids, subscribe_time) + end), + io:format(user, "InsertTotalTime: ~s~n", [ns(T1)]), + io:format(user, "InsertTimeAverage: ~s~n", [ns(SubsTime / Subs)]), + io:format(user, "InsertRps: ~p~n", [rps(Subs * SubOps, T1)]), + + io:format(user, "lookup ...~n", []), + {T2, PubsTime} = + ?T(begin + lists:foreach(fun(Pid) -> Pid ! start_lookup end, PubsPids), + collect_results(PubsPids, lookup_time) + end), + io:format(user, "LookupTotalTime: ~s~n", [ns(T2)]), + io:format(user, "LookupTimeAverage: ~s~n", [ns(PubsTime / Pubs)]), + io:format(user, "LookupRps: ~p~n", [rps(Pubs * PubOps, T2)]), + io:format(user, "mnesia table(s) RAM: ~p~n", [ram_bytes()]), - io:format(user, "~p~n", [erlang:memory()]), - io:format(user, "~p~n", [R]), - lists:foreach(fun(Pid) -> Pid ! stop end, Pids). + io:format(user, "erlang memory: ~p~n", [erlang:memory()]), + + io:format(user, "unsubscribe ...~n", []), + {T3, ok} = + ?T(begin + lists:foreach(fun(Pid) -> Pid ! stop end, SubsPids), + wait_until_empty() + end), + io:format(user, "TimeToUnsubscribeAll: ~s~n", [ns(T3)]). + +wait_until_empty() -> + case emqx_trie:empty() of + true -> ok; + false -> + timer:sleep(5), + wait_until_empty() + end. + +rps(N, NanoSec) -> N * 1_000_000 / NanoSec. + +ns(T) when T > 1_000_000 -> io_lib:format("~p(s)", [T / 1_000_000]); +ns(T) when T > 1_000 -> io_lib:format("~p(ms)", [T / 1_000]); +ns(T) -> io_lib:format("~p(ns)", [T]). ram_bytes() -> Wordsize = erlang:system_info(wordsize), @@ -56,48 +107,69 @@ ram_bytes() -> 0 end. -start_callers(0, _) -> []; -start_callers(N, Settings) -> - [start_caller(Settings#{id => N}) | start_callers(N - 1, Settings)]. +start_callers(N, F, Settings) -> + start_callers(N, F, Settings, []). -collect_results([], R) -> R; -collect_results([Pid | Pids], Acc = #{subscribe := Sr, match := Mr}) -> +start_callers(0, _F, _Settings, Acc) -> + lists:reverse(Acc); +start_callers(N, F, Settings, Acc) -> + start_callers(N - 1, F, Settings, [F(Settings#{id => N}) | Acc]). + +collect_results(Pids, Tag) -> + collect_results(Pids, Tag, 0). + +collect_results([], _Tag, R) -> R; +collect_results([Pid | Pids], Tag, R) -> receive - {Pid, #{subscribe := Srd, match := Mrd}} -> - collect_results(Pids, Acc#{subscribe := Sr + Srd, match := Mr + Mrd}) + {Pid, Tag, N} -> + collect_results(Pids, Tag, N + R) end. -%% ops per second -rps(T, N) -> round(N / (T / 1000000)). - -start_caller(#{id := Id, limit := N, sub_ptn := SubPtn, pub_ptn := PubPtn}) -> +start_subscriber(#{id := Id, sub_ops := N, sub_ptn := SubPtn}) -> Parent = self(), proc_lib:spawn_link( fun() -> SubTopics = make_topics(SubPtn, Id, N), - {Ts, _} = timer:tc(fun() -> subscribe(SubTopics) end), - PubTopics = make_topics(PubPtn, Id, N), - {Tm, _} = timer:tc(fun() -> match(PubTopics) end), - _ = erlang:send(Parent, {self(), #{subscribe => rps(Ts, N), match => rps(Tm, N)}}), + Parent ! {self(), subscriber_ready, 0}, + receive + start_subscribe -> + ok + end, + {Ts, _} = ?T(subscribe(SubTopics)), + _ = erlang:send(Parent, {self(), subscribe_time, Ts/ N}), + %% subscribers should not exit before publish test is done receive stop -> ok end end). -match([]) -> ok; -match([Topic | Topics]) -> - _ = emqx_router:lookup_routes(Topic), - match(Topics). +start_publisher(#{id := Id, pub_ops := N, pub_ptn := PubPtn, subscribers := Subs}) -> + Parent = self(), + proc_lib:spawn_link( + fun() -> + L = lists:seq(1, N), + [Topic] = make_topics(PubPtn, (Id rem Subs) + 1, 1), + receive + start_lookup -> + ok + end, + {Tm, ok} = ?T(lists:foreach(fun(_) -> match(Topic) end, L)), + _ = erlang:send(Parent, {self(), lookup_time, Tm / N}), + ok + end). + +match(Topic) -> + [_] = emqx_router:match_routes(Topic). subscribe([]) -> ok; subscribe([Topic | Rest]) -> ok = emqx_broker:subscribe(Topic), subscribe(Rest). -make_topics(SubPtn0, Id, Limit) -> - SubPtn = emqx_topic:words(SubPtn0), - F = fun(N) -> render(Id, N, SubPtn) end, +make_topics(Ptn0, Id, Limit) -> + Ptn = emqx_topic:words(Ptn0), + F = fun(N) -> render(Id, N, Ptn) end, lists:map(F, lists:seq(1, Limit)). render(ID, N, Ptn) -> From 3c03047c9f94c64185182a1ae91cfe0d0d2b38e9 Mon Sep 17 00:00:00 2001 From: Zaiming Shi Date: Thu, 13 May 2021 15:15:22 +0200 Subject: [PATCH 3/6] fix(emqx_trie): performance issue when many levels --- src/emqx.appup.src | 2 ++ src/emqx_trie.erl | 50 ++++++++++++++++++++++++---------------- test/emqx_trie_SUITE.erl | 25 +++++++++++++++++++- 3 files changed, 56 insertions(+), 21 deletions(-) diff --git a/src/emqx.appup.src b/src/emqx.appup.src index d162cc15d..2cac1e315 100644 --- a/src/emqx.appup.src +++ b/src/emqx.appup.src @@ -5,6 +5,7 @@ {load_module, emqx_logger_jsonfmt, brutal_purge, soft_purge, []}, {load_module, emqx_connection, brutal_purge, soft_purge, []}, {load_module, emqx_frame, brutal_purge, soft_purge, []}, + {load_module, emqx_trie, brutal_purge, soft_purge, []}, {load_module, emqx_metrics, brutal_purge, soft_purge, []}, {apply, {emqx_metrics, upgrade_retained_delayed_counter_type, []}} ]}, @@ -15,6 +16,7 @@ {load_module, emqx_logger_jsonfmt, brutal_purge, soft_purge, []}, {load_module, emqx_connection, brutal_purge, soft_purge, []}, {load_module, emqx_frame, brutal_purge, soft_purge, []}, + {load_module, emqx_trie, brutal_purge, soft_purge, []}, %% Just load the module. We don't need to change the 'messages.retained' %% and 'messages.retained' counter type. {load_module, emqx_metrics, brutal_purge, soft_purge, []} diff --git a/src/emqx_trie.erl b/src/emqx_trie.erl index 8ece333b0..901da2556 100644 --- a/src/emqx_trie.erl +++ b/src/emqx_trie.erl @@ -219,15 +219,22 @@ do_match(Words) -> do_match(Words, empty). do_match(Words, Prefix) -> - match(is_compact(), Words, Prefix, []). + case is_compact() of + true -> match_compact(Words, Prefix, []); + false -> match_no_compact(Words, Prefix, []) + end. -match(_IsCompact, [], Topic, Acc) -> +match_no_compact([], Topic, Acc) -> 'match_#'(Topic) ++ %% try match foo/bar/# lookup_topic(Topic) ++ %% try match foo/bar Acc; -match(IsCompact, [Word | Words], Prefix, Acc0) -> - case {has_prefix(Prefix), IsCompact} of - {false, false} -> +match_no_compact([Word | Words], Prefix, Acc0) -> + case has_prefix(Prefix) of + true -> + Acc1 = 'match_#'(Prefix) ++ Acc0, + Acc = match_no_compact(Words, join(Prefix, '+'), Acc1), + match_no_compact(Words, join(Prefix, Word), Acc); + false -> %% non-compact paths in database %% if there is no prefix matches the current topic prefix %% we can simpliy return from here @@ -240,21 +247,24 @@ match(IsCompact, [Word | Words], Prefix, Acc0) -> %% then at the second level, we lookup prefix a/x, %% no such prefix to be found, meaning there is no point %% searching for 'a/x/y', 'a/x/+' or 'a/x/#' - Acc0; - _ -> - %% compact paths in database - %% we have to enumerate all possible prefixes - %% e.g. a/+/b/# results with below entries in database - %% - a/+ - %% - a/+/b/# - %% when matching a/x/y, we need to enumerate - %% - a - %% - a/x - %% - a/x/y - %% *with '+', '#' replaced at each level - Acc1 = 'match_#'(Prefix) ++ Acc0, - Acc = match(IsCompact, Words, join(Prefix, '+'), Acc1), - match(IsCompact, Words, join(Prefix, Word), Acc) + Acc0 + end. + +match_compact([], Topic, Acc) -> + 'match_#'(Topic) ++ %% try match foo/bar/# + lookup_topic(Topic) ++ %% try match foo/bar + Acc; +match_compact([Word | Words], Prefix, Acc0) -> + Acc1 = 'match_#'(Prefix) ++ Acc0, + Acc = match_compact(Words, join(Prefix, Word), Acc1), + WildcardPrefix = join(Prefix, '+'), + %% go deeper to match current_prefix/+ only when: + %% 1. current word is the last + %% OR + %% 2. there is a prefix = 'current_prefix/+' + case Words =:= [] orelse has_prefix(WildcardPrefix) of + true -> match_compact(Words, WildcardPrefix, Acc); + false -> Acc end. 'match_#'(Prefix) -> diff --git a/test/emqx_trie_SUITE.erl b/test/emqx_trie_SUITE.erl index 7516b58a0..23cd1009f 100644 --- a/test/emqx_trie_SUITE.erl +++ b/test/emqx_trie_SUITE.erl @@ -105,7 +105,10 @@ t_match3(_) -> Topics = [<<"d/#">>, <<"a/b/c">>, <<"a/b/+">>, <<"a/#">>, <<"#">>, <<"$SYS/#">>], trans(fun() -> [emqx_trie:insert(Topic) || Topic <- Topics] end), Matched = mnesia:async_dirty(fun emqx_trie:match/1, [<<"a/b/c">>]), - ?assertEqual(4, length(Matched)), + case length(Matched) of + 4 -> ok; + _ -> error({unexpected, Matched}) + end, SysMatched = emqx_trie:match(<<"$SYS/a/b/c">>), ?assertEqual([<<"$SYS/#">>], SysMatched). @@ -114,6 +117,26 @@ t_match4(_) -> trans(fun() -> lists:foreach(fun emqx_trie:insert/1, Topics) end), ?assertEqual([<<"/#">>, <<"/+/a/b/c">>], lists:sort(emqx_trie:match(<<"/0/a/b/c">>))). +t_match5(_) -> + T = <<"a/b/c/d/e/f/g/h/i/j/k/l/m/n/o/p/q/r/s/t/u/v/w/x/y/z">>, + Topics = [<<"#">>, <>, <>], + trans(fun() -> lists:foreach(fun emqx_trie:insert/1, Topics) end), + ?assertEqual([<<"#">>, <>], lists:sort(emqx_trie:match(T))), + ?assertEqual([<<"#">>, <>, <>], + lists:sort(emqx_trie:match(<>))). + +t_match6(_) -> + T = <<"a/b/c/d/e/f/g/h/i/j/k/l/m/n/o/p/q/r/s/t/u/v/w/x/y/z">>, + W = <<"+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/#">>, + trans(fun() -> emqx_trie:insert(W) end), + ?assertEqual([W], emqx_trie:match(T)). + +t_match7(_) -> + T = <<"a/b/c/d/e/f/g/h/i/j/k/l/m/n/o/p/q/r/s/t/u/v/w/x/y/z">>, + W = <<"a/+/c/+/e/+/g/+/i/+/k/+/m/+/o/+/q/+/s/+/u/+/w/+/y/+/#">>, + trans(fun() -> emqx_trie:insert(W) end), + ?assertEqual([W], emqx_trie:match(T)). + t_empty(_) -> ?assert(?TRIE:empty()), trans(fun ?TRIE:insert/1, [<<"topic/x/#">>]), From 55316b3ac36fd0d1a03b8a2839585520e90f3ba6 Mon Sep 17 00:00:00 2001 From: Zaiming Shi Date: Thu, 13 May 2021 16:31:13 +0200 Subject: [PATCH 4/6] perf: micro optimisation: no lookup for non-wildcard in trie --- src/emqx_trie.erl | 31 ++++++++++++++++++------------- test/emqx_trie_SUITE.erl | 4 ++-- 2 files changed, 20 insertions(+), 15 deletions(-) diff --git a/src/emqx_trie.erl b/src/emqx_trie.erl index 901da2556..7146feb74 100644 --- a/src/emqx_trie.erl +++ b/src/emqx_trie.erl @@ -194,6 +194,11 @@ delete_key(Key) -> ok end. +%% micro-optimization: no need to lookup when topic is not wildcard +%% because we only insert wildcards to emqx_trie +lookup_topic(_Topic, false) -> []; +lookup_topic(Topic, true) -> lookup_topic(Topic). + lookup_topic(Topic) when is_binary(Topic) -> case ets:lookup(?TRIE, ?TOPIC(Topic)) of [#?TRIE{count = C}] -> [Topic || C > 0]; @@ -220,20 +225,20 @@ do_match(Words) -> do_match(Words, Prefix) -> case is_compact() of - true -> match_compact(Words, Prefix, []); - false -> match_no_compact(Words, Prefix, []) + true -> match_compact(Words, Prefix, false, []); + false -> match_no_compact(Words, Prefix, false, []) end. -match_no_compact([], Topic, Acc) -> - 'match_#'(Topic) ++ %% try match foo/bar/# - lookup_topic(Topic) ++ %% try match foo/bar +match_no_compact([], Topic, IsWildcard, Acc) -> + 'match_#'(Topic) ++ %% try match foo/+/# or foo/bar/# + lookup_topic(Topic, IsWildcard) ++ %% e.g. foo/+ Acc; -match_no_compact([Word | Words], Prefix, Acc0) -> +match_no_compact([Word | Words], Prefix, IsWildcard, Acc0) -> case has_prefix(Prefix) of true -> Acc1 = 'match_#'(Prefix) ++ Acc0, - Acc = match_no_compact(Words, join(Prefix, '+'), Acc1), - match_no_compact(Words, join(Prefix, Word), Acc); + Acc = match_no_compact(Words, join(Prefix, '+'), true, Acc1), + match_no_compact(Words, join(Prefix, Word), IsWildcard, Acc); false -> %% non-compact paths in database %% if there is no prefix matches the current topic prefix @@ -250,20 +255,20 @@ match_no_compact([Word | Words], Prefix, Acc0) -> Acc0 end. -match_compact([], Topic, Acc) -> +match_compact([], Topic, IsWildcard, Acc) -> 'match_#'(Topic) ++ %% try match foo/bar/# - lookup_topic(Topic) ++ %% try match foo/bar + lookup_topic(Topic, IsWildcard) ++ %% try match foo/bar Acc; -match_compact([Word | Words], Prefix, Acc0) -> +match_compact([Word | Words], Prefix, IsWildcard, Acc0) -> Acc1 = 'match_#'(Prefix) ++ Acc0, - Acc = match_compact(Words, join(Prefix, Word), Acc1), + Acc = match_compact(Words, join(Prefix, Word), IsWildcard, Acc1), WildcardPrefix = join(Prefix, '+'), %% go deeper to match current_prefix/+ only when: %% 1. current word is the last %% OR %% 2. there is a prefix = 'current_prefix/+' case Words =:= [] orelse has_prefix(WildcardPrefix) of - true -> match_compact(Words, WildcardPrefix, Acc); + true -> match_compact(Words, WildcardPrefix, true, Acc); false -> Acc end. diff --git a/test/emqx_trie_SUITE.erl b/test/emqx_trie_SUITE.erl index 23cd1009f..7ae23b4c6 100644 --- a/test/emqx_trie_SUITE.erl +++ b/test/emqx_trie_SUITE.erl @@ -102,11 +102,11 @@ t_match2(_) -> ?assertEqual([], ?TRIE:match(<<"$SYS/broker/zenmq">>)). t_match3(_) -> - Topics = [<<"d/#">>, <<"a/b/c">>, <<"a/b/+">>, <<"a/#">>, <<"#">>, <<"$SYS/#">>], + Topics = [<<"d/#">>, <<"a/b/+">>, <<"a/#">>, <<"#">>, <<"$SYS/#">>], trans(fun() -> [emqx_trie:insert(Topic) || Topic <- Topics] end), Matched = mnesia:async_dirty(fun emqx_trie:match/1, [<<"a/b/c">>]), case length(Matched) of - 4 -> ok; + 3 -> ok; _ -> error({unexpected, Matched}) end, SysMatched = emqx_trie:match(<<"$SYS/a/b/c">>), From 78fd1a80c5348f1f5dba4d13d8fafdf00bf676d7 Mon Sep 17 00:00:00 2001 From: Zaiming Shi Date: Thu, 13 May 2021 18:32:06 +0200 Subject: [PATCH 5/6] fix(bench): test more publish levels --- src/emqx_broker_bench.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/emqx_broker_bench.erl b/src/emqx_broker_bench.erl index 725a65898..b46799eae 100644 --- a/src/emqx_broker_bench.erl +++ b/src/emqx_broker_bench.erl @@ -30,7 +30,7 @@ run1(Subs, SubOps, Pubs, PubOps) -> sub_ops => SubOps, pub_ops => PubOps, sub_ptn => <<"device/{{id}}/+/{{num}}/#">>, - pub_ptn => <<"device/{{id}}/foo/{{num}}/bar">> + pub_ptn => <<"device/{{id}}/foo/{{num}}/bar/1/2/3/4/5">> }). %% setting fields: From ed0ad3e796f8102b41198e57c9b603ca4fe4adaa Mon Sep 17 00:00:00 2001 From: Zaiming Shi Date: Thu, 13 May 2021 18:33:54 +0200 Subject: [PATCH 6/6] chore: skip printing memory --- src/emqx_broker_bench.erl | 1 - 1 file changed, 1 deletion(-) diff --git a/src/emqx_broker_bench.erl b/src/emqx_broker_bench.erl index b46799eae..5aad43cc9 100644 --- a/src/emqx_broker_bench.erl +++ b/src/emqx_broker_bench.erl @@ -72,7 +72,6 @@ run(#{subscribers := Subs, io:format(user, "LookupRps: ~p~n", [rps(Pubs * PubOps, T2)]), io:format(user, "mnesia table(s) RAM: ~p~n", [ram_bytes()]), - io:format(user, "erlang memory: ~p~n", [erlang:memory()]), io:format(user, "unsubscribe ...~n", []), {T3, ok} =