From 5024304bf9dc83e02567c2cb4114e5486011bfc6 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Mon, 4 Sep 2023 14:37:46 +0400 Subject: [PATCH] fix(router): wait for tables replicate before choosing schema vsn --- apps/emqx/src/emqx_router.erl | 2 + apps/emqx/src/emqx_trie.erl | 5 + apps/emqx/test/emqx_routing_SUITE.erl | 130 ++++++++++++++++++++++---- 3 files changed, 117 insertions(+), 20 deletions(-) diff --git a/apps/emqx/src/emqx_router.erl b/apps/emqx/src/emqx_router.erl index 092af06da..2cd5ffb96 100644 --- a/apps/emqx/src/emqx_router.erl +++ b/apps/emqx/src/emqx_router.erl @@ -465,6 +465,8 @@ get_schema_vsn() -> -spec init_schema() -> ok. init_schema() -> + ok = mria:wait_for_tables([?ROUTE_TAB, ?ROUTE_TAB_FILTERS]), + ok = emqx_trie:wait_for_tables(), ConfSchema = emqx_config:get([broker, routing, storage_schema]), Schema = choose_schema_vsn(ConfSchema), ok = persistent_term:put(?PT_SCHEMA_VSN, Schema), diff --git a/apps/emqx/src/emqx_trie.erl b/apps/emqx/src/emqx_trie.erl index 229a0e3f4..76be97d3e 100644 --- a/apps/emqx/src/emqx_trie.erl +++ b/apps/emqx/src/emqx_trie.erl @@ -21,6 +21,7 @@ %% Mnesia bootstrap -export([ mnesia/1, + wait_for_tables/0, create_session_trie/1 ]). @@ -105,6 +106,10 @@ create_session_trie(Type) -> ] ). +-spec wait_for_tables() -> ok | {error, _Reason}. +wait_for_tables() -> + mria:wait_for_tables([?TRIE]). + %%-------------------------------------------------------------------- %% Topics APIs %%-------------------------------------------------------------------- diff --git a/apps/emqx/test/emqx_routing_SUITE.erl b/apps/emqx/test/emqx_routing_SUITE.erl index dbc4dc193..6966ac56a 100644 --- a/apps/emqx/test/emqx_routing_SUITE.erl +++ b/apps/emqx/test/emqx_routing_SUITE.erl @@ -26,11 +26,15 @@ all() -> [ {group, routing_schema_v1}, - {group, routing_schema_v2} + {group, routing_schema_v2}, + t_routing_schema_switch_v1, + t_routing_schema_switch_v2 ]. groups() -> - TCs = emqx_common_test_helpers:all(?MODULE), + TCs = [ + t_cluster_routing + ], [ {routing_schema_v1, [], TCs}, {routing_schema_v2, [], TCs} @@ -39,28 +43,38 @@ groups() -> init_per_group(GroupName, Config) -> WorkDir = filename:join([?config(priv_dir, Config), ?MODULE, GroupName]), NodeSpecs = [ - {emqx_routing_SUITE1, #{apps => mk_appspecs(GroupName, 1), role => core}}, - {emqx_routing_SUITE2, #{apps => mk_appspecs(GroupName, 2), role => core}}, - {emqx_routing_SUITE3, #{apps => mk_appspecs(GroupName, 3), role => replicant}} + {emqx_routing_SUITE1, #{apps => [mk_emqx_appspec(GroupName, 1)], role => core}}, + {emqx_routing_SUITE2, #{apps => [mk_emqx_appspec(GroupName, 2)], role => core}}, + {emqx_routing_SUITE3, #{apps => [mk_emqx_appspec(GroupName, 3)], role => replicant}} ], Nodes = emqx_cth_cluster:start(NodeSpecs, #{work_dir => WorkDir}), - [{cluster, Nodes}, Config]. + [{cluster, Nodes} | Config]. end_per_group(_GroupName, Config) -> emqx_cth_cluster:stop(?config(cluster, Config)). -mk_appspecs(GroupName, N) -> - [ - {emqx, #{ - config => mk_config(GroupName, N), - after_start => fun() -> - % NOTE - % This one is actually defined on `emqx_conf_schema` level, but used - % in `emqx_broker`. Thus we have to resort to this ugly hack. - emqx_config:force_put([rpc, mode], async) - end - }} - ]. +init_per_testcase(TC, Config) -> + WorkDir = filename:join([?config(priv_dir, Config), ?MODULE, TC]), + [{work_dir, WorkDir} | Config]. + +end_per_testcase(_TC, _Config) -> + ok. + +mk_emqx_appspec(GroupName, N) -> + {emqx, #{ + config => mk_config(GroupName, N), + after_start => fun() -> + % NOTE + % This one is actually defined on `emqx_conf_schema` level, but used + % in `emqx_broker`. Thus we have to resort to this ugly hack. + emqx_config:force_put([rpc, mode], async) + end + }}. + +mk_genrpc_appspec() -> + {gen_rpc, #{ + override_env => [{port_discovery, stateless}] + }}. mk_config(GroupName, N) -> #{ @@ -68,9 +82,9 @@ mk_config(GroupName, N) -> listeners => mk_config_listeners(N) }. -mk_config_broker(routing_schema_v1) -> +mk_config_broker(Vsn) when Vsn == routing_schema_v1; Vsn == v1 -> #{routing => #{storage_schema => v1}}; -mk_config_broker(routing_schema_v2) -> +mk_config_broker(Vsn) when Vsn == routing_schema_v2; Vsn == v2 -> #{routing => #{storage_schema => v2}}. mk_config_listeners(N) -> @@ -82,6 +96,8 @@ mk_config_listeners(N) -> wss => #{default => #{enable => false}} }. +%% + t_cluster_routing(Config) -> Cluster = ?config(cluster, Config), Clients = [C1, C2, C3] = [start_client(N) || N <- Cluster], @@ -163,6 +179,80 @@ unsubscribe(C, Topic) -> {ok, _Props, undefined} = emqtt:unsubscribe(C, Topic), ok = timer:sleep(200). +%% + +t_routing_schema_switch_v1(Config) -> + t_routing_schema_switch(_From = v2, _To = v1, Config). + +t_routing_schema_switch_v2(Config) -> + t_routing_schema_switch(_From = v1, _To = v2, Config). + +t_routing_schema_switch(VFrom, VTo, Config) -> + % Start first node with routing schema VTo (e.g. v1) + WorkDir = ?config(work_dir, Config), + [Node1] = emqx_cth_cluster:start( + [ + {routing_schema_switch1, #{ + apps => [mk_genrpc_appspec(), mk_emqx_appspec(VTo, 1)] + }} + ], + #{work_dir => WorkDir} + ), + % Ensure there's at least 1 route on Node1 + C1 = start_client(Node1), + ok = subscribe(C1, <<"a/+/c">>), + ok = subscribe(C1, <<"d/e/f/#">>), + % Start rest of nodes with routing schema VFrom (e.g. v2) + [Node2, Node3] = emqx_cth_cluster:start( + [ + {routing_schema_switch2, #{ + apps => [mk_genrpc_appspec(), mk_emqx_appspec(VFrom, 2)], + base_port => 20000, + join_to => Node1 + }}, + {routing_schema_switch3, #{ + apps => [mk_genrpc_appspec(), mk_emqx_appspec(VFrom, 3)], + base_port => 20100, + join_to => Node1 + }} + ], + #{work_dir => WorkDir} + ), + % Verify that new nodes switched to schema v1/v2 in presence of v1/v2 routes respectively + Nodes = [Node1, Node2, Node3], + ?assertEqual( + [{ok, VTo}, {ok, VTo}, {ok, VTo}], + erpc:multicall(Nodes, emqx_router, get_schema_vsn, []) + ), + % Wait for all nodes to agree on cluster state + ?retry( + 500, + 10, + ?assertMatch( + [{ok, [Node1, Node2, Node3]}], + lists:usort(erpc:multicall(Nodes, emqx, running_nodes, [])) + ) + ), + % Verify that routing works as expected + C2 = start_client(Node2), + ok = subscribe(C2, <<"a/+/d">>), + C3 = start_client(Node3), + ok = subscribe(C3, <<"d/e/f/#">>), + {ok, _} = publish(C1, <<"a/b/d">>, <<"hey-newbies">>), + {ok, _} = publish(C2, <<"a/b/c">>, <<"hi">>), + {ok, _} = publish(C3, <<"d/e/f/42">>, <<"hello">>), + ?assertReceive({pub, C2, #{topic := <<"a/b/d">>, payload := <<"hey-newbies">>}}), + ?assertReceive({pub, C1, #{topic := <<"a/b/c">>, payload := <<"hi">>}}), + ?assertReceive({pub, C1, #{topic := <<"d/e/f/42">>, payload := <<"hello">>}}), + ?assertReceive({pub, C3, #{topic := <<"d/e/f/42">>, payload := <<"hello">>}}), + ?assertNotReceive(_), + ok = emqtt:stop(C1), + ok = emqtt:stop(C2), + ok = emqtt:stop(C3), + ok = emqx_cth_cluster:stop(Nodes). + +%% + get_mqtt_tcp_port(Node) -> {_, Port} = erpc:call(Node, emqx_config, get, [[listeners, tcp, default, bind]]), Port.