diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 1eb3a2ea8..026ab3b43 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -1137,11 +1137,10 @@ handle_call( conninfo = #{proto_ver := ProtoVer} } ) -> - Channel0 = maybe_publish_will_msg(kick, Channel), Channel1 = case ConnState of - connected -> ensure_disconnected(kicked, Channel0); - _ -> Channel0 + connected -> ensure_disconnected(kicked, Channel); + _ -> Channel end, case ProtoVer == ?MQTT_PROTO_V5 andalso ConnState == connected of true -> @@ -1423,8 +1422,6 @@ terminate(_, #channel{conn_state = idle} = _Channel) -> ok; terminate(normal, Channel) -> run_terminate_hook(normal, Channel); -terminate({shutdown, kicked}, Channel) -> - run_terminate_hook(kicked, Channel); terminate(Reason, Channel) -> Channel1 = maybe_publish_will_msg(Reason, Channel), run_terminate_hook(Reason, Channel1). @@ -2232,7 +2229,8 @@ maybe_publish_will_msg( maybe_publish_will_msg( Reason, Channel = #channel{clientinfo = ClientInfo, will_msg = WillMsg, timers = Timers} ) when - Reason =:= {shutdown, expired} + Reason =:= {shutdown, expired} orelse + Reason =:= {shutdown, kicked} -> %% Must publish now without delay and cancel the will message timer. DelayedWillTimer = maps:get(will_message, Timers, undefined), diff --git a/apps/emqx/test/emqx_takeover_SUITE.erl b/apps/emqx/test/emqx_takeover_SUITE.erl index 2eda29e5d..47e054907 100644 --- a/apps/emqx/test/emqx_takeover_SUITE.erl +++ b/apps/emqx/test/emqx_takeover_SUITE.erl @@ -94,7 +94,7 @@ t_takeover(Config) -> #{client := [CPid2, CPid1]} = FCtx, - assert_client_exit(CPid1, ?config(mqtt_vsn, Config)), + assert_client_exit(CPid1, ?config(mqtt_vsn, Config), takenover), ?assertReceive({'EXIT', CPid2, normal}), Received = [Msg || {publish, Msg} <- ?drainMailbox(?SLEEP)], ct:pal("middle: ~p", [Middle]), @@ -141,7 +141,7 @@ t_takeover_willmsg(Config) -> ), #{client := [CPid2, CPidSub, CPid1]} = FCtx, - assert_client_exit(CPid1, ?config(mqtt_vsn, Config)), + assert_client_exit(CPid1, ?config(mqtt_vsn, Config), takenover), Received = [Msg || {publish, Msg} <- ?drainMailbox(?SLEEP)], ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), {IsWill, ReceivedNoWill} = filter_payload(Received, <<"willpayload">>), @@ -189,16 +189,7 @@ t_takeover_willmsg_clean_session(Config) -> [{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs] ++ %% WHEN: client connects with clean_start=true and willmsg payload <<"willpayload_2">> [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOptsClean]}] ++ - [{fun publish_msg/2, [Msg]} || Msg <- Client2Msgs] ++ - [ - { - fun(CTX) -> - timer:sleep(1000), - CTX - end, - [] - } - ], + [{fun publish_msg/2, [Msg]} || Msg <- Client2Msgs], FCtx = lists:foldl( fun({Fun, Args}, Ctx) -> @@ -209,7 +200,7 @@ t_takeover_willmsg_clean_session(Config) -> Commands ), #{client := [CPid2, CPidSub, CPid1]} = FCtx, - assert_client_exit(CPid1, ?config(mqtt_vsn, Config)), + assert_client_exit(CPid1, ?config(mqtt_vsn, Config), takenover), Received = [Msg || {publish, Msg} <- ?drainMailbox(?SLEEP)], ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), {IsWill1, ReceivedNoWill0} = filter_payload(Received, <<"willpayload_1">>), @@ -224,6 +215,58 @@ t_takeover_willmsg_clean_session(Config) -> ?assert(not is_process_alive(CPid1)), ok. +t_kick_session(Config) -> + process_flag(trap_exit, true), + ClientId = atom_to_binary(?FUNCTION_NAME), + WillTopic = <>/binary>>, + % emqx_logger:set_log_level(debug), + WillOpts = [ + {proto_ver, ?config(mqtt_vsn, Config)}, + {clean_start, false}, + {will_topic, WillTopic}, + {will_payload, <<"willpayload_kick">>}, + {will_qos, 1} + ], + Commands = + %% GIVEN: client connect with willmsg payload <<"willpayload_kick">> + [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++ + [ + {fun start_client/5, [ + <>/binary>>, WillTopic, ?QOS_1, [] + ]} + ] ++ + [ + %% kick may fail (not found) without this delay + { + fun(CTX) -> + timer:sleep(100), + CTX + end, + [] + } + ] ++ + %% WHEN: client is kicked with kick_session + [{fun kick_client/2, [ClientId]}], + + FCtx = lists:foldl( + fun({Fun, Args}, Ctx) -> + ct:pal("COMMAND: ~p ~p", [element(2, erlang:fun_info(Fun, name)), Args]), + apply(Fun, [Ctx | Args]) + end, + #{}, + Commands + ), + #{client := [CPidSub, CPid1]} = FCtx, + assert_client_exit(CPid1, ?config(mqtt_vsn, Config), kicked), + Received = [Msg || {publish, Msg} <- ?drainMailbox(?SLEEP)], + ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), + %% THEN: payload <<"willpayload_kick">> should be published + {IsWill1, _ReceivedNoWill} = filter_payload(Received, <<"willpayload_kick">>), + ?assert(IsWill1), + emqtt:stop(CPidSub), + ?assert(not is_process_alive(CPid1)), + ok. + %% t_takover_in_cluster(_) -> %% todo. @@ -238,6 +281,10 @@ start_client(Ctx, ClientId, Topic, Qos, Opts) -> end), Ctx#{client => [CPid | maps:get(client, Ctx, [])]}. +kick_client(Ctx, ClientId) -> + ok = emqx_cm:kick_session(ClientId), + Ctx. + publish_msg(Ctx, Msg) -> ok = timer:sleep(rand:uniform(?SLEEP)), case emqx:publish(Msg) of @@ -306,23 +353,20 @@ payload(I) -> -spec filter_payload(List :: [#{payload := binary()}], Payload :: binary()) -> {IsPayloadFound :: boolean(), OtherPayloads :: [#{payload := binary()}]}. filter_payload(List, Payload) when is_binary(Payload) -> - IsWill = lists:any( - fun - (#{payload := P}) -> Payload == P; - (_) -> false - end, - List - ), Filtered = [ Msg || #{payload := P} = Msg <- List, P =/= Payload ], - {IsWill, Filtered}. + {length(List) =/= length(Filtered), Filtered}. %% @doc assert emqtt *client* process exits as expected. -assert_client_exit(Pid, v5) -> +assert_client_exit(Pid, v5, takenover) -> %% @ref: MQTT 5.0 spec [MQTT-3.1.4-3] ?assertReceive({'EXIT', Pid, {disconnected, ?RC_SESSION_TAKEN_OVER, _}}); -assert_client_exit(Pid, v3) -> - ?assertReceive({'EXIT', Pid, {shutdown, tcp_closed}}). +assert_client_exit(Pid, v3, takenover) -> + ?assertReceive({'EXIT', Pid, {shutdown, tcp_closed}}); +assert_client_exit(Pid, v3, kicked) -> + ?assertReceive({'EXIT', Pid, _}); +assert_client_exit(Pid, v5, kicked) -> + ?assertReceive({'EXIT', Pid, {disconnected, ?RC_ADMINISTRATIVE_ACTION, _}}).