fix(kick): defer willmsg publish when conn terminates

because kick means shutdown connection AND delete session
This commit is contained in:
William Yang 2023-11-10 08:55:59 +01:00
parent 9da4896f57
commit 6243cf0b0c
2 changed files with 72 additions and 30 deletions

View File

@ -1137,11 +1137,10 @@ handle_call(
conninfo = #{proto_ver := ProtoVer} conninfo = #{proto_ver := ProtoVer}
} }
) -> ) ->
Channel0 = maybe_publish_will_msg(kick, Channel),
Channel1 = Channel1 =
case ConnState of case ConnState of
connected -> ensure_disconnected(kicked, Channel0); connected -> ensure_disconnected(kicked, Channel);
_ -> Channel0 _ -> Channel
end, end,
case ProtoVer == ?MQTT_PROTO_V5 andalso ConnState == connected of case ProtoVer == ?MQTT_PROTO_V5 andalso ConnState == connected of
true -> true ->
@ -1423,8 +1422,6 @@ terminate(_, #channel{conn_state = idle} = _Channel) ->
ok; ok;
terminate(normal, Channel) -> terminate(normal, Channel) ->
run_terminate_hook(normal, Channel); run_terminate_hook(normal, Channel);
terminate({shutdown, kicked}, Channel) ->
run_terminate_hook(kicked, Channel);
terminate(Reason, Channel) -> terminate(Reason, Channel) ->
Channel1 = maybe_publish_will_msg(Reason, Channel), Channel1 = maybe_publish_will_msg(Reason, Channel),
run_terminate_hook(Reason, Channel1). run_terminate_hook(Reason, Channel1).
@ -2232,7 +2229,8 @@ maybe_publish_will_msg(
maybe_publish_will_msg( maybe_publish_will_msg(
Reason, Channel = #channel{clientinfo = ClientInfo, will_msg = WillMsg, timers = Timers} Reason, Channel = #channel{clientinfo = ClientInfo, will_msg = WillMsg, timers = Timers}
) when ) when
Reason =:= {shutdown, expired} Reason =:= {shutdown, expired} orelse
Reason =:= {shutdown, kicked}
-> ->
%% Must publish now without delay and cancel the will message timer. %% Must publish now without delay and cancel the will message timer.
DelayedWillTimer = maps:get(will_message, Timers, undefined), DelayedWillTimer = maps:get(will_message, Timers, undefined),

View File

@ -94,7 +94,7 @@ t_takeover(Config) ->
#{client := [CPid2, CPid1]} = FCtx, #{client := [CPid2, CPid1]} = FCtx,
assert_client_exit(CPid1, ?config(mqtt_vsn, Config)), assert_client_exit(CPid1, ?config(mqtt_vsn, Config), takenover),
?assertReceive({'EXIT', CPid2, normal}), ?assertReceive({'EXIT', CPid2, normal}),
Received = [Msg || {publish, Msg} <- ?drainMailbox(?SLEEP)], Received = [Msg || {publish, Msg} <- ?drainMailbox(?SLEEP)],
ct:pal("middle: ~p", [Middle]), ct:pal("middle: ~p", [Middle]),
@ -141,7 +141,7 @@ t_takeover_willmsg(Config) ->
), ),
#{client := [CPid2, CPidSub, CPid1]} = FCtx, #{client := [CPid2, CPidSub, CPid1]} = FCtx,
assert_client_exit(CPid1, ?config(mqtt_vsn, Config)), assert_client_exit(CPid1, ?config(mqtt_vsn, Config), takenover),
Received = [Msg || {publish, Msg} <- ?drainMailbox(?SLEEP)], Received = [Msg || {publish, Msg} <- ?drainMailbox(?SLEEP)],
ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), ct:pal("received: ~p", [[P || #{payload := P} <- Received]]),
{IsWill, ReceivedNoWill} = filter_payload(Received, <<"willpayload">>), {IsWill, ReceivedNoWill} = filter_payload(Received, <<"willpayload">>),
@ -189,16 +189,7 @@ t_takeover_willmsg_clean_session(Config) ->
[{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs] ++ [{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs] ++
%% WHEN: client connects with clean_start=true and willmsg payload <<"willpayload_2">> %% WHEN: client connects with clean_start=true and willmsg payload <<"willpayload_2">>
[{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOptsClean]}] ++ [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOptsClean]}] ++
[{fun publish_msg/2, [Msg]} || Msg <- Client2Msgs] ++ [{fun publish_msg/2, [Msg]} || Msg <- Client2Msgs],
[
{
fun(CTX) ->
timer:sleep(1000),
CTX
end,
[]
}
],
FCtx = lists:foldl( FCtx = lists:foldl(
fun({Fun, Args}, Ctx) -> fun({Fun, Args}, Ctx) ->
@ -209,7 +200,7 @@ t_takeover_willmsg_clean_session(Config) ->
Commands Commands
), ),
#{client := [CPid2, CPidSub, CPid1]} = FCtx, #{client := [CPid2, CPidSub, CPid1]} = FCtx,
assert_client_exit(CPid1, ?config(mqtt_vsn, Config)), assert_client_exit(CPid1, ?config(mqtt_vsn, Config), takenover),
Received = [Msg || {publish, Msg} <- ?drainMailbox(?SLEEP)], Received = [Msg || {publish, Msg} <- ?drainMailbox(?SLEEP)],
ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), ct:pal("received: ~p", [[P || #{payload := P} <- Received]]),
{IsWill1, ReceivedNoWill0} = filter_payload(Received, <<"willpayload_1">>), {IsWill1, ReceivedNoWill0} = filter_payload(Received, <<"willpayload_1">>),
@ -224,6 +215,58 @@ t_takeover_willmsg_clean_session(Config) ->
?assert(not is_process_alive(CPid1)), ?assert(not is_process_alive(CPid1)),
ok. ok.
t_kick_session(Config) ->
process_flag(trap_exit, true),
ClientId = atom_to_binary(?FUNCTION_NAME),
WillTopic = <<ClientId/binary, <<"willtopic">>/binary>>,
% emqx_logger:set_log_level(debug),
WillOpts = [
{proto_ver, ?config(mqtt_vsn, Config)},
{clean_start, false},
{will_topic, WillTopic},
{will_payload, <<"willpayload_kick">>},
{will_qos, 1}
],
Commands =
%% GIVEN: client connect with willmsg payload <<"willpayload_kick">>
[{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++
[
{fun start_client/5, [
<<ClientId/binary, <<"_willsub">>/binary>>, WillTopic, ?QOS_1, []
]}
] ++
[
%% kick may fail (not found) without this delay
{
fun(CTX) ->
timer:sleep(100),
CTX
end,
[]
}
] ++
%% WHEN: client is kicked with kick_session
[{fun kick_client/2, [ClientId]}],
FCtx = lists:foldl(
fun({Fun, Args}, Ctx) ->
ct:pal("COMMAND: ~p ~p", [element(2, erlang:fun_info(Fun, name)), Args]),
apply(Fun, [Ctx | Args])
end,
#{},
Commands
),
#{client := [CPidSub, CPid1]} = FCtx,
assert_client_exit(CPid1, ?config(mqtt_vsn, Config), kicked),
Received = [Msg || {publish, Msg} <- ?drainMailbox(?SLEEP)],
ct:pal("received: ~p", [[P || #{payload := P} <- Received]]),
%% THEN: payload <<"willpayload_kick">> should be published
{IsWill1, _ReceivedNoWill} = filter_payload(Received, <<"willpayload_kick">>),
?assert(IsWill1),
emqtt:stop(CPidSub),
?assert(not is_process_alive(CPid1)),
ok.
%% t_takover_in_cluster(_) -> %% t_takover_in_cluster(_) ->
%% todo. %% todo.
@ -238,6 +281,10 @@ start_client(Ctx, ClientId, Topic, Qos, Opts) ->
end), end),
Ctx#{client => [CPid | maps:get(client, Ctx, [])]}. Ctx#{client => [CPid | maps:get(client, Ctx, [])]}.
kick_client(Ctx, ClientId) ->
ok = emqx_cm:kick_session(ClientId),
Ctx.
publish_msg(Ctx, Msg) -> publish_msg(Ctx, Msg) ->
ok = timer:sleep(rand:uniform(?SLEEP)), ok = timer:sleep(rand:uniform(?SLEEP)),
case emqx:publish(Msg) of case emqx:publish(Msg) of
@ -306,23 +353,20 @@ payload(I) ->
-spec filter_payload(List :: [#{payload := binary()}], Payload :: binary()) -> -spec filter_payload(List :: [#{payload := binary()}], Payload :: binary()) ->
{IsPayloadFound :: boolean(), OtherPayloads :: [#{payload := binary()}]}. {IsPayloadFound :: boolean(), OtherPayloads :: [#{payload := binary()}]}.
filter_payload(List, Payload) when is_binary(Payload) -> filter_payload(List, Payload) when is_binary(Payload) ->
IsWill = lists:any(
fun
(#{payload := P}) -> Payload == P;
(_) -> false
end,
List
),
Filtered = [ Filtered = [
Msg Msg
|| #{payload := P} = Msg <- List, || #{payload := P} = Msg <- List,
P =/= Payload P =/= Payload
], ],
{IsWill, Filtered}. {length(List) =/= length(Filtered), Filtered}.
%% @doc assert emqtt *client* process exits as expected. %% @doc assert emqtt *client* process exits as expected.
assert_client_exit(Pid, v5) -> assert_client_exit(Pid, v5, takenover) ->
%% @ref: MQTT 5.0 spec [MQTT-3.1.4-3] %% @ref: MQTT 5.0 spec [MQTT-3.1.4-3]
?assertReceive({'EXIT', Pid, {disconnected, ?RC_SESSION_TAKEN_OVER, _}}); ?assertReceive({'EXIT', Pid, {disconnected, ?RC_SESSION_TAKEN_OVER, _}});
assert_client_exit(Pid, v3) -> assert_client_exit(Pid, v3, takenover) ->
?assertReceive({'EXIT', Pid, {shutdown, tcp_closed}}). ?assertReceive({'EXIT', Pid, {shutdown, tcp_closed}});
assert_client_exit(Pid, v3, kicked) ->
?assertReceive({'EXIT', Pid, _});
assert_client_exit(Pid, v5, kicked) ->
?assertReceive({'EXIT', Pid, {disconnected, ?RC_ADMINISTRATIVE_ACTION, _}}).