fix: check for authorization on topic before publishing last will testament

fixes #8978

Without checking for authorization, a client can, on abnormal
termination, publish a message to any topic, including `$SYS` ones.
This commit is contained in:
Thales Macedo Garitezi 2022-09-16 14:39:58 -03:00
parent dca522d7d3
commit c20ad3733a
3 changed files with 76 additions and 16 deletions

View File

@ -4,6 +4,10 @@
* Add `cert_common_name` and `cert_subject` placeholder support for authz_http and authz_mongo.[#8973](https://github.com/emqx/emqx/pull/8973)
## Bug fixes
* Check ACLs for last will testament topic before publishing the message. [#8930](https://github.com/emqx/emqx/pull/8930)
# 5.0.8
## Bug fixes

View File

@ -1167,10 +1167,11 @@ handle_call(
Channel = #channel{
conn_state = ConnState,
will_msg = WillMsg,
clientinfo = ClientInfo,
conninfo = #{proto_ver := ProtoVer}
}
) ->
(WillMsg =/= undefined) andalso publish_will_msg(WillMsg),
(WillMsg =/= undefined) andalso publish_will_msg(ClientInfo, WillMsg),
Channel1 =
case ConnState of
connected -> ensure_disconnected(kicked, Channel);
@ -1361,8 +1362,10 @@ handle_timeout(
end;
handle_timeout(_TRef, expire_session, Channel) ->
shutdown(expired, Channel);
handle_timeout(_TRef, will_message, Channel = #channel{will_msg = WillMsg}) ->
(WillMsg =/= undefined) andalso publish_will_msg(WillMsg),
handle_timeout(
_TRef, will_message, Channel = #channel{clientinfo = ClientInfo, will_msg = WillMsg}
) ->
(WillMsg =/= undefined) andalso publish_will_msg(ClientInfo, WillMsg),
{ok, clean_timer(will_timer, Channel#channel{will_msg = undefined})};
handle_timeout(
_TRef,
@ -1439,11 +1442,11 @@ terminate({shutdown, Reason}, Channel) when
Reason =:= takenover
->
run_terminate_hook(Reason, Channel);
terminate(Reason, Channel = #channel{will_msg = WillMsg}) ->
terminate(Reason, Channel = #channel{clientinfo = ClientInfo, will_msg = WillMsg}) ->
%% since will_msg is set to undefined as soon as it is published,
%% if will_msg still exists when the session is terminated, it
%% must be published immediately.
WillMsg =/= undefined andalso publish_will_msg(WillMsg),
WillMsg =/= undefined andalso publish_will_msg(ClientInfo, WillMsg),
(Reason =:= expired) andalso persist_if_session(Channel),
run_terminate_hook(Reason, Channel).
@ -2102,10 +2105,10 @@ ensure_disconnected(
maybe_publish_will_msg(Channel = #channel{will_msg = undefined}) ->
Channel;
maybe_publish_will_msg(Channel = #channel{will_msg = WillMsg}) ->
maybe_publish_will_msg(Channel = #channel{clientinfo = ClientInfo, will_msg = WillMsg}) ->
case will_delay_interval(WillMsg) of
0 ->
ok = publish_will_msg(WillMsg),
ok = publish_will_msg(ClientInfo, WillMsg),
Channel#channel{will_msg = undefined};
I ->
ensure_timer(will_timer, timer:seconds(I), Channel)
@ -2118,9 +2121,23 @@ will_delay_interval(WillMsg) ->
0
).
publish_will_msg(Msg) ->
_ = emqx_broker:publish(Msg),
ok.
publish_will_msg(ClientInfo, Msg = #message{topic = Topic}) ->
case emqx_access_control:authorize(ClientInfo, publish, Topic) of
allow ->
_ = emqx_broker:publish(Msg),
ok;
deny ->
?tp(
warning,
last_will_testament_publish_denied,
#{
client_info => ClientInfo,
topic => Topic,
message => Msg
}
),
ok
end.
%%--------------------------------------------------------------------
%% Disconnect Reason

View File

@ -24,6 +24,7 @@
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("emqx/include/emqx_placeholder.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
all() ->
emqx_common_test_helpers:all(?MODULE).
@ -157,6 +158,15 @@ set_special_configs(_App) ->
"\n{allow,{ipaddr,\"127.0.0.1\"},all,[\"$SYS/#\",\"#\"]}."
>>
}).
-define(SOURCE7, #{
<<"type">> => <<"file">>,
<<"enable">> => true,
<<"rules">> =>
<<
"{allow,{username,\"some_client\"},publish,[\"some_client/lwt\"]}.\n"
"{deny, all}."
>>
}).
%%------------------------------------------------------------------------------
%% Testcases
@ -306,12 +316,14 @@ t_get_enabled_authzs_some_enabled(_Config) ->
?assertEqual([postgresql], emqx_authz:get_enabled_authzs()).
t_subscribe_deny_disconnect_publishes_last_will_testament(_Config) ->
{ok, _} = emqx_authz:update(?CMD_REPLACE, [?SOURCE7]),
{ok, C} = emqtt:start_link([
{will_topic, <<"lwt">>},
{username, <<"some_client">>},
{will_topic, <<"some_client/lwt">>},
{will_payload, <<"should be published">>}
]),
{ok, _} = emqtt:connect(C),
ok = emqx:subscribe(<<"lwt">>),
ok = emqx:subscribe(<<"some_client/lwt">>),
process_flag(trap_exit, true),
try
@ -323,7 +335,7 @@ t_subscribe_deny_disconnect_publishes_last_will_testament(_Config) ->
end,
receive
{deliver, <<"lwt">>, #message{payload = <<"should be published">>}} ->
{deliver, <<"some_client/lwt">>, #message{payload = <<"should be published">>}} ->
ok
after 2_000 ->
error(lwt_not_published)
@ -332,12 +344,14 @@ t_subscribe_deny_disconnect_publishes_last_will_testament(_Config) ->
ok.
t_publish_deny_disconnect_publishes_last_will_testament(_Config) ->
{ok, _} = emqx_authz:update(?CMD_REPLACE, [?SOURCE7]),
{ok, C} = emqtt:start_link([
{will_topic, <<"lwt">>},
{username, <<"some_client">>},
{will_topic, <<"some_client/lwt">>},
{will_payload, <<"should be published">>}
]),
{ok, _} = emqtt:connect(C),
ok = emqx:subscribe(<<"lwt">>),
ok = emqx:subscribe(<<"some_client/lwt">>),
process_flag(trap_exit, true),
%% disconnect is async
@ -350,7 +364,7 @@ t_publish_deny_disconnect_publishes_last_will_testament(_Config) ->
error(client_should_have_been_disconnected)
end,
receive
{deliver, <<"lwt">>, #message{payload = <<"should be published">>}} ->
{deliver, <<"some_client/lwt">>, #message{payload = <<"should be published">>}} ->
ok
after 2_000 ->
error(lwt_not_published)
@ -358,5 +372,30 @@ t_publish_deny_disconnect_publishes_last_will_testament(_Config) ->
ok.
t_publish_last_will_testament_denied_topic(_Config) ->
{ok, C} = emqtt:start_link([
{will_topic, <<"$SYS/lwt">>},
{will_payload, <<"should not be published">>}
]),
{ok, _} = emqtt:connect(C),
ok = emqx:subscribe(<<"$SYS/lwt">>),
unlink(C),
ok = snabbkaffe:start_trace(),
{true, {ok, _}} = ?wait_async_action(
exit(C, kill),
#{?snk_kind := last_will_testament_publish_denied},
1_000
),
ok = snabbkaffe:stop(),
receive
{deliver, <<"$SYS/lwt">>, #message{payload = <<"should not be published">>}} ->
error(lwt_should_not_be_published_to_forbidden_topic)
after 1_000 ->
ok
end,
ok.
stop_apps(Apps) ->
lists:foreach(fun application:stop/1, Apps).