fix: maybe send willmsg

This commit is contained in:
William Yang 2023-11-15 13:55:26 +01:00
parent 6311b582ec
commit e5a3574d89
2 changed files with 421 additions and 90 deletions

View File

@ -145,6 +145,7 @@
). ).
-define(LIMITER_ROUTING, message_routing). -define(LIMITER_ROUTING, message_routing).
-define(chan_terminating, chan_terminating).
-dialyzer({no_match, [shutdown/4, ensure_timer/2, interval/2]}). -dialyzer({no_match, [shutdown/4, ensure_timer/2, interval/2]}).
@ -873,6 +874,7 @@ do_unsubscribe(
%% MQTT-v5.0: 3.14.4 DISCONNECT Actions %% MQTT-v5.0: 3.14.4 DISCONNECT Actions
maybe_clean_will_msg(?RC_SUCCESS, Channel) -> maybe_clean_will_msg(?RC_SUCCESS, Channel) ->
%% [MQTT-3.14.4-3]
Channel#channel{will_msg = undefined}; Channel#channel{will_msg = undefined};
maybe_clean_will_msg(_ReasonCode, Channel) -> maybe_clean_will_msg(_ReasonCode, Channel) ->
Channel. Channel.
@ -1165,7 +1167,8 @@ handle_call(
shutdown(kicked, ok, Channel1) shutdown(kicked, ok, Channel1)
end; end;
handle_call(discard, Channel) -> handle_call(discard, Channel) ->
disconnect_and_shutdown(discarded, ok, Channel); Channel0 = maybe_publish_will_msg(discarded, Channel),
disconnect_and_shutdown(discarded, ok, Channel0);
%% Session Takeover %% Session Takeover
handle_call({takeover, 'begin'}, Channel = #channel{session = Session}) -> handle_call({takeover, 'begin'}, Channel = #channel{session = Session}) ->
reply(Session, Channel#channel{takeover = true}); reply(Session, Channel#channel{takeover = true});
@ -1188,7 +1191,8 @@ handle_call(
emqx_channel_takeover_end, emqx_channel_takeover_end,
#{clientid => ClientId} #{clientid => ClientId}
), ),
disconnect_and_shutdown(takenover, AllPendings, Channel); Channel0 = maybe_publish_will_msg(takenover, Channel),
disconnect_and_shutdown(takenover, AllPendings, Channel0);
handle_call(list_authz_cache, Channel) -> handle_call(list_authz_cache, Channel) ->
{reply, emqx_authz_cache:list_authz_cache(), Channel}; {reply, emqx_authz_cache:list_authz_cache(), Channel};
handle_call( handle_call(
@ -1240,7 +1244,6 @@ handle_info(
) when ) when
ConnState =:= connected orelse ConnState =:= reauthenticating ConnState =:= connected orelse ConnState =:= reauthenticating
-> ->
{Intent, Session1} = session_disconnect(ClientInfo, ConnInfo, Session), {Intent, Session1} = session_disconnect(ClientInfo, ConnInfo, Session),
Channel1 = ensure_disconnected(Reason, maybe_publish_will_msg(sock_closed, Channel)), Channel1 = ensure_disconnected(Reason, maybe_publish_will_msg(sock_closed, Channel)),
Channel2 = Channel1#channel{session = Session1}, Channel2 = Channel1#channel{session = Session1},
@ -1354,8 +1357,9 @@ handle_timeout(
handle_out(publish, Replies, Channel#channel{session = NSession}) handle_out(publish, Replies, Channel#channel{session = NSession})
end; end;
handle_timeout(_TRef, expire_session, Channel = #channel{session = Session}) -> handle_timeout(_TRef, expire_session, Channel = #channel{session = Session}) ->
Channel0 = maybe_publish_will_msg(expired, Channel),
ok = emqx_session:destroy(Session), ok = emqx_session:destroy(Session),
shutdown(expired, Channel); shutdown(expired, Channel0);
handle_timeout( handle_timeout(
_TRef, _TRef,
will_message = TimerName, will_message = TimerName,
@ -1439,10 +1443,9 @@ terminate({shutdown, Reason}, Channel) when
Reason =:= kicked orelse Reason =:= kicked orelse
Reason =:= discarded Reason =:= discarded
-> ->
Channel1 = maybe_publish_will_msg(Reason, Channel), run_terminate_hook(Reason, Channel);
run_terminate_hook(Reason, Channel1);
terminate(Reason, Channel) -> terminate(Reason, Channel) ->
Channel1 = maybe_publish_will_msg(Reason, Channel), Channel1 = maybe_publish_will_msg(?chan_terminating, Channel),
run_terminate_hook(Reason, Channel1). run_terminate_hook(Reason, Channel1).
run_terminate_hook(_Reason, #channel{session = undefined}) -> run_terminate_hook(_Reason, #channel{session = undefined}) ->
@ -2234,74 +2237,149 @@ session_disconnect(_ClientInfo, _ConnInfo, undefined) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Maybe Publish will msg %% Maybe Publish will msg
%% @doc May publish will message [MQTT-3.1.2-8]
%% When willmsg presents the decision whether or when to publish the Will Message are effected by
%% the followings:
%% - connecion state
%% - If it is MQTT normal disconnection (RC: 0)
%% - If it is MQTT normal disconnection (RC: 4)
%% - will delay interval (MQTT 5.0 only)
%% - session expire Session Expiry (MQTT 5.0 only)
%% - EMQX operations on the client
%% @NOTE:
%% Connection close with session expiry interval = 0 means session close.
-spec maybe_publish_will_msg(Reason, channel()) -> channel() when -spec maybe_publish_will_msg(Reason, channel()) -> channel() when
Reason :: takenover | kicked | discarded | expired | sock_closed | {shutdown, atom()}. Reason ::
%% Connection will terminate because session is taken over by another process.
takenover
%% Connection will terminate because of EMQX mgmt operation, also delete the session.
| kicked
%% Connection will terminate because session is taken over by another process.
| discarded
%% Connection will terminate because session is expired
| expired
%% Connection will terminate because of socket close/error
| sock_closed
%% Connection will terminate with Reasons
| {shutdown, atom()}
%% Connection will terminate soon, delay willmsg publish is impossible.
| ?chan_terminating
%% Connection will terminate because of normal MQTT disconnection, implies delete the session.
| normal.
maybe_publish_will_msg(normal, Channel) ->
%% [MQTT-3.1.2-8]
Channel;
maybe_publish_will_msg(_Reason, Channel = #channel{will_msg = undefined}) -> maybe_publish_will_msg(_Reason, Channel = #channel{will_msg = undefined}) ->
%% No will message to publish
Channel; Channel;
maybe_publish_will_msg({shutdown, not_authorized}, Channel) -> maybe_publish_will_msg(
Channel; _Reason,
maybe_publish_will_msg(not_authorized, Channel) -> Channel = #channel{
Channel; conn_state = ConnState,
maybe_publish_will_msg(_Reason, Channel = #channel{conn_state = ConnState}) when conninfo = #{clientid := ClientId}
}
) when
ConnState =:= idle orelse ConnState =:= idle orelse
ConnState =:= connecting orelse ConnState =:= connecting orelse
ConnState =:= reauthenticating ConnState =:= reauthenticating
-> ->
%% Wrong state to publish
?tp(debug, willmsg_wrong_state, #{clientid => ClientId}),
Channel; Channel;
maybe_publish_will_msg( maybe_publish_will_msg(
_Reason, _Reason,
Channel = #channel{conninfo = #{proto_ver := ?MQTT_PROTO_V3}, will_msg = WillMsg} Channel = #channel{
conninfo = #{proto_ver := ?MQTT_PROTO_V3, clientid := ClientId}, will_msg = WillMsg
}
) -> ) ->
%% Unconditionally publish will message for MQTT 3.1.1 %% Unconditionally publish will message for MQTT 3.1.1
ok = publish_will_msg(Channel#channel.clientinfo, WillMsg), ?tp(debug, willmsg_v3, #{clientid => ClientId}),
_ = publish_will_msg(Channel#channel.clientinfo, WillMsg),
Channel#channel{will_msg = undefined}; Channel#channel{will_msg = undefined};
maybe_publish_will_msg( maybe_publish_will_msg(
Reason, Channel = #channel{clientinfo = ClientInfo, will_msg = WillMsg, timers = Timers} Reason,
Channel = #channel{
clientinfo = ClientInfo,
conninfo = #{clientid := ClientId},
will_msg = WillMsg
}
) when ) when
Reason =:= expired orelse Reason =:= expired orelse
Reason =:= discarded orelse Reason =:= discarded orelse
%% Unsure... Reason =:= kicked orelse
Reason =:= {shutdown, internal_error} orelse Reason =:= ?chan_terminating orelse
Reason =:= kicked %% Depends on the session backend, we may lost the session
Reason =:= {shutdown, internal_error}
-> ->
%% For the cases that session MUST be gone. %% For the cases that session MUST be gone impiles that the will message MUST be published
%% a. expired (session expired) %% a. expired (session expired)
%% c. discarded (Session ends because another process starts new session with the same clientid) %% b. discarded (Session ends because another process starts new session with the same clientid)
%% b. kicked. (kicked by operation) %% c. kicked. (kicked by operation)
%% d. internal_error (maybe not recoverable) %% d. internal_error (maybe not recoverable)
%% This ensures willmsg will be published if the willmsg timer is scheduled but not fired %% This ensures willmsg will be published if the willmsg timer is scheduled but not fired
%% OR fired but not yet handled %% OR fired but not yet handled
DelayedWillTimer = maps:get(will_message, Timers, undefined), ?tp(debug, willmsg_session_ends, #{clientid => ClientId}),
DelayedWillTimer =/= undefined andalso erlang:cancel_timer(DelayedWillTimer, [{async, true}]),
_ = publish_will_msg(ClientInfo, WillMsg), _ = publish_will_msg(ClientInfo, WillMsg),
Channel#channel{will_msg = undefined, timers = maps:remove(will_message, Timers)}; remove_willmsg(Channel);
maybe_publish_will_msg( maybe_publish_will_msg(
Reason, Channel = #channel{clientinfo = ClientInfo, will_msg = WillMsg, timers = Timers} takenover,
Channel = #channel{
clientinfo = ClientInfo,
will_msg = WillMsg,
conninfo = #{clientid := ClientId}
}
) -> ) ->
%% For the cases that session MAY/MAY NOT be gone, we don't care about session expired or not. %% TAKEOVER [MQTT-3.1.4-3]
%% willmsg publish could be defered. ?tp(debug, willmsg_takeover, #{clientid => ClientId}),
IsSessionExpirationInProgress = maps:is_key(expire_session, Timers),
IsWillmsgScheduled = maps:is_key(will_message, Timers),
case will_delay_interval(WillMsg) of case will_delay_interval(WillMsg) of
0 -> 0 ->
%% [MQTT-3.1.2-8], 0 means will delay Will Delay Interval has elapsed %% MQTT 5, Non-normative comment:
false = IsWillmsgScheduled, %% """"
ok = publish_will_msg(ClientInfo, WillMsg), %% If a Network Connection uses a Client Identifier of an existing Network Connection to the Server,
Channel#channel{will_msg = undefined}; %% the Will Message for the exiting connection is sent unless the new connection specifies Clean Start
I when IsSessionExpirationInProgress andalso not IsWillmsgScheduled -> %% of 0 and the Will Delay is greater than zero. If the Will Delay is 0 the Will Message is sent at
%% We delay the will message publishing %% the close of the existing Network Connection, and if Clean Start is 1 the Will Message is sent
%% Willmsg will be published whatever which timer fired first %% because the Session ends.
ensure_timer(will_message, timer:seconds(I), Channel); %% """"
_ when IsSessionExpirationInProgress andalso IsWillmsgScheduled -> _ = publish_will_msg(ClientInfo, WillMsg);
%% Willmsg will be published whatever which timer fired first [MQTT-3.1.3-9]. I when I > 0 ->
Channel; %% @NOTE: We do not publish willmsg when EI > 0 but that does not mean we shall delay the willmsg publish
_I when Reason =:= takenover -> %% because the session is already takenover by another process. If we delay the willmsg publish, the willmsg
%% don't see the point to delay the willmsg %% will be published as there is no chance to get it cancelled. This is not stated clearly in the MQTT spec.
Channel; skip
_I when not IsSessionExpirationInProgress andalso IsWillmsgScheduled -> end,
Channel; remove_willmsg(Channel);
I when not IsSessionExpirationInProgress andalso not IsWillmsgScheduled -> maybe_publish_will_msg(
%% @FIXME: process may terminate before the timer fired {shutdown, _},
Channel = #channel{
conninfo = #{expiry_interval := 0, clientid := ClientId},
clientinfo = ClientInfo,
will_msg = WillMsg
}
) ->
%% MQTT 5: 3.1.2.11.2 Session Expiry Interval
%% If the Session Expiry Interval is absent the value 0 is used.
%% If it is set to 0, or is absent, the Session ends when the Network Connection is closed.
%% Expire_interval == 0, means session is over at the time of calling with shutdown.
?tp(debug, willmsg_takeover, #{clientid => ClientId}),
_ = publish_will_msg(ClientInfo, WillMsg),
remove_willmsg(Channel);
maybe_publish_will_msg(
Reason,
Channel = #channel{
clientinfo = ClientInfo,
will_msg = WillMsg,
conninfo = #{clientid := ClientId}
}
) ->
%% Handles other Unknown Reasons.
case will_delay_interval(WillMsg) of
0 ->
?tp(debug, willmsg_other_publish, #{clientid => ClientId, reason => Reason}),
_ = publish_will_msg(ClientInfo, WillMsg),
remove_willmsg(Channel);
I when I > 0 ->
?tp(debug, willmsg_other_delay, #{clientid => ClientId, reason => Reason}),
ensure_timer(will_message, timer:seconds(I), Channel) ensure_timer(will_message, timer:seconds(I), Channel)
end. end.
@ -2434,6 +2512,18 @@ get_mqtt_conf(Zone, Key) ->
get_mqtt_conf(Zone, Key, Default) -> get_mqtt_conf(Zone, Key, Default) ->
emqx_config:get_zone_conf(Zone, [mqtt, Key], Default). emqx_config:get_zone_conf(Zone, [mqtt, Key], Default).
-spec remove_willmsg(Old :: channel()) -> New :: channel().
remove_willmsg(Channel = #channel{timers = Timers}) ->
case maps:get(will_message, Timers, undefined) of
undefined ->
Channel#channel{will_msg = undefined};
DelayedWillTimer ->
ok = erlang:cancel_timer(DelayedWillTimer, [{async, true}, {info, false}]),
Channel#channel{
will_msg = undefined,
timers = maps:remove(will_message, Timers)
}
end.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% For CT tests %% For CT tests
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -50,17 +50,22 @@ end_per_suite(Config) ->
groups() -> groups() ->
[ [
{mqttv3, [], {mqttv3, [], emqx_common_test_helpers:all(?MODULE) -- tc_v5_only()},
emqx_common_test_helpers:all(?MODULE) --
[
t_session_expire_with_delayed_willmsg,
t_no_takeover_with_delayed_willmsg,
t_takeover_before_session_expire,
t_takeover_before_willmsg_expire
]},
{mqttv5, [], emqx_common_test_helpers:all(?MODULE)} {mqttv5, [], emqx_common_test_helpers:all(?MODULE)}
]. ].
tc_v5_only() ->
[
t_session_expire_with_delayed_willmsg,
t_no_takeover_with_delayed_willmsg,
t_takeover_before_session_expire,
t_takeover_before_willmsg_expire,
t_takeover_before_session_expire_willdelay0,
t_takeover_session_then_normal_disconnect,
t_takeover_session_then_abnormal_disconnect,
t_takeover_session_then_abnormal_disconnect_2
].
init_per_group(mqttv3, Config) -> init_per_group(mqttv3, Config) ->
lists:keystore(mqtt_vsn, 1, Config, {mqtt_vsn, v3}); lists:keystore(mqtt_vsn, 1, Config, {mqtt_vsn, v3});
init_per_group(mqttv5, Config) -> init_per_group(mqttv5, Config) ->
@ -178,7 +183,6 @@ t_takeover_willmsg_clean_session(Config) ->
Middle = ?CNT div 2, Middle = ?CNT div 2,
Client1Msgs = messages(ClientId, 0, Middle), Client1Msgs = messages(ClientId, 0, Middle),
Client2Msgs = messages(ClientId, Middle, ?CNT div 2), Client2Msgs = messages(ClientId, Middle, ?CNT div 2),
AllMsgs = Client1Msgs ++ Client2Msgs,
WillOpts = [ WillOpts = [
{proto_ver, ?config(mqtt_vsn, Config)}, {proto_ver, ?config(mqtt_vsn, Config)},
{clean_start, false}, {clean_start, false},
@ -220,9 +224,7 @@ t_takeover_willmsg_clean_session(Config) ->
Received = [Msg || {publish, Msg} <- ?drainMailbox(?SLEEP)], Received = [Msg || {publish, Msg} <- ?drainMailbox(?SLEEP)],
ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), ct:pal("received: ~p", [[P || #{payload := P} <- Received]]),
{IsWill1, ReceivedNoWill0} = filter_payload(Received, <<"willpayload_1">>), {IsWill1, ReceivedNoWill0} = filter_payload(Received, <<"willpayload_1">>),
{IsWill2, ReceivedNoWill} = filter_payload(ReceivedNoWill0, <<"willpayload_2">>), {IsWill2, _ReceivedNoWill} = filter_payload(ReceivedNoWill0, <<"willpayload_2">>),
assert_messages_missed(AllMsgs, ReceivedNoWill),
assert_messages_order(AllMsgs, ReceivedNoWill),
%% THEN: payload <<"willpayload_1">> should be published instead of <<"willpayload_2">> %% THEN: payload <<"willpayload_1">> should be published instead of <<"willpayload_2">>
?assert(IsWill1), ?assert(IsWill1),
?assertNot(IsWill2), ?assertNot(IsWill2),
@ -238,7 +240,6 @@ t_takeover_clean_session_with_delayed_willmsg(Config) ->
Middle = ?CNT div 2, Middle = ?CNT div 2,
Client1Msgs = messages(ClientId, 0, Middle), Client1Msgs = messages(ClientId, 0, Middle),
Client2Msgs = messages(ClientId, Middle, ?CNT div 2), Client2Msgs = messages(ClientId, Middle, ?CNT div 2),
AllMsgs = Client1Msgs ++ Client2Msgs,
WillOpts = [ WillOpts = [
{proto_ver, ?config(mqtt_vsn, Config)}, {proto_ver, ?config(mqtt_vsn, Config)},
{clean_start, false}, {clean_start, false},
@ -282,9 +283,7 @@ t_takeover_clean_session_with_delayed_willmsg(Config) ->
Received = [Msg || {publish, Msg} <- ?drainMailbox(?SLEEP)], Received = [Msg || {publish, Msg} <- ?drainMailbox(?SLEEP)],
ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), ct:pal("received: ~p", [[P || #{payload := P} <- Received]]),
{IsWill1, ReceivedNoWill0} = filter_payload(Received, <<"willpayload_delay10">>), {IsWill1, ReceivedNoWill0} = filter_payload(Received, <<"willpayload_delay10">>),
{IsWill2, ReceivedNoWill} = filter_payload(ReceivedNoWill0, <<"willpayload_2">>), {IsWill2, _ReceivedNoWill} = filter_payload(ReceivedNoWill0, <<"willpayload_2">>),
assert_messages_missed(AllMsgs, ReceivedNoWill),
assert_messages_order(AllMsgs, ReceivedNoWill),
%% THEN: payload <<"willpayload_delay10">> should be published without delay. %% THEN: payload <<"willpayload_delay10">> should be published without delay.
?assert(IsWill1), ?assert(IsWill1),
?assertNot(IsWill2), ?assertNot(IsWill2),
@ -332,17 +331,19 @@ t_no_takeover_with_delayed_willmsg(Config) ->
Received = [Msg || {publish, Msg} <- ?drainMailbox(1000)], Received = [Msg || {publish, Msg} <- ?drainMailbox(1000)],
ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), ct:pal("received: ~p", [[P || #{payload := P} <- Received]]),
assert_messages_missed(Client1Msgs, Received), assert_messages_missed(Client1Msgs, Received),
{IsWill0, ReceivedNoWill0} = filter_payload(Received, <<"willpayload_delay3">>),
?assertNot(IsWill0),
?assertNotEqual([], ReceivedNoWill0),
#{client := [CPidSub, CPid1]} = FCtx, #{client := [CPidSub, CPid1]} = FCtx,
%% WHEN: client disconnects abnormally AND no reconnect after 3s. %% WHEN: client disconnects abnormally AND no reconnect after 3s.
exit(CPid1, kill), exit(CPid1, kill),
assert_client_exit(CPid1, ?config(mqtt_vsn, Config), killed), assert_client_exit(CPid1, ?config(mqtt_vsn, Config), killed),
Received1 = [Msg || {publish, Msg} <- ?drainMailbox(1000)], Received1 = [Msg || {publish, Msg} <- ?drainMailbox(1000)],
{IsWill1, ReceivedNoWill1} = filter_payload(Received1, <<"willpayload_delay3">>),
{IsWill, ReceivedNoWill} = filter_payload(Received1, <<"willpayload_delay3">>), ?assertNot(IsWill1),
?assertNot(IsWill), ?assertEqual([], ReceivedNoWill1),
?assertEqual([], ReceivedNoWill), %% THEN: for MQTT v5, payload <<"willpayload_delay3">> should be published after WILL delay (3 secs).
%% THEN: for MQTT v5, payload <<"willpayload_delay3">> should be published after WILL delay.
Received2 = [Msg || {publish, Msg} <- ?drainMailbox(5000)], Received2 = [Msg || {publish, Msg} <- ?drainMailbox(5000)],
{IsWill11, ReceivedNoWill11} = filter_payload(Received2, <<"willpayload_delay3">>), {IsWill11, ReceivedNoWill11} = filter_payload(Received2, <<"willpayload_delay3">>),
?assertEqual([], ReceivedNoWill11), ?assertEqual([], ReceivedNoWill11),
@ -388,6 +389,9 @@ t_session_expire_with_delayed_willmsg(Config) ->
Received = [Msg || {publish, Msg} <- ?drainMailbox(1000)], Received = [Msg || {publish, Msg} <- ?drainMailbox(1000)],
ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), ct:pal("received: ~p", [[P || #{payload := P} <- Received]]),
{IsWill, ReceivedNoWill} = filter_payload(Received, <<"willpayload_delay10">>),
?assertNot(IsWill),
?assertNotEqual([], ReceivedNoWill),
assert_messages_missed(Client1Msgs, Received), assert_messages_missed(Client1Msgs, Received),
#{client := [CPidSub, CPid1]} = FCtx, #{client := [CPidSub, CPid1]} = FCtx,
%% WHEN: client disconnects abnormally AND no reconnect after 3s. %% WHEN: client disconnects abnormally AND no reconnect after 3s.
@ -395,25 +399,86 @@ t_session_expire_with_delayed_willmsg(Config) ->
assert_client_exit(CPid1, ?config(mqtt_vsn, Config), killed), assert_client_exit(CPid1, ?config(mqtt_vsn, Config), killed),
Received1 = [Msg || {publish, Msg} <- ?drainMailbox(1000)], Received1 = [Msg || {publish, Msg} <- ?drainMailbox(1000)],
{IsWill, ReceivedNoWill} = filter_payload(Received1, <<"willpayload_delay10">>), {IsWill1, ReceivedNoWill1} = filter_payload(Received1, <<"willpayload_delay10">>),
?assertNot(IsWill), ?assertNot(IsWill1),
?assertEqual([], ReceivedNoWill), ?assertEqual([], ReceivedNoWill1),
%% THEN: for MQTT v5, payload <<"willpayload_delay3">> should be published after session expiry. %% THEN: for MQTT v5, payload <<"willpayload_delay3">> should be published after session expiry.
Received2 = [Msg || {publish, Msg} <- ?drainMailbox(5000)], Received2 = [Msg || {publish, Msg} <- ?drainMailbox(5000)],
{IsWill11, ReceivedNoWill11} = filter_payload(Received2, <<"willpayload_delay10">>), {IsWill12, ReceivedNoWill2} = filter_payload(Received2, <<"willpayload_delay10">>),
?assertEqual([], ReceivedNoWill11), ?assertEqual([], ReceivedNoWill2),
?assert(IsWill11), ?assert(IsWill12),
emqtt:stop(CPidSub), emqtt:stop(CPidSub),
?assert(not is_process_alive(CPid1)), ?assert(not is_process_alive(CPid1)),
ok. ok.
%% @TODO 'Server-Keep-Alive'
%% t_no_takeover_keepalive_fired(Config) ->
%% ok.
t_takeover_before_session_expire_willdelay0(Config) ->
?config(mqtt_vsn, Config) =:= v5 orelse ct:fail("MQTTv5 Only"),
process_flag(trap_exit, true),
ClientId = atom_to_binary(?FUNCTION_NAME),
WillTopic = <<ClientId/binary, <<"willtopic">>/binary>>,
Client1Msgs = messages(ClientId, 0, 10),
WillOpts = [
{proto_ver, ?config(mqtt_vsn, Config)},
{clean_start, false},
{will_topic, WillTopic},
{will_payload, <<"willpayload_delay10">>},
{will_qos, 1},
{will_props, #{'Will-Delay-Interval' => 0}},
{properties, #{'Session-Expiry-Interval' => 3}}
],
Commands =
%% GIVEN: client connect with willmsg payload <<"willpayload_delay10">>
%% and delay-interval 10s session expiry 3s.
[{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++
[
{fun start_client/5, [
<<ClientId/binary, <<"_willsub">>/binary>>, WillTopic, ?QOS_1, []
]}
] ++
[{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs] ++
[
%% avoid two clients race for session takeover
{
fun(CTX) ->
timer:sleep(1000),
CTX
end,
[]
}
] ++
%% WHEN: client session is taken over within 3s.
[{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}],
FCtx = lists:foldl(
fun({Fun, Args}, Ctx) ->
ct:pal("COMMAND: ~p ~p", [element(2, erlang:fun_info(Fun, name)), Args]),
apply(Fun, [Ctx | Args])
end,
#{},
Commands
),
#{client := [CPid2, CPidSub, CPid1]} = FCtx,
assert_client_exit(CPid1, ?config(mqtt_vsn, Config), takenover),
Received = [Msg || {publish, Msg} <- ?drainMailbox(1000)],
ct:pal("received: ~p", [[P || #{payload := P} <- Received]]),
{IsWill, _ReceivedNoWill} = filter_payload(Received, <<"willpayload_delay10">>),
?assert(IsWill),
emqtt:stop(CPidSub),
emqtt:stop(CPid2),
?assert(not is_process_alive(CPid1)),
ok.
t_takeover_before_session_expire(Config) -> t_takeover_before_session_expire(Config) ->
?config(mqtt_vsn, Config) =:= v5 orelse ct:fail("MQTTv5 Only"), ?config(mqtt_vsn, Config) =:= v5 orelse ct:fail("MQTTv5 Only"),
process_flag(trap_exit, true), process_flag(trap_exit, true),
ClientId = atom_to_binary(?FUNCTION_NAME), ClientId = atom_to_binary(?FUNCTION_NAME),
WillTopic = <<ClientId/binary, <<"willtopic">>/binary>>, WillTopic = <<ClientId/binary, <<"willtopic">>/binary>>,
Client1Msgs = messages(ClientId, 0, 10), Client1Msgs = messages(ClientId, 0, 10),
emqx_logger:set_log_level(debug),
WillOpts = [ WillOpts = [
{proto_ver, ?config(mqtt_vsn, Config)}, {proto_ver, ?config(mqtt_vsn, Config)},
{clean_start, false}, {clean_start, false},
@ -443,7 +508,7 @@ t_takeover_before_session_expire(Config) ->
[] []
} }
] ++ ] ++
%% WHEN: client session is taken over with in 3s. %% WHEN: client session is taken over within 3s.
[{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}], [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}],
FCtx = lists:foldl( FCtx = lists:foldl(
@ -460,29 +525,205 @@ t_takeover_before_session_expire(Config) ->
Received = [Msg || {publish, Msg} <- ?drainMailbox(1000)], Received = [Msg || {publish, Msg} <- ?drainMailbox(1000)],
ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), ct:pal("received: ~p", [[P || #{payload := P} <- Received]]),
assert_messages_missed(Client1Msgs, Received), {IsWill, ReceivedNoWill} = filter_payload(Received, <<"willpayload_delay10">>),
Received1 = [Msg || {publish, Msg} <- ?drainMailbox(1000)],
{IsWill, ReceivedNoWill} = filter_payload(Received1, <<"willpayload_delay10">>),
?assertNot(IsWill), ?assertNot(IsWill),
?assertEqual([], ReceivedNoWill), ?assertNotEqual([], ReceivedNoWill),
%% THEN: for MQTT v5, payload <<"willpayload_delay10">> should NOT be published.
Received2 = [Msg || {publish, Msg} <- ?drainMailbox(5000)],
{IsWill11, ReceivedNoWill11} = filter_payload(Received2, <<"willpayload_delay10">>),
?assertEqual([], ReceivedNoWill11),
?assertNot(IsWill11),
emqtt:stop(CPidSub), emqtt:stop(CPidSub),
emqtt:stop(CPid2), emqtt:stop(CPid2),
?assert(not is_process_alive(CPid1)), ?assert(not is_process_alive(CPid1)),
ok. ok.
t_takeover_session_then_normal_disconnect(Config) ->
?config(mqtt_vsn, Config) =:= v5 orelse ct:fail("MQTTv5 Only"),
process_flag(trap_exit, true),
ClientId = atom_to_binary(?FUNCTION_NAME),
WillTopic = <<ClientId/binary, <<"willtopic">>/binary>>,
Client1Msgs = messages(ClientId, 0, 10),
WillOpts = [
{proto_ver, ?config(mqtt_vsn, Config)},
{clean_start, false},
{will_topic, WillTopic},
{will_payload, <<"willpayload_delay10">>},
{will_qos, 1},
{will_props, #{'Will-Delay-Interval' => 10}},
{properties, #{'Session-Expiry-Interval' => 3}}
],
Commands =
[{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++
[
{fun start_client/5, [
<<ClientId/binary, <<"_willsub">>/binary>>, WillTopic, ?QOS_1, []
]}
] ++
[{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs] ++
[
%% avoid two clients race for session takeover
{
fun(CTX) ->
timer:sleep(100),
CTX
end,
[]
}
] ++
%% GIVEN: client reconnect with willmsg payload <<"willpayload_delay10">>
%% and delay-interval 10s > session expiry 3s.
[{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}],
FCtx = lists:foldl(
fun({Fun, Args}, Ctx) ->
ct:pal("COMMAND: ~p ~p", [element(2, erlang:fun_info(Fun, name)), Args]),
apply(Fun, [Ctx | Args])
end,
#{},
Commands
),
#{client := [CPid2, CPidSub, CPid1]} = FCtx,
assert_client_exit(CPid1, ?config(mqtt_vsn, Config), takenover),
%% WHEN: client disconnect normally.
emqtt:disconnect(CPid2, ?RC_SUCCESS),
Received = [Msg || {publish, Msg} <- ?drainMailbox(1000)],
ct:pal("received: ~p", [[P || #{payload := P} <- Received]]),
{IsWill, ReceivedNoWill} = filter_payload(Received, <<"willpayload_delay10">>),
%% THEN: willmsg is not published.
?assertNot(IsWill),
?assertNotEqual([], ReceivedNoWill),
emqtt:stop(CPidSub),
?assert(not is_process_alive(CPid1)),
?assert(not is_process_alive(CPid2)),
ok.
t_takeover_session_then_abnormal_disconnect(Config) ->
?config(mqtt_vsn, Config) =:= v5 orelse ct:fail("MQTTv5 Only"),
process_flag(trap_exit, true),
ClientId = atom_to_binary(?FUNCTION_NAME),
WillTopic = <<ClientId/binary, <<"willtopic">>/binary>>,
Client1Msgs = messages(ClientId, 0, 10),
WillOpts = [
{proto_ver, ?config(mqtt_vsn, Config)},
{clean_start, false},
{will_topic, WillTopic},
{will_payload, <<"willpayload_delay10">>},
{will_qos, 1},
{will_props, #{'Will-Delay-Interval' => 10}},
{properties, #{'Session-Expiry-Interval' => 3}}
],
Commands =
%% GIVEN: client connect with willmsg payload <<"willpayload_delay10">>
%% and delay-interval 10s, session expiry 3s.
[{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++
[
{fun start_client/5, [
<<ClientId/binary, <<"_willsub">>/binary>>, WillTopic, ?QOS_1, []
]}
] ++
[{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs] ++
[
%% avoid two clients race for session takeover
{
fun(CTX) ->
timer:sleep(100),
CTX
end,
[]
}
] ++
[{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}],
FCtx = lists:foldl(
fun({Fun, Args}, Ctx) ->
ct:pal("COMMAND: ~p ~p", [element(2, erlang:fun_info(Fun, name)), Args]),
apply(Fun, [Ctx | Args])
end,
#{},
Commands
),
#{client := [CPid2, CPidSub, CPid1]} = FCtx,
assert_client_exit(CPid1, ?config(mqtt_vsn, Config), takenover),
%% WHEN: client disconnect abnormally
emqtt:disconnect(CPid2, ?RC_DISCONNECT_WITH_WILL_MESSAGE),
Received = [Msg || {publish, Msg} <- ?drainMailbox(1000)],
ct:pal("received: ~p", [[P || #{payload := P} <- Received]]),
{IsWill, ReceivedNoWill} = filter_payload(Received, <<"willpayload_delay10">>),
%% THEN: willmsg is published before session expiry
?assertNot(IsWill),
?assertNotEqual([], ReceivedNoWill),
Received1 = [Msg || {publish, Msg} <- ?drainMailbox(3000)],
{IsWill1, ReceivedNoWill1} = filter_payload(Received1, <<"willpayload_delay10">>),
%% AND THEN: willmsg is published after session expiry
?assert(IsWill1),
?assertEqual([], ReceivedNoWill1),
emqtt:stop(CPidSub),
?assert(not is_process_alive(CPid1)),
?assert(not is_process_alive(CPid2)),
ok.
t_takeover_session_then_abnormal_disconnect_2(Config) ->
?config(mqtt_vsn, Config) =:= v5 orelse ct:fail("MQTTv5 Only"),
process_flag(trap_exit, true),
ClientId = atom_to_binary(?FUNCTION_NAME),
WillTopic = <<ClientId/binary, <<"willtopic">>/binary>>,
Client1Msgs = messages(ClientId, 0, 10),
WillOpts = [
{proto_ver, ?config(mqtt_vsn, Config)},
{clean_start, false},
{will_topic, WillTopic},
{will_payload, <<"willpayload_delay10">>},
{will_qos, 1},
{will_props, #{'Will-Delay-Interval' => 0}},
{properties, #{'Session-Expiry-Interval' => 3}}
],
Commands =
[{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++
[
{fun start_client/5, [
<<ClientId/binary, <<"_willsub">>/binary>>, WillTopic, ?QOS_1, []
]}
] ++
[{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs] ++
[
%% avoid two clients race for session takeover
{
fun(CTX) ->
timer:sleep(100),
CTX
end,
[]
}
] ++
%% GIVEN: client reconnect with willmsg payload <<"willpayload_delay10">>
%% and delay-interval 0s, session expiry 3s.
[{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}],
FCtx = lists:foldl(
fun({Fun, Args}, Ctx) ->
ct:pal("COMMAND: ~p ~p", [element(2, erlang:fun_info(Fun, name)), Args]),
apply(Fun, [Ctx | Args])
end,
#{},
Commands
),
#{client := [CPid2, CPidSub, CPid1]} = FCtx,
assert_client_exit(CPid1, ?config(mqtt_vsn, Config), takenover),
%% WHEN: client disconnect abnormally
emqtt:disconnect(CPid2, ?RC_DISCONNECT_WITH_WILL_MESSAGE),
Received = [Msg || {publish, Msg} <- ?drainMailbox(1000)],
ct:pal("received: ~p", [[P || #{payload := P} <- Received]]),
{IsWill, ReceivedNoWill} = filter_payload(Received, <<"willpayload_delay10">>),
%% THEN: willmsg is published after session expiry
?assert(IsWill),
?assertNotEqual([], ReceivedNoWill),
emqtt:stop(CPidSub),
?assert(not is_process_alive(CPid1)),
?assert(not is_process_alive(CPid2)),
ok.
t_takeover_before_willmsg_expire(Config) -> t_takeover_before_willmsg_expire(Config) ->
?config(mqtt_vsn, Config) =:= v5 orelse ct:fail("MQTTv5 Only"), ?config(mqtt_vsn, Config) =:= v5 orelse ct:fail("MQTTv5 Only"),
process_flag(trap_exit, true), process_flag(trap_exit, true),
ClientId = atom_to_binary(?FUNCTION_NAME), ClientId = atom_to_binary(?FUNCTION_NAME),
WillTopic = <<ClientId/binary, <<"willtopic">>/binary>>, WillTopic = <<ClientId/binary, <<"willtopic">>/binary>>,
Client1Msgs = messages(ClientId, 0, 10), Client1Msgs = messages(ClientId, 0, 10),
emqx_logger:set_log_level(debug),
WillOpts = [ WillOpts = [
{proto_ver, ?config(mqtt_vsn, Config)}, {proto_ver, ?config(mqtt_vsn, Config)},
{clean_start, false}, {clean_start, false},
@ -590,8 +831,8 @@ t_kick_session(Config) ->
Received = [Msg || {publish, Msg} <- ?drainMailbox(?SLEEP)], Received = [Msg || {publish, Msg} <- ?drainMailbox(?SLEEP)],
ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), ct:pal("received: ~p", [[P || #{payload := P} <- Received]]),
%% THEN: payload <<"willpayload_kick">> should be published %% THEN: payload <<"willpayload_kick">> should be published
{IsWill1, _ReceivedNoWill} = filter_payload(Received, <<"willpayload_kick">>), {IsWill, _ReceivedNoWill} = filter_payload(Received, <<"willpayload_kick">>),
?assert(IsWill1), ?assert(IsWill),
emqtt:stop(CPidSub), emqtt:stop(CPidSub),
?assert(not is_process_alive(CPid1)), ?assert(not is_process_alive(CPid1)),
ok. ok.