From a1ccf85c66577ba3d606596442ab13f0c006325a Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 5 Jan 2024 18:35:49 +0100 Subject: [PATCH] test(routesync): verify that syncer preserves consistency Under a highly concurrent load. Be aware that this testcase is not deterministic. --- apps/emqx/test/emqx_routing_SUITE.erl | 138 +++++++++++++++++++++++++- 1 file changed, 137 insertions(+), 1 deletion(-) diff --git a/apps/emqx/test/emqx_routing_SUITE.erl b/apps/emqx/test/emqx_routing_SUITE.erl index 10ca866fe..af8af737b 100644 --- a/apps/emqx/test/emqx_routing_SUITE.erl +++ b/apps/emqx/test/emqx_routing_SUITE.erl @@ -21,6 +21,7 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("emqx/include/asserts.hrl"). -include_lib("emqx/include/emqx_router.hrl"). @@ -29,7 +30,8 @@ all() -> {group, routing_schema_v1}, {group, routing_schema_v2}, t_routing_schema_switch_v1, - t_routing_schema_switch_v2 + t_routing_schema_switch_v2, + t_concurrent_routing_updates ]. groups() -> @@ -182,6 +184,140 @@ unsubscribe(C, Topic) -> %% +-define(SUBSCRIBE_TOPICS, [ + <<"t/#">>, + <<"t/fixed">>, + <<"t/1/+">>, + <<"t/2/+">>, + <<"t/42/+/+">>, + <<"client/${i}/+">>, + <<"client/${i}/fixed">>, + <<"client/${i}/#">>, + <<"rand/${r}/+">>, + <<"rand/${r}/fixed">> +]). + +t_concurrent_routing_updates(init, Config) -> + WorkDir = emqx_cth_suite:work_dir(?FUNCTION_NAME, Config), + Apps = emqx_cth_suite:start( + [ + {emqx, #{ + config => #{broker => #{routing => #{storage_schema => v2}}}, + before_start => fun() -> + % NOTE + % This one is actually defined on `emqx_conf_schema` level, but used + % in `emqx_broker`. Thus we have to resort to this ugly hack. + emqx_config:force_put([node, broker_pool_size], 2) + end + }} + ], + #{work_dir => WorkDir} + ), + ok = snabbkaffe:start_trace(), + [{tc_apps, Apps} | Config]; +t_concurrent_routing_updates('end', Config) -> + ok = snabbkaffe:stop(), + ok = emqx_cth_suite:stop(?config(tc_apps, Config)). + +t_concurrent_routing_updates(_Config) -> + NClients = 400, + NRTopics = 250, + MCommands = 8, + Port = get_mqtt_tcp_port(node()), + Clients = [ + spawn_link(?MODULE, run_concurrent_client, [I, Port, MCommands, NRTopics]) + || I <- lists:seq(1, NClients) + ], + ok = lists:foreach(fun ping_concurrent_client/1, Clients), + ok = timer:sleep(200), + Subscribers = ets:tab2list(?SUBSCRIBER), + Topics = maps:keys(maps:from_list(Subscribers)), + ?assertEqual(lists:sort(Topics), lists:sort(emqx_router:topics())), + ok = lists:foreach(fun stop_concurrent_client/1, Clients), + ok = timer:sleep(1000), + ct:pal("Trace: ~p", [?of_kind(router_syncer_new_batch, snabbkaffe:collect_trace())]), + ?assertEqual([], ets:tab2list(?SUBSCRIBER)), + ?assertEqual([], emqx_router:topics()). + +run_concurrent_client(I, Port, MCommands, NRTopics) -> + % _ = rand:seed(default, I), + Ctx = #{ + i => I, + r => rand:uniform(NRTopics) + }, + {ok, C} = emqtt:start_link(#{port => Port, clientid => render("client:${i}", Ctx)}), + {ok, _Props} = emqtt:connect(C), + NCommands = rand:uniform(MCommands), + Commands = gen_concurrent_client_plan(NCommands, Ctx), + ok = subscribe_concurrent_client(C, Commands), + run_concurrent_client_loop(C). + +gen_concurrent_client_plan(N, Ctx) -> + lists:foldl( + fun(_, Acc) -> mixin(pick_random_command(Ctx), Acc) end, + [], + lists:seq(1, N) + ). + +subscribe_concurrent_client(C, Commands) -> + lists:foreach( + fun + ({subscribe, Topic}) -> + {ok, _Props, [0]} = emqtt:subscribe(C, Topic); + ({unsubscribe, Topic}) -> + {ok, _Props, undefined} = emqtt:unsubscribe(C, Topic) + end, + Commands + ). + +pick_random_command(Ctx) -> + Topic = render(randpick(?SUBSCRIBE_TOPICS), Ctx), + randpick([ + [{subscribe, Topic}], + [{subscribe, Topic}, {unsubscribe, Topic}] + ]). + +render(Template, Ctx) -> + iolist_to_binary(emqx_template:render_strict(emqx_template:parse(Template), Ctx)). + +run_concurrent_client_loop(C) -> + receive + {From, Ref, F} -> + Reply = F(C), + From ! {Ref, Reply}, + run_concurrent_client_loop(C) + end. + +ping_concurrent_client(Pid) -> + Ref = make_ref(), + Pid ! {self(), Ref, fun emqtt:ping/1}, + receive + {Ref, Reply} -> Reply + after 5000 -> + error(timeout) + end. + +stop_concurrent_client(Pid) -> + MRef = erlang:monitor(process, Pid), + true = erlang:unlink(Pid), + true = erlang:exit(Pid, shutdown), + receive + {'DOWN', MRef, process, Pid, Reason} -> Reason + end. + +randpick(List) -> + lists:nth(rand:uniform(length(List)), List). + +mixin(L = [H | T], Into = [HInto | TInto]) -> + case rand:uniform(length(Into) + 1) of + 1 -> [H | mixin(T, Into)]; + _ -> [HInto | mixin(L, TInto)] + end; +mixin(L, Into) -> + L ++ Into. + +%% + t_routing_schema_switch_v1(Config) -> WorkDir = emqx_cth_suite:work_dir(?FUNCTION_NAME, Config), t_routing_schema_switch(_From = v2, _To = v1, WorkDir).