fix: handle delayed willmsg, part 1
This commit is contained in:
parent
b76c701b1c
commit
dd62280e04
|
@ -117,7 +117,17 @@
|
||||||
atom() => term()
|
atom() => term()
|
||||||
}.
|
}.
|
||||||
|
|
||||||
-type conn_state() :: idle | connecting | connected | reauthenticating | disconnected.
|
%% init
|
||||||
|
-type conn_state() ::
|
||||||
|
idle
|
||||||
|
%% mqtt connect recved but not acked
|
||||||
|
| connecting
|
||||||
|
%% mqtt connect acked
|
||||||
|
| connected
|
||||||
|
%% mqtt connected but reauthenticating
|
||||||
|
| reauthenticating
|
||||||
|
%% keepalive timeout or connection terminated
|
||||||
|
| disconnected.
|
||||||
|
|
||||||
-type reply() ::
|
-type reply() ::
|
||||||
{outgoing, emqx_types:packet()}
|
{outgoing, emqx_types:packet()}
|
||||||
|
@ -1137,10 +1147,11 @@ handle_call(
|
||||||
conninfo = #{proto_ver := ProtoVer}
|
conninfo = #{proto_ver := ProtoVer}
|
||||||
}
|
}
|
||||||
) ->
|
) ->
|
||||||
|
Channel0 = maybe_publish_will_msg(kicked, Channel),
|
||||||
Channel1 =
|
Channel1 =
|
||||||
case ConnState of
|
case ConnState of
|
||||||
connected -> ensure_disconnected(kicked, Channel);
|
connected -> ensure_disconnected(kicked, Channel0);
|
||||||
_ -> Channel
|
_ -> Channel0
|
||||||
end,
|
end,
|
||||||
case ProtoVer == ?MQTT_PROTO_V5 andalso ConnState == connected of
|
case ProtoVer == ?MQTT_PROTO_V5 andalso ConnState == connected of
|
||||||
true ->
|
true ->
|
||||||
|
@ -1422,6 +1433,14 @@ terminate(_, #channel{conn_state = idle} = _Channel) ->
|
||||||
ok;
|
ok;
|
||||||
terminate(normal, Channel) ->
|
terminate(normal, Channel) ->
|
||||||
run_terminate_hook(normal, Channel);
|
run_terminate_hook(normal, Channel);
|
||||||
|
terminate({shutdown, Reason}, Channel) when
|
||||||
|
Reason =:= expired orelse
|
||||||
|
Reason =:= takenover orelse
|
||||||
|
Reason =:= kicked orelse
|
||||||
|
Reason =:= discarded
|
||||||
|
->
|
||||||
|
Channel1 = maybe_publish_will_msg(Reason, Channel),
|
||||||
|
run_terminate_hook(Reason, Channel1);
|
||||||
terminate(Reason, Channel) ->
|
terminate(Reason, Channel) ->
|
||||||
Channel1 = maybe_publish_will_msg(Reason, Channel),
|
Channel1 = maybe_publish_will_msg(Reason, Channel),
|
||||||
run_terminate_hook(Reason, Channel1).
|
run_terminate_hook(Reason, Channel1).
|
||||||
|
@ -2216,9 +2235,19 @@ session_disconnect(_ClientInfo, _ConnInfo, undefined) ->
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Maybe Publish will msg
|
%% Maybe Publish will msg
|
||||||
-spec maybe_publish_will_msg(Reason, channel()) -> channel() when
|
-spec maybe_publish_will_msg(Reason, channel()) -> channel() when
|
||||||
Reason :: kick | sock_closed | {shutdown, atom()}.
|
Reason :: takenover | kicked | discarded | expired | sock_closed | {shutdown, atom()}.
|
||||||
maybe_publish_will_msg(_Reason, Channel = #channel{will_msg = undefined}) ->
|
maybe_publish_will_msg(_Reason, Channel = #channel{will_msg = undefined}) ->
|
||||||
Channel;
|
Channel;
|
||||||
|
maybe_publish_will_msg({shutdown, not_authorized}, Channel) ->
|
||||||
|
Channel;
|
||||||
|
maybe_publish_will_msg(not_authorized, Channel) ->
|
||||||
|
Channel;
|
||||||
|
maybe_publish_will_msg(_Reason, Channel = #channel{conn_state = ConnState}) when
|
||||||
|
ConnState =:= idle orelse
|
||||||
|
ConnState =:= connecting orelse
|
||||||
|
ConnState =:= reauthenticating
|
||||||
|
->
|
||||||
|
Channel;
|
||||||
maybe_publish_will_msg(
|
maybe_publish_will_msg(
|
||||||
_Reason,
|
_Reason,
|
||||||
Channel = #channel{conninfo = #{proto_ver := ?MQTT_PROTO_V3}, will_msg = WillMsg}
|
Channel = #channel{conninfo = #{proto_ver := ?MQTT_PROTO_V3}, will_msg = WillMsg}
|
||||||
|
@ -2229,30 +2258,51 @@ maybe_publish_will_msg(
|
||||||
maybe_publish_will_msg(
|
maybe_publish_will_msg(
|
||||||
Reason, Channel = #channel{clientinfo = ClientInfo, will_msg = WillMsg, timers = Timers}
|
Reason, Channel = #channel{clientinfo = ClientInfo, will_msg = WillMsg, timers = Timers}
|
||||||
) when
|
) when
|
||||||
Reason =:= {shutdown, expired} orelse
|
Reason =:= expired orelse
|
||||||
Reason =:= {shutdown, kicked}
|
Reason =:= discarded orelse
|
||||||
|
%% Unsure...
|
||||||
|
Reason =:= {shutdown, internal_error} orelse
|
||||||
|
Reason =:= kicked
|
||||||
->
|
->
|
||||||
%% Must publish now without delay and cancel the will message timer.
|
%% For the cases that session MUST be gone.
|
||||||
|
%% a. expired (session expired)
|
||||||
|
%% c. discarded (Session ends because another process starts new session with the same clientid)
|
||||||
|
%% b. kicked. (kicked by operation)
|
||||||
|
%% d. internal_error (maybe not recoverable)
|
||||||
|
%% This ensures willmsg will be published if the willmsg timer is scheduled but not fired
|
||||||
|
%% OR fired but not yet handled
|
||||||
DelayedWillTimer = maps:get(will_message, Timers, undefined),
|
DelayedWillTimer = maps:get(will_message, Timers, undefined),
|
||||||
DelayedWillTimer =/= undefined andalso erlang:cancel_timer(DelayedWillTimer, [{async, true}]),
|
DelayedWillTimer =/= undefined andalso erlang:cancel_timer(DelayedWillTimer, [{async, true}]),
|
||||||
_ = publish_will_msg(ClientInfo, WillMsg),
|
_ = publish_will_msg(ClientInfo, WillMsg),
|
||||||
Channel#channel{will_msg = undefined, timers = maps:remove(will_message, Timers)};
|
Channel#channel{will_msg = undefined, timers = maps:remove(will_message, Timers)};
|
||||||
maybe_publish_will_msg(
|
maybe_publish_will_msg(
|
||||||
_OtherReasons, Channel = #channel{clientinfo = ClientInfo, will_msg = WillMsg, timers = Timers}
|
Reason, Channel = #channel{clientinfo = ClientInfo, will_msg = WillMsg, timers = Timers}
|
||||||
) ->
|
) ->
|
||||||
case maps:get(will_message, Timers, undefined) of
|
%% For the cases that session MAY/MAY NOT be gone, we don't care about session expired or not.
|
||||||
undefined ->
|
%% willmsg publish could be defered.
|
||||||
%% May defer the will message publishing
|
IsSessionExpirationInProgress = maps:is_key(expire_session, Timers),
|
||||||
|
IsWillmsgScheduled = maps:is_key(will_message, Timers),
|
||||||
case will_delay_interval(WillMsg) of
|
case will_delay_interval(WillMsg) of
|
||||||
0 ->
|
0 ->
|
||||||
|
%% [MQTT-3.1.2-8], 0 means will delay Will Delay Interval has elapsed
|
||||||
|
false = IsWillmsgScheduled,
|
||||||
ok = publish_will_msg(ClientInfo, WillMsg),
|
ok = publish_will_msg(ClientInfo, WillMsg),
|
||||||
Channel#channel{will_msg = undefined};
|
Channel#channel{will_msg = undefined};
|
||||||
I ->
|
I when IsSessionExpirationInProgress andalso not IsWillmsgScheduled ->
|
||||||
|
%% We delay the will message publishing
|
||||||
|
%% Willmsg will be published whatever which timer fired first
|
||||||
|
ensure_timer(will_message, timer:seconds(I), Channel);
|
||||||
|
_ when IsSessionExpirationInProgress andalso IsWillmsgScheduled ->
|
||||||
|
%% Willmsg will be published whatever which timer fired first [MQTT-3.1.3-9].
|
||||||
|
Channel;
|
||||||
|
_I when Reason =:= takenover ->
|
||||||
|
%% don't see the point to delay the willmsg
|
||||||
|
Channel;
|
||||||
|
_I when not IsSessionExpirationInProgress andalso IsWillmsgScheduled ->
|
||||||
|
Channel;
|
||||||
|
I when not IsSessionExpirationInProgress andalso not IsWillmsgScheduled ->
|
||||||
|
%% @FIXME: process may terminate before the timer fired
|
||||||
ensure_timer(will_message, timer:seconds(I), Channel)
|
ensure_timer(will_message, timer:seconds(I), Channel)
|
||||||
end;
|
|
||||||
%% Will message is already scheduled
|
|
||||||
_ ->
|
|
||||||
Channel
|
|
||||||
end.
|
end.
|
||||||
|
|
||||||
will_delay_interval(WillMsg) ->
|
will_delay_interval(WillMsg) ->
|
||||||
|
|
|
@ -923,6 +923,7 @@ t_session_kicked({init, Config}) when is_list(Config) ->
|
||||||
t_session_kicked({'end', Config}) when is_list(Config) ->
|
t_session_kicked({'end', Config}) when is_list(Config) ->
|
||||||
emqx_config:put_zone_conf(default, [mqtt, max_inflight], 0);
|
emqx_config:put_zone_conf(default, [mqtt, max_inflight], 0);
|
||||||
t_session_kicked(Config) when is_list(Config) ->
|
t_session_kicked(Config) when is_list(Config) ->
|
||||||
|
emqx_logger:set_log_level(debug),
|
||||||
Topic = <<"foo/bar/1">>,
|
Topic = <<"foo/bar/1">>,
|
||||||
ClientId1 = <<"ClientId1">>,
|
ClientId1 = <<"ClientId1">>,
|
||||||
ClientId2 = <<"ClientId2">>,
|
ClientId2 = <<"ClientId2">>,
|
||||||
|
@ -953,6 +954,8 @@ t_session_kicked(Config) when is_list(Config) ->
|
||||||
%% on if it's picked as the first one for round_robin
|
%% on if it's picked as the first one for round_robin
|
||||||
MsgRec1 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P1}}, P1),
|
MsgRec1 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P1}}, P1),
|
||||||
MsgRec2 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P2}}, P2),
|
MsgRec2 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P2}}, P2),
|
||||||
|
|
||||||
|
ct:pal("MsgRec1: ~p MsgRec2 ~p ~n", [MsgRec1, MsgRec2]),
|
||||||
case MsgRec2 of
|
case MsgRec2 of
|
||||||
<<"hello3">> ->
|
<<"hello3">> ->
|
||||||
?assertEqual(<<"hello1">>, MsgRec1);
|
?assertEqual(<<"hello1">>, MsgRec1);
|
||||||
|
|
|
@ -224,6 +224,68 @@ t_takeover_willmsg_clean_session(Config) ->
|
||||||
?assert(not is_process_alive(CPid1)),
|
?assert(not is_process_alive(CPid1)),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
t_takeover_clean_session_with_delayed_willmsg(Config) ->
|
||||||
|
process_flag(trap_exit, true),
|
||||||
|
ClientId = atom_to_binary(?FUNCTION_NAME),
|
||||||
|
WillTopic = <<ClientId/binary, <<"willtopic">>/binary>>,
|
||||||
|
Middle = ?CNT div 2,
|
||||||
|
Client1Msgs = messages(ClientId, 0, Middle),
|
||||||
|
Client2Msgs = messages(ClientId, Middle, ?CNT div 2),
|
||||||
|
AllMsgs = Client1Msgs ++ Client2Msgs,
|
||||||
|
WillOpts = [
|
||||||
|
{proto_ver, ?config(mqtt_vsn, Config)},
|
||||||
|
{clean_start, false},
|
||||||
|
{will_topic, WillTopic},
|
||||||
|
{will_payload, <<"willpayload_delay10">>},
|
||||||
|
{will_qos, 1},
|
||||||
|
%% mqttv5 only
|
||||||
|
{properties, #{'Will-Delay-Interval' => 10000}}
|
||||||
|
],
|
||||||
|
WillOptsClean = [
|
||||||
|
{proto_ver, ?config(mqtt_vsn, Config)},
|
||||||
|
{clean_start, true},
|
||||||
|
{will_topic, WillTopic},
|
||||||
|
{will_payload, <<"willpayload_2">>},
|
||||||
|
{will_qos, 1}
|
||||||
|
],
|
||||||
|
|
||||||
|
Commands =
|
||||||
|
%% GIVEN: client connect with willmsg payload <<"willpayload_delay10">> and delay-interval 10s
|
||||||
|
[{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++
|
||||||
|
[
|
||||||
|
{fun start_client/5, [
|
||||||
|
<<ClientId/binary, <<"_willsub">>/binary>>, WillTopic, ?QOS_1, []
|
||||||
|
]}
|
||||||
|
] ++
|
||||||
|
[{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs] ++
|
||||||
|
%% WHEN: client connects with clean_start=true and willmsg payload <<"willpayload_2">>
|
||||||
|
[{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOptsClean]}] ++
|
||||||
|
[{fun publish_msg/2, [Msg]} || Msg <- Client2Msgs],
|
||||||
|
|
||||||
|
FCtx = lists:foldl(
|
||||||
|
fun({Fun, Args}, Ctx) ->
|
||||||
|
ct:pal("COMMAND: ~p ~p", [element(2, erlang:fun_info(Fun, name)), Args]),
|
||||||
|
apply(Fun, [Ctx | Args])
|
||||||
|
end,
|
||||||
|
#{},
|
||||||
|
Commands
|
||||||
|
),
|
||||||
|
#{client := [CPid2, CPidSub, CPid1]} = FCtx,
|
||||||
|
assert_client_exit(CPid1, ?config(mqtt_vsn, Config), takenover),
|
||||||
|
Received = [Msg || {publish, Msg} <- ?drainMailbox(?SLEEP)],
|
||||||
|
ct:pal("received: ~p", [[P || #{payload := P} <- Received]]),
|
||||||
|
{IsWill1, ReceivedNoWill0} = filter_payload(Received, <<"willpayload_delay10">>),
|
||||||
|
{IsWill2, ReceivedNoWill} = filter_payload(ReceivedNoWill0, <<"willpayload_2">>),
|
||||||
|
assert_messages_missed(AllMsgs, ReceivedNoWill),
|
||||||
|
assert_messages_order(AllMsgs, ReceivedNoWill),
|
||||||
|
%% THEN: payload <<"willpayload_delay10">> should be published without delay.
|
||||||
|
?assert(IsWill1),
|
||||||
|
?assertNot(IsWill2),
|
||||||
|
emqtt:stop(CPid2),
|
||||||
|
emqtt:stop(CPidSub),
|
||||||
|
?assert(not is_process_alive(CPid1)),
|
||||||
|
ok.
|
||||||
|
|
||||||
t_kick_session(Config) ->
|
t_kick_session(Config) ->
|
||||||
process_flag(trap_exit, true),
|
process_flag(trap_exit, true),
|
||||||
ClientId = atom_to_binary(?FUNCTION_NAME),
|
ClientId = atom_to_binary(?FUNCTION_NAME),
|
||||||
|
|
Loading…
Reference in New Issue