diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 026ab3b43..46a1a7f51 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -117,7 +117,17 @@ atom() => term() }. --type conn_state() :: idle | connecting | connected | reauthenticating | disconnected. +%% init +-type conn_state() :: + idle + %% mqtt connect recved but not acked + | connecting + %% mqtt connect acked + | connected + %% mqtt connected but reauthenticating + | reauthenticating + %% keepalive timeout or connection terminated + | disconnected. -type reply() :: {outgoing, emqx_types:packet()} @@ -1137,10 +1147,11 @@ handle_call( conninfo = #{proto_ver := ProtoVer} } ) -> + Channel0 = maybe_publish_will_msg(kicked, Channel), Channel1 = case ConnState of - connected -> ensure_disconnected(kicked, Channel); - _ -> Channel + connected -> ensure_disconnected(kicked, Channel0); + _ -> Channel0 end, case ProtoVer == ?MQTT_PROTO_V5 andalso ConnState == connected of true -> @@ -1422,6 +1433,14 @@ terminate(_, #channel{conn_state = idle} = _Channel) -> ok; terminate(normal, Channel) -> run_terminate_hook(normal, Channel); +terminate({shutdown, Reason}, Channel) when + Reason =:= expired orelse + Reason =:= takenover orelse + Reason =:= kicked orelse + Reason =:= discarded +-> + Channel1 = maybe_publish_will_msg(Reason, Channel), + run_terminate_hook(Reason, Channel1); terminate(Reason, Channel) -> Channel1 = maybe_publish_will_msg(Reason, Channel), run_terminate_hook(Reason, Channel1). @@ -2216,9 +2235,19 @@ session_disconnect(_ClientInfo, _ConnInfo, undefined) -> %%-------------------------------------------------------------------- %% Maybe Publish will msg -spec maybe_publish_will_msg(Reason, channel()) -> channel() when - Reason :: kick | sock_closed | {shutdown, atom()}. + Reason :: takenover | kicked | discarded | expired | sock_closed | {shutdown, atom()}. maybe_publish_will_msg(_Reason, Channel = #channel{will_msg = undefined}) -> Channel; +maybe_publish_will_msg({shutdown, not_authorized}, Channel) -> + Channel; +maybe_publish_will_msg(not_authorized, Channel) -> + Channel; +maybe_publish_will_msg(_Reason, Channel = #channel{conn_state = ConnState}) when + ConnState =:= idle orelse + ConnState =:= connecting orelse + ConnState =:= reauthenticating +-> + Channel; maybe_publish_will_msg( _Reason, Channel = #channel{conninfo = #{proto_ver := ?MQTT_PROTO_V3}, will_msg = WillMsg} @@ -2229,30 +2258,51 @@ maybe_publish_will_msg( maybe_publish_will_msg( Reason, Channel = #channel{clientinfo = ClientInfo, will_msg = WillMsg, timers = Timers} ) when - Reason =:= {shutdown, expired} orelse - Reason =:= {shutdown, kicked} + Reason =:= expired orelse + Reason =:= discarded orelse + %% Unsure... + Reason =:= {shutdown, internal_error} orelse + Reason =:= kicked -> - %% Must publish now without delay and cancel the will message timer. + %% For the cases that session MUST be gone. + %% a. expired (session expired) + %% c. discarded (Session ends because another process starts new session with the same clientid) + %% b. kicked. (kicked by operation) + %% d. internal_error (maybe not recoverable) + %% This ensures willmsg will be published if the willmsg timer is scheduled but not fired + %% OR fired but not yet handled DelayedWillTimer = maps:get(will_message, Timers, undefined), DelayedWillTimer =/= undefined andalso erlang:cancel_timer(DelayedWillTimer, [{async, true}]), _ = publish_will_msg(ClientInfo, WillMsg), Channel#channel{will_msg = undefined, timers = maps:remove(will_message, Timers)}; maybe_publish_will_msg( - _OtherReasons, Channel = #channel{clientinfo = ClientInfo, will_msg = WillMsg, timers = Timers} + Reason, Channel = #channel{clientinfo = ClientInfo, will_msg = WillMsg, timers = Timers} ) -> - case maps:get(will_message, Timers, undefined) of - undefined -> - %% May defer the will message publishing - case will_delay_interval(WillMsg) of - 0 -> - ok = publish_will_msg(ClientInfo, WillMsg), - Channel#channel{will_msg = undefined}; - I -> - ensure_timer(will_message, timer:seconds(I), Channel) - end; - %% Will message is already scheduled - _ -> - Channel + %% For the cases that session MAY/MAY NOT be gone, we don't care about session expired or not. + %% willmsg publish could be defered. + IsSessionExpirationInProgress = maps:is_key(expire_session, Timers), + IsWillmsgScheduled = maps:is_key(will_message, Timers), + case will_delay_interval(WillMsg) of + 0 -> + %% [MQTT-3.1.2-8], 0 means will delay Will Delay Interval has elapsed + false = IsWillmsgScheduled, + ok = publish_will_msg(ClientInfo, WillMsg), + Channel#channel{will_msg = undefined}; + I when IsSessionExpirationInProgress andalso not IsWillmsgScheduled -> + %% We delay the will message publishing + %% Willmsg will be published whatever which timer fired first + ensure_timer(will_message, timer:seconds(I), Channel); + _ when IsSessionExpirationInProgress andalso IsWillmsgScheduled -> + %% Willmsg will be published whatever which timer fired first [MQTT-3.1.3-9]. + Channel; + _I when Reason =:= takenover -> + %% don't see the point to delay the willmsg + Channel; + _I when not IsSessionExpirationInProgress andalso IsWillmsgScheduled -> + Channel; + I when not IsSessionExpirationInProgress andalso not IsWillmsgScheduled -> + %% @FIXME: process may terminate before the timer fired + ensure_timer(will_message, timer:seconds(I), Channel) end. will_delay_interval(WillMsg) -> diff --git a/apps/emqx/test/emqx_shared_sub_SUITE.erl b/apps/emqx/test/emqx_shared_sub_SUITE.erl index 3f50db6e3..dc755e78c 100644 --- a/apps/emqx/test/emqx_shared_sub_SUITE.erl +++ b/apps/emqx/test/emqx_shared_sub_SUITE.erl @@ -923,6 +923,7 @@ t_session_kicked({init, Config}) when is_list(Config) -> t_session_kicked({'end', Config}) when is_list(Config) -> emqx_config:put_zone_conf(default, [mqtt, max_inflight], 0); t_session_kicked(Config) when is_list(Config) -> + emqx_logger:set_log_level(debug), Topic = <<"foo/bar/1">>, ClientId1 = <<"ClientId1">>, ClientId2 = <<"ClientId2">>, @@ -953,6 +954,8 @@ t_session_kicked(Config) when is_list(Config) -> %% on if it's picked as the first one for round_robin MsgRec1 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P1}}, P1), MsgRec2 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P2}}, P2), + + ct:pal("MsgRec1: ~p MsgRec2 ~p ~n", [MsgRec1, MsgRec2]), case MsgRec2 of <<"hello3">> -> ?assertEqual(<<"hello1">>, MsgRec1); diff --git a/apps/emqx/test/emqx_takeover_SUITE.erl b/apps/emqx/test/emqx_takeover_SUITE.erl index d10d52661..d7b143e43 100644 --- a/apps/emqx/test/emqx_takeover_SUITE.erl +++ b/apps/emqx/test/emqx_takeover_SUITE.erl @@ -224,6 +224,68 @@ t_takeover_willmsg_clean_session(Config) -> ?assert(not is_process_alive(CPid1)), ok. +t_takeover_clean_session_with_delayed_willmsg(Config) -> + process_flag(trap_exit, true), + ClientId = atom_to_binary(?FUNCTION_NAME), + WillTopic = <>/binary>>, + Middle = ?CNT div 2, + Client1Msgs = messages(ClientId, 0, Middle), + Client2Msgs = messages(ClientId, Middle, ?CNT div 2), + AllMsgs = Client1Msgs ++ Client2Msgs, + WillOpts = [ + {proto_ver, ?config(mqtt_vsn, Config)}, + {clean_start, false}, + {will_topic, WillTopic}, + {will_payload, <<"willpayload_delay10">>}, + {will_qos, 1}, + %% mqttv5 only + {properties, #{'Will-Delay-Interval' => 10000}} + ], + WillOptsClean = [ + {proto_ver, ?config(mqtt_vsn, Config)}, + {clean_start, true}, + {will_topic, WillTopic}, + {will_payload, <<"willpayload_2">>}, + {will_qos, 1} + ], + + Commands = + %% GIVEN: client connect with willmsg payload <<"willpayload_delay10">> and delay-interval 10s + [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++ + [ + {fun start_client/5, [ + <>/binary>>, WillTopic, ?QOS_1, [] + ]} + ] ++ + [{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs] ++ + %% WHEN: client connects with clean_start=true and willmsg payload <<"willpayload_2">> + [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOptsClean]}] ++ + [{fun publish_msg/2, [Msg]} || Msg <- Client2Msgs], + + FCtx = lists:foldl( + fun({Fun, Args}, Ctx) -> + ct:pal("COMMAND: ~p ~p", [element(2, erlang:fun_info(Fun, name)), Args]), + apply(Fun, [Ctx | Args]) + end, + #{}, + Commands + ), + #{client := [CPid2, CPidSub, CPid1]} = FCtx, + assert_client_exit(CPid1, ?config(mqtt_vsn, Config), takenover), + Received = [Msg || {publish, Msg} <- ?drainMailbox(?SLEEP)], + ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), + {IsWill1, ReceivedNoWill0} = filter_payload(Received, <<"willpayload_delay10">>), + {IsWill2, ReceivedNoWill} = filter_payload(ReceivedNoWill0, <<"willpayload_2">>), + assert_messages_missed(AllMsgs, ReceivedNoWill), + assert_messages_order(AllMsgs, ReceivedNoWill), + %% THEN: payload <<"willpayload_delay10">> should be published without delay. + ?assert(IsWill1), + ?assertNot(IsWill2), + emqtt:stop(CPid2), + emqtt:stop(CPidSub), + ?assert(not is_process_alive(CPid1)), + ok. + t_kick_session(Config) -> process_flag(trap_exit, true), ClientId = atom_to_binary(?FUNCTION_NAME),