From 2e0eae54f827d9ab8281c686e1518d6f9f85c7d6 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 26 Sep 2022 10:06:27 -0300 Subject: [PATCH] fix(acl): check ACL before publishing last will testament (lwt) message (4.3) --- .../test/emqx_acl_mnesia_SUITE.erl | 56 ++++++++++++++++++- src/emqx_channel.erl | 34 +++++++---- 2 files changed, 78 insertions(+), 12 deletions(-) diff --git a/apps/emqx_auth_mnesia/test/emqx_acl_mnesia_SUITE.erl b/apps/emqx_auth_mnesia/test/emqx_acl_mnesia_SUITE.erl index 941ebedb9..213f9c278 100644 --- a/apps/emqx_auth_mnesia/test/emqx_acl_mnesia_SUITE.erl +++ b/apps/emqx_auth_mnesia/test/emqx_acl_mnesia_SUITE.erl @@ -20,6 +20,7 @@ -compile(export_all). -include("emqx_auth_mnesia.hrl"). +-include_lib("emqx/include/emqx.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). @@ -77,15 +78,37 @@ init_per_testcase_migration(_, Config) -> emqx_acl_mnesia_migrator:migrate_records(), Config. +init_per_testcase_other(t_last_will_testament_message_check_acl, Config) -> + OriginalACLNoMatch = application:get_env(emqx, acl_nomatch), + application:set_env(emqx, acl_nomatch, deny), + emqx_mod_acl_internal:unload([]), + %% deny all for this client + ClientID = <<"lwt_client">>, + ok = emqx_acl_mnesia_db:add_acl({clientid, ClientID}, <<"#">>, pubsub, deny), + [ {original_acl_nomatch, OriginalACLNoMatch} + , {clientid, ClientID} + | Config]; +init_per_testcase_other(_TestCase, Config) -> + Config. + init_per_testcase(Case, Config) -> PerTestInitializers = [ fun init_per_testcase_clean/2, fun init_per_testcase_migration/2, - fun init_per_testcase_emqx_hook/2 + fun init_per_testcase_emqx_hook/2, + fun init_per_testcase_other/2 ], lists:foldl(fun(Init, Conf) -> Init(Case, Conf) end, Config, PerTestInitializers). -end_per_testcase(_, Config) -> +end_per_testcase(t_last_will_testament_message_check_acl, Config) -> + emqx:unhook('client.check_acl', fun emqx_acl_mnesia:check_acl/5), + case ?config(original_acl_nomatch, Config) of + {ok, Original} -> application:set_env(emqx, acl_nomatch, Original); + _ -> ok + end, + emqx_mod_acl_internal:load([]), + ok; +end_per_testcase(_TestCase, Config) -> emqx:unhook('client.check_acl', fun emqx_acl_mnesia:check_acl/5), Config. @@ -464,6 +487,35 @@ t_rest_api(_Config) -> {ok, Res3} = request_http_rest_list(["$all"]), ?assertMatch([], get_http_data(Res3)). +%% asserts that we check ACL for the LWT topic before publishing the +%% LWT. +t_last_will_testament_message_check_acl(Config) -> + ClientID = ?config(clientid, Config), + {ok, C} = emqtt:start_link([ + {clientid, ClientID}, + {will_topic, <<"$SYS/lwt">>}, + {will_payload, <<"should not be published">>} + ]), + {ok, _} = emqtt:connect(C), + ok = emqx:subscribe(<<"$SYS/lwt">>), + unlink(C), + ok = snabbkaffe:start_trace(), + {true, {ok, _}} = + ?wait_async_action( + exit(C, kill), + #{?snk_kind := last_will_testament_publish_denied}, + 1_000 + ), + ok = snabbkaffe:stop(), + + receive + {deliver, <<"$SYS/lwt">>, #message{payload = <<"should not be published">>}} -> + error(lwt_should_not_be_published_to_forbidden_topic) + after 1_000 -> + ok + end, + + ok. create_conflicting_records() -> Records = [ diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index 5ac212678..b8f3c5b2c 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -951,9 +951,10 @@ return_sub_unsub_ack(Packet, Channel) -> handle_call(kick, Channel = #channel{ conn_state = ConnState, will_msg = WillMsg, + clientinfo = ClientInfo, conninfo = #{proto_ver := ProtoVer} }) -> - (WillMsg =/= undefined) andalso publish_will_msg(WillMsg), + (WillMsg =/= undefined) andalso publish_will_msg(ClientInfo, WillMsg), Channel1 = case ConnState of connected -> ensure_disconnected(kicked, Channel); _ -> Channel @@ -1102,8 +1103,9 @@ handle_timeout(_TRef, expire_awaiting_rel, handle_timeout(_TRef, expire_session, Channel) -> shutdown(expired, Channel); -handle_timeout(_TRef, will_message, Channel = #channel{will_msg = WillMsg}) -> - (WillMsg =/= undefined) andalso publish_will_msg(WillMsg), +handle_timeout(_TRef, will_message, Channel = #channel{will_msg = WillMsg, + clientinfo = ClientInfo}) -> + (WillMsg =/= undefined) andalso publish_will_msg(ClientInfo, WillMsg), {ok, clean_timer(will_timer, Channel#channel{will_msg = undefined})}; handle_timeout(_TRef, expire_quota_limit, Channel) -> @@ -1159,9 +1161,10 @@ terminate(_Reason, #channel{conn_state = idle} = _Channel) -> ok; terminate(normal, Channel) -> run_terminate_hook(normal, Channel); -terminate(Reason, Channel = #channel{will_msg = WillMsg}) -> +terminate(Reason, Channel = #channel{will_msg = WillMsg, + clientinfo = ClientInfo}) -> should_publish_will_message(Reason, Channel) - andalso publish_will_msg(WillMsg), + andalso publish_will_msg(ClientInfo, WillMsg), run_terminate_hook(Reason, Channel). run_terminate_hook(_Reason, #channel{session = undefined} = _Channel) -> @@ -1701,10 +1704,11 @@ ensure_disconnected(Reason, Channel = #channel{conninfo = ConnInfo, maybe_publish_will_msg(Channel = #channel{will_msg = undefined}) -> Channel; -maybe_publish_will_msg(Channel = #channel{will_msg = WillMsg}) -> +maybe_publish_will_msg(Channel = #channel{will_msg = WillMsg, + clientinfo = ClientInfo}) -> case will_delay_interval(WillMsg) of 0 -> - ok = publish_will_msg(WillMsg), + ok = publish_will_msg(ClientInfo, WillMsg), Channel#channel{will_msg = undefined}; I -> ensure_timer(will_timer, timer:seconds(I), Channel) @@ -1714,9 +1718,19 @@ will_delay_interval(WillMsg) -> maps:get('Will-Delay-Interval', emqx_message:get_header(properties, WillMsg, #{}), 0). -publish_will_msg(Msg) -> - _ = emqx_broker:publish(Msg), - ok. +publish_will_msg(ClientInfo, Msg = #message{topic = Topic}) -> + case emqx_access_control:check_acl(ClientInfo, publish, Topic) of + allow -> + _ = emqx_broker:publish(Msg), + ok; + deny -> + ?tp( + warning, + last_will_testament_publish_denied, + #{topic => Topic} + ), + ok + end. %%-------------------------------------------------------------------- %% Disconnect Reason