diff --git a/apps/emqx/src/emqx_cm_registry.erl b/apps/emqx/src/emqx_cm_registry.erl index 89d17ae9e..be4bc1e0d 100644 --- a/apps/emqx/src/emqx_cm_registry.erl +++ b/apps/emqx/src/emqx_cm_registry.erl @@ -122,10 +122,11 @@ handle_cast(Msg, State) -> {noreply, State}. handle_info({membership, {mnesia, down, Node}}, State) -> - global:trans({?LOCK, self()}, - fun() -> - mria:transaction(?CM_SHARD, fun cleanup_channels/1, [Node]) - end), + cleanup_channels(Node), + {noreply, State}; + +handle_info({membership, {node, down, Node}}, State) -> + cleanup_channels(Node), {noreply, State}; handle_info({membership, _Event}, State) -> @@ -146,6 +147,12 @@ code_change(_OldVsn, State, _Extra) -> %%-------------------------------------------------------------------- cleanup_channels(Node) -> + global:trans({?LOCK, self()}, + fun() -> + mria:transaction(?CM_SHARD, fun do_cleanup_channels/1, [Node]) + end). + +do_cleanup_channels(Node) -> Pat = [{#channel{pid = '$1', _ = '_'}, [{'==', {node, '$1'}, Node}], ['$_']}], lists:foreach(fun delete_channel/1, mnesia:select(?TAB, Pat, write)). diff --git a/apps/emqx/src/emqx_router_helper.erl b/apps/emqx/src/emqx_router_helper.erl index 7203b35e1..3c00ef3cc 100644 --- a/apps/emqx/src/emqx_router_helper.erl +++ b/apps/emqx/src/emqx_router_helper.erl @@ -21,6 +21,7 @@ -include("emqx.hrl"). -include("logger.hrl"). -include("types.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). %% Mnesia bootstrap @@ -91,6 +92,7 @@ monitor(Node) when is_atom(Node) -> %%-------------------------------------------------------------------- init([]) -> + process_flag(trap_exit, true), ok = ekka:monitor(membership), _ = mria:wait_for_tables([?ROUTING_NODE]), {ok, _} = mnesia:subscribe({table, ?ROUTING_NODE, simple}), @@ -136,11 +138,15 @@ handle_info({nodedown, Node}, State = #{nodes := Nodes}) -> mria:transaction(?ROUTE_SHARD, fun cleanup_routes/1, [Node]) end), ok = mria:dirty_delete(?ROUTING_NODE, Node), + ?tp(emqx_router_helper_cleanup_done, #{node => Node}), {noreply, State#{nodes := lists:delete(Node, Nodes)}, hibernate}; handle_info({membership, {mnesia, down, Node}}, State) -> handle_info({nodedown, Node}, State); +handle_info({membership, {node, down, Node}}, State) -> + handle_info({nodedown, Node}, State); + handle_info({membership, _Event}, State) -> {noreply, State}; diff --git a/apps/emqx/test/emqx_cm_registry_SUITE.erl b/apps/emqx/test/emqx_cm_registry_SUITE.erl index 422b7d084..c2ae5b56b 100644 --- a/apps/emqx/test/emqx_cm_registry_SUITE.erl +++ b/apps/emqx/test/emqx_cm_registry_SUITE.erl @@ -65,7 +65,7 @@ t_register_unregister_channel(_) -> emqx_cm_registry:unregister_channel(ClientId), ?assertEqual([], emqx_cm_registry:lookup_channels(ClientId)). -t_cleanup_channels(_) -> +t_cleanup_channels_mnesia_down(_) -> ClientId = <<"clientid">>, ClientId2 = <<"clientid2">>, emqx_cm_registry:register_channel(ClientId), @@ -76,3 +76,13 @@ t_cleanup_channels(_) -> ?assertEqual([], emqx_cm_registry:lookup_channels(ClientId)), ?assertEqual([], emqx_cm_registry:lookup_channels(ClientId2)). +t_cleanup_channels_node_down(_) -> + ClientId = <<"clientid">>, + ClientId2 = <<"clientid2">>, + emqx_cm_registry:register_channel(ClientId), + emqx_cm_registry:register_channel(ClientId2), + ?assertEqual([self()], emqx_cm_registry:lookup_channels(ClientId)), + emqx_cm_registry ! {membership, {node, down, node()}}, + ct:sleep(100), + ?assertEqual([], emqx_cm_registry:lookup_channels(ClientId)), + ?assertEqual([], emqx_cm_registry:lookup_channels(ClientId2)). diff --git a/apps/emqx/test/emqx_router_helper_SUITE.erl b/apps/emqx/test/emqx_router_helper_SUITE.erl index acf3f4c09..f00a9b977 100644 --- a/apps/emqx/test/emqx_router_helper_SUITE.erl +++ b/apps/emqx/test/emqx_router_helper_SUITE.erl @@ -19,18 +19,61 @@ -compile(export_all). -compile(nowarn_export_all). +-include("emqx.hrl"). -include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -define(ROUTER_HELPER, emqx_router_helper). +-define(ROUTE_TAB, emqx_route). all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> + DistPid = case net_kernel:nodename() of + ignored -> + %% calling `net_kernel:start' without `epmd' + %% running will result in a failure. + start_epmd(), + {ok, Pid} = net_kernel:start(['test@127.0.0.1', longnames]), + Pid; + _ -> + undefined + end, emqx_common_test_helpers:start_apps([]), + [{dist_pid, DistPid} | Config]. + +end_per_suite(Config) -> + DistPid = ?config(dist_pid, Config), + case DistPid of + Pid when is_pid(Pid) -> + net_kernel:stop(); + _ -> + ok + end, + emqx_common_test_helpers:stop_apps([]). + +init_per_testcase(TestCase, Config) + when TestCase =:= t_cleanup_membership_mnesia_down; + TestCase =:= t_cleanup_membership_node_down; + TestCase =:= t_cleanup_monitor_node_down -> + ok = snabbkaffe:start_trace(), + Slave = start_slave(some_node), + [{slave, Slave} | Config]; +init_per_testcase(_TestCase, Config) -> Config. -end_per_suite(_Config) -> - emqx_common_test_helpers:stop_apps([]). +end_per_testcase(TestCase, Config) + when TestCase =:= t_cleanup_membership_mnesia_down; + TestCase =:= t_cleanup_membership_node_down; + TestCase =:= t_cleanup_monitor_node_down -> + Slave = ?config(slave, Config), + stop_slave(Slave), + mria:transaction(?ROUTE_SHARD, fun() -> mnesia:clear_table(?ROUTE_TAB) end), + snabbkaffe:stop(), + ok; +end_per_testcase(_TestCase, _Config) -> + ok. t_monitor(_) -> ok = emqx_router_helper:monitor({undefined, node()}), @@ -44,7 +87,74 @@ t_mnesia(_) -> ?ROUTER_HELPER ! {membership, {mnesia, down, node()}}, ct:sleep(200). +t_cleanup_membership_mnesia_down(Config) -> + Slave = ?config(slave, Config), + emqx_router:add_route(<<"a/b/c">>, Slave), + emqx_router:add_route(<<"d/e/f">>, node()), + ?assertMatch([_, _], emqx_router:topics()), + ?wait_async_action( + ?ROUTER_HELPER ! {membership, {mnesia, down, Slave}}, + #{?snk_kind := emqx_router_helper_cleanup_done, node := Slave}, + 1_000), + ?assertEqual([<<"d/e/f">>], emqx_router:topics()). + +t_cleanup_membership_node_down(Config) -> + Slave = ?config(slave, Config), + emqx_router:add_route(<<"a/b/c">>, Slave), + emqx_router:add_route(<<"d/e/f">>, node()), + ?assertMatch([_, _], emqx_router:topics()), + ?wait_async_action( + ?ROUTER_HELPER ! {membership, {node, down, Slave}}, + #{?snk_kind := emqx_router_helper_cleanup_done, node := Slave}, + 1_000), + ?assertEqual([<<"d/e/f">>], emqx_router:topics()). + +t_cleanup_monitor_node_down(Config) -> + Slave = ?config(slave, Config), + emqx_router:add_route(<<"a/b/c">>, Slave), + emqx_router:add_route(<<"d/e/f">>, node()), + ?assertMatch([_, _], emqx_router:topics()), + ?wait_async_action( + stop_slave(Slave), + #{?snk_kind := emqx_router_helper_cleanup_done, node := Slave}, + 1_000), + ?assertEqual([<<"d/e/f">>], emqx_router:topics()). + t_message(_) -> ?ROUTER_HELPER ! testing, gen_server:cast(?ROUTER_HELPER, testing), gen_server:call(?ROUTER_HELPER, testing). + +%%------------------------------------------------------------------------------ +%% Internal functions +%%------------------------------------------------------------------------------ + +start_epmd() -> + [] = os:cmd("\"" ++ epmd_path() ++ "\" -daemon"), + ok. + +epmd_path() -> + case os:find_executable("epmd") of + false -> + ct:pal(critical, "Could not find epmd.~n"), + exit(epmd_not_found); + GlobalEpmd -> + GlobalEpmd + end. + +start_slave(Name) -> + CommonBeamOpts = "+S 1:1 ", % We want VMs to only occupy a single core + {ok, Node} = slave:start_link(host(), Name, CommonBeamOpts ++ ebin_path()), + Node. + +stop_slave(Node) -> + slave:stop(Node). + +host() -> + [_, Host] = string:tokens(atom_to_list(node()), "@"), Host. + +ebin_path() -> + string:join(["-pa" | lists:filter(fun is_lib/1, code:get_path())], " "). + +is_lib(Path) -> + string:prefix(Path, code:lib_dir()) =:= nomatch. diff --git a/apps/emqx_gateway/src/emqx_gateway_cm_registry.erl b/apps/emqx_gateway/src/emqx_gateway_cm_registry.erl index 08f2ba455..a81dc6c3b 100644 --- a/apps/emqx_gateway/src/emqx_gateway_cm_registry.erl +++ b/apps/emqx_gateway/src/emqx_gateway_cm_registry.erl @@ -64,7 +64,7 @@ tabname(Name) -> register_channel(Name, ClientId) when is_binary(ClientId) -> register_channel(Name, {ClientId, self()}); -register_channel(Name, {ClientId, ChanPid}) +register_channel(Name, {ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid) -> mria:dirty_write(tabname(Name), record(ClientId, ChanPid)). @@ -113,12 +113,11 @@ handle_cast(Msg, State) -> {noreply, State}. handle_info({membership, {mnesia, down, Node}}, State = #{name := Name}) -> - Tab = tabname(Name), - global:trans( - {?LOCK, self()}, - fun() -> - mria:transaction(?CM_SHARD, fun cleanup_channels/2, [Node, Tab]) - end), + cleanup_channels(Node, Name), + {noreply, State}; + +handle_info({membership, {node, down, Node}}, State = #{name := Name}) -> + cleanup_channels(Node, Name), {noreply, State}; handle_info({membership, _Event}, State) -> @@ -138,7 +137,15 @@ code_change(_OldVsn, State, _Extra) -> %% Internal functions %%-------------------------------------------------------------------- -cleanup_channels(Node, Tab) -> +cleanup_channels(Node, Name) -> + Tab = tabname(Name), + global:trans( + {?LOCK, self()}, + fun() -> + mria:transaction(?CM_SHARD, fun do_cleanup_channels/2, [Node, Tab]) + end). + +do_cleanup_channels(Node, Tab) -> Pat = [{#channel{pid = '$1', _ = '_'}, [{'==', {node, '$1'}, Node}], ['$_']}], lists:foreach(fun(Chan) -> mnesia:delete_object(Tab, Chan, write) diff --git a/apps/emqx_gateway/test/emqx_gateway_cm_registry_SUITE.erl b/apps/emqx_gateway/test/emqx_gateway_cm_registry_SUITE.erl index 0401d9fe6..628a66c0a 100644 --- a/apps/emqx_gateway/test/emqx_gateway_cm_registry_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_gateway_cm_registry_SUITE.erl @@ -72,13 +72,13 @@ t_register_unregister_channel(_) -> ok = emqx_gateway_cm_registry:unregister_channel(?GWNAME, ?CLIENTID), ?assertEqual( - [], + [], ets:tab2list(emqx_gateway_cm_registry:tabname(?GWNAME))), ?assertEqual( [], emqx_gateway_cm_registry:lookup_channels(?GWNAME, ?CLIENTID)). -t_cleanup_channels(Conf) -> +t_cleanup_channels_mnesia_down(Conf) -> Pid = proplists:get_value(registry, Conf), emqx_gateway_cm_registry:register_channel(?GWNAME, ?CLIENTID), ?assertEqual( @@ -90,6 +90,18 @@ t_cleanup_channels(Conf) -> [], emqx_gateway_cm_registry:lookup_channels(?GWNAME, ?CLIENTID)). +t_cleanup_channels_node_down(Conf) -> + Pid = proplists:get_value(registry, Conf), + emqx_gateway_cm_registry:register_channel(?GWNAME, ?CLIENTID), + ?assertEqual( + [self()], + emqx_gateway_cm_registry:lookup_channels(?GWNAME, ?CLIENTID)), + Pid ! {membership, {node, down, node()}}, + ct:sleep(100), + ?assertEqual( + [], + emqx_gateway_cm_registry:lookup_channels(?GWNAME, ?CLIENTID)). + t_handle_unexpected_msg(Conf) -> Pid = proplists:get_value(registry, Conf), _ = Pid ! unexpected_info,