Fix reload acl module and clean acl caches (#3409)
This commit is contained in:
parent
7cf97acddd
commit
7ba801c8d4
|
@ -58,6 +58,8 @@
|
|||
, lookup_channels/2
|
||||
]).
|
||||
|
||||
-export([all_channels/0]).
|
||||
|
||||
%% gen_server callbacks
|
||||
-export([ init/1
|
||||
, handle_call/3
|
||||
|
@ -327,6 +329,11 @@ with_channel(ClientId, Fun) ->
|
|||
Pids -> Fun(lists:last(Pids))
|
||||
end.
|
||||
|
||||
%% @doc Get all channels registed.
|
||||
all_channels() ->
|
||||
Pat = [{{'_', '$1'}, [], ['$1']}],
|
||||
ets:select(?CHAN_TAB, Pat).
|
||||
|
||||
%% @doc Lookup channels.
|
||||
-spec(lookup_channels(emqx_types:clientid()) -> list(chan_pid())).
|
||||
lookup_channels(ClientId) ->
|
||||
|
|
|
@ -51,7 +51,10 @@ unload(_Env) ->
|
|||
emqx_hooks:del('client.check_acl', ?MFA(?MODULE, check_acl, [Rules])).
|
||||
|
||||
reload(_Env) ->
|
||||
emqx_acl_cache:is_enabled() andalso emqx_acl_cache:empty_acl_cache(),
|
||||
emqx_acl_cache:is_enabled() andalso (
|
||||
lists:foreach(
|
||||
fun(Pid) -> erlang:send(Pid, clean_acl_cache) end,
|
||||
emqx_cm:all_channels())),
|
||||
unload([]), load([]).
|
||||
|
||||
description() ->
|
||||
|
|
|
@ -31,13 +31,11 @@ init_per_suite(Config) ->
|
|||
end_per_suite(_Config) ->
|
||||
emqx_ct_helpers:stop_apps([]).
|
||||
|
||||
init_per_testcase(_TestCase, Config) ->
|
||||
Config.
|
||||
%%--------------------------------------------------------------------
|
||||
%% Test cases
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
end_per_testcase(_TestCase, Config) ->
|
||||
Config.
|
||||
|
||||
t_clean_acl_cache(_Config) ->
|
||||
t_clean_acl_cache(_) ->
|
||||
{ok, Client} = emqtt:start_link([{clientid, <<"emqx_c">>}]),
|
||||
{ok, _} = emqtt:connect(Client),
|
||||
{ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0),
|
||||
|
@ -57,6 +55,58 @@ t_clean_acl_cache(_Config) ->
|
|||
?assertEqual(0, length(gen_server:call(ClientPid, list_acl_cache))),
|
||||
emqtt:stop(Client).
|
||||
|
||||
% optimize??
|
||||
t_reload_aclfile_and_cleanall(Config) ->
|
||||
|
||||
RasieMsg = fun() -> Self = self(), #{puback => fun(Msg) -> Self ! {puback, Msg} end,
|
||||
disconnected => fun(_) -> ok end,
|
||||
publish => fun(_) -> ok end } end,
|
||||
|
||||
{ok, Client} = emqtt:start_link([{clientid, <<"emqx_c">>}, {proto_ver, v5}, {msg_handler, RasieMsg()}]),
|
||||
{ok, _} = emqtt:connect(Client),
|
||||
|
||||
{ok, PktId} = emqtt:publish(Client, <<"t1">>, <<"{\"x\":1}">>, qos1),
|
||||
|
||||
%% Success publish to broker
|
||||
receive
|
||||
{puback, #{packet_id := PktId, reason_code := Rc}} ->
|
||||
?assertEqual(16#10, Rc);
|
||||
_ ->
|
||||
?assert(false)
|
||||
end,
|
||||
|
||||
%% Check acl cache list
|
||||
[ClientPid] = emqx_cm:lookup_channels(<<"emqx_c">>),
|
||||
?assert(length(gen_server:call(ClientPid, list_acl_cache)) > 0),
|
||||
|
||||
%% Update acl file and reload mod_acl_internal
|
||||
Path = filename:join([testdir(proplists:get_value(data_dir, Config)), "acl2.conf"]),
|
||||
ok = file:write_file(Path, <<"{deny, all}.">>),
|
||||
OldPath = emqx:get_env(acl_file),
|
||||
application:set_env(emqx, acl_file, Path),
|
||||
|
||||
emqx_mod_acl_internal:reload([]),
|
||||
|
||||
?assert(length(gen_server:call(ClientPid, list_acl_cache)) == 0),
|
||||
{ok, PktId2} = emqtt:publish(Client, <<"t1">>, <<"{\"x\":1}">>, qos1),
|
||||
|
||||
receive
|
||||
{puback, #{packet_id := PktId2, reason_code := Rc2}} ->
|
||||
%% Not authorized
|
||||
?assertEqual(16#87, Rc2);
|
||||
_ ->
|
||||
?assert(false)
|
||||
end,
|
||||
application:set_env(emqx, acl_file, OldPath),
|
||||
file:delete(Path),
|
||||
emqx_mod_acl_internal:reload([]),
|
||||
emqtt:stop(Client).
|
||||
|
||||
%% @private
|
||||
testdir(DataPath) ->
|
||||
Ls = filename:split(DataPath),
|
||||
filename:join(lists:sublist(Ls, 1, length(Ls) - 1)).
|
||||
|
||||
% t_cache_k(_) ->
|
||||
% error('TODO').
|
||||
|
||||
|
|
|
@ -133,6 +133,9 @@ t_kick_session(_) ->
|
|||
ok = emqx_cm:unregister_channel(<<"clientid">>),
|
||||
ok = meck:unload(emqx_connection).
|
||||
|
||||
t_all_channels(_) ->
|
||||
?assertEqual(true, is_list(emqx_cm:all_channels())).
|
||||
|
||||
t_lock_clientid(_) ->
|
||||
{true, _Nodes} = emqx_cm_locker:lock(<<"clientid">>),
|
||||
{true, _Nodes} = emqx_cm_locker:lock(<<"clientid">>),
|
||||
|
|
|
@ -75,3 +75,4 @@ t_cleanup_channels(_) ->
|
|||
ct:sleep(100),
|
||||
?assertEqual([], emqx_cm_registry:lookup_channels(ClientId)),
|
||||
?assertEqual([], emqx_cm_registry:lookup_channels(ClientId2)).
|
||||
|
||||
|
|
|
@ -24,11 +24,9 @@
|
|||
all() -> emqx_ct:all(?MODULE).
|
||||
|
||||
init_per_suite(Config) ->
|
||||
|
||||
emqx_ct_helpers:boot_modules([]),
|
||||
emqx_ct_helpers:start_apps([], fun set_sepecial_cfg/1),
|
||||
Config.
|
||||
|
||||
|
||||
set_sepecial_cfg(_) ->
|
||||
application:set_env(emqx, modules_loaded_file, emqx_ct_helpers:deps_path(emqx, "test/emqx_SUITE_data/loaded_modules")),
|
||||
ok.
|
||||
|
|
Loading…
Reference in New Issue