From 87ab2e3a2dd00537d66ac1c06362673af0c5d7ee Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 8 Sep 2022 10:13:51 -0300 Subject: [PATCH 1/3] fix: check conn state before sending will message --- apps/emqx/src/emqx_channel.erl | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 686833c45..699c05134 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -1434,15 +1434,24 @@ terminate({shutdown, kicked}, Channel) -> run_terminate_hook(kicked, Channel); terminate({shutdown, Reason}, Channel) when Reason =:= discarded; - Reason =:= takenover; - Reason =:= not_authorized + Reason =:= takenover -> run_terminate_hook(Reason, Channel); terminate(Reason, Channel = #channel{will_msg = WillMsg}) -> - (WillMsg =/= undefined) andalso publish_will_msg(WillMsg), + should_publish_will_message(Reason, Channel) andalso publish_will_msg(WillMsg), (Reason =:= expired) andalso persist_if_session(Channel), run_terminate_hook(Reason, Channel). +should_publish_will_message(TerminateReason, Channel) -> + not lists:member(TerminateReason, [ + {shutdown, kicked}, + {shutdown, discarded}, + {shutdown, takenover}, + {shutdown, not_authorized} + ]) andalso + not lists:member(info(conn_state, Channel), [idle, connecting]) andalso + info(will_msg, Channel) =/= undefined. + persist_if_session(#channel{session = Session} = Channel) -> case emqx_session:is_session(Session) of true -> From dca522d7d340bfafc02792e2e9a9ef21ab0ec274 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 14 Sep 2022 09:32:59 -0300 Subject: [PATCH 2/3] test: add tests for publishing lwt when deny_action is disconnect --- apps/emqx/src/emqx_channel.erl | 21 +++---- apps/emqx_authz/test/emqx_authz_SUITE.erl | 71 +++++++++++++++++++++++ 2 files changed, 79 insertions(+), 13 deletions(-) diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 699c05134..afa5b1cf2 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -354,12 +354,14 @@ handle_in(?CONNECT_PACKET(ConnPkt) = Packet, Channel) -> {ok, NConnPkt, NChannel = #channel{clientinfo = ClientInfo}} -> ?TRACE("MQTT", "mqtt_packet_received", #{packet => Packet}), NChannel1 = NChannel#channel{ - will_msg = emqx_packet:will_msg(NConnPkt), alias_maximum = init_alias_maximum(NConnPkt, ClientInfo) }, case authenticate(?CONNECT_PACKET(NConnPkt), NChannel1) of {ok, Properties, NChannel2} -> - process_connect(Properties, NChannel2); + %% only store will_msg after successful authn + %% fix for: https://github.com/emqx/emqx/issues/8886 + NChannel3 = NChannel2#channel{will_msg = emqx_packet:will_msg(NConnPkt)}, + process_connect(Properties, NChannel3); {continue, Properties, NChannel2} -> handle_out(auth, {?RC_CONTINUE_AUTHENTICATION, Properties}, NChannel2); {error, ReasonCode} -> @@ -1438,20 +1440,13 @@ terminate({shutdown, Reason}, Channel) when -> run_terminate_hook(Reason, Channel); terminate(Reason, Channel = #channel{will_msg = WillMsg}) -> - should_publish_will_message(Reason, Channel) andalso publish_will_msg(WillMsg), + %% since will_msg is set to undefined as soon as it is published, + %% if will_msg still exists when the session is terminated, it + %% must be published immediately. + WillMsg =/= undefined andalso publish_will_msg(WillMsg), (Reason =:= expired) andalso persist_if_session(Channel), run_terminate_hook(Reason, Channel). -should_publish_will_message(TerminateReason, Channel) -> - not lists:member(TerminateReason, [ - {shutdown, kicked}, - {shutdown, discarded}, - {shutdown, takenover}, - {shutdown, not_authorized} - ]) andalso - not lists:member(info(conn_state, Channel), [idle, connecting]) andalso - info(will_msg, Channel) =/= undefined. - persist_if_session(#channel{session = Session} = Channel) -> case emqx_session:is_session(Session) of true -> diff --git a/apps/emqx_authz/test/emqx_authz_SUITE.erl b/apps/emqx_authz/test/emqx_authz_SUITE.erl index 3ee1a94e8..8f8c8fef1 100644 --- a/apps/emqx_authz/test/emqx_authz_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_SUITE.erl @@ -19,6 +19,8 @@ -compile(export_all). -include("emqx_authz.hrl"). +-include_lib("emqx/include/emqx.hrl"). +-include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). -include_lib("emqx/include/emqx_placeholder.hrl"). @@ -61,10 +63,26 @@ end_per_suite(_Config) -> meck:unload(emqx_resource), ok. +init_per_testcase(TestCase, Config) when + TestCase =:= t_subscribe_deny_disconnect_publishes_last_will_testament; + TestCase =:= t_publish_deny_disconnect_publishes_last_will_testament +-> + {ok, _} = emqx_authz:update(?CMD_REPLACE, []), + {ok, _} = emqx:update_config([authorization, deny_action], disconnect), + Config; init_per_testcase(_, Config) -> {ok, _} = emqx_authz:update(?CMD_REPLACE, []), Config. +end_per_testcase(TestCase, _Config) when + TestCase =:= t_subscribe_deny_disconnect_publishes_last_will_testament; + TestCase =:= t_publish_deny_disconnect_publishes_last_will_testament +-> + {ok, _} = emqx:update_config([authorization, deny_action], ignore), + ok; +end_per_testcase(_TestCase, _Config) -> + ok. + set_special_configs(emqx_authz) -> {ok, _} = emqx:update_config([authorization, cache, enable], false), {ok, _} = emqx:update_config([authorization, no_match], deny), @@ -287,5 +305,58 @@ t_get_enabled_authzs_some_enabled(_Config) -> {ok, _} = emqx_authz:update(?CMD_REPLACE, [?SOURCE4]), ?assertEqual([postgresql], emqx_authz:get_enabled_authzs()). +t_subscribe_deny_disconnect_publishes_last_will_testament(_Config) -> + {ok, C} = emqtt:start_link([ + {will_topic, <<"lwt">>}, + {will_payload, <<"should be published">>} + ]), + {ok, _} = emqtt:connect(C), + ok = emqx:subscribe(<<"lwt">>), + process_flag(trap_exit, true), + + try + emqtt:subscribe(C, <<"unauthorized">>), + error(should_have_disconnected) + catch + exit:{{shutdown, tcp_closed}, _} -> + ok + end, + + receive + {deliver, <<"lwt">>, #message{payload = <<"should be published">>}} -> + ok + after 2_000 -> + error(lwt_not_published) + end, + + ok. + +t_publish_deny_disconnect_publishes_last_will_testament(_Config) -> + {ok, C} = emqtt:start_link([ + {will_topic, <<"lwt">>}, + {will_payload, <<"should be published">>} + ]), + {ok, _} = emqtt:connect(C), + ok = emqx:subscribe(<<"lwt">>), + process_flag(trap_exit, true), + + %% disconnect is async + Ref = monitor(process, C), + emqtt:publish(C, <<"some/topic">>, <<"unauthorized">>), + receive + {'DOWN', Ref, process, C, _} -> + ok + after 1_000 -> + error(client_should_have_been_disconnected) + end, + receive + {deliver, <<"lwt">>, #message{payload = <<"should be published">>}} -> + ok + after 2_000 -> + error(lwt_not_published) + end, + + ok. + stop_apps(Apps) -> lists:foreach(fun application:stop/1, Apps). From c20ad3733af2084602dcaf0415eaea67993fe8f7 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Fri, 16 Sep 2022 14:39:58 -0300 Subject: [PATCH 3/3] fix: check for authorization on topic before publishing last will testament fixes #8978 Without checking for authorization, a client can, on abnormal termination, publish a message to any topic, including `$SYS` ones. --- CHANGES-5.0.md | 4 ++ apps/emqx/src/emqx_channel.erl | 37 +++++++++++----- apps/emqx_authz/test/emqx_authz_SUITE.erl | 51 ++++++++++++++++++++--- 3 files changed, 76 insertions(+), 16 deletions(-) diff --git a/CHANGES-5.0.md b/CHANGES-5.0.md index d0f80cc93..39affe867 100644 --- a/CHANGES-5.0.md +++ b/CHANGES-5.0.md @@ -4,6 +4,10 @@ * Add `cert_common_name` and `cert_subject` placeholder support for authz_http and authz_mongo.[#8973](https://github.com/emqx/emqx/pull/8973) +## Bug fixes + +* Check ACLs for last will testament topic before publishing the message. [#8930](https://github.com/emqx/emqx/pull/8930) + # 5.0.8 ## Bug fixes diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index afa5b1cf2..8335a2a5d 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -1167,10 +1167,11 @@ handle_call( Channel = #channel{ conn_state = ConnState, will_msg = WillMsg, + clientinfo = ClientInfo, conninfo = #{proto_ver := ProtoVer} } ) -> - (WillMsg =/= undefined) andalso publish_will_msg(WillMsg), + (WillMsg =/= undefined) andalso publish_will_msg(ClientInfo, WillMsg), Channel1 = case ConnState of connected -> ensure_disconnected(kicked, Channel); @@ -1361,8 +1362,10 @@ handle_timeout( end; handle_timeout(_TRef, expire_session, Channel) -> shutdown(expired, Channel); -handle_timeout(_TRef, will_message, Channel = #channel{will_msg = WillMsg}) -> - (WillMsg =/= undefined) andalso publish_will_msg(WillMsg), +handle_timeout( + _TRef, will_message, Channel = #channel{clientinfo = ClientInfo, will_msg = WillMsg} +) -> + (WillMsg =/= undefined) andalso publish_will_msg(ClientInfo, WillMsg), {ok, clean_timer(will_timer, Channel#channel{will_msg = undefined})}; handle_timeout( _TRef, @@ -1439,11 +1442,11 @@ terminate({shutdown, Reason}, Channel) when Reason =:= takenover -> run_terminate_hook(Reason, Channel); -terminate(Reason, Channel = #channel{will_msg = WillMsg}) -> +terminate(Reason, Channel = #channel{clientinfo = ClientInfo, will_msg = WillMsg}) -> %% since will_msg is set to undefined as soon as it is published, %% if will_msg still exists when the session is terminated, it %% must be published immediately. - WillMsg =/= undefined andalso publish_will_msg(WillMsg), + WillMsg =/= undefined andalso publish_will_msg(ClientInfo, WillMsg), (Reason =:= expired) andalso persist_if_session(Channel), run_terminate_hook(Reason, Channel). @@ -2102,10 +2105,10 @@ ensure_disconnected( maybe_publish_will_msg(Channel = #channel{will_msg = undefined}) -> Channel; -maybe_publish_will_msg(Channel = #channel{will_msg = WillMsg}) -> +maybe_publish_will_msg(Channel = #channel{clientinfo = ClientInfo, will_msg = WillMsg}) -> case will_delay_interval(WillMsg) of 0 -> - ok = publish_will_msg(WillMsg), + ok = publish_will_msg(ClientInfo, WillMsg), Channel#channel{will_msg = undefined}; I -> ensure_timer(will_timer, timer:seconds(I), Channel) @@ -2118,9 +2121,23 @@ will_delay_interval(WillMsg) -> 0 ). -publish_will_msg(Msg) -> - _ = emqx_broker:publish(Msg), - ok. +publish_will_msg(ClientInfo, Msg = #message{topic = Topic}) -> + case emqx_access_control:authorize(ClientInfo, publish, Topic) of + allow -> + _ = emqx_broker:publish(Msg), + ok; + deny -> + ?tp( + warning, + last_will_testament_publish_denied, + #{ + client_info => ClientInfo, + topic => Topic, + message => Msg + } + ), + ok + end. %%-------------------------------------------------------------------- %% Disconnect Reason diff --git a/apps/emqx_authz/test/emqx_authz_SUITE.erl b/apps/emqx_authz/test/emqx_authz_SUITE.erl index 8f8c8fef1..f602acedc 100644 --- a/apps/emqx_authz/test/emqx_authz_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_SUITE.erl @@ -24,6 +24,7 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). -include_lib("emqx/include/emqx_placeholder.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). all() -> emqx_common_test_helpers:all(?MODULE). @@ -157,6 +158,15 @@ set_special_configs(_App) -> "\n{allow,{ipaddr,\"127.0.0.1\"},all,[\"$SYS/#\",\"#\"]}." >> }). +-define(SOURCE7, #{ + <<"type">> => <<"file">>, + <<"enable">> => true, + <<"rules">> => + << + "{allow,{username,\"some_client\"},publish,[\"some_client/lwt\"]}.\n" + "{deny, all}." + >> +}). %%------------------------------------------------------------------------------ %% Testcases @@ -306,12 +316,14 @@ t_get_enabled_authzs_some_enabled(_Config) -> ?assertEqual([postgresql], emqx_authz:get_enabled_authzs()). t_subscribe_deny_disconnect_publishes_last_will_testament(_Config) -> + {ok, _} = emqx_authz:update(?CMD_REPLACE, [?SOURCE7]), {ok, C} = emqtt:start_link([ - {will_topic, <<"lwt">>}, + {username, <<"some_client">>}, + {will_topic, <<"some_client/lwt">>}, {will_payload, <<"should be published">>} ]), {ok, _} = emqtt:connect(C), - ok = emqx:subscribe(<<"lwt">>), + ok = emqx:subscribe(<<"some_client/lwt">>), process_flag(trap_exit, true), try @@ -323,7 +335,7 @@ t_subscribe_deny_disconnect_publishes_last_will_testament(_Config) -> end, receive - {deliver, <<"lwt">>, #message{payload = <<"should be published">>}} -> + {deliver, <<"some_client/lwt">>, #message{payload = <<"should be published">>}} -> ok after 2_000 -> error(lwt_not_published) @@ -332,12 +344,14 @@ t_subscribe_deny_disconnect_publishes_last_will_testament(_Config) -> ok. t_publish_deny_disconnect_publishes_last_will_testament(_Config) -> + {ok, _} = emqx_authz:update(?CMD_REPLACE, [?SOURCE7]), {ok, C} = emqtt:start_link([ - {will_topic, <<"lwt">>}, + {username, <<"some_client">>}, + {will_topic, <<"some_client/lwt">>}, {will_payload, <<"should be published">>} ]), {ok, _} = emqtt:connect(C), - ok = emqx:subscribe(<<"lwt">>), + ok = emqx:subscribe(<<"some_client/lwt">>), process_flag(trap_exit, true), %% disconnect is async @@ -350,7 +364,7 @@ t_publish_deny_disconnect_publishes_last_will_testament(_Config) -> error(client_should_have_been_disconnected) end, receive - {deliver, <<"lwt">>, #message{payload = <<"should be published">>}} -> + {deliver, <<"some_client/lwt">>, #message{payload = <<"should be published">>}} -> ok after 2_000 -> error(lwt_not_published) @@ -358,5 +372,30 @@ t_publish_deny_disconnect_publishes_last_will_testament(_Config) -> ok. +t_publish_last_will_testament_denied_topic(_Config) -> + {ok, C} = emqtt:start_link([ + {will_topic, <<"$SYS/lwt">>}, + {will_payload, <<"should not be published">>} + ]), + {ok, _} = emqtt:connect(C), + ok = emqx:subscribe(<<"$SYS/lwt">>), + unlink(C), + ok = snabbkaffe:start_trace(), + {true, {ok, _}} = ?wait_async_action( + exit(C, kill), + #{?snk_kind := last_will_testament_publish_denied}, + 1_000 + ), + ok = snabbkaffe:stop(), + + receive + {deliver, <<"$SYS/lwt">>, #message{payload = <<"should not be published">>}} -> + error(lwt_should_not_be_published_to_forbidden_topic) + after 1_000 -> + ok + end, + + ok. + stop_apps(Apps) -> lists:foreach(fun application:stop/1, Apps).