fix(ekka): run cleanups on node down events

When using the RLOG DB Backend with Mria, replicant nodes do not
generate `mnesia down` events.  Therefore, cleanup procedures that
some modules do when a node goes down do not work for replicants.

However, replicant do generate `node down` events, so that may be a
safer way to handle cleanup to take into account that type of node.
This commit is contained in:
Thales Macedo Garitezi 2022-03-11 15:21:12 -03:00
parent 24251aea5d
commit 2748c22b0c
No known key found for this signature in database
GPG Key ID: DD279F8152A9B6DD
6 changed files with 169 additions and 17 deletions

View File

@ -122,10 +122,11 @@ handle_cast(Msg, State) ->
{noreply, State}.
handle_info({membership, {mnesia, down, Node}}, State) ->
global:trans({?LOCK, self()},
fun() ->
mria:transaction(?CM_SHARD, fun cleanup_channels/1, [Node])
end),
cleanup_channels(Node),
{noreply, State};
handle_info({membership, {node, down, Node}}, State) ->
cleanup_channels(Node),
{noreply, State};
handle_info({membership, _Event}, State) ->
@ -146,6 +147,12 @@ code_change(_OldVsn, State, _Extra) ->
%%--------------------------------------------------------------------
cleanup_channels(Node) ->
global:trans({?LOCK, self()},
fun() ->
mria:transaction(?CM_SHARD, fun do_cleanup_channels/1, [Node])
end).
do_cleanup_channels(Node) ->
Pat = [{#channel{pid = '$1', _ = '_'}, [{'==', {node, '$1'}, Node}], ['$_']}],
lists:foreach(fun delete_channel/1, mnesia:select(?TAB, Pat, write)).

View File

@ -21,6 +21,7 @@
-include("emqx.hrl").
-include("logger.hrl").
-include("types.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
%% Mnesia bootstrap
@ -91,6 +92,7 @@ monitor(Node) when is_atom(Node) ->
%%--------------------------------------------------------------------
init([]) ->
process_flag(trap_exit, true),
ok = ekka:monitor(membership),
_ = mria:wait_for_tables([?ROUTING_NODE]),
{ok, _} = mnesia:subscribe({table, ?ROUTING_NODE, simple}),
@ -136,11 +138,15 @@ handle_info({nodedown, Node}, State = #{nodes := Nodes}) ->
mria:transaction(?ROUTE_SHARD, fun cleanup_routes/1, [Node])
end),
ok = mria:dirty_delete(?ROUTING_NODE, Node),
?tp(emqx_router_helper_cleanup_done, #{node => Node}),
{noreply, State#{nodes := lists:delete(Node, Nodes)}, hibernate};
handle_info({membership, {mnesia, down, Node}}, State) ->
handle_info({nodedown, Node}, State);
handle_info({membership, {node, down, Node}}, State) ->
handle_info({nodedown, Node}, State);
handle_info({membership, _Event}, State) ->
{noreply, State};

View File

@ -65,7 +65,7 @@ t_register_unregister_channel(_) ->
emqx_cm_registry:unregister_channel(ClientId),
?assertEqual([], emqx_cm_registry:lookup_channels(ClientId)).
t_cleanup_channels(_) ->
t_cleanup_channels_mnesia_down(_) ->
ClientId = <<"clientid">>,
ClientId2 = <<"clientid2">>,
emqx_cm_registry:register_channel(ClientId),
@ -76,3 +76,13 @@ t_cleanup_channels(_) ->
?assertEqual([], emqx_cm_registry:lookup_channels(ClientId)),
?assertEqual([], emqx_cm_registry:lookup_channels(ClientId2)).
t_cleanup_channels_node_down(_) ->
ClientId = <<"clientid">>,
ClientId2 = <<"clientid2">>,
emqx_cm_registry:register_channel(ClientId),
emqx_cm_registry:register_channel(ClientId2),
?assertEqual([self()], emqx_cm_registry:lookup_channels(ClientId)),
emqx_cm_registry ! {membership, {node, down, node()}},
ct:sleep(100),
?assertEqual([], emqx_cm_registry:lookup_channels(ClientId)),
?assertEqual([], emqx_cm_registry:lookup_channels(ClientId2)).

View File

@ -19,18 +19,61 @@
-compile(export_all).
-compile(nowarn_export_all).
-include("emqx.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-define(ROUTER_HELPER, emqx_router_helper).
-define(ROUTE_TAB, emqx_route).
all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
DistPid = case net_kernel:nodename() of
ignored ->
%% calling `net_kernel:start' without `epmd'
%% running will result in a failure.
start_epmd(),
{ok, Pid} = net_kernel:start(['test@127.0.0.1', longnames]),
Pid;
_ ->
undefined
end,
emqx_common_test_helpers:start_apps([]),
[{dist_pid, DistPid} | Config].
end_per_suite(Config) ->
DistPid = ?config(dist_pid, Config),
case DistPid of
Pid when is_pid(Pid) ->
net_kernel:stop();
_ ->
ok
end,
emqx_common_test_helpers:stop_apps([]).
init_per_testcase(TestCase, Config)
when TestCase =:= t_cleanup_membership_mnesia_down;
TestCase =:= t_cleanup_membership_node_down;
TestCase =:= t_cleanup_monitor_node_down ->
ok = snabbkaffe:start_trace(),
Slave = start_slave(some_node),
[{slave, Slave} | Config];
init_per_testcase(_TestCase, Config) ->
Config.
end_per_suite(_Config) ->
emqx_common_test_helpers:stop_apps([]).
end_per_testcase(TestCase, Config)
when TestCase =:= t_cleanup_membership_mnesia_down;
TestCase =:= t_cleanup_membership_node_down;
TestCase =:= t_cleanup_monitor_node_down ->
Slave = ?config(slave, Config),
stop_slave(Slave),
mria:transaction(?ROUTE_SHARD, fun() -> mnesia:clear_table(?ROUTE_TAB) end),
snabbkaffe:stop(),
ok;
end_per_testcase(_TestCase, _Config) ->
ok.
t_monitor(_) ->
ok = emqx_router_helper:monitor({undefined, node()}),
@ -44,7 +87,74 @@ t_mnesia(_) ->
?ROUTER_HELPER ! {membership, {mnesia, down, node()}},
ct:sleep(200).
t_cleanup_membership_mnesia_down(Config) ->
Slave = ?config(slave, Config),
emqx_router:add_route(<<"a/b/c">>, Slave),
emqx_router:add_route(<<"d/e/f">>, node()),
?assertMatch([_, _], emqx_router:topics()),
?wait_async_action(
?ROUTER_HELPER ! {membership, {mnesia, down, Slave}},
#{?snk_kind := emqx_router_helper_cleanup_done, node := Slave},
1_000),
?assertEqual([<<"d/e/f">>], emqx_router:topics()).
t_cleanup_membership_node_down(Config) ->
Slave = ?config(slave, Config),
emqx_router:add_route(<<"a/b/c">>, Slave),
emqx_router:add_route(<<"d/e/f">>, node()),
?assertMatch([_, _], emqx_router:topics()),
?wait_async_action(
?ROUTER_HELPER ! {membership, {node, down, Slave}},
#{?snk_kind := emqx_router_helper_cleanup_done, node := Slave},
1_000),
?assertEqual([<<"d/e/f">>], emqx_router:topics()).
t_cleanup_monitor_node_down(Config) ->
Slave = ?config(slave, Config),
emqx_router:add_route(<<"a/b/c">>, Slave),
emqx_router:add_route(<<"d/e/f">>, node()),
?assertMatch([_, _], emqx_router:topics()),
?wait_async_action(
stop_slave(Slave),
#{?snk_kind := emqx_router_helper_cleanup_done, node := Slave},
1_000),
?assertEqual([<<"d/e/f">>], emqx_router:topics()).
t_message(_) ->
?ROUTER_HELPER ! testing,
gen_server:cast(?ROUTER_HELPER, testing),
gen_server:call(?ROUTER_HELPER, testing).
%%------------------------------------------------------------------------------
%% Internal functions
%%------------------------------------------------------------------------------
start_epmd() ->
[] = os:cmd("\"" ++ epmd_path() ++ "\" -daemon"),
ok.
epmd_path() ->
case os:find_executable("epmd") of
false ->
ct:pal(critical, "Could not find epmd.~n"),
exit(epmd_not_found);
GlobalEpmd ->
GlobalEpmd
end.
start_slave(Name) ->
CommonBeamOpts = "+S 1:1 ", % We want VMs to only occupy a single core
{ok, Node} = slave:start_link(host(), Name, CommonBeamOpts ++ ebin_path()),
Node.
stop_slave(Node) ->
slave:stop(Node).
host() ->
[_, Host] = string:tokens(atom_to_list(node()), "@"), Host.
ebin_path() ->
string:join(["-pa" | lists:filter(fun is_lib/1, code:get_path())], " ").
is_lib(Path) ->
string:prefix(Path, code:lib_dir()) =:= nomatch.

View File

@ -64,7 +64,7 @@ tabname(Name) ->
register_channel(Name, ClientId) when is_binary(ClientId) ->
register_channel(Name, {ClientId, self()});
register_channel(Name, {ClientId, ChanPid})
register_channel(Name, {ClientId, ChanPid})
when is_binary(ClientId), is_pid(ChanPid) ->
mria:dirty_write(tabname(Name), record(ClientId, ChanPid)).
@ -113,12 +113,11 @@ handle_cast(Msg, State) ->
{noreply, State}.
handle_info({membership, {mnesia, down, Node}}, State = #{name := Name}) ->
Tab = tabname(Name),
global:trans(
{?LOCK, self()},
fun() ->
mria:transaction(?CM_SHARD, fun cleanup_channels/2, [Node, Tab])
end),
cleanup_channels(Node, Name),
{noreply, State};
handle_info({membership, {node, down, Node}}, State = #{name := Name}) ->
cleanup_channels(Node, Name),
{noreply, State};
handle_info({membership, _Event}, State) ->
@ -138,7 +137,15 @@ code_change(_OldVsn, State, _Extra) ->
%% Internal functions
%%--------------------------------------------------------------------
cleanup_channels(Node, Tab) ->
cleanup_channels(Node, Name) ->
Tab = tabname(Name),
global:trans(
{?LOCK, self()},
fun() ->
mria:transaction(?CM_SHARD, fun do_cleanup_channels/2, [Node, Tab])
end).
do_cleanup_channels(Node, Tab) ->
Pat = [{#channel{pid = '$1', _ = '_'}, [{'==', {node, '$1'}, Node}], ['$_']}],
lists:foreach(fun(Chan) ->
mnesia:delete_object(Tab, Chan, write)

View File

@ -72,13 +72,13 @@ t_register_unregister_channel(_) ->
ok = emqx_gateway_cm_registry:unregister_channel(?GWNAME, ?CLIENTID),
?assertEqual(
[],
[],
ets:tab2list(emqx_gateway_cm_registry:tabname(?GWNAME))),
?assertEqual(
[],
emqx_gateway_cm_registry:lookup_channels(?GWNAME, ?CLIENTID)).
t_cleanup_channels(Conf) ->
t_cleanup_channels_mnesia_down(Conf) ->
Pid = proplists:get_value(registry, Conf),
emqx_gateway_cm_registry:register_channel(?GWNAME, ?CLIENTID),
?assertEqual(
@ -90,6 +90,18 @@ t_cleanup_channels(Conf) ->
[],
emqx_gateway_cm_registry:lookup_channels(?GWNAME, ?CLIENTID)).
t_cleanup_channels_node_down(Conf) ->
Pid = proplists:get_value(registry, Conf),
emqx_gateway_cm_registry:register_channel(?GWNAME, ?CLIENTID),
?assertEqual(
[self()],
emqx_gateway_cm_registry:lookup_channels(?GWNAME, ?CLIENTID)),
Pid ! {membership, {node, down, node()}},
ct:sleep(100),
?assertEqual(
[],
emqx_gateway_cm_registry:lookup_channels(?GWNAME, ?CLIENTID)).
t_handle_unexpected_msg(Conf) ->
Pid = proplists:get_value(registry, Conf),
_ = Pid ! unexpected_info,