Merge pull request #11868 from qzhuyan/fix/william/takeover-pub-willmsg

fix: willmsg not published in takeover
This commit is contained in:
William Yang 2024-02-29 13:14:18 +01:00 committed by GitHub
commit b82973b36b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 1004 additions and 51 deletions

View File

@ -117,7 +117,17 @@
atom() => term() atom() => term()
}. }.
-type conn_state() :: idle | connecting | connected | reauthenticating | disconnected. %% init
-type conn_state() ::
idle
%% mqtt connect recved but not acked
| connecting
%% mqtt connect acked
| connected
%% mqtt connected but reauthenticating
| reauthenticating
%% keepalive timeout or connection terminated
| disconnected.
-type reply() :: -type reply() ::
{outgoing, emqx_types:packet()} {outgoing, emqx_types:packet()}
@ -135,6 +145,7 @@
). ).
-define(LIMITER_ROUTING, message_routing). -define(LIMITER_ROUTING, message_routing).
-define(chan_terminating, chan_terminating).
-dialyzer({no_match, [shutdown/4, ensure_timer/2, interval/2]}). -dialyzer({no_match, [shutdown/4, ensure_timer/2, interval/2]}).
@ -863,6 +874,7 @@ do_unsubscribe(
%% MQTT-v5.0: 3.14.4 DISCONNECT Actions %% MQTT-v5.0: 3.14.4 DISCONNECT Actions
maybe_clean_will_msg(?RC_SUCCESS, Channel) -> maybe_clean_will_msg(?RC_SUCCESS, Channel) ->
%% [MQTT-3.14.4-3]
Channel#channel{will_msg = undefined}; Channel#channel{will_msg = undefined};
maybe_clean_will_msg(_ReasonCode, Channel) -> maybe_clean_will_msg(_ReasonCode, Channel) ->
Channel. Channel.
@ -1134,16 +1146,14 @@ handle_call(
kick, kick,
Channel = #channel{ Channel = #channel{
conn_state = ConnState, conn_state = ConnState,
will_msg = WillMsg,
clientinfo = ClientInfo,
conninfo = #{proto_ver := ProtoVer} conninfo = #{proto_ver := ProtoVer}
} }
) -> ) ->
(WillMsg =/= undefined) andalso publish_will_msg(ClientInfo, WillMsg), Channel0 = maybe_publish_will_msg(kicked, Channel),
Channel1 = Channel1 =
case ConnState of case ConnState of
connected -> ensure_disconnected(kicked, Channel); connected -> ensure_disconnected(kicked, Channel0);
_ -> Channel _ -> Channel0
end, end,
case ProtoVer == ?MQTT_PROTO_V5 andalso ConnState == connected of case ProtoVer == ?MQTT_PROTO_V5 andalso ConnState == connected of
true -> true ->
@ -1157,7 +1167,8 @@ handle_call(
shutdown(kicked, ok, Channel1) shutdown(kicked, ok, Channel1)
end; end;
handle_call(discard, Channel) -> handle_call(discard, Channel) ->
disconnect_and_shutdown(discarded, ok, Channel); Channel0 = maybe_publish_will_msg(discarded, Channel),
disconnect_and_shutdown(discarded, ok, Channel0);
%% Session Takeover %% Session Takeover
handle_call({takeover, 'begin'}, Channel = #channel{session = Session}) -> handle_call({takeover, 'begin'}, Channel = #channel{session = Session}) ->
reply(Session, Channel#channel{takeover = true}); reply(Session, Channel#channel{takeover = true});
@ -1180,7 +1191,8 @@ handle_call(
emqx_channel_takeover_end, emqx_channel_takeover_end,
#{clientid => ClientId} #{clientid => ClientId}
), ),
disconnect_and_shutdown(takenover, AllPendings, Channel); Channel0 = maybe_publish_will_msg(takenover, Channel),
disconnect_and_shutdown(takenover, AllPendings, Channel0);
handle_call(list_authz_cache, Channel) -> handle_call(list_authz_cache, Channel) ->
{reply, emqx_authz_cache:list_authz_cache(), Channel}; {reply, emqx_authz_cache:list_authz_cache(), Channel};
handle_call( handle_call(
@ -1233,7 +1245,7 @@ handle_info(
ConnState =:= connected orelse ConnState =:= reauthenticating ConnState =:= connected orelse ConnState =:= reauthenticating
-> ->
{Intent, Session1} = session_disconnect(ClientInfo, ConnInfo, Session), {Intent, Session1} = session_disconnect(ClientInfo, ConnInfo, Session),
Channel1 = ensure_disconnected(Reason, maybe_publish_will_msg(Channel)), Channel1 = ensure_disconnected(Reason, maybe_publish_will_msg(sock_closed, Channel)),
Channel2 = Channel1#channel{session = Session1}, Channel2 = Channel1#channel{session = Session1},
case maybe_shutdown(Reason, Intent, Channel2) of case maybe_shutdown(Reason, Intent, Channel2) of
{ok, Channel3} -> {ok, ?REPLY_EVENT(disconnected), Channel3}; {ok, Channel3} -> {ok, ?REPLY_EVENT(disconnected), Channel3};
@ -1345,8 +1357,9 @@ handle_timeout(
handle_out(publish, Replies, Channel#channel{session = NSession}) handle_out(publish, Replies, Channel#channel{session = NSession})
end; end;
handle_timeout(_TRef, expire_session, Channel = #channel{session = Session}) -> handle_timeout(_TRef, expire_session, Channel = #channel{session = Session}) ->
Channel0 = maybe_publish_will_msg(expired, Channel),
ok = emqx_session:destroy(Session), ok = emqx_session:destroy(Session),
shutdown(expired, Channel); shutdown(expired, Channel0);
handle_timeout( handle_timeout(
_TRef, _TRef,
will_message = TimerName, will_message = TimerName,
@ -1424,19 +1437,16 @@ terminate(_, #channel{conn_state = idle} = _Channel) ->
ok; ok;
terminate(normal, Channel) -> terminate(normal, Channel) ->
run_terminate_hook(normal, Channel); run_terminate_hook(normal, Channel);
terminate({shutdown, kicked}, Channel) ->
run_terminate_hook(kicked, Channel);
terminate({shutdown, Reason}, Channel) when terminate({shutdown, Reason}, Channel) when
Reason =:= discarded; Reason =:= expired orelse
Reason =:= takenover Reason =:= takenover orelse
Reason =:= kicked orelse
Reason =:= discarded
-> ->
run_terminate_hook(Reason, Channel); run_terminate_hook(Reason, Channel);
terminate(Reason, Channel = #channel{clientinfo = ClientInfo, will_msg = WillMsg}) -> terminate(Reason, Channel) ->
%% since will_msg is set to undefined as soon as it is published, Channel1 = maybe_publish_will_msg(?chan_terminating, Channel),
%% if will_msg still exists when the session is terminated, it run_terminate_hook(Reason, Channel1).
%% must be published immediately.
WillMsg =/= undefined andalso publish_will_msg(ClientInfo, WillMsg),
run_terminate_hook(Reason, Channel).
run_terminate_hook(_Reason, #channel{session = undefined}) -> run_terminate_hook(_Reason, #channel{session = undefined}) ->
ok; ok;
@ -2227,15 +2237,139 @@ session_disconnect(_ClientInfo, _ConnInfo, undefined) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Maybe Publish will msg %% Maybe Publish will msg
%% @doc Maybe publish will message [MQTT-3.1.2-8]
maybe_publish_will_msg(Channel = #channel{will_msg = undefined}) -> %% When willmsg presents the decision whether or when to publish the Will Message are effected by
%% the followings:
%% - connecion state
%% - If it is MQTT normal disconnection (RC: 0) or abnormal (RC != 0) from the *client*
%% - will delay interval (MQTT 5.0 only)
%% - session expire Session Expiry (MQTT 5.0 only)
%% - EMQX operations on the client
%% @NOTE:
%% Connection close with session expiry interval = 0 means session close.
%% @NOTE:
%% The caller does not need to take care of the case when process terminates while will_msg is published
%% as it is designed by the spec.
%% @NOTE:
%% this function should be safe to be called multiple times in the life time of the connecion process, the willmsg
%% must be delete from the state if it is published or cleared.
-spec maybe_publish_will_msg(Reason, channel()) -> channel() when
Reason ::
%% Connection is terminating because session is taken over by another process.
takenover
%% Connection is terminating because of EMQX mgmt operation, the session state is deleted with none-zero RC code
| kicked
%% Connection is terminating because of client clean start new session.
| discarded
%% Connection is terminating because session is expired
| expired
%% Connection is terminating because of socket close/error
| sock_closed
%% Session is terminating, delay willmsg publish is impossible.
| ?chan_terminating.
maybe_publish_will_msg(_Reason, Channel = #channel{will_msg = undefined}) ->
%% No will message to publish
Channel; Channel;
maybe_publish_will_msg(Channel = #channel{clientinfo = ClientInfo, will_msg = WillMsg}) -> maybe_publish_will_msg(
_Reason,
Channel = #channel{
conn_state = ConnState,
conninfo = #{clientid := ClientId}
}
) when
ConnState =:= idle orelse
ConnState =:= connecting orelse
ConnState =:= reauthenticating
->
%% Wrong state to publish, they are intermediate state
?tp(debug, willmsg_wrong_state, #{clientid => ClientId}),
Channel;
maybe_publish_will_msg(
_Reason,
Channel = #channel{
conninfo = #{proto_ver := ?MQTT_PROTO_V3, clientid := ClientId}, will_msg = WillMsg
}
) ->
%% Unconditionally publish will message for MQTT 3.1.1
?tp(debug, maybe_publish_willmsg_v3, #{clientid => ClientId}),
_ = publish_will_msg(Channel#channel.clientinfo, WillMsg),
Channel#channel{will_msg = undefined};
maybe_publish_will_msg(
Reason,
Channel = #channel{
clientinfo = ClientInfo,
conninfo = #{clientid := ClientId},
will_msg = WillMsg
}
) when
Reason =:= expired orelse
Reason =:= discarded orelse
Reason =:= kicked orelse
Reason =:= ?chan_terminating orelse
%% Depends on the session backend, we may lost the session
Reason =:= {shutdown, internal_error}
->
%% For the cases that session MUST be gone impiles that the will message MUST be published
%% a. expired (session expired)
%% b. discarded (Session ends because of clean start)
%% c. kicked. (kicked by operation, abnormal conn close)
%% d. internal_error (maybe not recoverable)
%% This ensures willmsg will be published if the willmsg timer is scheduled but not fired
%% OR fired but not yet handled
?tp(debug, maybe_publish_willmsg_session_ends, #{clientid => ClientId, reason => Reason}),
_ = publish_will_msg(ClientInfo, WillMsg),
remove_willmsg(Channel);
maybe_publish_will_msg(
takenover,
Channel = #channel{
clientinfo = ClientInfo,
will_msg = WillMsg,
conninfo = #{clientid := ClientId}
}
) ->
%% TAKEOVER [MQTT-3.1.4-3]
%% MQTT 5, Non-normative comment:
%% """"
%% If a Network Connection uses a Client Identifier of an existing Network Connection to the Server,
%% the Will Message for the exiting connection is sent unless the new connection specifies Clean Start
%% of 0 and the Will Delay is greater than zero. If the Will Delay is 0 the Will Message is sent at
%% the close of the existing Network Connection, and if Clean Start is 1 the Will Message is sent
%% because the Session ends.
%% """"
%% NOTE, above clean start=1 is `discard' scenarios not `takeover' scenario.
case will_delay_interval(WillMsg) of case will_delay_interval(WillMsg) of
0 -> 0 ->
ok = publish_will_msg(ClientInfo, WillMsg), ?tp(debug, maybe_publish_willmsg_takenover_pub, #{clientid => ClientId}),
Channel#channel{will_msg = undefined}; _ = publish_will_msg(ClientInfo, WillMsg);
I -> I when I > 0 ->
%% @NOTE Non-normative comment in MQTT 5.0 spec
%% """
%% One use of this is to avoid publishing Will Messages if there is a temporary network
%% disconnection and the Client succeeds in reconnecting and continuing its Session
%% before the Will Message is published.
%% """
?tp(debug, maybe_publish_willmsg_takenover_skip, #{clientid => ClientId}),
skip
end,
remove_willmsg(Channel);
maybe_publish_will_msg(
Reason,
Channel = #channel{
clientinfo = ClientInfo,
will_msg = WillMsg,
conninfo = #{clientid := ClientId}
}
) ->
%% Default to handle other reasons
case will_delay_interval(WillMsg) of
0 ->
?tp(debug, maybe_publish_will_msg_other_publish, #{
clientid => ClientId, reason => Reason
}),
_ = publish_will_msg(ClientInfo, WillMsg),
remove_willmsg(Channel);
I when I > 0 ->
?tp(debug, maybe_publish_will_msg_other_delay, #{clientid => ClientId, reason => Reason}),
ensure_timer(will_message, timer:seconds(I), Channel) ensure_timer(will_message, timer:seconds(I), Channel)
end. end.
@ -2368,6 +2502,19 @@ get_mqtt_conf(Zone, Key) ->
get_mqtt_conf(Zone, Key, Default) -> get_mqtt_conf(Zone, Key, Default) ->
emqx_config:get_zone_conf(Zone, [mqtt, Key], Default). emqx_config:get_zone_conf(Zone, [mqtt, Key], Default).
%% @doc unset will_msg and cancel the will_message timer
-spec remove_willmsg(Old :: channel()) -> New :: channel().
remove_willmsg(Channel = #channel{timers = Timers}) ->
case maps:get(will_message, Timers, undefined) of
undefined ->
Channel#channel{will_msg = undefined};
DelayedWillTimer ->
ok = erlang:cancel_timer(DelayedWillTimer, [{async, true}, {info, false}]),
Channel#channel{
will_msg = undefined,
timers = maps:remove(will_message, Timers)
}
end.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% For CT tests %% For CT tests
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -728,7 +728,8 @@ handle_call(_From, Req, State = #state{channel = Channel}) ->
{shutdown, Reason, Reply, OutPacket, NChannel} -> {shutdown, Reason, Reply, OutPacket, NChannel} ->
NState = State#state{channel = NChannel}, NState = State#state{channel = NChannel},
ok = handle_outgoing(OutPacket, NState), ok = handle_outgoing(OutPacket, NState),
shutdown(Reason, Reply, NState) NState2 = graceful_shutdown_transport(Reason, NState),
shutdown(Reason, Reply, NState2)
end. end.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -1234,6 +1235,12 @@ set_tcp_keepalive({Type, Id}) ->
async_set_keepalive(Idle, Interval, Probes) async_set_keepalive(Idle, Interval, Probes)
end. end.
-spec graceful_shutdown_transport(atom(), state()) -> state().
graceful_shutdown_transport(_Reason, S = #state{transport = Transport, socket = Socket}) ->
%% @TODO Reason is reserved for future use, quic transport
Transport:shutdown(Socket, read_write),
S#state{sockstate = closed}.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% For CT tests %% For CT tests
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -32,6 +32,7 @@
wait/1, wait/1,
getstat/2, getstat/2,
fast_close/1, fast_close/1,
shutdown/2,
ensure_ok_or_exit/2, ensure_ok_or_exit/2,
async_send/3, async_send/3,
setopts/2, setopts/2,
@ -147,6 +148,10 @@ fast_close({quic, _Conn, Stream, _Info}) ->
% quicer:async_shutdown_connection(Conn, ?QUIC_CONNECTION_SHUTDOWN_FLAG_NONE, 0), % quicer:async_shutdown_connection(Conn, ?QUIC_CONNECTION_SHUTDOWN_FLAG_NONE, 0),
ok. ok.
shutdown({quic, _Conn, Stream, _Info}, read_write) ->
%% A graceful shutdown means both side shutdown the read and write gracefully.
quicer:shutdown_stream(Stream, ?QUIC_STREAM_SHUTDOWN_FLAG_GRACEFUL, 1, 5000).
-spec ensure_ok_or_exit(atom(), list(term())) -> term(). -spec ensure_ok_or_exit(atom(), list(term())) -> term().
ensure_ok_or_exit(Fun, Args = [Sock | _]) when is_atom(Fun), is_list(Args) -> ensure_ok_or_exit(Fun, Args = [Sock | _]) when is_atom(Fun), is_list(Args) ->
case erlang:apply(?MODULE, Fun, Args) of case erlang:apply(?MODULE, Fun, Args) of

View File

@ -32,6 +32,7 @@ all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) -> init_per_suite(Config) ->
%% Meck Transport %% Meck Transport
ok = meck:new(emqx_transport, [non_strict, passthrough, no_history, no_link]), ok = meck:new(emqx_transport, [non_strict, passthrough, no_history, no_link]),
ok = meck:expect(emqx_transport, shutdown, fun(_, _) -> ok end),
%% Meck Channel %% Meck Channel
ok = meck:new(emqx_channel, [passthrough, no_history, no_link]), ok = meck:new(emqx_channel, [passthrough, no_history, no_link]),
%% Meck Cm %% Meck Cm

View File

@ -908,6 +908,8 @@ t_session_takeover(Config) when is_list(Config) ->
?assertMatch([_], emqx:publish(Message3)), ?assertMatch([_], emqx:publish(Message3)),
?assertMatch([_], emqx:publish(Message4)), ?assertMatch([_], emqx:publish(Message4)),
{true, _} = last_message(<<"hello2">>, [ConnPid2]), {true, _} = last_message(<<"hello2">>, [ConnPid2]),
%% We may or may not recv dup hello2 due to QoS1 redelivery
_ = last_message(<<"hello2">>, [ConnPid2]),
{true, _} = last_message(<<"hello3">>, [ConnPid2]), {true, _} = last_message(<<"hello3">>, [ConnPid2]),
{true, _} = last_message(<<"hello4">>, [ConnPid2]), {true, _} = last_message(<<"hello4">>, [ConnPid2]),
?assertEqual([], collect_msgs(timer:seconds(2))), ?assertEqual([], collect_msgs(timer:seconds(2))),
@ -951,6 +953,8 @@ t_session_kicked(Config) when is_list(Config) ->
%% on if it's picked as the first one for round_robin %% on if it's picked as the first one for round_robin
MsgRec1 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P1}}, P1), MsgRec1 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P1}}, P1),
MsgRec2 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P2}}, P2), MsgRec2 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P2}}, P2),
ct:pal("MsgRec1: ~p MsgRec2 ~p ~n", [MsgRec1, MsgRec2]),
case MsgRec2 of case MsgRec2 of
<<"hello3">> -> <<"hello3">> ->
?assertEqual(<<"hello1">>, MsgRec1); ?assertEqual(<<"hello1">>, MsgRec1);

View File

@ -24,47 +24,83 @@
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
-define(TOPIC, <<"t">>).
-define(CNT, 100). -define(CNT, 100).
-define(SLEEP, 10). -define(SLEEP, 10).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Initial funcs %% Initial funcs
all() -> emqx_common_test_helpers:all(?MODULE). all() ->
[
{group, mqttv3},
{group, mqttv5}
].
init_per_suite(Config) -> init_per_suite(Config) ->
Apps = emqx_cth_suite:start( Apps = emqx_cth_suite:start(
[emqx], [emqx],
#{work_dir => emqx_cth_suite:work_dir(Config)} #{work_dir => emqx_cth_suite:work_dir(Config)}
), ),
emqx_logger:set_log_level(debug),
[{apps, Apps} | Config]. [{apps, Apps} | Config].
end_per_suite(Config) -> end_per_suite(Config) ->
Apps = ?config(apps, Config), Apps = ?config(apps, Config),
ok = emqx_cth_suite:stop(Apps), ok = emqx_cth_suite:stop(Apps),
ok. ok.
groups() ->
[
{mqttv3, [], emqx_common_test_helpers:all(?MODULE) -- tc_v5_only()},
{mqttv5, [], emqx_common_test_helpers:all(?MODULE)}
].
tc_v5_only() ->
[
t_session_expire_with_delayed_willmsg,
t_no_takeover_with_delayed_willmsg,
t_takeover_before_session_expire,
t_takeover_before_willmsg_expire,
t_takeover_before_session_expire_willdelay0,
t_takeover_session_then_normal_disconnect,
t_takeover_session_then_abnormal_disconnect,
t_takeover_session_then_abnormal_disconnect_2
].
init_per_group(mqttv3, Config) ->
lists:keystore(mqtt_vsn, 1, Config, {mqtt_vsn, v3});
init_per_group(mqttv5, Config) ->
lists:keystore(mqtt_vsn, 1, Config, {mqtt_vsn, v5}).
end_per_group(_Group, _Config) ->
ok.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Testcases %% Testcases
t_takeover(_) -> t_takeover(Config) ->
process_flag(trap_exit, true), process_flag(trap_exit, true),
ClientId = <<"clientid">>, ClientId = atom_to_binary(?FUNCTION_NAME),
ClientOpts = [
{proto_ver, ?config(mqtt_vsn, Config)},
{clean_start, false}
],
Middle = ?CNT div 2, Middle = ?CNT div 2,
Client1Msgs = messages(0, Middle), Client1Msgs = messages(ClientId, 0, Middle),
Client2Msgs = messages(Middle, ?CNT div 2), Client2Msgs = messages(ClientId, Middle, ?CNT div 2),
AllMsgs = Client1Msgs ++ Client2Msgs, AllMsgs = Client1Msgs ++ Client2Msgs,
meck:new(emqx_cm, [non_strict, passthrough]), meck:new(emqx_cm, [non_strict, passthrough]),
meck:expect(emqx_cm, takeover_session_end, fun(Arg) -> meck:expect(emqx_cm, takeover_session_end, fun(Arg) ->
%% trigger more complex takeover conditions during 2-phase takeover protocol:
%% when messages are accumulated in 2 processes simultaneously,
%% and need to be properly ordered / deduplicated after the protocol commences.
ok = timer:sleep(?SLEEP * 2), ok = timer:sleep(?SLEEP * 2),
meck:passthrough([Arg]) meck:passthrough([Arg])
end), end),
Commands = Commands =
[{fun start_client/4, [ClientId, <<"t">>, ?QOS_1]}] ++ [{fun start_client/5, [ClientId, ClientId, ?QOS_1, ClientOpts]}] ++
[{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs] ++ [{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs] ++
[{fun start_client/4, [ClientId, <<"t">>, ?QOS_1]}] ++ [{fun start_client/5, [ClientId, ClientId, ?QOS_1, ClientOpts]}] ++
[{fun publish_msg/2, [Msg]} || Msg <- Client2Msgs] ++ [{fun publish_msg/2, [Msg]} || Msg <- Client2Msgs] ++
[{fun stop_client/1, []}], [{fun stop_client/1, []}],
@ -78,30 +114,751 @@ t_takeover(_) ->
), ),
#{client := [CPid2, CPid1]} = FCtx, #{client := [CPid2, CPid1]} = FCtx,
?assertReceive({'EXIT', CPid1, {disconnected, ?RC_SESSION_TAKEN_OVER, _}}),
?assertReceive({'EXIT', CPid2, normal}),
assert_client_exit(CPid1, ?config(mqtt_vsn, Config), takenover),
?assertReceive({'EXIT', CPid2, normal}),
Received = [Msg || {publish, Msg} <- ?drainMailbox(?SLEEP)], Received = [Msg || {publish, Msg} <- ?drainMailbox(?SLEEP)],
ct:pal("middle: ~p", [Middle]), ct:pal("middle: ~p", [Middle]),
ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), ct:pal("received: ~p", [[P || #{payload := P} <- Received]]),
assert_messages_missed(AllMsgs, Received), assert_messages_missed(AllMsgs, Received),
assert_messages_order(AllMsgs, Received), assert_messages_order(AllMsgs, Received),
meck:unload(emqx_cm), meck:unload(emqx_cm),
ok. ok.
t_takover_in_cluster(_) -> t_takeover_willmsg(Config) ->
todo. process_flag(trap_exit, true),
ClientId = atom_to_binary(?FUNCTION_NAME),
WillTopic = <<ClientId/binary, <<"willtopic">>/binary>>,
Middle = ?CNT div 2,
Client1Msgs = messages(ClientId, 0, Middle),
Client2Msgs = messages(ClientId, Middle, ?CNT div 2),
AllMsgs = Client1Msgs ++ Client2Msgs,
WillOpts = [
{proto_ver, ?config(mqtt_vsn, Config)},
{clean_start, false},
{will_topic, WillTopic},
{will_payload, <<"willpayload">>},
{will_qos, 0}
],
Commands =
%% GIVEN client connect with will message
[{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++
[
{fun start_client/5, [
<<ClientId/binary, <<"_willsub">>/binary>>, WillTopic, ?QOS_1, []
]}
] ++
[{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs] ++
%% WHEN client reconnect with clean_start = false
[{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++
[{fun publish_msg/2, [Msg]} || Msg <- Client2Msgs],
FCtx = lists:foldl(
fun({Fun, Args}, Ctx) ->
ct:pal("COMMAND: ~p ~p", [element(2, erlang:fun_info(Fun, name)), Args]),
apply(Fun, [Ctx | Args])
end,
#{},
Commands
),
#{client := [CPid2, CPidSub, CPid1]} = FCtx,
assert_client_exit(CPid1, ?config(mqtt_vsn, Config), takenover),
Received = [Msg || {publish, Msg} <- ?drainMailbox(?SLEEP)],
ct:pal("received: ~p", [[P || #{payload := P} <- Received]]),
{IsWill, ReceivedNoWill} = filter_payload(Received, <<"willpayload">>),
assert_messages_missed(AllMsgs, ReceivedNoWill),
assert_messages_order(AllMsgs, ReceivedNoWill),
%% THEN will message should be received
?assert(IsWill),
emqtt:stop(CPidSub),
emqtt:stop(CPid2),
?assertReceive({'EXIT', CPid2, normal}),
?assert(not is_process_alive(CPid1)),
ok.
t_takeover_willmsg_clean_session(Config) ->
process_flag(trap_exit, true),
ClientId = atom_to_binary(?FUNCTION_NAME),
WillTopic = <<ClientId/binary, <<"willtopic">>/binary>>,
Middle = ?CNT div 2,
Client1Msgs = messages(ClientId, 0, Middle),
Client2Msgs = messages(ClientId, Middle, ?CNT div 2),
WillOpts = [
{proto_ver, ?config(mqtt_vsn, Config)},
{clean_start, false},
{will_topic, WillTopic},
{will_payload, <<"willpayload_1">>},
{will_qos, 1}
],
WillOptsClean = [
{proto_ver, ?config(mqtt_vsn, Config)},
{clean_start, true},
{will_topic, WillTopic},
{will_payload, <<"willpayload_2">>},
{will_qos, 1}
],
Commands =
%% GIVEN: client connect with willmsg payload <<"willpayload_1">>
[{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++
[
{fun start_client/5, [
<<ClientId/binary, <<"_willsub">>/binary>>, WillTopic, ?QOS_1, []
]}
] ++
[{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs] ++
%% WHEN: client connects with clean_start=true and willmsg payload <<"willpayload_2">>
[{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOptsClean]}] ++
[{fun publish_msg/2, [Msg]} || Msg <- Client2Msgs],
FCtx = lists:foldl(
fun({Fun, Args}, Ctx) ->
ct:pal("COMMAND: ~p ~p", [element(2, erlang:fun_info(Fun, name)), Args]),
apply(Fun, [Ctx | Args])
end,
#{},
Commands
),
#{client := [CPid2, CPidSub, CPid1]} = FCtx,
assert_client_exit(CPid1, ?config(mqtt_vsn, Config), takenover),
Received = [Msg || {publish, Msg} <- ?drainMailbox(?SLEEP)],
ct:pal("received: ~p", [[P || #{payload := P} <- Received]]),
{IsWill1, ReceivedNoWill0} = filter_payload(Received, <<"willpayload_1">>),
{IsWill2, _ReceivedNoWill} = filter_payload(ReceivedNoWill0, <<"willpayload_2">>),
%% THEN: payload <<"willpayload_1">> should be published instead of <<"willpayload_2">>
?assert(IsWill1),
?assertNot(IsWill2),
emqtt:stop(CPid2),
emqtt:stop(CPidSub),
?assert(not is_process_alive(CPid1)),
ok.
t_takeover_clean_session_with_delayed_willmsg(Config) ->
process_flag(trap_exit, true),
ClientId = atom_to_binary(?FUNCTION_NAME),
WillTopic = <<ClientId/binary, <<"willtopic">>/binary>>,
Middle = ?CNT div 2,
Client1Msgs = messages(ClientId, 0, Middle),
Client2Msgs = messages(ClientId, Middle, ?CNT div 2),
WillOpts = [
{proto_ver, ?config(mqtt_vsn, Config)},
{clean_start, false},
{will_topic, WillTopic},
{will_payload, <<"willpayload_delay10">>},
{will_qos, 1},
%% mqttv5 only
{will_props, #{'Will-Delay-Interval' => 10}}
],
WillOptsClean = [
{proto_ver, ?config(mqtt_vsn, Config)},
{clean_start, true},
{will_topic, WillTopic},
{will_payload, <<"willpayload_2">>},
{will_qos, 1}
],
Commands =
%% GIVEN: client connect with willmsg payload <<"willpayload_delay10">> and delay-interval 10s
[{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++
[
{fun start_client/5, [
<<ClientId/binary, <<"_willsub">>/binary>>, WillTopic, ?QOS_1, []
]}
] ++
[{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs] ++
%% WHEN: client connects with clean_start=true and willmsg payload <<"willpayload_2">>
[{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOptsClean]}] ++
[{fun publish_msg/2, [Msg]} || Msg <- Client2Msgs],
FCtx = lists:foldl(
fun({Fun, Args}, Ctx) ->
ct:pal("COMMAND: ~p ~p", [element(2, erlang:fun_info(Fun, name)), Args]),
apply(Fun, [Ctx | Args])
end,
#{},
Commands
),
#{client := [CPid2, CPidSub, CPid1]} = FCtx,
assert_client_exit(CPid1, ?config(mqtt_vsn, Config), takenover),
Received = [Msg || {publish, Msg} <- ?drainMailbox(?SLEEP)],
ct:pal("received: ~p", [[P || #{payload := P} <- Received]]),
{IsWill1, ReceivedNoWill0} = filter_payload(Received, <<"willpayload_delay10">>),
{IsWill2, _ReceivedNoWill} = filter_payload(ReceivedNoWill0, <<"willpayload_2">>),
%% THEN: payload <<"willpayload_delay10">> should be published without delay
?assert(IsWill1),
?assertNot(IsWill2),
emqtt:stop(CPid2),
emqtt:stop(CPidSub),
?assert(not is_process_alive(CPid1)),
ok.
t_no_takeover_with_delayed_willmsg(Config) ->
?config(mqtt_vsn, Config) =:= v5 orelse ct:fail("MQTTv5 Only"),
process_flag(trap_exit, true),
ClientId = atom_to_binary(?FUNCTION_NAME),
WillTopic = <<ClientId/binary, <<"willtopic">>/binary>>,
Client1Msgs = messages(ClientId, 0, 10),
WillOpts = [
{proto_ver, ?config(mqtt_vsn, Config)},
{clean_start, false},
{will_topic, WillTopic},
{will_payload, <<"willpayload_delay3">>},
{will_qos, 1},
%secs
{will_props, #{'Will-Delay-Interval' => 3}},
% secs
{properties, #{'Session-Expiry-Interval' => 10}}
],
Commands =
%% GIVEN: client connect with willmsg payload <<"willpayload_delay3">> and delay-interval 3s
[{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++
[
{fun start_client/5, [
<<ClientId/binary, <<"_willsub">>/binary>>, WillTopic, ?QOS_1, []
]}
] ++
[{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs],
FCtx = lists:foldl(
fun({Fun, Args}, Ctx) ->
ct:pal("COMMAND: ~p ~p", [element(2, erlang:fun_info(Fun, name)), Args]),
apply(Fun, [Ctx | Args])
end,
#{},
Commands
),
Received = [Msg || {publish, Msg} <- ?drainMailbox(1000)],
ct:pal("received: ~p", [[P || #{payload := P} <- Received]]),
assert_messages_missed(Client1Msgs, Received),
{IsWill0, ReceivedNoWill0} = filter_payload(Received, <<"willpayload_delay3">>),
?assertNot(IsWill0),
?assertNotEqual([], ReceivedNoWill0),
#{client := [CPidSub, CPid1]} = FCtx,
%% WHEN: client disconnects abnormally AND no reconnect after 3s.
exit(CPid1, kill),
assert_client_exit(CPid1, ?config(mqtt_vsn, Config), killed),
Received1 = [Msg || {publish, Msg} <- ?drainMailbox(1000)],
{IsWill1, ReceivedNoWill1} = filter_payload(Received1, <<"willpayload_delay3">>),
?assertNot(IsWill1),
?assertEqual([], ReceivedNoWill1),
%% THEN: for MQTT v5, payload <<"willpayload_delay3">> should be published after WILL delay (3 secs).
Received2 = [Msg || {publish, Msg} <- ?drainMailbox(5000)],
{IsWill11, ReceivedNoWill11} = filter_payload(Received2, <<"willpayload_delay3">>),
?assertEqual([], ReceivedNoWill11),
?assert(IsWill11),
emqtt:stop(CPidSub),
?assert(not is_process_alive(CPid1)),
ok.
t_session_expire_with_delayed_willmsg(Config) ->
?config(mqtt_vsn, Config) =:= v5 orelse ct:fail("MQTTv5 Only"),
process_flag(trap_exit, true),
ClientId = atom_to_binary(?FUNCTION_NAME),
WillTopic = <<ClientId/binary, <<"willtopic">>/binary>>,
Client1Msgs = messages(ClientId, 0, 10),
WillOpts = [
{proto_ver, ?config(mqtt_vsn, Config)},
{clean_start, false},
{will_topic, WillTopic},
{will_payload, <<"willpayload_delay10">>},
{will_qos, 1},
{will_props, #{'Will-Delay-Interval' => 10}},
{properties, #{'Session-Expiry-Interval' => 3}}
],
Commands =
%% GIVEN: client connect with willmsg payload <<"willpayload_delay10">>
%% and delay-interval 10s > session expiry 3s.
[{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++
[
{fun start_client/5, [
<<ClientId/binary, <<"_willsub">>/binary>>, WillTopic, ?QOS_1, []
]}
] ++
[{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs],
FCtx = lists:foldl(
fun({Fun, Args}, Ctx) ->
ct:pal("COMMAND: ~p ~p", [element(2, erlang:fun_info(Fun, name)), Args]),
apply(Fun, [Ctx | Args])
end,
#{},
Commands
),
Received = [Msg || {publish, Msg} <- ?drainMailbox(1000)],
ct:pal("received: ~p", [[P || #{payload := P} <- Received]]),
{IsWill, ReceivedNoWill} = filter_payload(Received, <<"willpayload_delay10">>),
?assertNot(IsWill),
?assertNotEqual([], ReceivedNoWill),
assert_messages_missed(Client1Msgs, Received),
#{client := [CPidSub, CPid1]} = FCtx,
%% WHEN: client disconnects abnormally AND no reconnect after 3s.
exit(CPid1, kill),
assert_client_exit(CPid1, ?config(mqtt_vsn, Config), killed),
Received1 = [Msg || {publish, Msg} <- ?drainMailbox(1000)],
{IsWill1, ReceivedNoWill1} = filter_payload(Received1, <<"willpayload_delay10">>),
?assertNot(IsWill1),
?assertEqual([], ReceivedNoWill1),
%% THEN: for MQTT v5, payload <<"willpayload_delay3">> should be published after session expiry.
Received2 = [Msg || {publish, Msg} <- ?drainMailbox(5000)],
{IsWill12, ReceivedNoWill2} = filter_payload(Received2, <<"willpayload_delay10">>),
?assertEqual([], ReceivedNoWill2),
?assert(IsWill12),
emqtt:stop(CPidSub),
?assert(not is_process_alive(CPid1)),
ok.
%% @TODO 'Server-Keep-Alive'
%% t_no_takeover_keepalive_fired(Config) ->
%% ok.
t_takeover_before_session_expire_willdelay0(Config) ->
?config(mqtt_vsn, Config) =:= v5 orelse ct:fail("MQTTv5 Only"),
process_flag(trap_exit, true),
ClientId = atom_to_binary(?FUNCTION_NAME),
WillTopic = <<ClientId/binary, <<"willtopic">>/binary>>,
Client1Msgs = messages(ClientId, 0, 10),
WillOpts = [
{proto_ver, ?config(mqtt_vsn, Config)},
{clean_start, false},
{will_topic, WillTopic},
{will_payload, <<"willpayload_delay10">>},
{will_qos, 1},
{will_props, #{'Will-Delay-Interval' => 0}},
{properties, #{'Session-Expiry-Interval' => 3}}
],
Commands =
%% GIVEN: client connect with willmsg payload <<"willpayload_delay10">>
%% and delay-interval 0s session expiry 3s.
[{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++
[
{fun start_client/5, [
<<ClientId/binary, <<"_willsub">>/binary>>, WillTopic, ?QOS_1, []
]}
] ++
[{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs] ++
[
%% avoid two clients race for session takeover
{
fun(CTX) ->
timer:sleep(1000),
CTX
end,
[]
}
] ++
%% WHEN: client session is taken over within 3s.
[{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}],
FCtx = lists:foldl(
fun({Fun, Args}, Ctx) ->
ct:pal("COMMAND: ~p ~p", [element(2, erlang:fun_info(Fun, name)), Args]),
apply(Fun, [Ctx | Args])
end,
#{},
Commands
),
#{client := [CPid2, CPidSub, CPid1]} = FCtx,
assert_client_exit(CPid1, ?config(mqtt_vsn, Config), takenover),
Received = [Msg || {publish, Msg} <- ?drainMailbox(1000)],
ct:pal("received: ~p", [[P || #{payload := P} <- Received]]),
{IsWill, _ReceivedNoWill} = filter_payload(Received, <<"willpayload_delay10">>),
%% THEN: willmsg is published
?assert(IsWill),
emqtt:stop(CPidSub),
emqtt:stop(CPid2),
?assert(not is_process_alive(CPid1)),
ok.
t_takeover_before_session_expire(Config) ->
?config(mqtt_vsn, Config) =:= v5 orelse ct:fail("MQTTv5 Only"),
process_flag(trap_exit, true),
ClientId = atom_to_binary(?FUNCTION_NAME),
WillTopic = <<ClientId/binary, <<"willtopic">>/binary>>,
Client1Msgs = messages(ClientId, 0, 10),
WillOpts = [
{proto_ver, ?config(mqtt_vsn, Config)},
{clean_start, false},
{will_topic, WillTopic},
{will_payload, <<"willpayload_delay10">>},
{will_qos, 1},
{will_props, #{'Will-Delay-Interval' => 10}},
{properties, #{'Session-Expiry-Interval' => 3}}
],
Commands =
%% GIVEN: client connect with willmsg payload <<"willpayload_delay10">>
%% and delay-interval 10s > session expiry 3s.
[{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++
[
{fun start_client/5, [
<<ClientId/binary, <<"_willsub">>/binary>>, WillTopic, ?QOS_1, []
]}
] ++
[{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs] ++
[
%% avoid two clients race for session takeover
{
fun(CTX) ->
timer:sleep(100),
CTX
end,
[]
}
] ++
%% WHEN: client session is taken over within 3s.
[{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}],
FCtx = lists:foldl(
fun({Fun, Args}, Ctx) ->
ct:pal("COMMAND: ~p ~p", [element(2, erlang:fun_info(Fun, name)), Args]),
apply(Fun, [Ctx | Args])
end,
#{},
Commands
),
#{client := [CPid2, CPidSub, CPid1]} = FCtx,
ct:pal("FCtx: ~p", [FCtx]),
assert_client_exit(CPid1, ?config(mqtt_vsn, Config), takenover),
Received = [Msg || {publish, Msg} <- ?drainMailbox(1000)],
ct:pal("received: ~p", [[P || #{payload := P} <- Received]]),
{IsWill, ReceivedNoWill} = filter_payload(Received, <<"willpayload_delay10">>),
%% THEN: No Willmsg is published
?assertNot(IsWill),
?assertNotEqual([], ReceivedNoWill),
emqtt:stop(CPidSub),
emqtt:stop(CPid2),
?assert(not is_process_alive(CPid1)),
ok.
t_takeover_session_then_normal_disconnect(Config) ->
?config(mqtt_vsn, Config) =:= v5 orelse ct:fail("MQTTv5 Only"),
process_flag(trap_exit, true),
ClientId = atom_to_binary(?FUNCTION_NAME),
WillTopic = <<ClientId/binary, <<"willtopic">>/binary>>,
Client1Msgs = messages(ClientId, 0, 10),
WillOpts = [
{proto_ver, ?config(mqtt_vsn, Config)},
{clean_start, false},
{will_topic, WillTopic},
{will_payload, <<"willpayload_delay10">>},
{will_qos, 1},
{will_props, #{'Will-Delay-Interval' => 10}},
{properties, #{'Session-Expiry-Interval' => 3}}
],
Commands =
[{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++
[
{fun start_client/5, [
<<ClientId/binary, <<"_willsub">>/binary>>, WillTopic, ?QOS_1, []
]}
] ++
[{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs] ++
[
%% avoid two clients race for session takeover
{
fun(CTX) ->
timer:sleep(100),
CTX
end,
[]
}
] ++
%% GIVEN: client reconnect with willmsg payload <<"willpayload_delay10">>
%% and delay-interval 10s > session expiry 3s.
[{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}],
FCtx = lists:foldl(
fun({Fun, Args}, Ctx) ->
ct:pal("COMMAND: ~p ~p", [element(2, erlang:fun_info(Fun, name)), Args]),
apply(Fun, [Ctx | Args])
end,
#{},
Commands
),
#{client := [CPid2, CPidSub, CPid1]} = FCtx,
assert_client_exit(CPid1, ?config(mqtt_vsn, Config), takenover),
%% WHEN: client disconnect normally.
emqtt:disconnect(CPid2, ?RC_SUCCESS),
Received = [Msg || {publish, Msg} <- ?drainMailbox(1000)],
ct:pal("received: ~p", [[P || #{payload := P} <- Received]]),
{IsWill, ReceivedNoWill} = filter_payload(Received, <<"willpayload_delay10">>),
%% THEN: willmsg is not published.
?assertNot(IsWill),
?assertNotEqual([], ReceivedNoWill),
emqtt:stop(CPidSub),
?assert(not is_process_alive(CPid1)),
?assert(not is_process_alive(CPid2)),
ok.
t_takeover_session_then_abnormal_disconnect(Config) ->
?config(mqtt_vsn, Config) =:= v5 orelse ct:fail("MQTTv5 Only"),
process_flag(trap_exit, true),
ClientId = atom_to_binary(?FUNCTION_NAME),
WillTopic = <<ClientId/binary, <<"willtopic">>/binary>>,
Client1Msgs = messages(ClientId, 0, 10),
WillOpts = [
{proto_ver, ?config(mqtt_vsn, Config)},
{clean_start, false},
{will_topic, WillTopic},
{will_payload, <<"willpayload_delay10">>},
{will_qos, 1},
{will_props, #{'Will-Delay-Interval' => 10}},
{properties, #{'Session-Expiry-Interval' => 3}}
],
Commands =
%% GIVEN: client connect with willmsg payload <<"willpayload_delay10">>
%% and will-delay-interval 10s > session expiry 3s.
[{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++
[
{fun start_client/5, [
<<ClientId/binary, <<"_willsub">>/binary>>, WillTopic, ?QOS_1, []
]}
] ++
[{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs] ++
[
%% avoid two clients race for session takeover
{
fun(CTX) ->
timer:sleep(100),
CTX
end,
[]
}
] ++
[{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}],
FCtx = lists:foldl(
fun({Fun, Args}, Ctx) ->
ct:pal("COMMAND: ~p ~p", [element(2, erlang:fun_info(Fun, name)), Args]),
apply(Fun, [Ctx | Args])
end,
#{},
Commands
),
#{client := [CPid2, CPidSub, CPid1]} = FCtx,
assert_client_exit(CPid1, ?config(mqtt_vsn, Config), takenover),
%% WHEN: client disconnect abnormally
emqtt:disconnect(CPid2, ?RC_DISCONNECT_WITH_WILL_MESSAGE),
Received = [Msg || {publish, Msg} <- ?drainMailbox(1000)],
ct:pal("received: ~p", [[P || #{payload := P} <- Received]]),
{IsWill, ReceivedNoWill} = filter_payload(Received, <<"willpayload_delay10">>),
%% THEN: willmsg is not published before session expiry
?assertNot(IsWill),
?assertNotEqual([], ReceivedNoWill),
Received1 = [Msg || {publish, Msg} <- ?drainMailbox(3000)],
{IsWill1, ReceivedNoWill1} = filter_payload(Received1, <<"willpayload_delay10">>),
%% AND THEN: willmsg is published after session expiry
?assert(IsWill1),
?assertEqual([], ReceivedNoWill1),
emqtt:stop(CPidSub),
?assert(not is_process_alive(CPid1)),
?assert(not is_process_alive(CPid2)),
ok.
t_takeover_session_then_abnormal_disconnect_2(Config) ->
?config(mqtt_vsn, Config) =:= v5 orelse ct:fail("MQTTv5 Only"),
process_flag(trap_exit, true),
ClientId = atom_to_binary(?FUNCTION_NAME),
WillTopic = <<ClientId/binary, <<"willtopic">>/binary>>,
Client1Msgs = messages(ClientId, 0, 10),
WillOpts = [
{proto_ver, ?config(mqtt_vsn, Config)},
{clean_start, false},
{will_topic, WillTopic},
{will_payload, <<"willpayload_delay1">>},
{will_qos, 1},
{will_props, #{'Will-Delay-Interval' => 1}},
{properties, #{'Session-Expiry-Interval' => 3}}
],
WillOpts2 = [
{proto_ver, ?config(mqtt_vsn, Config)},
{clean_start, false},
{will_topic, WillTopic},
{will_payload, <<"willpayload_delay2">>},
{will_qos, 1},
{will_props, #{'Will-Delay-Interval' => 0}},
{properties, #{'Session-Expiry-Interval' => 3}}
],
Commands =
[{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++
[
{fun start_client/5, [
<<ClientId/binary, <<"_willsub">>/binary>>, WillTopic, ?QOS_1, []
]}
] ++
[{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs] ++
[
%% avoid two clients race for session takeover
{
fun(CTX) ->
timer:sleep(100),
CTX
end,
[]
}
] ++
%% GIVEN: client *reconnect* with willmsg payload <<"willpayload_delay2">>
%% and will-delay-interval 0s, session expiry 3s.
[{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts2]}],
FCtx = lists:foldl(
fun({Fun, Args}, Ctx) ->
ct:pal("COMMAND: ~p ~p", [element(2, erlang:fun_info(Fun, name)), Args]),
apply(Fun, [Ctx | Args])
end,
#{},
Commands
),
#{client := [CPid2, CPidSub, CPid1]} = FCtx,
assert_client_exit(CPid1, ?config(mqtt_vsn, Config), takenover),
%% WHEN: client disconnect abnormally
emqtt:disconnect(CPid2, ?RC_DISCONNECT_WITH_WILL_MESSAGE),
Received = [Msg || {publish, Msg} <- ?drainMailbox(2000)],
ct:pal("received: ~p", [[P || #{payload := P} <- Received]]),
{IsWill, ReceivedNoWill} = filter_payload(Received, <<"willpayload_delay1">>),
%% THEN: willmsg1 of old conn is not published because will-delay-interval > 0
?assertNot(IsWill),
?assertNotEqual([], ReceivedNoWill),
%% THEN: willmsg1 is published because will-delay-interval is 0
{IsWill2, _} = filter_payload(Received, <<"willpayload_delay2">>),
?assert(IsWill2),
emqtt:stop(CPidSub),
?assert(not is_process_alive(CPid1)),
?assert(not is_process_alive(CPid2)),
ok.
t_takeover_before_willmsg_expire(Config) ->
?config(mqtt_vsn, Config) =:= v5 orelse ct:fail("MQTTv5 Only"),
process_flag(trap_exit, true),
ClientId = atom_to_binary(?FUNCTION_NAME),
WillTopic = <<ClientId/binary, <<"willtopic">>/binary>>,
Client1Msgs = messages(ClientId, 0, 10),
WillOpts = [
{proto_ver, ?config(mqtt_vsn, Config)},
{clean_start, false},
{will_topic, WillTopic},
{will_payload, <<"willpayload_delay10">>},
{will_qos, 1},
{will_props, #{'Will-Delay-Interval' => 3}},
{properties, #{'Session-Expiry-Interval' => 10}}
],
Commands =
%% GIVEN: client connect with willmsg payload <<"willpayload_delay10">>
%% and will-delay-interval 3s < session expiry 10s.
[{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++
[
{fun start_client/5, [
<<ClientId/binary, <<"_willsub">>/binary>>, WillTopic, ?QOS_1, []
]}
] ++
[{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs] ++
[
%% avoid two clients race for session takeover
{
fun(CTX) ->
timer:sleep(100),
CTX
end,
[]
}
] ++
%% WHEN: another client takeover the session with in 3s.
[{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}],
FCtx = lists:foldl(
fun({Fun, Args}, Ctx) ->
ct:pal("COMMAND: ~p ~p", [element(2, erlang:fun_info(Fun, name)), Args]),
apply(Fun, [Ctx | Args])
end,
#{},
Commands
),
#{client := [CPid2, CPidSub, CPid1]} = FCtx,
ct:pal("FCtx: ~p", [FCtx]),
assert_client_exit(CPid1, ?config(mqtt_vsn, Config), takenover),
Received = [Msg || {publish, Msg} <- ?drainMailbox(1000)],
ct:pal("received: ~p", [[P || #{payload := P} <- Received]]),
assert_messages_missed(Client1Msgs, Received),
Received1 = [Msg || {publish, Msg} <- ?drainMailbox(1000)],
{IsWill, ReceivedNoWill} = filter_payload(Received1, <<"willpayload_delay10">>),
?assertNot(IsWill),
?assertEqual([], ReceivedNoWill),
%% THEN: for MQTT v5, payload <<"willpayload_delay10">> should NOT be published after 3s.
Received2 = [Msg || {publish, Msg} <- ?drainMailbox(5000)],
{IsWill11, ReceivedNoWill11} = filter_payload(Received2, <<"willpayload_delay10">>),
?assertEqual([], ReceivedNoWill11),
?assertNot(IsWill11),
emqtt:stop(CPidSub),
emqtt:stop(CPid2),
?assert(not is_process_alive(CPid1)),
ok.
t_kick_session(Config) ->
process_flag(trap_exit, true),
ClientId = atom_to_binary(?FUNCTION_NAME),
WillTopic = <<ClientId/binary, <<"willtopic">>/binary>>,
WillOpts = [
{proto_ver, ?config(mqtt_vsn, Config)},
{clean_start, false},
{will_topic, WillTopic},
{will_payload, <<"willpayload_kick">>},
{will_qos, 1}
],
Commands =
%% GIVEN: client connect with willmsg payload <<"willpayload_kick">>
[{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++
[
{fun start_client/5, [
<<ClientId/binary, <<"_willsub">>/binary>>, WillTopic, ?QOS_1, []
]}
] ++
[
%% kick may fail (not found) without this delay
{
fun(CTX) ->
timer:sleep(100),
CTX
end,
[]
}
] ++
%% WHEN: client is kicked with kick_session
[{fun kick_client/2, [ClientId]}],
FCtx = lists:foldl(
fun({Fun, Args}, Ctx) ->
ct:pal("COMMAND: ~p ~p", [element(2, erlang:fun_info(Fun, name)), Args]),
apply(Fun, [Ctx | Args])
end,
#{},
Commands
),
#{client := [CPidSub, CPid1]} = FCtx,
assert_client_exit(CPid1, ?config(mqtt_vsn, Config), kicked),
Received = [Msg || {publish, Msg} <- ?drainMailbox(?SLEEP)],
ct:pal("received: ~p", [[P || #{payload := P} <- Received]]),
%% THEN: payload <<"willpayload_kick">> should be published
{IsWill, _ReceivedNoWill} = filter_payload(Received, <<"willpayload_kick">>),
?assert(IsWill),
emqtt:stop(CPidSub),
?assert(not is_process_alive(CPid1)),
ok.
%% t_takover_in_cluster(_) ->
%% todo.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Commands %% Commands
start_client(Ctx, ClientId, Topic, Qos, Opts) ->
start_client(Ctx, ClientId, Topic, Qos) -> {ok, CPid} = emqtt:start_link([{clientid, ClientId} | Opts]),
{ok, CPid} = emqtt:start_link([
{clientid, ClientId},
{proto_ver, v5},
{clean_start, false}
]),
_ = erlang:spawn_link(fun() -> _ = erlang:spawn_link(fun() ->
{ok, _} = emqtt:connect(CPid), {ok, _} = emqtt:connect(CPid),
ct:pal("CLIENT: connected ~p", [CPid]), ct:pal("CLIENT: connected ~p", [CPid]),
@ -109,6 +866,10 @@ start_client(Ctx, ClientId, Topic, Qos) ->
end), end),
Ctx#{client => [CPid | maps:get(client, Ctx, [])]}. Ctx#{client => [CPid | maps:get(client, Ctx, [])]}.
kick_client(Ctx, ClientId) ->
ok = emqx_cm:kick_session(ClientId),
Ctx.
publish_msg(Ctx, Msg) -> publish_msg(Ctx, Msg) ->
ok = timer:sleep(rand:uniform(?SLEEP)), ok = timer:sleep(rand:uniform(?SLEEP)),
case emqx:publish(Msg) of case emqx:publish(Msg) of
@ -157,8 +918,8 @@ assert_messages_order([Msg | Expected], Received) ->
assert_messages_order(Expected, Rest) assert_messages_order(Expected, Rest)
end. end.
messages(Offset, Cnt) -> messages(Topic, Offset, Cnt) ->
[emqx_message:make(ct, ?QOS_1, ?TOPIC, payload(Offset + I)) || I <- lists:seq(1, Cnt)]. [emqx_message:make(ct, ?QOS_1, Topic, payload(Offset + I)) || I <- lists:seq(1, Cnt)].
payload(I) -> payload(I) ->
% NOTE % NOTE
@ -170,3 +931,29 @@ payload(I) ->
emqx_utils_calendar:now_to_rfc3339(millisecond) emqx_utils_calendar:now_to_rfc3339(millisecond)
]) ])
). ).
%% @doc Filter out the message with matching target payload from the list of messages.
%% return '{IsTargetFound, ListOfOtherMessages}'
%% @end
-spec filter_payload(List :: [#{payload := binary()}], Payload :: binary()) ->
{IsPayloadFound :: boolean(), OtherPayloads :: [#{payload := binary()}]}.
filter_payload(List, Payload) when is_binary(Payload) ->
Filtered = [
Msg
|| #{payload := P} = Msg <- List,
P =/= Payload
],
{length(List) =/= length(Filtered), Filtered}.
%% @doc assert emqtt *client* process exits as expected.
assert_client_exit(Pid, v5, takenover) ->
%% @ref: MQTT 5.0 spec [MQTT-3.1.4-3]
?assertReceive({'EXIT', Pid, {disconnected, ?RC_SESSION_TAKEN_OVER, _}});
assert_client_exit(Pid, v3, takenover) ->
?assertReceive({'EXIT', Pid, {shutdown, tcp_closed}});
assert_client_exit(Pid, v3, kicked) ->
?assertReceive({'EXIT', Pid, _});
assert_client_exit(Pid, v5, kicked) ->
?assertReceive({'EXIT', Pid, {disconnected, ?RC_ADMINISTRATIVE_ACTION, _}});
assert_client_exit(Pid, _, killed) ->
?assertReceive({'EXIT', Pid, killed}).

View File

@ -0,0 +1,2 @@
Fix a bug that willmsg is not published after session takeover.