Merge pull request #9064 from thalesmg/bugfix-lwt-acl-4214
fix(acl): check ACL before publishing last will testament (lwt) message (4.2)
This commit is contained in:
commit
f6e0498cb3
|
@ -1012,8 +1012,9 @@ handle_timeout(_TRef, expire_awaiting_rel,
|
||||||
handle_timeout(_TRef, expire_session, Channel) ->
|
handle_timeout(_TRef, expire_session, Channel) ->
|
||||||
shutdown(expired, Channel);
|
shutdown(expired, Channel);
|
||||||
|
|
||||||
handle_timeout(_TRef, will_message, Channel = #channel{will_msg = WillMsg}) ->
|
handle_timeout(_TRef, will_message, Channel = #channel{will_msg = WillMsg,
|
||||||
(WillMsg =/= undefined) andalso publish_will_msg(WillMsg),
|
clientinfo = ClientInfo}) ->
|
||||||
|
(WillMsg =/= undefined) andalso publish_will_msg(ClientInfo, WillMsg),
|
||||||
{ok, clean_timer(will_timer, Channel#channel{will_msg = undefined})};
|
{ok, clean_timer(will_timer, Channel#channel{will_msg = undefined})};
|
||||||
|
|
||||||
handle_timeout(_TRef, expire_quota_limit, Channel) ->
|
handle_timeout(_TRef, expire_quota_limit, Channel) ->
|
||||||
|
@ -1076,8 +1077,9 @@ terminate(normal, Channel) ->
|
||||||
terminate({shutdown, Reason}, Channel)
|
terminate({shutdown, Reason}, Channel)
|
||||||
when Reason =:= kicked; Reason =:= discarded; Reason =:= takeovered ->
|
when Reason =:= kicked; Reason =:= discarded; Reason =:= takeovered ->
|
||||||
run_terminate_hook(Reason, Channel);
|
run_terminate_hook(Reason, Channel);
|
||||||
terminate(Reason, Channel = #channel{will_msg = WillMsg}) ->
|
terminate(Reason, Channel = #channel{will_msg = WillMsg,
|
||||||
(WillMsg =/= undefined) andalso publish_will_msg(WillMsg),
|
clientinfo = ClientInfo}) ->
|
||||||
|
(WillMsg =/= undefined) andalso publish_will_msg(ClientInfo, WillMsg),
|
||||||
run_terminate_hook(Reason, Channel).
|
run_terminate_hook(Reason, Channel).
|
||||||
|
|
||||||
run_terminate_hook(_Reason, #channel{session = undefined}) -> ok;
|
run_terminate_hook(_Reason, #channel{session = undefined}) -> ok;
|
||||||
|
@ -1587,9 +1589,10 @@ ensure_disconnected(Reason, Channel = #channel{conninfo = ConnInfo,
|
||||||
|
|
||||||
mabye_publish_will_msg(Channel = #channel{will_msg = undefined}) ->
|
mabye_publish_will_msg(Channel = #channel{will_msg = undefined}) ->
|
||||||
Channel;
|
Channel;
|
||||||
mabye_publish_will_msg(Channel = #channel{will_msg = WillMsg}) ->
|
mabye_publish_will_msg(Channel = #channel{will_msg = WillMsg,
|
||||||
|
clientinfo = ClientInfo}) ->
|
||||||
case will_delay_interval(WillMsg) of
|
case will_delay_interval(WillMsg) of
|
||||||
0 -> publish_will_msg(WillMsg),
|
0 -> publish_will_msg(ClientInfo, WillMsg),
|
||||||
Channel#channel{will_msg = undefined};
|
Channel#channel{will_msg = undefined};
|
||||||
I -> ensure_timer(will_timer, timer:seconds(I), Channel)
|
I -> ensure_timer(will_timer, timer:seconds(I), Channel)
|
||||||
end.
|
end.
|
||||||
|
@ -1597,7 +1600,15 @@ mabye_publish_will_msg(Channel = #channel{will_msg = WillMsg}) ->
|
||||||
will_delay_interval(WillMsg) ->
|
will_delay_interval(WillMsg) ->
|
||||||
maps:get('Will-Delay-Interval', emqx_message:get_header(properties, WillMsg), 0).
|
maps:get('Will-Delay-Interval', emqx_message:get_header(properties, WillMsg), 0).
|
||||||
|
|
||||||
publish_will_msg(Msg) -> emqx_broker:publish(Msg).
|
publish_will_msg(ClientInfo, Msg = #message{topic = Topic}) ->
|
||||||
|
case emqx_access_control:check_acl(ClientInfo, publish, Topic) of
|
||||||
|
allow ->
|
||||||
|
_ = emqx_broker:publish(Msg),
|
||||||
|
ok;
|
||||||
|
deny ->
|
||||||
|
?LOG(warning, "Last will testament publish denied: topic ~p~n", [Topic]),
|
||||||
|
ok
|
||||||
|
end.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Disconnect Reason
|
%% Disconnect Reason
|
||||||
|
|
|
@ -20,6 +20,8 @@
|
||||||
-compile(nowarn_export_all).
|
-compile(nowarn_export_all).
|
||||||
|
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
-include_lib("common_test/include/ct.hrl").
|
||||||
|
-include("emqx.hrl").
|
||||||
|
|
||||||
all() -> emqx_ct:all(?MODULE).
|
all() -> emqx_ct:all(?MODULE).
|
||||||
|
|
||||||
|
@ -31,6 +33,21 @@ init_per_suite(Config) ->
|
||||||
end_per_suite(_Config) ->
|
end_per_suite(_Config) ->
|
||||||
emqx_ct_helpers:stop_apps([]).
|
emqx_ct_helpers:stop_apps([]).
|
||||||
|
|
||||||
|
init_per_testcase(t_last_will_testament_message_check_acl, Config) ->
|
||||||
|
OriginalACLNomatch = emqx_zone:get_env(external, acl_nomatch),
|
||||||
|
emqx_zone:set_env(external, acl_nomatch, deny),
|
||||||
|
[ {original_acl_nomatch, OriginalACLNomatch}
|
||||||
|
| Config];
|
||||||
|
init_per_testcase(_TestCase, Config) ->
|
||||||
|
Config.
|
||||||
|
|
||||||
|
end_per_testcase(t_last_will_testament_message_check_acl, Config) ->
|
||||||
|
OriginalACLNomatch = ?config(original_acl_nomatch, Config),
|
||||||
|
emqx_zone:set_env(external, acl_nomatch, OriginalACLNomatch),
|
||||||
|
Config;
|
||||||
|
end_per_testcase(_TestCase, _Config) ->
|
||||||
|
ok.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Test cases
|
%% Test cases
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -80,6 +97,28 @@ t_reload_aclfile_and_cleanall(_Config) ->
|
||||||
?assert(length(gen_server:call(ClientPid, list_acl_cache)) > 0),
|
?assert(length(gen_server:call(ClientPid, list_acl_cache)) > 0),
|
||||||
emqtt:stop(Client).
|
emqtt:stop(Client).
|
||||||
|
|
||||||
|
|
||||||
|
%% asserts that we check ACL for the LWT topic before publishing the
|
||||||
|
%% LWT.
|
||||||
|
t_last_will_testament_message_check_acl(_Config) ->
|
||||||
|
ClientID = <<"lwt_acl">>,
|
||||||
|
{ok, C} = emqtt:start_link([
|
||||||
|
{clientid, ClientID},
|
||||||
|
{will_topic, <<"$SYS/lwt">>},
|
||||||
|
{will_payload, <<"should not be published">>}
|
||||||
|
]),
|
||||||
|
{ok, _} = emqtt:connect(C),
|
||||||
|
ok = emqx:subscribe(<<"$SYS/lwt">>),
|
||||||
|
unlink(C),
|
||||||
|
exit(C, kill),
|
||||||
|
receive
|
||||||
|
{deliver, <<"$SYS/lwt">>, #message{payload = <<"should not be published">>}} ->
|
||||||
|
error(lwt_should_not_be_published_to_forbidden_topic)
|
||||||
|
after 1000 ->
|
||||||
|
ok
|
||||||
|
end,
|
||||||
|
ok.
|
||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
testdir(DataPath) ->
|
testdir(DataPath) ->
|
||||||
Ls = filename:split(DataPath),
|
Ls = filename:split(DataPath),
|
||||||
|
@ -120,4 +159,3 @@ testdir(DataPath) ->
|
||||||
|
|
||||||
% t_is_enabled(_) ->
|
% t_is_enabled(_) ->
|
||||||
% error('TODO').
|
% error('TODO').
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue