Merge pull request #8930 from thalesmg/bugfix-will-msg-check-conn-state
fix: check conn state before sending will message
This commit is contained in:
commit
d4b272b0ed
|
@ -4,6 +4,10 @@
|
|||
|
||||
* Add `cert_common_name` and `cert_subject` placeholder support for authz_http and authz_mongo.[#8973](https://github.com/emqx/emqx/pull/8973)
|
||||
|
||||
## Bug fixes
|
||||
|
||||
* Check ACLs for last will testament topic before publishing the message. [#8930](https://github.com/emqx/emqx/pull/8930)
|
||||
|
||||
# 5.0.8
|
||||
|
||||
## Bug fixes
|
||||
|
|
|
@ -354,12 +354,14 @@ handle_in(?CONNECT_PACKET(ConnPkt) = Packet, Channel) ->
|
|||
{ok, NConnPkt, NChannel = #channel{clientinfo = ClientInfo}} ->
|
||||
?TRACE("MQTT", "mqtt_packet_received", #{packet => Packet}),
|
||||
NChannel1 = NChannel#channel{
|
||||
will_msg = emqx_packet:will_msg(NConnPkt),
|
||||
alias_maximum = init_alias_maximum(NConnPkt, ClientInfo)
|
||||
},
|
||||
case authenticate(?CONNECT_PACKET(NConnPkt), NChannel1) of
|
||||
{ok, Properties, NChannel2} ->
|
||||
process_connect(Properties, NChannel2);
|
||||
%% only store will_msg after successful authn
|
||||
%% fix for: https://github.com/emqx/emqx/issues/8886
|
||||
NChannel3 = NChannel2#channel{will_msg = emqx_packet:will_msg(NConnPkt)},
|
||||
process_connect(Properties, NChannel3);
|
||||
{continue, Properties, NChannel2} ->
|
||||
handle_out(auth, {?RC_CONTINUE_AUTHENTICATION, Properties}, NChannel2);
|
||||
{error, ReasonCode} ->
|
||||
|
@ -1165,10 +1167,11 @@ handle_call(
|
|||
Channel = #channel{
|
||||
conn_state = ConnState,
|
||||
will_msg = WillMsg,
|
||||
clientinfo = ClientInfo,
|
||||
conninfo = #{proto_ver := ProtoVer}
|
||||
}
|
||||
) ->
|
||||
(WillMsg =/= undefined) andalso publish_will_msg(WillMsg),
|
||||
(WillMsg =/= undefined) andalso publish_will_msg(ClientInfo, WillMsg),
|
||||
Channel1 =
|
||||
case ConnState of
|
||||
connected -> ensure_disconnected(kicked, Channel);
|
||||
|
@ -1359,8 +1362,10 @@ handle_timeout(
|
|||
end;
|
||||
handle_timeout(_TRef, expire_session, Channel) ->
|
||||
shutdown(expired, Channel);
|
||||
handle_timeout(_TRef, will_message, Channel = #channel{will_msg = WillMsg}) ->
|
||||
(WillMsg =/= undefined) andalso publish_will_msg(WillMsg),
|
||||
handle_timeout(
|
||||
_TRef, will_message, Channel = #channel{clientinfo = ClientInfo, will_msg = WillMsg}
|
||||
) ->
|
||||
(WillMsg =/= undefined) andalso publish_will_msg(ClientInfo, WillMsg),
|
||||
{ok, clean_timer(will_timer, Channel#channel{will_msg = undefined})};
|
||||
handle_timeout(
|
||||
_TRef,
|
||||
|
@ -1434,12 +1439,14 @@ terminate({shutdown, kicked}, Channel) ->
|
|||
run_terminate_hook(kicked, Channel);
|
||||
terminate({shutdown, Reason}, Channel) when
|
||||
Reason =:= discarded;
|
||||
Reason =:= takenover;
|
||||
Reason =:= not_authorized
|
||||
Reason =:= takenover
|
||||
->
|
||||
run_terminate_hook(Reason, Channel);
|
||||
terminate(Reason, Channel = #channel{will_msg = WillMsg}) ->
|
||||
(WillMsg =/= undefined) andalso publish_will_msg(WillMsg),
|
||||
terminate(Reason, Channel = #channel{clientinfo = ClientInfo, will_msg = WillMsg}) ->
|
||||
%% since will_msg is set to undefined as soon as it is published,
|
||||
%% if will_msg still exists when the session is terminated, it
|
||||
%% must be published immediately.
|
||||
WillMsg =/= undefined andalso publish_will_msg(ClientInfo, WillMsg),
|
||||
(Reason =:= expired) andalso persist_if_session(Channel),
|
||||
run_terminate_hook(Reason, Channel).
|
||||
|
||||
|
@ -2098,10 +2105,10 @@ ensure_disconnected(
|
|||
|
||||
maybe_publish_will_msg(Channel = #channel{will_msg = undefined}) ->
|
||||
Channel;
|
||||
maybe_publish_will_msg(Channel = #channel{will_msg = WillMsg}) ->
|
||||
maybe_publish_will_msg(Channel = #channel{clientinfo = ClientInfo, will_msg = WillMsg}) ->
|
||||
case will_delay_interval(WillMsg) of
|
||||
0 ->
|
||||
ok = publish_will_msg(WillMsg),
|
||||
ok = publish_will_msg(ClientInfo, WillMsg),
|
||||
Channel#channel{will_msg = undefined};
|
||||
I ->
|
||||
ensure_timer(will_timer, timer:seconds(I), Channel)
|
||||
|
@ -2114,9 +2121,23 @@ will_delay_interval(WillMsg) ->
|
|||
0
|
||||
).
|
||||
|
||||
publish_will_msg(Msg) ->
|
||||
_ = emqx_broker:publish(Msg),
|
||||
ok.
|
||||
publish_will_msg(ClientInfo, Msg = #message{topic = Topic}) ->
|
||||
case emqx_access_control:authorize(ClientInfo, publish, Topic) of
|
||||
allow ->
|
||||
_ = emqx_broker:publish(Msg),
|
||||
ok;
|
||||
deny ->
|
||||
?tp(
|
||||
warning,
|
||||
last_will_testament_publish_denied,
|
||||
#{
|
||||
client_info => ClientInfo,
|
||||
topic => Topic,
|
||||
message => Msg
|
||||
}
|
||||
),
|
||||
ok
|
||||
end.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Disconnect Reason
|
||||
|
|
|
@ -19,9 +19,12 @@
|
|||
-compile(export_all).
|
||||
|
||||
-include("emqx_authz.hrl").
|
||||
-include_lib("emqx/include/emqx.hrl").
|
||||
-include_lib("emqx/include/emqx_mqtt.hrl").
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
-include_lib("common_test/include/ct.hrl").
|
||||
-include_lib("emqx/include/emqx_placeholder.hrl").
|
||||
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||
|
||||
all() ->
|
||||
emqx_common_test_helpers:all(?MODULE).
|
||||
|
@ -61,10 +64,26 @@ end_per_suite(_Config) ->
|
|||
meck:unload(emqx_resource),
|
||||
ok.
|
||||
|
||||
init_per_testcase(TestCase, Config) when
|
||||
TestCase =:= t_subscribe_deny_disconnect_publishes_last_will_testament;
|
||||
TestCase =:= t_publish_deny_disconnect_publishes_last_will_testament
|
||||
->
|
||||
{ok, _} = emqx_authz:update(?CMD_REPLACE, []),
|
||||
{ok, _} = emqx:update_config([authorization, deny_action], disconnect),
|
||||
Config;
|
||||
init_per_testcase(_, Config) ->
|
||||
{ok, _} = emqx_authz:update(?CMD_REPLACE, []),
|
||||
Config.
|
||||
|
||||
end_per_testcase(TestCase, _Config) when
|
||||
TestCase =:= t_subscribe_deny_disconnect_publishes_last_will_testament;
|
||||
TestCase =:= t_publish_deny_disconnect_publishes_last_will_testament
|
||||
->
|
||||
{ok, _} = emqx:update_config([authorization, deny_action], ignore),
|
||||
ok;
|
||||
end_per_testcase(_TestCase, _Config) ->
|
||||
ok.
|
||||
|
||||
set_special_configs(emqx_authz) ->
|
||||
{ok, _} = emqx:update_config([authorization, cache, enable], false),
|
||||
{ok, _} = emqx:update_config([authorization, no_match], deny),
|
||||
|
@ -139,6 +158,15 @@ set_special_configs(_App) ->
|
|||
"\n{allow,{ipaddr,\"127.0.0.1\"},all,[\"$SYS/#\",\"#\"]}."
|
||||
>>
|
||||
}).
|
||||
-define(SOURCE7, #{
|
||||
<<"type">> => <<"file">>,
|
||||
<<"enable">> => true,
|
||||
<<"rules">> =>
|
||||
<<
|
||||
"{allow,{username,\"some_client\"},publish,[\"some_client/lwt\"]}.\n"
|
||||
"{deny, all}."
|
||||
>>
|
||||
}).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Testcases
|
||||
|
@ -287,5 +315,87 @@ t_get_enabled_authzs_some_enabled(_Config) ->
|
|||
{ok, _} = emqx_authz:update(?CMD_REPLACE, [?SOURCE4]),
|
||||
?assertEqual([postgresql], emqx_authz:get_enabled_authzs()).
|
||||
|
||||
t_subscribe_deny_disconnect_publishes_last_will_testament(_Config) ->
|
||||
{ok, _} = emqx_authz:update(?CMD_REPLACE, [?SOURCE7]),
|
||||
{ok, C} = emqtt:start_link([
|
||||
{username, <<"some_client">>},
|
||||
{will_topic, <<"some_client/lwt">>},
|
||||
{will_payload, <<"should be published">>}
|
||||
]),
|
||||
{ok, _} = emqtt:connect(C),
|
||||
ok = emqx:subscribe(<<"some_client/lwt">>),
|
||||
process_flag(trap_exit, true),
|
||||
|
||||
try
|
||||
emqtt:subscribe(C, <<"unauthorized">>),
|
||||
error(should_have_disconnected)
|
||||
catch
|
||||
exit:{{shutdown, tcp_closed}, _} ->
|
||||
ok
|
||||
end,
|
||||
|
||||
receive
|
||||
{deliver, <<"some_client/lwt">>, #message{payload = <<"should be published">>}} ->
|
||||
ok
|
||||
after 2_000 ->
|
||||
error(lwt_not_published)
|
||||
end,
|
||||
|
||||
ok.
|
||||
|
||||
t_publish_deny_disconnect_publishes_last_will_testament(_Config) ->
|
||||
{ok, _} = emqx_authz:update(?CMD_REPLACE, [?SOURCE7]),
|
||||
{ok, C} = emqtt:start_link([
|
||||
{username, <<"some_client">>},
|
||||
{will_topic, <<"some_client/lwt">>},
|
||||
{will_payload, <<"should be published">>}
|
||||
]),
|
||||
{ok, _} = emqtt:connect(C),
|
||||
ok = emqx:subscribe(<<"some_client/lwt">>),
|
||||
process_flag(trap_exit, true),
|
||||
|
||||
%% disconnect is async
|
||||
Ref = monitor(process, C),
|
||||
emqtt:publish(C, <<"some/topic">>, <<"unauthorized">>),
|
||||
receive
|
||||
{'DOWN', Ref, process, C, _} ->
|
||||
ok
|
||||
after 1_000 ->
|
||||
error(client_should_have_been_disconnected)
|
||||
end,
|
||||
receive
|
||||
{deliver, <<"some_client/lwt">>, #message{payload = <<"should be published">>}} ->
|
||||
ok
|
||||
after 2_000 ->
|
||||
error(lwt_not_published)
|
||||
end,
|
||||
|
||||
ok.
|
||||
|
||||
t_publish_last_will_testament_denied_topic(_Config) ->
|
||||
{ok, C} = emqtt:start_link([
|
||||
{will_topic, <<"$SYS/lwt">>},
|
||||
{will_payload, <<"should not be published">>}
|
||||
]),
|
||||
{ok, _} = emqtt:connect(C),
|
||||
ok = emqx:subscribe(<<"$SYS/lwt">>),
|
||||
unlink(C),
|
||||
ok = snabbkaffe:start_trace(),
|
||||
{true, {ok, _}} = ?wait_async_action(
|
||||
exit(C, kill),
|
||||
#{?snk_kind := last_will_testament_publish_denied},
|
||||
1_000
|
||||
),
|
||||
ok = snabbkaffe:stop(),
|
||||
|
||||
receive
|
||||
{deliver, <<"$SYS/lwt">>, #message{payload = <<"should not be published">>}} ->
|
||||
error(lwt_should_not_be_published_to_forbidden_topic)
|
||||
after 1_000 ->
|
||||
ok
|
||||
end,
|
||||
|
||||
ok.
|
||||
|
||||
stop_apps(Apps) ->
|
||||
lists:foreach(fun application:stop/1, Apps).
|
||||
|
|
Loading…
Reference in New Issue