Merge pull request #12742 from thalesmg/ds-will-msg-m-20240318
feat(ds): make durable sessions handle will messages
This commit is contained in:
commit
4af1a8cc56
|
@ -143,6 +143,7 @@ wait_gen_rpc_down(_NodeSpec = #{apps := Apps}) ->
|
|||
|
||||
start_client(Opts0 = #{}) ->
|
||||
Defaults = #{
|
||||
port => 1883,
|
||||
proto_ver => v5,
|
||||
properties => #{'Session-Expiry-Interval' => 300}
|
||||
},
|
||||
|
@ -189,6 +190,23 @@ list_all_subscriptions(Node) ->
|
|||
list_all_pubranges(Node) ->
|
||||
erpc:call(Node, emqx_persistent_session_ds, list_all_pubranges, []).
|
||||
|
||||
session_open(Node, ClientId) ->
|
||||
ClientInfo = #{},
|
||||
ConnInfo = #{peername => {undefined, undefined}},
|
||||
WillMsg = undefined,
|
||||
erpc:call(
|
||||
Node,
|
||||
emqx_persistent_session_ds,
|
||||
session_open,
|
||||
[ClientId, ClientInfo, ConnInfo, WillMsg]
|
||||
).
|
||||
|
||||
force_last_alive_at(ClientId, Time) ->
|
||||
{ok, S0} = emqx_persistent_session_ds_state:open(ClientId),
|
||||
S = emqx_persistent_session_ds_state:set_last_alive_at(Time, S0),
|
||||
_ = emqx_persistent_session_ds_state:commit(S),
|
||||
ok.
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Testcases
|
||||
%%------------------------------------------------------------------------------
|
||||
|
@ -243,10 +261,7 @@ t_session_subscription_idempotency(Config) ->
|
|||
end,
|
||||
fun(Trace) ->
|
||||
ct:pal("trace:\n ~p", [Trace]),
|
||||
ConnInfo = #{peername => {undefined, undefined}},
|
||||
Session = erpc:call(
|
||||
Node1, emqx_persistent_session_ds, session_open, [ClientId, ConnInfo]
|
||||
),
|
||||
Session = session_open(Node1, ClientId),
|
||||
?assertMatch(
|
||||
#{SubTopicFilter := #{}},
|
||||
emqx_session:info(subscriptions, Session)
|
||||
|
@ -320,10 +335,7 @@ t_session_unsubscription_idempotency(Config) ->
|
|||
end,
|
||||
fun(Trace) ->
|
||||
ct:pal("trace:\n ~p", [Trace]),
|
||||
ConnInfo = #{peername => {undefined, undefined}},
|
||||
Session = erpc:call(
|
||||
Node1, emqx_persistent_session_ds, session_open, [ClientId, ConnInfo]
|
||||
),
|
||||
Session = session_open(Node1, ClientId),
|
||||
?assertEqual(
|
||||
#{},
|
||||
emqx_session:info(subscriptions, Session)
|
||||
|
@ -552,6 +564,7 @@ t_session_gc(Config) ->
|
|||
),
|
||||
|
||||
%% Clients are still alive; no session is garbage collected.
|
||||
?tp(notice, "waiting for gc", #{}),
|
||||
?assertMatch(
|
||||
{ok, _},
|
||||
?block_until(
|
||||
|
@ -564,9 +577,11 @@ t_session_gc(Config) ->
|
|||
),
|
||||
?assertMatch([_, _, _], list_all_sessions(Node1), sessions),
|
||||
?assertMatch([_, _, _], list_all_subscriptions(Node1), subscriptions),
|
||||
?tp(notice, "gc ran", #{}),
|
||||
|
||||
%% Now we disconnect 2 of them; only those should be GC'ed.
|
||||
|
||||
?tp(notice, "disconnecting client1", #{}),
|
||||
?assertMatch(
|
||||
{ok, {ok, _}},
|
||||
?wait_async_action(
|
||||
|
@ -671,3 +686,42 @@ t_session_replay_retry(_Config) ->
|
|||
[maps:with([topic, payload, qos], P) || P <- Pubs0],
|
||||
[maps:with([topic, payload, qos], P) || P <- Pubs1 ++ Pubs2]
|
||||
).
|
||||
|
||||
%% Check that we send will messages when performing GC without relying on timers set by
|
||||
%% the channel process.
|
||||
t_session_gc_will_message(_Config) ->
|
||||
?check_trace(
|
||||
#{timetrap => 10_000},
|
||||
begin
|
||||
WillTopic = <<"will/t">>,
|
||||
ok = emqx:subscribe(WillTopic, #{qos => 2}),
|
||||
ClientId = <<"will_msg_client">>,
|
||||
Client = start_client(#{
|
||||
clientid => ClientId,
|
||||
will_topic => WillTopic,
|
||||
will_payload => <<"will payload">>,
|
||||
will_qos => 0,
|
||||
will_props => #{'Will-Delay-Interval' => 300}
|
||||
}),
|
||||
{ok, _} = emqtt:connect(Client),
|
||||
%% Use reason code =/= `?RC_SUCCESS' to allow will message
|
||||
{ok, {ok, _}} =
|
||||
?wait_async_action(
|
||||
emqtt:disconnect(Client, ?RC_UNSPECIFIED_ERROR),
|
||||
#{?snk_kind := emqx_cm_clean_down}
|
||||
),
|
||||
?assertNotReceive({deliver, WillTopic, _}),
|
||||
%% Set fake `last_alive_at' to trigger immediate will message.
|
||||
force_last_alive_at(ClientId, _Time = 0),
|
||||
{ok, {ok, _}} =
|
||||
?wait_async_action(
|
||||
emqx_persistent_session_ds_gc_worker:check_session(ClientId),
|
||||
#{?snk_kind := session_gc_published_will_msg}
|
||||
),
|
||||
?assertReceive({deliver, WillTopic, _}),
|
||||
|
||||
ok
|
||||
end,
|
||||
[]
|
||||
),
|
||||
ok.
|
||||
|
|
|
@ -12,6 +12,7 @@
|
|||
{emqx_broker,1}.
|
||||
{emqx_cm,1}.
|
||||
{emqx_cm,2}.
|
||||
{emqx_cm,3}.
|
||||
{emqx_conf,1}.
|
||||
{emqx_conf,2}.
|
||||
{emqx_conf,3}.
|
||||
|
@ -26,6 +27,7 @@
|
|||
{emqx_ds,4}.
|
||||
{emqx_eviction_agent,1}.
|
||||
{emqx_eviction_agent,2}.
|
||||
{emqx_eviction_agent,3}.
|
||||
{emqx_exhook,1}.
|
||||
{emqx_ft_storage_exporter_fs,1}.
|
||||
{emqx_ft_storage_fs,1}.
|
||||
|
|
|
@ -64,6 +64,12 @@
|
|||
maybe_nack/1
|
||||
]).
|
||||
|
||||
%% Export for DS session GC worker and session implementations
|
||||
-export([
|
||||
will_delay_interval/1,
|
||||
prepare_will_message_for_publishing/2
|
||||
]).
|
||||
|
||||
%% Exports for CT
|
||||
-export([set_field/3]).
|
||||
|
||||
|
@ -584,11 +590,12 @@ process_connect(
|
|||
AckProps,
|
||||
Channel = #channel{
|
||||
conninfo = ConnInfo,
|
||||
clientinfo = ClientInfo
|
||||
clientinfo = ClientInfo,
|
||||
will_msg = MaybeWillMsg
|
||||
}
|
||||
) ->
|
||||
#{clean_start := CleanStart} = ConnInfo,
|
||||
case emqx_cm:open_session(CleanStart, ClientInfo, ConnInfo) of
|
||||
case emqx_cm:open_session(CleanStart, ClientInfo, ConnInfo, MaybeWillMsg) of
|
||||
{ok, #{session := Session, present := false}} ->
|
||||
NChannel = Channel#channel{session = Session},
|
||||
handle_out(connack, {?RC_SUCCESS, sp(false), AckProps}, ensure_connected(NChannel));
|
||||
|
@ -884,9 +891,10 @@ do_unsubscribe(
|
|||
%%--------------------------------------------------------------------
|
||||
|
||||
%% MQTT-v5.0: 3.14.4 DISCONNECT Actions
|
||||
maybe_clean_will_msg(?RC_SUCCESS, Channel) ->
|
||||
maybe_clean_will_msg(?RC_SUCCESS, Channel = #channel{session = Session0}) ->
|
||||
%% [MQTT-3.14.4-3]
|
||||
Channel#channel{will_msg = undefined};
|
||||
Session = emqx_session:clear_will_message(Session0),
|
||||
Channel#channel{will_msg = undefined, session = Session};
|
||||
maybe_clean_will_msg(_ReasonCode, Channel) ->
|
||||
Channel.
|
||||
|
||||
|
@ -1019,7 +1027,6 @@ handle_out(connack, {?RC_SUCCESS, SP, Props}, Channel = #channel{conninfo = Conn
|
|||
[ConnInfo, emqx_reason_codes:name(?RC_SUCCESS)],
|
||||
AckProps
|
||||
),
|
||||
|
||||
return_connack(
|
||||
?CONNACK_PACKET(?RC_SUCCESS, SP, NAckProps),
|
||||
ensure_keepalive(NAckProps, Channel)
|
||||
|
@ -1204,6 +1211,9 @@ handle_call(
|
|||
),
|
||||
Channel0 = maybe_publish_will_msg(takenover, Channel),
|
||||
disconnect_and_shutdown(takenover, AllPendings, Channel0);
|
||||
handle_call(takeover_kick, Channel) ->
|
||||
Channel0 = maybe_publish_will_msg(takenover, Channel),
|
||||
disconnect_and_shutdown(takenover, ok, Channel0);
|
||||
handle_call(list_authz_cache, Channel) ->
|
||||
{reply, emqx_authz_cache:list_authz_cache(), Channel};
|
||||
handle_call(
|
||||
|
@ -1378,9 +1388,9 @@ handle_timeout(_TRef, expire_session, Channel = #channel{session = Session}) ->
|
|||
handle_timeout(
|
||||
_TRef,
|
||||
will_message = TimerName,
|
||||
Channel = #channel{clientinfo = ClientInfo, will_msg = WillMsg}
|
||||
Channel = #channel{will_msg = WillMsg}
|
||||
) ->
|
||||
(WillMsg =/= undefined) andalso publish_will_msg(ClientInfo, WillMsg),
|
||||
(WillMsg =/= undefined) andalso publish_will_msg(Channel),
|
||||
{ok, clean_timer(TimerName, Channel#channel{will_msg = undefined})};
|
||||
handle_timeout(
|
||||
_TRef,
|
||||
|
@ -2301,20 +2311,18 @@ maybe_publish_will_msg(
|
|||
Channel;
|
||||
maybe_publish_will_msg(
|
||||
_Reason,
|
||||
Channel = #channel{
|
||||
conninfo = #{proto_ver := ?MQTT_PROTO_V3, clientid := ClientId}, will_msg = WillMsg
|
||||
Channel0 = #channel{
|
||||
conninfo = #{proto_ver := ?MQTT_PROTO_V3, clientid := ClientId}
|
||||
}
|
||||
) ->
|
||||
%% Unconditionally publish will message for MQTT 3.1.1
|
||||
?tp(debug, maybe_publish_willmsg_v3, #{clientid => ClientId}),
|
||||
_ = publish_will_msg(Channel#channel.clientinfo, WillMsg),
|
||||
Channel#channel{will_msg = undefined};
|
||||
Channel = publish_will_msg(Channel0),
|
||||
remove_willmsg(Channel);
|
||||
maybe_publish_will_msg(
|
||||
Reason,
|
||||
Channel = #channel{
|
||||
clientinfo = ClientInfo,
|
||||
conninfo = #{clientid := ClientId},
|
||||
will_msg = WillMsg
|
||||
Channel0 = #channel{
|
||||
conninfo = #{clientid := ClientId}
|
||||
}
|
||||
) when
|
||||
Reason =:= expired orelse
|
||||
|
@ -2331,13 +2339,20 @@ maybe_publish_will_msg(
|
|||
%% d. internal_error (maybe not recoverable)
|
||||
%% This ensures willmsg will be published if the willmsg timer is scheduled but not fired
|
||||
%% OR fired but not yet handled
|
||||
%% NOTE! For durable sessions, `?chan_terminating' does NOT imply that the session is
|
||||
%% gone.
|
||||
case is_durable_session(Channel0) andalso Reason =:= ?chan_terminating of
|
||||
false ->
|
||||
?tp(debug, maybe_publish_willmsg_session_ends, #{clientid => ClientId, reason => Reason}),
|
||||
_ = publish_will_msg(ClientInfo, WillMsg),
|
||||
|
||||
Channel = publish_will_msg(Channel0),
|
||||
remove_willmsg(Channel);
|
||||
true ->
|
||||
Channel0
|
||||
end;
|
||||
maybe_publish_will_msg(
|
||||
takenover,
|
||||
Channel = #channel{
|
||||
clientinfo = ClientInfo,
|
||||
Channel0 = #channel{
|
||||
will_msg = WillMsg,
|
||||
conninfo = #{clientid := ClientId}
|
||||
}
|
||||
|
@ -2355,7 +2370,8 @@ maybe_publish_will_msg(
|
|||
case will_delay_interval(WillMsg) of
|
||||
0 ->
|
||||
?tp(debug, maybe_publish_willmsg_takenover_pub, #{clientid => ClientId}),
|
||||
_ = publish_will_msg(ClientInfo, WillMsg);
|
||||
Channel = publish_will_msg(Channel0),
|
||||
ok;
|
||||
I when I > 0 ->
|
||||
%% @NOTE Non-normative comment in MQTT 5.0 spec
|
||||
%% """
|
||||
|
@ -2364,13 +2380,13 @@ maybe_publish_will_msg(
|
|||
%% before the Will Message is published.
|
||||
%% """
|
||||
?tp(debug, maybe_publish_willmsg_takenover_skip, #{clientid => ClientId}),
|
||||
Channel = Channel0,
|
||||
skip
|
||||
end,
|
||||
remove_willmsg(Channel);
|
||||
maybe_publish_will_msg(
|
||||
Reason,
|
||||
Channel = #channel{
|
||||
clientinfo = ClientInfo,
|
||||
Channel0 = #channel{
|
||||
will_msg = WillMsg,
|
||||
conninfo = #{clientid := ClientId}
|
||||
}
|
||||
|
@ -2381,11 +2397,11 @@ maybe_publish_will_msg(
|
|||
?tp(debug, maybe_publish_will_msg_other_publish, #{
|
||||
clientid => ClientId, reason => Reason
|
||||
}),
|
||||
_ = publish_will_msg(ClientInfo, WillMsg),
|
||||
Channel = publish_will_msg(Channel0),
|
||||
remove_willmsg(Channel);
|
||||
I when I > 0 ->
|
||||
?tp(debug, maybe_publish_will_msg_other_delay, #{clientid => ClientId, reason => Reason}),
|
||||
ensure_timer(will_message, timer:seconds(I), Channel)
|
||||
ensure_timer(will_message, timer:seconds(I), Channel0)
|
||||
end.
|
||||
|
||||
will_delay_interval(WillMsg) ->
|
||||
|
@ -2396,14 +2412,17 @@ will_delay_interval(WillMsg) ->
|
|||
).
|
||||
|
||||
publish_will_msg(
|
||||
ClientInfo = #{mountpoint := MountPoint},
|
||||
Msg = #message{topic = Topic}
|
||||
#channel{
|
||||
session = Session,
|
||||
clientinfo = ClientInfo,
|
||||
will_msg = Msg = #message{topic = Topic}
|
||||
} = Channel
|
||||
) ->
|
||||
Action = authz_action(Msg),
|
||||
PublishingDisallowed = emqx_access_control:authorize(ClientInfo, Action, Topic) =/= allow,
|
||||
ClientBanned = emqx_banned:check(ClientInfo),
|
||||
case PublishingDisallowed orelse ClientBanned of
|
||||
true ->
|
||||
case prepare_will_message_for_publishing(ClientInfo, Msg) of
|
||||
{ok, PreparedMessage} ->
|
||||
NSession = emqx_session:publish_will_message_now(Session, PreparedMessage),
|
||||
Channel#channel{session = NSession};
|
||||
{error, #{client_banned := ClientBanned, publishing_disallowed := PublishingDisallowed}} ->
|
||||
?tp(
|
||||
warning,
|
||||
last_will_testament_publish_denied,
|
||||
|
@ -2413,12 +2432,23 @@ publish_will_msg(
|
|||
publishing_disallowed => PublishingDisallowed
|
||||
}
|
||||
),
|
||||
ok;
|
||||
Channel
|
||||
end.
|
||||
|
||||
prepare_will_message_for_publishing(
|
||||
ClientInfo = #{mountpoint := MountPoint},
|
||||
Msg = #message{topic = Topic}
|
||||
) ->
|
||||
Action = authz_action(Msg),
|
||||
PublishingDisallowed = emqx_access_control:authorize(ClientInfo, Action, Topic) =/= allow,
|
||||
ClientBanned = emqx_banned:check(ClientInfo),
|
||||
case PublishingDisallowed orelse ClientBanned of
|
||||
true ->
|
||||
{error, #{client_banned => ClientBanned, publishing_disallowed => PublishingDisallowed}};
|
||||
false ->
|
||||
NMsg = emqx_mountpoint:mount(MountPoint, Msg),
|
||||
NMsg2 = NMsg#message{timestamp = erlang:system_time(millisecond)},
|
||||
_ = emqx_broker:publish(NMsg2),
|
||||
ok
|
||||
PreparedMessage = NMsg#message{timestamp = emqx_message:timestamp_now()},
|
||||
{ok, PreparedMessage}
|
||||
end.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
|
@ -2530,6 +2560,15 @@ remove_willmsg(Channel = #channel{timers = Timers}) ->
|
|||
timers = maps:remove(will_message, Timers)
|
||||
}
|
||||
end.
|
||||
|
||||
is_durable_session(#channel{session = Session}) ->
|
||||
case emqx_session:info(impl, Session) of
|
||||
emqx_persistent_session_ds ->
|
||||
true;
|
||||
_ ->
|
||||
false
|
||||
end.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% For CT tests
|
||||
%%--------------------------------------------------------------------
|
||||
|
|
|
@ -47,13 +47,14 @@
|
|||
]).
|
||||
|
||||
-export([
|
||||
open_session/3,
|
||||
open_session/4,
|
||||
discard_session/1,
|
||||
discard_session/2,
|
||||
takeover_session_begin/1,
|
||||
takeover_session_end/1,
|
||||
kick_session/1,
|
||||
kick_session/2
|
||||
kick_session/2,
|
||||
takeover_kick/1
|
||||
]).
|
||||
|
||||
-export([
|
||||
|
@ -100,6 +101,7 @@
|
|||
takeover_session/2,
|
||||
takeover_finish/2,
|
||||
do_kick_session/3,
|
||||
do_takeover_kick_session_v3/2,
|
||||
do_get_chan_info/2,
|
||||
do_get_chan_stats/2,
|
||||
do_get_chann_conn_mod/2
|
||||
|
@ -110,6 +112,8 @@
|
|||
chan_pid/0
|
||||
]).
|
||||
|
||||
-type message() :: emqx_types:message().
|
||||
|
||||
-type chan_pid() :: pid().
|
||||
|
||||
-type channel_info() :: {
|
||||
|
@ -120,6 +124,8 @@
|
|||
|
||||
-type takeover_state() :: {_ConnMod :: module(), _ChanPid :: pid()}.
|
||||
|
||||
-define(BPAPI_NAME, emqx_cm).
|
||||
|
||||
-define(CHAN_STATS, [
|
||||
{?CHAN_TAB, 'channels.count', 'channels.max'},
|
||||
{?CHAN_TAB, 'sessions.count', 'sessions.max'},
|
||||
|
@ -266,24 +272,29 @@ set_chan_stats(ClientId, ChanPid, Stats) when ?IS_CLIENTID(ClientId) ->
|
|||
end.
|
||||
|
||||
%% @doc Open a session.
|
||||
-spec open_session(_CleanStart :: boolean(), emqx_types:clientinfo(), emqx_types:conninfo()) ->
|
||||
-spec open_session(
|
||||
_CleanStart :: boolean(),
|
||||
emqx_types:clientinfo(),
|
||||
emqx_types:conninfo(),
|
||||
emqx_maybe:t(message())
|
||||
) ->
|
||||
{ok, #{
|
||||
session := emqx_session:t(),
|
||||
present := boolean(),
|
||||
replay => _ReplayContext
|
||||
}}
|
||||
| {error, Reason :: term()}.
|
||||
open_session(_CleanStart = true, ClientInfo = #{clientid := ClientId}, ConnInfo) ->
|
||||
open_session(_CleanStart = true, ClientInfo = #{clientid := ClientId}, ConnInfo, MaybeWillMsg) ->
|
||||
Self = self(),
|
||||
emqx_cm_locker:trans(ClientId, fun(_) ->
|
||||
ok = discard_session(ClientId),
|
||||
ok = emqx_session:destroy(ClientInfo, ConnInfo),
|
||||
create_register_session(ClientInfo, ConnInfo, Self)
|
||||
create_register_session(ClientInfo, ConnInfo, MaybeWillMsg, Self)
|
||||
end);
|
||||
open_session(_CleanStart = false, ClientInfo = #{clientid := ClientId}, ConnInfo) ->
|
||||
open_session(_CleanStart = false, ClientInfo = #{clientid := ClientId}, ConnInfo, MaybeWillMsg) ->
|
||||
Self = self(),
|
||||
emqx_cm_locker:trans(ClientId, fun(_) ->
|
||||
case emqx_session:open(ClientInfo, ConnInfo) of
|
||||
case emqx_session:open(ClientInfo, ConnInfo, MaybeWillMsg) of
|
||||
{true, Session, ReplayContext} ->
|
||||
ok = register_channel(ClientId, Self, ConnInfo),
|
||||
{ok, #{session => Session, present => true, replay => ReplayContext}};
|
||||
|
@ -293,8 +304,8 @@ open_session(_CleanStart = false, ClientInfo = #{clientid := ClientId}, ConnInfo
|
|||
end
|
||||
end).
|
||||
|
||||
create_register_session(ClientInfo = #{clientid := ClientId}, ConnInfo, ChanPid) ->
|
||||
Session = emqx_session:create(ClientInfo, ConnInfo),
|
||||
create_register_session(ClientInfo = #{clientid := ClientId}, ConnInfo, MaybeWillMsg, ChanPid) ->
|
||||
Session = emqx_session:create(ClientInfo, ConnInfo, MaybeWillMsg),
|
||||
ok = register_channel(ClientId, ChanPid, ConnInfo),
|
||||
{ok, #{session => Session, present => false}}.
|
||||
|
||||
|
@ -345,6 +356,38 @@ pick_channel(ClientId) ->
|
|||
ChanPid
|
||||
end.
|
||||
|
||||
%% Used by `emqx_persistent_session_ds'
|
||||
-spec takeover_kick(emqx_types:clientid()) -> ok.
|
||||
takeover_kick(ClientId) ->
|
||||
case lookup_channels(ClientId) of
|
||||
[] ->
|
||||
ok;
|
||||
ChanPids ->
|
||||
lists:foreach(
|
||||
fun(Pid) ->
|
||||
do_takeover_session(ClientId, Pid)
|
||||
end,
|
||||
ChanPids
|
||||
)
|
||||
end.
|
||||
|
||||
%% Used by `emqx_persistent_session_ds'.
|
||||
%% We stop any running channels with reason `takenover' so that correct reason codes and
|
||||
%% will message processing may take place. For older BPAPI nodes, we don't have much
|
||||
%% choice other than calling the old `discard_session' code.
|
||||
do_takeover_session(ClientId, Pid) ->
|
||||
Node = node(Pid),
|
||||
case emqx_bpapi:supported_version(Node, ?BPAPI_NAME) of
|
||||
undefined ->
|
||||
%% Race: node (re)starting? Assume v2.
|
||||
discard_session(ClientId, Pid);
|
||||
Vsn when Vsn =< 2 ->
|
||||
discard_session(ClientId, Pid);
|
||||
_Vsn ->
|
||||
takeover_kick_session(ClientId, Pid)
|
||||
end.
|
||||
|
||||
%% Used only by `emqx_session_mem'
|
||||
takeover_finish(ConnMod, ChanPid) ->
|
||||
request_stepdown(
|
||||
{takeover, 'end'},
|
||||
|
@ -353,6 +396,7 @@ takeover_finish(ConnMod, ChanPid) ->
|
|||
).
|
||||
|
||||
%% @doc RPC Target @ emqx_cm_proto_v2:takeover_session/2
|
||||
%% Used only by `emqx_session_mem'
|
||||
takeover_session(ClientId, Pid) ->
|
||||
try
|
||||
do_takeover_begin(ClientId, Pid)
|
||||
|
@ -408,7 +452,7 @@ discard_session(ClientId) when is_binary(ClientId) ->
|
|||
| {ok, emqx_session:t() | _ReplayContext}
|
||||
| {error, term()}
|
||||
when
|
||||
Action :: kick | discard | {takeover, 'begin'} | {takeover, 'end'}.
|
||||
Action :: kick | discard | {takeover, 'begin'} | {takeover, 'end'} | takeover_kick.
|
||||
request_stepdown(Action, ConnMod, Pid) ->
|
||||
Timeout =
|
||||
case Action == kick orelse Action == discard of
|
||||
|
@ -489,7 +533,19 @@ do_kick_session(Action, ClientId, ChanPid) when node(ChanPid) =:= node() ->
|
|||
ok = request_stepdown(Action, ConnMod, ChanPid)
|
||||
end.
|
||||
|
||||
%% @private This function is shared for session 'kick' and 'discard' (as the first arg Action).
|
||||
%% @doc RPC Target for emqx_cm_proto_v3:takeover_kick_session/3
|
||||
-spec do_takeover_kick_session_v3(emqx_types:clientid(), chan_pid()) -> ok.
|
||||
do_takeover_kick_session_v3(ClientId, ChanPid) when node(ChanPid) =:= node() ->
|
||||
case do_get_chann_conn_mod(ClientId, ChanPid) of
|
||||
undefined ->
|
||||
%% already deregistered
|
||||
ok;
|
||||
ConnMod when is_atom(ConnMod) ->
|
||||
ok = request_stepdown(takeover_kick, ConnMod, ChanPid)
|
||||
end.
|
||||
|
||||
%% @private This function is shared for session `kick' and `discard' (as the first arg
|
||||
%% Action).
|
||||
kick_session(Action, ClientId, ChanPid) ->
|
||||
try
|
||||
wrap_rpc(emqx_cm_proto_v2:kick_session(Action, ClientId, ChanPid))
|
||||
|
@ -512,6 +568,28 @@ kick_session(Action, ClientId, ChanPid) ->
|
|||
)
|
||||
end.
|
||||
|
||||
takeover_kick_session(ClientId, ChanPid) ->
|
||||
try
|
||||
wrap_rpc(emqx_cm_proto_v3:takeover_kick_session(ClientId, ChanPid))
|
||||
catch
|
||||
Error:Reason ->
|
||||
%% This should mostly be RPC failures.
|
||||
%% However, if the node is still running the old version
|
||||
%% code (prior to emqx app 4.3.10) some of the RPC handler
|
||||
%% exceptions may get propagated to a new version node
|
||||
?SLOG(
|
||||
error,
|
||||
#{
|
||||
msg => "failed_to_kick_session_on_remote_node",
|
||||
node => node(ChanPid),
|
||||
action => takeover,
|
||||
error => Error,
|
||||
reason => Reason
|
||||
},
|
||||
#{clientid => ClientId}
|
||||
)
|
||||
end.
|
||||
|
||||
kick_session(ClientId) ->
|
||||
case lookup_channels(ClientId) of
|
||||
[] ->
|
||||
|
|
|
@ -34,8 +34,8 @@
|
|||
|
||||
%% Session API
|
||||
-export([
|
||||
create/3,
|
||||
open/3,
|
||||
create/4,
|
||||
open/4,
|
||||
destroy/1
|
||||
]).
|
||||
|
||||
|
@ -66,6 +66,12 @@
|
|||
terminate/2
|
||||
]).
|
||||
|
||||
%% Will message handling
|
||||
-export([
|
||||
clear_will_message/1,
|
||||
publish_will_message_now/2
|
||||
]).
|
||||
|
||||
%% Managment APIs:
|
||||
-export([
|
||||
list_client_subscriptions/1
|
||||
|
@ -88,7 +94,7 @@
|
|||
|
||||
-ifdef(TEST).
|
||||
-export([
|
||||
session_open/2,
|
||||
session_open/4,
|
||||
list_all_sessions/0
|
||||
]).
|
||||
-endif.
|
||||
|
@ -155,6 +161,7 @@
|
|||
|
||||
-type stream_state() :: #srs{}.
|
||||
|
||||
-type message() :: emqx_types:message().
|
||||
-type timestamp() :: emqx_utils_calendar:epoch_millisecond().
|
||||
-type millisecond() :: non_neg_integer().
|
||||
-type clientinfo() :: emqx_types:clientinfo().
|
||||
|
@ -181,22 +188,22 @@
|
|||
|
||||
%%
|
||||
|
||||
-spec create(clientinfo(), conninfo(), emqx_session:conf()) ->
|
||||
-spec create(clientinfo(), conninfo(), emqx_maybe:t(message()), emqx_session:conf()) ->
|
||||
session().
|
||||
create(#{clientid := ClientID}, ConnInfo, Conf) ->
|
||||
ensure_timers(session_ensure_new(ClientID, ConnInfo, Conf)).
|
||||
create(#{clientid := ClientID} = ClientInfo, ConnInfo, MaybeWillMsg, Conf) ->
|
||||
ensure_timers(session_ensure_new(ClientID, ClientInfo, ConnInfo, MaybeWillMsg, Conf)).
|
||||
|
||||
-spec open(clientinfo(), conninfo(), emqx_session:conf()) ->
|
||||
-spec open(clientinfo(), conninfo(), emqx_maybe:t(message()), emqx_session:conf()) ->
|
||||
{_IsPresent :: true, session(), []} | false.
|
||||
open(#{clientid := ClientID} = _ClientInfo, ConnInfo, Conf) ->
|
||||
open(#{clientid := ClientID} = ClientInfo, ConnInfo, MaybeWillMsg, Conf) ->
|
||||
%% NOTE
|
||||
%% The fact that we need to concern about discarding all live channels here
|
||||
%% is essentially a consequence of the in-memory session design, where we
|
||||
%% have disconnected channels holding onto session state. Ideally, we should
|
||||
%% somehow isolate those idling not-yet-expired sessions into a separate process
|
||||
%% space, and move this call back into `emqx_cm` where it belongs.
|
||||
ok = emqx_cm:discard_session(ClientID),
|
||||
case session_open(ClientID, ConnInfo) of
|
||||
ok = emqx_cm:takeover_kick(ClientID),
|
||||
case session_open(ClientID, ClientInfo, ConnInfo, MaybeWillMsg) of
|
||||
Session0 = #{} ->
|
||||
Session = Session0#{props => Conf},
|
||||
{true, ensure_timers(Session), []};
|
||||
|
@ -607,7 +614,8 @@ disconnect(Session = #{s := S0}, ConnInfo) ->
|
|||
{shutdown, Session#{s => S}}.
|
||||
|
||||
-spec terminate(Reason :: term(), session()) -> ok.
|
||||
terminate(_Reason, _Session = #{id := Id, s := S}) ->
|
||||
terminate(_Reason, Session = #{id := Id, s := S}) ->
|
||||
maybe_set_will_message_timer(Session),
|
||||
_ = emqx_persistent_session_ds_state:commit(S),
|
||||
?tp(debug, persistent_session_ds_terminate, #{id => Id}),
|
||||
ok.
|
||||
|
@ -679,9 +687,9 @@ sync(ClientId) ->
|
|||
%%
|
||||
%% Note: session API doesn't handle session takeovers, it's the job of
|
||||
%% the broker.
|
||||
-spec session_open(id(), emqx_types:conninfo()) ->
|
||||
-spec session_open(id(), emqx_types:clientinfo(), emqx_types:conninfo(), emqx_maybe:t(message())) ->
|
||||
session() | false.
|
||||
session_open(SessionId, NewConnInfo) ->
|
||||
session_open(SessionId, ClientInfo, NewConnInfo, MaybeWillMsg) ->
|
||||
NowMS = now_ms(),
|
||||
case emqx_persistent_session_ds_state:open(SessionId) of
|
||||
{ok, S0} ->
|
||||
|
@ -699,7 +707,9 @@ session_open(SessionId, NewConnInfo) ->
|
|||
S3 = emqx_persistent_session_ds_state:set_peername(
|
||||
maps:get(peername, NewConnInfo), S2
|
||||
),
|
||||
S = emqx_persistent_session_ds_state:commit(S3),
|
||||
S4 = emqx_persistent_session_ds_state:set_will_message(MaybeWillMsg, S3),
|
||||
S5 = emqx_persistent_session_ds_state:set_clientinfo(ClientInfo, S4),
|
||||
S = emqx_persistent_session_ds_state:commit(S5),
|
||||
Inflight = emqx_persistent_session_ds_inflight:new(
|
||||
receive_maximum(NewConnInfo)
|
||||
),
|
||||
|
@ -714,9 +724,15 @@ session_open(SessionId, NewConnInfo) ->
|
|||
false
|
||||
end.
|
||||
|
||||
-spec session_ensure_new(id(), emqx_types:conninfo(), emqx_session:conf()) ->
|
||||
-spec session_ensure_new(
|
||||
id(),
|
||||
emqx_types:clientinfo(),
|
||||
emqx_types:conninfo(),
|
||||
emqx_maybe:t(message()),
|
||||
emqx_session:conf()
|
||||
) ->
|
||||
session().
|
||||
session_ensure_new(Id, ConnInfo, Conf) ->
|
||||
session_ensure_new(Id, ClientInfo, ConnInfo, MaybeWillMsg, Conf) ->
|
||||
?tp(debug, persistent_session_ds_ensure_new, #{id => Id}),
|
||||
Now = now_ms(),
|
||||
S0 = emqx_persistent_session_ds_state:create_new(Id),
|
||||
|
@ -738,7 +754,9 @@ session_ensure_new(Id, ConnInfo, Conf) ->
|
|||
?committed(?QOS_2)
|
||||
]
|
||||
),
|
||||
S = emqx_persistent_session_ds_state:commit(S4),
|
||||
S5 = emqx_persistent_session_ds_state:set_will_message(MaybeWillMsg, S4),
|
||||
S6 = emqx_persistent_session_ds_state:set_clientinfo(ClientInfo, S5),
|
||||
S = emqx_persistent_session_ds_state:commit(S6),
|
||||
#{
|
||||
id => Id,
|
||||
props => Conf,
|
||||
|
@ -1191,6 +1209,34 @@ seqno_diff(?QOS_1, A, B) ->
|
|||
seqno_diff(?QOS_2, A, B) ->
|
||||
A - B.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Will message handling
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-spec clear_will_message(session()) -> session().
|
||||
clear_will_message(#{s := S0} = Session) ->
|
||||
S = emqx_persistent_session_ds_state:clear_will_message(S0),
|
||||
Session#{s := S}.
|
||||
|
||||
-spec publish_will_message_now(session(), message()) -> session().
|
||||
publish_will_message_now(#{} = Session, WillMsg = #message{}) ->
|
||||
_ = emqx_broker:publish(WillMsg),
|
||||
clear_will_message(Session).
|
||||
|
||||
maybe_set_will_message_timer(#{id := SessionId, s := S}) ->
|
||||
case emqx_persistent_session_ds_state:get_will_message(S) of
|
||||
#message{} = WillMsg ->
|
||||
WillDelayInterval = emqx_channel:will_delay_interval(WillMsg),
|
||||
WillDelayInterval > 0 andalso
|
||||
emqx_persistent_session_ds_gc_worker:check_session_after(
|
||||
SessionId,
|
||||
timer:seconds(WillDelayInterval)
|
||||
),
|
||||
ok;
|
||||
_ ->
|
||||
ok
|
||||
end.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Tests
|
||||
%%--------------------------------------------------------------------
|
||||
|
|
|
@ -75,5 +75,7 @@
|
|||
%% Unique integer used to create unique identities
|
||||
-define(last_id, last_id).
|
||||
-define(peername, peername).
|
||||
-define(will_message, will_message).
|
||||
-define(clientinfo, clientinfo).
|
||||
|
||||
-endif.
|
||||
|
|
|
@ -25,7 +25,9 @@
|
|||
|
||||
%% API
|
||||
-export([
|
||||
start_link/0
|
||||
start_link/0,
|
||||
check_session/1,
|
||||
check_session_after/2
|
||||
]).
|
||||
|
||||
%% `gen_server' API
|
||||
|
@ -38,6 +40,7 @@
|
|||
|
||||
%% call/cast/info records
|
||||
-record(gc, {}).
|
||||
-record(check_session, {id :: emqx_persistent_session_ds:id()}).
|
||||
|
||||
%%--------------------------------------------------------------------------------
|
||||
%% API
|
||||
|
@ -46,6 +49,17 @@
|
|||
start_link() ->
|
||||
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
|
||||
|
||||
-spec check_session(emqx_persistent_session_ds:id()) -> ok.
|
||||
check_session(SessionId) ->
|
||||
gen_server:cast(?MODULE, #check_session{id = SessionId}).
|
||||
|
||||
-spec check_session_after(emqx_persistent_session_ds:id(), pos_integer()) -> ok.
|
||||
check_session_after(SessionId, Time0) ->
|
||||
#{bump_interval := BumpInterval} = gc_context(),
|
||||
Time = max(Time0, BumpInterval),
|
||||
_ = erlang:send_after(Time, ?MODULE, #check_session{id = SessionId}),
|
||||
ok.
|
||||
|
||||
%%--------------------------------------------------------------------------------
|
||||
%% `gen_server' API
|
||||
%%--------------------------------------------------------------------------------
|
||||
|
@ -58,6 +72,9 @@ init(_Opts) ->
|
|||
handle_call(_Call, _From, State) ->
|
||||
{reply, error, State}.
|
||||
|
||||
handle_cast(#check_session{id = SessionId}, State) ->
|
||||
do_check_session(SessionId),
|
||||
{noreply, State};
|
||||
handle_cast(_Cast, State) ->
|
||||
{noreply, State}.
|
||||
|
||||
|
@ -65,6 +82,9 @@ handle_info(#gc{}, State) ->
|
|||
try_gc(),
|
||||
ensure_gc_timer(),
|
||||
{noreply, State};
|
||||
handle_info(#check_session{id = SessionId}, State) ->
|
||||
do_check_session(SessionId),
|
||||
{noreply, State};
|
||||
handle_info(_Info, State) ->
|
||||
{noreply, State}.
|
||||
|
||||
|
@ -104,25 +124,65 @@ now_ms() ->
|
|||
erlang:system_time(millisecond).
|
||||
|
||||
start_gc() ->
|
||||
#{
|
||||
min_last_alive := MinLastAlive,
|
||||
min_last_alive_will_msg := MinLastAliveWillMsg
|
||||
} = gc_context(),
|
||||
gc_loop(
|
||||
MinLastAlive, MinLastAliveWillMsg, emqx_persistent_session_ds_state:make_session_iterator()
|
||||
).
|
||||
|
||||
gc_context() ->
|
||||
GCInterval = emqx_config:get([session_persistence, session_gc_interval]),
|
||||
BumpInterval = emqx_config:get([session_persistence, last_alive_update_interval]),
|
||||
TimeThreshold = max(GCInterval, BumpInterval) * 3,
|
||||
MinLastAlive = now_ms() - TimeThreshold,
|
||||
gc_loop(MinLastAlive, emqx_persistent_session_ds_state:make_session_iterator()).
|
||||
NowMS = now_ms(),
|
||||
#{
|
||||
min_last_alive => NowMS - TimeThreshold,
|
||||
%% For will messages, we don't need to be so strict as session GC (GC interval is
|
||||
%% of the order of ~ 10 minutes by default, bump interval ~ 100 ms), otherwise
|
||||
%% most will be sent very late.
|
||||
min_last_alive_will_msg => NowMS - BumpInterval * 5,
|
||||
time_threshold => TimeThreshold,
|
||||
bump_interval => BumpInterval,
|
||||
gc_interval => GCInterval
|
||||
}.
|
||||
|
||||
gc_loop(MinLastAlive, It0) ->
|
||||
gc_loop(MinLastAlive, MinLastAliveWillMsg, It0) ->
|
||||
GCBatchSize = emqx_config:get([session_persistence, session_gc_batch_size]),
|
||||
case emqx_persistent_session_ds_state:session_iterator_next(It0, GCBatchSize) of
|
||||
{[], _It} ->
|
||||
ok;
|
||||
{Sessions, It} ->
|
||||
[do_gc(SessionId, MinLastAlive, Metadata) || {SessionId, Metadata} <- Sessions],
|
||||
gc_loop(MinLastAlive, It)
|
||||
[
|
||||
do_gc(SessionId, MinLastAliveWillMsg, MinLastAlive, Metadata)
|
||||
|| {SessionId, Metadata} <- Sessions
|
||||
],
|
||||
gc_loop(MinLastAlive, MinLastAliveWillMsg, It)
|
||||
end.
|
||||
|
||||
do_gc(SessionId, MinLastAlive, Metadata) ->
|
||||
#{?last_alive_at := LastAliveAt, ?expiry_interval := EI} = Metadata,
|
||||
case LastAliveAt + EI < MinLastAlive of
|
||||
do_gc(SessionId, MinLastAliveWillMsg, MinLastAlive, Metadata) ->
|
||||
#{
|
||||
?last_alive_at := LastAliveAt,
|
||||
?expiry_interval := EI,
|
||||
?will_message := MaybeWillMessage,
|
||||
?clientinfo := ClientInfo
|
||||
} = Metadata,
|
||||
IsExpired = LastAliveAt + EI < MinLastAlive,
|
||||
case
|
||||
should_send_will_message(
|
||||
MaybeWillMessage, ClientInfo, IsExpired, LastAliveAt, MinLastAliveWillMsg
|
||||
)
|
||||
of
|
||||
{true, PreparedMessage} ->
|
||||
_ = emqx_broker:publish(PreparedMessage),
|
||||
ok = emqx_persistent_session_ds_state:clear_will_message_now(SessionId),
|
||||
?tp(session_gc_published_will_msg, #{id => SessionId, msg => PreparedMessage}),
|
||||
ok;
|
||||
false ->
|
||||
ok
|
||||
end,
|
||||
case IsExpired of
|
||||
true ->
|
||||
emqx_persistent_session_ds:destroy_session(SessionId),
|
||||
?tp(debug, ds_session_gc_cleaned, #{
|
||||
|
@ -134,3 +194,35 @@ do_gc(SessionId, MinLastAlive, Metadata) ->
|
|||
false ->
|
||||
ok
|
||||
end.
|
||||
|
||||
should_send_will_message(
|
||||
undefined = _WillMsg, _ClientInfo, _IsExpired, _LastAliveAt, _MinLastAliveWillMsg
|
||||
) ->
|
||||
false;
|
||||
should_send_will_message(WillMsg, ClientInfo, IsExpired, LastAliveAt, MinLastAliveWillMsg) ->
|
||||
WillDelayIntervalS = emqx_channel:will_delay_interval(WillMsg),
|
||||
WillDelayInterval = timer:seconds(WillDelayIntervalS),
|
||||
PastWillDelay = LastAliveAt + WillDelayInterval < MinLastAliveWillMsg,
|
||||
case PastWillDelay orelse IsExpired of
|
||||
true ->
|
||||
case emqx_channel:prepare_will_message_for_publishing(ClientInfo, WillMsg) of
|
||||
{ok, PreparedMessage} ->
|
||||
{true, PreparedMessage};
|
||||
{error, _} ->
|
||||
false
|
||||
end;
|
||||
false ->
|
||||
false
|
||||
end.
|
||||
|
||||
do_check_session(SessionId) ->
|
||||
case emqx_persistent_session_ds_state:print_session(SessionId) of
|
||||
#{metadata := Metadata} ->
|
||||
#{
|
||||
min_last_alive := MinLastAlive,
|
||||
min_last_alive_will_msg := MinLastAliveWillMsg
|
||||
} = gc_context(),
|
||||
do_gc(SessionId, MinLastAliveWillMsg, MinLastAlive, Metadata);
|
||||
_ ->
|
||||
ok
|
||||
end.
|
||||
|
|
|
@ -30,6 +30,8 @@
|
|||
-export([get_created_at/1, set_created_at/2]).
|
||||
-export([get_last_alive_at/1, set_last_alive_at/2]).
|
||||
-export([get_expiry_interval/1, set_expiry_interval/2]).
|
||||
-export([get_clientinfo/1, set_clientinfo/2]).
|
||||
-export([get_will_message/1, set_will_message/2, clear_will_message/1, clear_will_message_now/1]).
|
||||
-export([get_peername/1, set_peername/2]).
|
||||
-export([new_id/1]).
|
||||
-export([get_stream/2, put_stream/3, del_stream/2, fold_streams/3]).
|
||||
|
@ -58,6 +60,8 @@
|
|||
%% Type declarations
|
||||
%%================================================================================
|
||||
|
||||
-type message() :: emqx_types:message().
|
||||
|
||||
-type subscriptions() :: emqx_topic_gbt:t(_SubId, emqx_persistent_session_ds:subscription()).
|
||||
|
||||
-opaque session_iterator() :: emqx_persistent_session_ds:id() | '$end_of_table'.
|
||||
|
@ -288,6 +292,39 @@ get_peername(Rec) ->
|
|||
set_peername(Val, Rec) ->
|
||||
set_meta(?peername, Val, Rec).
|
||||
|
||||
-spec get_clientinfo(t()) -> emqx_maybe:t(emqx_types:clientinfo()).
|
||||
get_clientinfo(Rec) ->
|
||||
get_meta(?clientinfo, Rec).
|
||||
|
||||
-spec set_clientinfo(emqx_types:clientinfo(), t()) -> t().
|
||||
set_clientinfo(Val, Rec) ->
|
||||
set_meta(?clientinfo, Val, Rec).
|
||||
|
||||
-spec get_will_message(t()) -> emqx_maybe:t(message()).
|
||||
get_will_message(Rec) ->
|
||||
get_meta(?will_message, Rec).
|
||||
|
||||
-spec set_will_message(emqx_maybe:t(message()), t()) -> t().
|
||||
set_will_message(Val, Rec) ->
|
||||
set_meta(?will_message, Val, Rec).
|
||||
|
||||
-spec clear_will_message_now(emqx_persistent_session_ds:id()) -> ok.
|
||||
clear_will_message_now(SessionId) when is_binary(SessionId) ->
|
||||
transaction(fun() ->
|
||||
case kv_restore(?session_tab, SessionId) of
|
||||
[Metadata0] ->
|
||||
Metadata = Metadata0#{?will_message => undefined},
|
||||
kv_persist(?session_tab, SessionId, Metadata),
|
||||
ok;
|
||||
[] ->
|
||||
ok
|
||||
end
|
||||
end).
|
||||
|
||||
-spec clear_will_message(t()) -> t().
|
||||
clear_will_message(Rec) ->
|
||||
set_will_message(undefined, Rec).
|
||||
|
||||
-spec new_id(t()) -> {emqx_persistent_session_ds:subscription_id(), t()}.
|
||||
new_id(Rec) ->
|
||||
LastId =
|
||||
|
|
|
@ -55,8 +55,8 @@
|
|||
-endif.
|
||||
|
||||
-export([
|
||||
create/2,
|
||||
open/2,
|
||||
create/3,
|
||||
open/3,
|
||||
destroy/1,
|
||||
destroy/2
|
||||
]).
|
||||
|
@ -88,6 +88,12 @@
|
|||
terminate/3
|
||||
]).
|
||||
|
||||
%% Will message handling
|
||||
-export([
|
||||
clear_will_message/1,
|
||||
publish_will_message_now/2
|
||||
]).
|
||||
|
||||
% Timers
|
||||
-export([
|
||||
ensure_timer/3,
|
||||
|
@ -175,57 +181,58 @@
|
|||
%% Behaviour
|
||||
%% -------------------------------------------------------------------
|
||||
|
||||
-callback create(clientinfo(), conninfo(), conf()) ->
|
||||
-callback create(clientinfo(), conninfo(), emqx_maybe:t(message()), conf()) ->
|
||||
t().
|
||||
-callback open(clientinfo(), conninfo(), conf()) ->
|
||||
-callback open(clientinfo(), conninfo(), emqx_maybe:t(message()), conf()) ->
|
||||
{_IsPresent :: true, t(), _ReplayContext} | false.
|
||||
-callback destroy(t() | clientinfo()) -> ok.
|
||||
-callback clear_will_message(t()) -> t().
|
||||
-callback publish_will_message_now(t(), message()) -> t().
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Create a Session
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-spec create(clientinfo(), conninfo()) -> t().
|
||||
create(ClientInfo, ConnInfo) ->
|
||||
-spec create(clientinfo(), conninfo(), emqx_maybe:t(message())) -> t().
|
||||
create(ClientInfo, ConnInfo, MaybeWillMsg) ->
|
||||
Conf = get_session_conf(ClientInfo),
|
||||
create(ClientInfo, ConnInfo, Conf).
|
||||
|
||||
create(ClientInfo, ConnInfo, Conf) ->
|
||||
% FIXME error conditions
|
||||
create(hd(choose_impl_candidates(ClientInfo, ConnInfo)), ClientInfo, ConnInfo, Conf).
|
||||
create(
|
||||
hd(choose_impl_candidates(ClientInfo, ConnInfo)), ClientInfo, ConnInfo, MaybeWillMsg, Conf
|
||||
).
|
||||
|
||||
create(Mod, ClientInfo, ConnInfo, Conf) ->
|
||||
create(Mod, ClientInfo, ConnInfo, MaybeWillMsg, Conf) ->
|
||||
% FIXME error conditions
|
||||
Session = Mod:create(ClientInfo, ConnInfo, Conf),
|
||||
Session = Mod:create(ClientInfo, ConnInfo, MaybeWillMsg, Conf),
|
||||
ok = emqx_metrics:inc('session.created'),
|
||||
ok = emqx_hooks:run('session.created', [ClientInfo, info(Session)]),
|
||||
Session.
|
||||
|
||||
-spec open(clientinfo(), conninfo()) ->
|
||||
-spec open(clientinfo(), conninfo(), emqx_maybe:t(message())) ->
|
||||
{_IsPresent :: true, t(), _ReplayContext} | {_IsPresent :: false, t()}.
|
||||
open(ClientInfo, ConnInfo) ->
|
||||
open(ClientInfo, ConnInfo, MaybeWillMsg) ->
|
||||
Conf = get_session_conf(ClientInfo),
|
||||
Mods = [Default | _] = choose_impl_candidates(ClientInfo, ConnInfo),
|
||||
%% NOTE
|
||||
%% Try to look the existing session up in session stores corresponding to the given
|
||||
%% `Mods` in order, starting from the last one.
|
||||
case try_open(Mods, ClientInfo, ConnInfo, Conf) of
|
||||
case try_open(Mods, ClientInfo, ConnInfo, MaybeWillMsg, Conf) of
|
||||
{_IsPresent = true, _, _} = Present ->
|
||||
Present;
|
||||
false ->
|
||||
%% NOTE
|
||||
%% Nothing was found, create a new session with the `Default` implementation.
|
||||
{false, create(Default, ClientInfo, ConnInfo, Conf)}
|
||||
{false, create(Default, ClientInfo, ConnInfo, MaybeWillMsg, Conf)}
|
||||
end.
|
||||
|
||||
try_open([Mod | Rest], ClientInfo, ConnInfo, Conf) ->
|
||||
case try_open(Rest, ClientInfo, ConnInfo, Conf) of
|
||||
try_open([Mod | Rest], ClientInfo, ConnInfo, MaybeWillMsg, Conf) ->
|
||||
case try_open(Rest, ClientInfo, ConnInfo, MaybeWillMsg, Conf) of
|
||||
{_IsPresent = true, _, _} = Present ->
|
||||
Present;
|
||||
false ->
|
||||
Mod:open(ClientInfo, ConnInfo, Conf)
|
||||
Mod:open(ClientInfo, ConnInfo, MaybeWillMsg, Conf)
|
||||
end;
|
||||
try_open([], _ClientInfo, _ConnInfo, _Conf) ->
|
||||
try_open([], _ClientInfo, _ConnInfo, _MaybeWillMsg, _Conf) ->
|
||||
false.
|
||||
|
||||
-spec get_session_conf(clientinfo()) -> conf().
|
||||
|
@ -635,3 +642,15 @@ choose_impl_candidates(#{zone := Zone}, #{expiry_interval := EI}) ->
|
|||
run_hook(Name, Args) ->
|
||||
ok = emqx_metrics:inc(Name),
|
||||
emqx_hooks:run(Name, Args).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Will message handling
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-spec clear_will_message(t()) -> t().
|
||||
clear_will_message(Session) ->
|
||||
?IMPL(Session):clear_will_message(Session).
|
||||
|
||||
-spec publish_will_message_now(t(), message()) -> t().
|
||||
publish_will_message_now(Session, WillMsg) ->
|
||||
?IMPL(Session):publish_will_message_now(Session, WillMsg).
|
||||
|
|
|
@ -58,8 +58,8 @@
|
|||
-endif.
|
||||
|
||||
-export([
|
||||
create/3,
|
||||
open/3,
|
||||
create/4,
|
||||
open/4,
|
||||
destroy/1
|
||||
]).
|
||||
|
||||
|
@ -106,6 +106,12 @@
|
|||
dedup/4
|
||||
]).
|
||||
|
||||
%% Will message handling
|
||||
-export([
|
||||
clear_will_message/1,
|
||||
publish_will_message_now/2
|
||||
]).
|
||||
|
||||
%% Export for CT
|
||||
-export([set_field/3]).
|
||||
|
||||
|
@ -127,6 +133,7 @@
|
|||
-type session() :: #session{}.
|
||||
-type replayctx() :: [emqx_types:message()].
|
||||
|
||||
-type message() :: emqx_types:message().
|
||||
-type clientinfo() :: emqx_types:clientinfo().
|
||||
-type conninfo() :: emqx_session:conninfo().
|
||||
-type replies() :: emqx_session:replies().
|
||||
|
@ -151,11 +158,12 @@
|
|||
%% Init a Session
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-spec create(clientinfo(), conninfo(), emqx_session:conf()) ->
|
||||
-spec create(clientinfo(), conninfo(), emqx_maybe:t(message()), emqx_session:conf()) ->
|
||||
session().
|
||||
create(
|
||||
#{zone := Zone, clientid := ClientId},
|
||||
#{expiry_interval := EI, receive_maximum := ReceiveMax},
|
||||
_MaybeWillMsg,
|
||||
Conf
|
||||
) ->
|
||||
QueueOpts = get_mqueue_conf(Zone),
|
||||
|
@ -200,9 +208,9 @@ destroy(_Session) ->
|
|||
%% Open a (possibly existing) Session
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-spec open(clientinfo(), conninfo(), emqx_session:conf()) ->
|
||||
-spec open(clientinfo(), conninfo(), emqx_maybe:t(message()), emqx_session:conf()) ->
|
||||
{_IsPresent :: true, session(), replayctx()} | _IsPresent :: false.
|
||||
open(ClientInfo = #{clientid := ClientId}, ConnInfo, Conf) ->
|
||||
open(ClientInfo = #{clientid := ClientId}, ConnInfo, _MaybeWillMsg, Conf) ->
|
||||
case emqx_cm:takeover_session_begin(ClientId) of
|
||||
{ok, SessionRemote, TakeoverState} ->
|
||||
Session0 = resume(ClientInfo, SessionRemote),
|
||||
|
@ -797,6 +805,19 @@ next_pkt_id(Session = #session{next_pkt_id = ?MAX_PACKET_ID}) ->
|
|||
next_pkt_id(Session = #session{next_pkt_id = Id}) ->
|
||||
Session#session{next_pkt_id = Id + 1}.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Will message handling
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-spec clear_will_message(session()) -> session().
|
||||
clear_will_message(#session{} = Session) ->
|
||||
Session.
|
||||
|
||||
-spec publish_will_message_now(session(), message()) -> session().
|
||||
publish_will_message_now(#session{} = Session, #message{} = WillMsg) ->
|
||||
_ = emqx_broker:publish(WillMsg),
|
||||
Session.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Helper functions
|
||||
%%--------------------------------------------------------------------
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
|
||||
-export([
|
||||
introduced_in/0,
|
||||
deprecated_since/0,
|
||||
|
||||
lookup_client/2,
|
||||
kickout_client/2,
|
||||
|
@ -39,6 +40,9 @@
|
|||
introduced_in() ->
|
||||
"5.0.0".
|
||||
|
||||
deprecated_since() ->
|
||||
"5.7.0".
|
||||
|
||||
-spec kickout_client(node(), emqx_types:clientid()) -> ok | {badrpc, _}.
|
||||
kickout_client(Node, ClientId) ->
|
||||
rpc:call(Node, emqx_cm, kick_session, [ClientId]).
|
||||
|
|
|
@ -0,0 +1,106 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_cm_proto_v3).
|
||||
|
||||
-behaviour(emqx_bpapi).
|
||||
|
||||
-export([
|
||||
introduced_in/0,
|
||||
|
||||
lookup_client/2,
|
||||
kickout_client/2,
|
||||
|
||||
get_chan_stats/2,
|
||||
get_chan_info/2,
|
||||
get_chann_conn_mod/2,
|
||||
|
||||
takeover_session/2,
|
||||
takeover_finish/2,
|
||||
kick_session/3,
|
||||
|
||||
%% Introduced in v3
|
||||
takeover_kick_session/2
|
||||
]).
|
||||
|
||||
-include("bpapi.hrl").
|
||||
-include_lib("emqx/include/emqx_cm.hrl").
|
||||
|
||||
introduced_in() ->
|
||||
"5.7.0".
|
||||
|
||||
-spec kickout_client(node(), emqx_types:clientid()) -> ok | {badrpc, _}.
|
||||
kickout_client(Node, ClientId) ->
|
||||
rpc:call(Node, emqx_cm, kick_session, [ClientId]).
|
||||
|
||||
-spec lookup_client(node(), {clientid, emqx_types:clientid()} | {username, emqx_types:username()}) ->
|
||||
[emqx_cm:channel_info()] | {badrpc, _}.
|
||||
lookup_client(Node, Key) ->
|
||||
rpc:call(Node, emqx_cm, lookup_client, [Key]).
|
||||
|
||||
-spec get_chan_stats(emqx_types:clientid(), emqx_cm:chan_pid()) ->
|
||||
emqx_types:stats() | undefined | {badrpc, _}.
|
||||
get_chan_stats(ClientId, ChanPid) ->
|
||||
rpc:call(node(ChanPid), emqx_cm, do_get_chan_stats, [ClientId, ChanPid], ?T_GET_INFO * 2).
|
||||
|
||||
-spec get_chan_info(emqx_types:clientid(), emqx_cm:chan_pid()) ->
|
||||
emqx_types:infos() | undefined | {badrpc, _}.
|
||||
get_chan_info(ClientId, ChanPid) ->
|
||||
rpc:call(node(ChanPid), emqx_cm, do_get_chan_info, [ClientId, ChanPid], ?T_GET_INFO * 2).
|
||||
|
||||
-spec get_chann_conn_mod(emqx_types:clientid(), emqx_cm:chan_pid()) ->
|
||||
module() | undefined | {badrpc, _}.
|
||||
get_chann_conn_mod(ClientId, ChanPid) ->
|
||||
rpc:call(node(ChanPid), emqx_cm, do_get_chann_conn_mod, [ClientId, ChanPid], ?T_GET_INFO * 2).
|
||||
|
||||
-spec takeover_session(emqx_types:clientid(), emqx_cm:chan_pid()) ->
|
||||
none
|
||||
| {living, _ConnMod :: atom(), emqx_cm:chan_pid(), emqx_session:session()}
|
||||
%% NOTE: v5.3.0
|
||||
| {living, _ConnMod :: atom(), emqx_session:session()}
|
||||
| {expired | persistent, emqx_session:session()}
|
||||
| {badrpc, _}.
|
||||
takeover_session(ClientId, ChanPid) ->
|
||||
rpc:call(node(ChanPid), emqx_cm, takeover_session, [ClientId, ChanPid], ?T_TAKEOVER * 2).
|
||||
|
||||
-spec takeover_finish(module(), emqx_cm:chan_pid()) ->
|
||||
{ok, emqx_types:takeover_data()}
|
||||
| {ok, list(emqx_types:deliver())}
|
||||
| {error, term()}
|
||||
| {badrpc, _}.
|
||||
takeover_finish(ConnMod, ChanPid) ->
|
||||
erpc:call(
|
||||
node(ChanPid),
|
||||
emqx_cm,
|
||||
takeover_finish,
|
||||
[ConnMod, ChanPid],
|
||||
?T_TAKEOVER * 2
|
||||
).
|
||||
|
||||
-spec kick_session(kick | discard, emqx_types:clientid(), emqx_cm:chan_pid()) -> ok | {badrpc, _}.
|
||||
kick_session(Action, ClientId, ChanPid) ->
|
||||
rpc:call(node(ChanPid), emqx_cm, do_kick_session, [Action, ClientId, ChanPid], ?T_KICK * 2).
|
||||
|
||||
%%--------------------------------------------------------------------------------
|
||||
%% Introduced in v3
|
||||
%%--------------------------------------------------------------------------------
|
||||
|
||||
-spec takeover_kick_session(emqx_types:clientid(), emqx_cm:chan_pid()) ->
|
||||
ok | {badrpc, _}.
|
||||
takeover_kick_session(ClientId, ChanPid) ->
|
||||
rpc:call(
|
||||
node(ChanPid), emqx_cm, do_takeover_kick_session_v3, [ClientId, ChanPid], ?T_TAKEOVER * 2
|
||||
).
|
|
@ -288,13 +288,7 @@ t_handle_in_puback_id_not_found(_) ->
|
|||
% ?assertEqual(#{puback_in => 1}, emqx_channel:info(pub_stats, Channel)).
|
||||
|
||||
t_bad_receive_maximum(_) ->
|
||||
ok = meck:expect(
|
||||
emqx_cm,
|
||||
open_session,
|
||||
fun(true, _ClientInfo, _ConnInfo) ->
|
||||
{ok, #{session => session(), present => false}}
|
||||
end
|
||||
),
|
||||
mock_cm_open_session(),
|
||||
emqx_config:put_zone_conf(default, [mqtt, response_information], test),
|
||||
C1 = channel(#{conn_state => idle}),
|
||||
{shutdown, protocol_error, _, _} =
|
||||
|
@ -304,13 +298,7 @@ t_bad_receive_maximum(_) ->
|
|||
).
|
||||
|
||||
t_override_client_receive_maximum(_) ->
|
||||
ok = meck:expect(
|
||||
emqx_cm,
|
||||
open_session,
|
||||
fun(true, _ClientInfo, _ConnInfo) ->
|
||||
{ok, #{session => session(), present => false}}
|
||||
end
|
||||
),
|
||||
mock_cm_open_session(),
|
||||
emqx_config:put_zone_conf(default, [mqtt, response_information], test),
|
||||
emqx_config:put_zone_conf(default, [mqtt, max_inflight], 0),
|
||||
C1 = channel(#{conn_state => idle}),
|
||||
|
@ -460,13 +448,7 @@ t_handle_in_expected_packet(_) ->
|
|||
emqx_channel:handle_in(packet, channel()).
|
||||
|
||||
t_process_connect(_) ->
|
||||
ok = meck:expect(
|
||||
emqx_cm,
|
||||
open_session,
|
||||
fun(true, _ClientInfo, _ConnInfo) ->
|
||||
{ok, #{session => session(), present => false}}
|
||||
end
|
||||
),
|
||||
mock_cm_open_session(),
|
||||
{ok, [{event, connected}, {connack, ?CONNACK_PACKET(?RC_SUCCESS)}], _Chan} =
|
||||
emqx_channel:process_connect(#{}, channel(#{conn_state => idle})).
|
||||
|
||||
|
@ -604,13 +586,7 @@ t_handle_out_connack_sucess(_) ->
|
|||
?assertEqual(connected, emqx_channel:info(conn_state, Channel)).
|
||||
|
||||
t_handle_out_connack_response_information(_) ->
|
||||
ok = meck:expect(
|
||||
emqx_cm,
|
||||
open_session,
|
||||
fun(true, _ClientInfo, _ConnInfo) ->
|
||||
{ok, #{session => session(), present => false}}
|
||||
end
|
||||
),
|
||||
mock_cm_open_session(),
|
||||
emqx_config:put_zone_conf(default, [mqtt, response_information], test),
|
||||
IdleChannel = channel(#{conn_state => idle}),
|
||||
{ok,
|
||||
|
@ -624,13 +600,7 @@ t_handle_out_connack_response_information(_) ->
|
|||
).
|
||||
|
||||
t_handle_out_connack_not_response_information(_) ->
|
||||
ok = meck:expect(
|
||||
emqx_cm,
|
||||
open_session,
|
||||
fun(true, _ClientInfo, _ConnInfo) ->
|
||||
{ok, #{session => session(), present => false}}
|
||||
end
|
||||
),
|
||||
mock_cm_open_session(),
|
||||
emqx_config:put_zone_conf(default, [mqtt, response_information], test),
|
||||
IdleChannel = channel(#{conn_state => idle}),
|
||||
{ok, [{event, connected}, {connack, ?CONNACK_PACKET(?RC_SUCCESS, 0, AckProps)}], _} =
|
||||
|
@ -1017,13 +987,7 @@ t_ws_cookie_init(_) ->
|
|||
t_flapping_detect(_) ->
|
||||
emqx_config:put_zone_conf(default, [flapping_detect, window_time], 60000),
|
||||
Parent = self(),
|
||||
ok = meck:expect(
|
||||
emqx_cm,
|
||||
open_session,
|
||||
fun(true, _ClientInfo, _ConnInfo) ->
|
||||
{ok, #{session => session(), present => false}}
|
||||
end
|
||||
),
|
||||
mock_cm_open_session(),
|
||||
ok = meck:expect(emqx_flapping, detect, fun(_) -> Parent ! flapping_detect end),
|
||||
IdleChannel = channel(
|
||||
clientinfo(#{
|
||||
|
@ -1128,7 +1092,8 @@ session(ClientInfo, InitFields) when is_map(InitFields) ->
|
|||
#{
|
||||
receive_maximum => 0,
|
||||
expiry_interval => 0
|
||||
}
|
||||
},
|
||||
_WillMsg = undefined
|
||||
),
|
||||
maps:fold(
|
||||
fun(Field, Value, SessionAcc) ->
|
||||
|
@ -1139,6 +1104,15 @@ session(ClientInfo, InitFields) when is_map(InitFields) ->
|
|||
InitFields
|
||||
).
|
||||
|
||||
mock_cm_open_session() ->
|
||||
ok = meck:expect(
|
||||
emqx_cm,
|
||||
open_session,
|
||||
fun(true, _ClientInfo, _ConnInfo, _MaybeWillMsg) ->
|
||||
{ok, #{session => session(), present => false}}
|
||||
end
|
||||
).
|
||||
|
||||
%% conn: 5/s; overall: 10/s
|
||||
quota() ->
|
||||
emqx_limiter_container:get_limiter_by_types(?MODULE, [message_routing], limiter_cfg()).
|
||||
|
|
|
@ -59,6 +59,13 @@ init_per_suite(Config) ->
|
|||
end_per_suite(Config) ->
|
||||
emqx_cth_suite:stop(proplists:get_value(apps, Config)).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Helper fns
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
open_session(CleanStart, ClientInfo, ConnInfo) ->
|
||||
emqx_cm:open_session(CleanStart, ClientInfo, ConnInfo, _WillMsg = undefined).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% TODO: Add more test cases
|
||||
%%--------------------------------------------------------------------
|
||||
|
@ -129,10 +136,10 @@ t_open_session(_) ->
|
|||
receive_maximum => 100
|
||||
},
|
||||
{ok, #{session := Session1, present := false}} =
|
||||
emqx_cm:open_session(true, ClientInfo, ConnInfo),
|
||||
open_session(true, ClientInfo, ConnInfo),
|
||||
?assertEqual(100, emqx_session:info(inflight_max, Session1)),
|
||||
{ok, #{session := Session2, present := false}} =
|
||||
emqx_cm:open_session(true, ClientInfo, ConnInfo),
|
||||
open_session(true, ClientInfo, ConnInfo),
|
||||
?assertEqual(100, emqx_session:info(inflight_max, Session2)),
|
||||
|
||||
emqx_cm:unregister_channel(<<"clientid">>),
|
||||
|
@ -163,7 +170,7 @@ t_open_session_race_condition(_) ->
|
|||
Parent = self(),
|
||||
OpenASession = fun() ->
|
||||
timer:sleep(rand:uniform(100)),
|
||||
OpenR = (emqx_cm:open_session(true, ClientInfo, ConnInfo)),
|
||||
OpenR = open_session(true, ClientInfo, ConnInfo),
|
||||
Parent ! OpenR,
|
||||
case OpenR of
|
||||
{ok, _} ->
|
||||
|
|
|
@ -679,7 +679,8 @@ channel(InitFields) ->
|
|||
},
|
||||
Session = emqx_session:create(
|
||||
ClientInfo,
|
||||
#{receive_maximum => 0, expiry_interval => 1000}
|
||||
#{receive_maximum => 0, expiry_interval => 1000},
|
||||
_WillMsg = undefined
|
||||
),
|
||||
maps:fold(
|
||||
fun(Field, Value, Channel) ->
|
||||
|
|
|
@ -150,6 +150,7 @@ when
|
|||
work_dir := file:name()
|
||||
}.
|
||||
start(Apps, SuiteOpts = #{work_dir := WorkDir}) ->
|
||||
emqx_common_test_helpers:clear_screen(),
|
||||
% 1. Prepare appspec instructions
|
||||
AppSpecs = [mk_appspec(App, SuiteOpts) || App <- Apps],
|
||||
% 2. Load every app so that stuff scanning attributes of loaded modules works
|
||||
|
|
|
@ -21,6 +21,7 @@
|
|||
-include_lib("common_test/include/ct.hrl").
|
||||
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||
-include_lib("emqx/include/emqx_mqtt.hrl").
|
||||
-include_lib("emqx_utils/include/emqx_message.hrl").
|
||||
|
||||
-compile(export_all).
|
||||
-compile(nowarn_export_all).
|
||||
|
@ -76,6 +77,7 @@ init_per_group(persistence_enabled, Config) ->
|
|||
" enable = true\n"
|
||||
" last_alive_update_interval = 100ms\n"
|
||||
" renew_streams_interval = 100ms\n"
|
||||
" session_gc_interval = 2s\n"
|
||||
"}"},
|
||||
{persistence, ds}
|
||||
| Config
|
||||
|
@ -1240,6 +1242,98 @@ t_multiple_subscription_matches(Config) ->
|
|||
?assertEqual({ok, 2}, maps:find(qos, Msg2)),
|
||||
ok = emqtt:disconnect(Client2).
|
||||
|
||||
%% Check that we don't get a will message when the client disconnects with success reason
|
||||
%% code, with `Will-Delay-Interval' = 0, `Session-Expiry-Interval' > 0, QoS = 1.
|
||||
t_no_will_message(Config) ->
|
||||
ConnFun = ?config(conn_fun, Config),
|
||||
WillTopic = ?config(topic, Config),
|
||||
WillPayload = <<"will message">>,
|
||||
ClientId = ?config(client_id, Config),
|
||||
|
||||
?check_trace(
|
||||
#{timetrap => 15_000},
|
||||
begin
|
||||
ok = emqx:subscribe(WillTopic, #{qos => 2}),
|
||||
{ok, Client} = emqtt:start_link([
|
||||
{clientid, ClientId},
|
||||
{proto_ver, v5},
|
||||
{properties, #{'Session-Expiry-Interval' => 1}},
|
||||
{will_topic, WillTopic},
|
||||
{will_payload, WillPayload},
|
||||
{will_qos, 1},
|
||||
{will_props, #{'Will-Delay-Interval' => 0}}
|
||||
| Config
|
||||
]),
|
||||
{ok, _} = emqtt:ConnFun(Client),
|
||||
ok = emqtt:disconnect(Client, ?RC_SUCCESS),
|
||||
|
||||
%% No will message
|
||||
?assertNotReceive({deliver, WillTopic, _}, 5_000),
|
||||
|
||||
ok
|
||||
end,
|
||||
[]
|
||||
),
|
||||
ok.
|
||||
|
||||
%% Check that we get a single will message when the client disconnects with a non
|
||||
%% successfull reason code, with `Will-Delay-Interval' = `Session-Expiry-Interval' > 0,
|
||||
%% QoS = 1.
|
||||
t_will_message1(Config) ->
|
||||
do_t_will_message(Config, #{will_delay => 1, session_expiry => 1}),
|
||||
ok.
|
||||
|
||||
%% Check that we get a single will message when the client disconnects with a non
|
||||
%% successfull reason code, with `Will-Delay-Interval' = 0, `Session-Expiry-Interval' > 0,
|
||||
%% QoS = 1.
|
||||
t_will_message2(Config) ->
|
||||
do_t_will_message(Config, #{will_delay => 0, session_expiry => 1}),
|
||||
ok.
|
||||
|
||||
%% Check that we get a single will message when the client disconnects with a non
|
||||
%% successfull reason code, with `Will-Delay-Interval' >> `Session-Expiry-Interval' > 0,
|
||||
%% QoS = 1.
|
||||
t_will_message3(Config) ->
|
||||
do_t_will_message(Config, #{will_delay => 300, session_expiry => 1}),
|
||||
ok.
|
||||
|
||||
do_t_will_message(Config, Opts) ->
|
||||
#{
|
||||
session_expiry := SessionExpiry,
|
||||
will_delay := WillDelay
|
||||
} = Opts,
|
||||
ConnFun = ?config(conn_fun, Config),
|
||||
WillTopic = ?config(topic, Config),
|
||||
WillPayload = <<"will message">>,
|
||||
ClientId = ?config(client_id, Config),
|
||||
|
||||
?check_trace(
|
||||
#{timetrap => 15_000},
|
||||
begin
|
||||
ok = emqx:subscribe(WillTopic, #{qos => 2}),
|
||||
{ok, Client} = emqtt:start_link([
|
||||
{clientid, ClientId},
|
||||
{proto_ver, v5},
|
||||
{properties, #{'Session-Expiry-Interval' => SessionExpiry}},
|
||||
{will_topic, WillTopic},
|
||||
{will_payload, WillPayload},
|
||||
{will_qos, 1},
|
||||
{will_props, #{'Will-Delay-Interval' => WillDelay}}
|
||||
| Config
|
||||
]),
|
||||
{ok, _} = emqtt:ConnFun(Client),
|
||||
ok = emqtt:disconnect(Client, ?RC_UNSPECIFIED_ERROR),
|
||||
|
||||
?assertReceive({deliver, WillTopic, #message{payload = WillPayload}}, 10_000),
|
||||
%% No duplicates
|
||||
?assertNotReceive({deliver, WillTopic, _}, 100),
|
||||
|
||||
ok
|
||||
end,
|
||||
[]
|
||||
),
|
||||
ok.
|
||||
|
||||
get_topicwise_order(Msgs) ->
|
||||
maps:groups_from_list(fun get_msgpub_topic/1, fun get_msgpub_payload/1, Msgs).
|
||||
|
||||
|
|
|
@ -67,6 +67,7 @@ t_session_init(_) ->
|
|||
Session = emqx_session_mem:create(
|
||||
ClientInfo,
|
||||
ConnInfo,
|
||||
_WillMsg = undefined,
|
||||
emqx_session:get_session_conf(ClientInfo)
|
||||
),
|
||||
?assertEqual(#{}, emqx_session_mem:info(subscriptions, Session)),
|
||||
|
@ -531,6 +532,7 @@ session(InitFields) when is_map(InitFields) ->
|
|||
Session = emqx_session_mem:create(
|
||||
ClientInfo,
|
||||
ConnInfo,
|
||||
_WillMsg = undefined,
|
||||
emqx_session:get_session_conf(ClientInfo)
|
||||
),
|
||||
maps:fold(
|
||||
|
|
|
@ -23,6 +23,7 @@
|
|||
-include_lib("emqx/include/asserts.hrl").
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
-include_lib("common_test/include/ct.hrl").
|
||||
-include_lib("emqx_utils/include/emqx_message.hrl").
|
||||
|
||||
-define(CNT, 100).
|
||||
-define(SLEEP, 10).
|
||||
|
@ -32,25 +33,15 @@
|
|||
|
||||
all() ->
|
||||
[
|
||||
{group, mqttv3},
|
||||
{group, mqttv5}
|
||||
{group, persistence_disabled},
|
||||
{group, persistence_enabled}
|
||||
].
|
||||
|
||||
init_per_suite(Config) ->
|
||||
Apps = emqx_cth_suite:start(
|
||||
[emqx],
|
||||
#{work_dir => emqx_cth_suite:work_dir(Config)}
|
||||
),
|
||||
emqx_logger:set_log_level(debug),
|
||||
[{apps, Apps} | Config].
|
||||
|
||||
end_per_suite(Config) ->
|
||||
Apps = ?config(apps, Config),
|
||||
ok = emqx_cth_suite:stop(Apps),
|
||||
ok.
|
||||
|
||||
groups() ->
|
||||
MQTTGroups = [{group, G} || G <- [mqttv3, mqttv5]],
|
||||
[
|
||||
{persistence_enabled, MQTTGroups},
|
||||
{persistence_disabled, MQTTGroups},
|
||||
{mqttv3, [], emqx_common_test_helpers:all(?MODULE) -- tc_v5_only()},
|
||||
{mqttv5, [], emqx_common_test_helpers:all(?MODULE)}
|
||||
].
|
||||
|
@ -67,11 +58,55 @@ tc_v5_only() ->
|
|||
t_takeover_session_then_abnormal_disconnect_2
|
||||
].
|
||||
|
||||
init_per_suite(Config) ->
|
||||
emqx_common_test_helpers:clear_screen(),
|
||||
Config.
|
||||
|
||||
end_per_suite(_Config) ->
|
||||
ok.
|
||||
|
||||
init_per_group(persistence_enabled = Group, Config) ->
|
||||
Apps = emqx_cth_suite:start(
|
||||
[
|
||||
{emqx,
|
||||
"session_persistence = {\n"
|
||||
" enable = true\n"
|
||||
" last_alive_update_interval = 100ms\n"
|
||||
" renew_streams_interval = 100ms\n"
|
||||
" session_gc_interval = 2s\n"
|
||||
"}\n"}
|
||||
],
|
||||
#{work_dir => emqx_cth_suite:work_dir(Group, Config)}
|
||||
),
|
||||
emqx_logger:set_log_level(debug),
|
||||
[
|
||||
{apps, Apps},
|
||||
{persistence_enabled, true}
|
||||
| Config
|
||||
];
|
||||
init_per_group(persistence_disabled = Group, Config) ->
|
||||
Apps = emqx_cth_suite:start(
|
||||
[{emqx, "session_persistence.enable = false"}],
|
||||
#{work_dir => emqx_cth_suite:work_dir(Group, Config)}
|
||||
),
|
||||
emqx_logger:set_log_level(debug),
|
||||
[
|
||||
{apps, Apps},
|
||||
{persistence_enabled, false}
|
||||
| Config
|
||||
];
|
||||
init_per_group(mqttv3, Config) ->
|
||||
lists:keystore(mqtt_vsn, 1, Config, {mqtt_vsn, v3});
|
||||
init_per_group(mqttv5, Config) ->
|
||||
lists:keystore(mqtt_vsn, 1, Config, {mqtt_vsn, v5}).
|
||||
|
||||
end_per_group(Group, Config) when
|
||||
Group =:= persistence_disabled;
|
||||
Group =:= persistence_enabled
|
||||
->
|
||||
Apps = ?config(apps, Config),
|
||||
ok = emqx_cth_suite:stop(Apps),
|
||||
ok;
|
||||
end_per_group(_Group, _Config) ->
|
||||
ok.
|
||||
|
||||
|
@ -97,19 +132,34 @@ t_takeover(Config) ->
|
|||
ok = timer:sleep(?SLEEP * 2),
|
||||
meck:passthrough([Arg])
|
||||
end),
|
||||
meck:expect(emqx_cm, takeover_kick, fun(Arg) ->
|
||||
%% trigger more complex takeover conditions during 2-phase takeover protocol:
|
||||
%% when messages are accumulated in 2 processes simultaneously,
|
||||
%% and need to be properly ordered / deduplicated after the protocol commences.
|
||||
ok = timer:sleep(?SLEEP * 2),
|
||||
meck:passthrough([Arg])
|
||||
end),
|
||||
Commands =
|
||||
[{fun start_client/5, [ClientId, ClientId, ?QOS_1, ClientOpts]}] ++
|
||||
[{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs] ++
|
||||
[{fun start_client/5, [ClientId, ClientId, ?QOS_1, ClientOpts]}] ++
|
||||
[{fun publish_msg/2, [Msg]} || Msg <- Client2Msgs] ++
|
||||
[{fun stop_client/1, []}],
|
||||
lists:flatten([
|
||||
{fun start_client/5, [ClientId, ClientId, ?QOS_1, ClientOpts]},
|
||||
{fun maybe_wait_subscriptions/1, []},
|
||||
[{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs],
|
||||
{fun start_client/5, [ClientId, ClientId, ?QOS_1, ClientOpts]},
|
||||
[{fun publish_msg/2, [Msg]} || Msg <- Client2Msgs],
|
||||
{fun stop_client/1, []}
|
||||
]),
|
||||
|
||||
Sleep =
|
||||
case ?config(persistence_enabled, Config) of
|
||||
true -> 1_500;
|
||||
false -> ?SLEEP
|
||||
end,
|
||||
FCtx = lists:foldl(
|
||||
fun({Fun, Args}, Ctx) ->
|
||||
ct:pal("COMMAND: ~p ~p", [element(2, erlang:fun_info(Fun, name)), Args]),
|
||||
apply(Fun, [Ctx | Args])
|
||||
end,
|
||||
#{},
|
||||
#{persistence_enabled => ?config(persistence_enabled, Config), sleep => Sleep},
|
||||
Commands
|
||||
),
|
||||
|
||||
|
@ -117,7 +167,7 @@ t_takeover(Config) ->
|
|||
|
||||
assert_client_exit(CPid1, ?config(mqtt_vsn, Config), takenover),
|
||||
?assertReceive({'EXIT', CPid2, normal}),
|
||||
Received = [Msg || {publish, Msg} <- ?drainMailbox(?SLEEP)],
|
||||
Received = [Msg || {publish, Msg} <- ?drainMailbox(Sleep)],
|
||||
ct:pal("middle: ~p", [Middle]),
|
||||
ct:pal("received: ~p", [[P || #{payload := P} <- Received]]),
|
||||
assert_messages_missed(AllMsgs, Received),
|
||||
|
@ -141,30 +191,36 @@ t_takeover_willmsg(Config) ->
|
|||
{will_qos, 0}
|
||||
],
|
||||
Commands =
|
||||
lists:flatten([
|
||||
%% GIVEN client connect with will message
|
||||
[{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++
|
||||
[
|
||||
{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]},
|
||||
{fun maybe_wait_subscriptions/1, []},
|
||||
{fun start_client/5, [
|
||||
<<ClientId/binary, <<"_willsub">>/binary>>, WillTopic, ?QOS_1, []
|
||||
]}
|
||||
] ++
|
||||
[{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs] ++
|
||||
]},
|
||||
[{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs],
|
||||
%% WHEN client reconnect with clean_start = false
|
||||
[{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++
|
||||
[{fun publish_msg/2, [Msg]} || Msg <- Client2Msgs],
|
||||
{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]},
|
||||
[{fun publish_msg/2, [Msg]} || Msg <- Client2Msgs]
|
||||
]),
|
||||
|
||||
FCtx = lists:foldl(
|
||||
fun({Fun, Args}, Ctx) ->
|
||||
ct:pal("COMMAND: ~p ~p", [element(2, erlang:fun_info(Fun, name)), Args]),
|
||||
apply(Fun, [Ctx | Args])
|
||||
end,
|
||||
#{},
|
||||
#{persistence_enabled => ?config(persistence_enabled, Config)},
|
||||
Commands
|
||||
),
|
||||
|
||||
#{client := [CPid2, CPidSub, CPid1]} = FCtx,
|
||||
assert_client_exit(CPid1, ?config(mqtt_vsn, Config), takenover),
|
||||
Received = [Msg || {publish, Msg} <- ?drainMailbox(?SLEEP)],
|
||||
Sleep =
|
||||
case ?config(persistence_enabled, Config) of
|
||||
true -> 2_000;
|
||||
false -> ?SLEEP
|
||||
end,
|
||||
Received = [Msg || {publish, Msg} <- ?drainMailbox(Sleep)],
|
||||
ct:pal("received: ~p", [[P || #{payload := P} <- Received]]),
|
||||
{IsWill, ReceivedNoWill} = filter_payload(Received, <<"willpayload">>),
|
||||
assert_messages_missed(AllMsgs, ReceivedNoWill),
|
||||
|
@ -297,7 +353,7 @@ t_no_takeover_with_delayed_willmsg(Config) ->
|
|||
?config(mqtt_vsn, Config) =:= v5 orelse ct:fail("MQTTv5 Only"),
|
||||
process_flag(trap_exit, true),
|
||||
ClientId = atom_to_binary(?FUNCTION_NAME),
|
||||
WillTopic = <<ClientId/binary, <<"willtopic">>/binary>>,
|
||||
WillTopic = <<ClientId/binary, "willtopic">>,
|
||||
Client1Msgs = messages(ClientId, 0, 10),
|
||||
WillOpts = [
|
||||
{proto_ver, ?config(mqtt_vsn, Config)},
|
||||
|
@ -312,24 +368,25 @@ t_no_takeover_with_delayed_willmsg(Config) ->
|
|||
],
|
||||
Commands =
|
||||
%% GIVEN: client connect with willmsg payload <<"willpayload_delay3">> and delay-interval 3s
|
||||
[{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++
|
||||
lists:flatten(
|
||||
[
|
||||
{fun start_client/5, [
|
||||
<<ClientId/binary, <<"_willsub">>/binary>>, WillTopic, ?QOS_1, []
|
||||
]}
|
||||
] ++
|
||||
[{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs],
|
||||
{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]},
|
||||
{fun maybe_wait_subscriptions/1, []},
|
||||
{fun start_client/5, [<<ClientId/binary, "_willsub">>, WillTopic, ?QOS_1, []]},
|
||||
[{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs]
|
||||
]
|
||||
),
|
||||
|
||||
FCtx = lists:foldl(
|
||||
fun({Fun, Args}, Ctx) ->
|
||||
ct:pal("COMMAND: ~p ~p", [element(2, erlang:fun_info(Fun, name)), Args]),
|
||||
apply(Fun, [Ctx | Args])
|
||||
end,
|
||||
#{},
|
||||
#{persistence_enabled => ?config(persistence_enabled, Config)},
|
||||
Commands
|
||||
),
|
||||
|
||||
Received = [Msg || {publish, Msg} <- ?drainMailbox(1000)],
|
||||
Received = [Msg || {publish, Msg} <- ?drainMailbox(2000)],
|
||||
ct:pal("received: ~p", [[P || #{payload := P} <- Received]]),
|
||||
assert_messages_missed(Client1Msgs, Received),
|
||||
{IsWill0, ReceivedNoWill0} = filter_payload(Received, <<"willpayload_delay3">>),
|
||||
|
@ -369,15 +426,15 @@ t_session_expire_with_delayed_willmsg(Config) ->
|
|||
{properties, #{'Session-Expiry-Interval' => 3}}
|
||||
],
|
||||
Commands =
|
||||
lists:flatten([
|
||||
%% GIVEN: client connect with willmsg payload <<"willpayload_delay10">>
|
||||
%% and delay-interval 10s > session expiry 3s.
|
||||
[{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++
|
||||
[
|
||||
{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]},
|
||||
{fun start_client/5, [
|
||||
<<ClientId/binary, <<"_willsub">>/binary>>, WillTopic, ?QOS_1, []
|
||||
]}
|
||||
] ++
|
||||
[{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs],
|
||||
]},
|
||||
[{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs]
|
||||
]),
|
||||
|
||||
FCtx = lists:foldl(
|
||||
fun({Fun, Args}, Ctx) ->
|
||||
|
@ -388,7 +445,7 @@ t_session_expire_with_delayed_willmsg(Config) ->
|
|||
Commands
|
||||
),
|
||||
|
||||
Received = [Msg || {publish, Msg} <- ?drainMailbox(1000)],
|
||||
Received = [Msg || {publish, Msg} <- ?drainMailbox(2000)],
|
||||
ct:pal("received: ~p", [[P || #{payload := P} <- Received]]),
|
||||
{IsWill, ReceivedNoWill} = filter_payload(Received, <<"willpayload_delay10">>),
|
||||
?assertNot(IsWill),
|
||||
|
@ -404,7 +461,15 @@ t_session_expire_with_delayed_willmsg(Config) ->
|
|||
?assertNot(IsWill1),
|
||||
?assertEqual([], ReceivedNoWill1),
|
||||
%% THEN: for MQTT v5, payload <<"willpayload_delay3">> should be published after session expiry.
|
||||
Received2 = [Msg || {publish, Msg} <- ?drainMailbox(5000)],
|
||||
SessionSleep =
|
||||
case ?config(persistence_enabled, Config) of
|
||||
true ->
|
||||
%% Session GC uses a larger, safer cutoff time.
|
||||
10_000;
|
||||
false ->
|
||||
5_000
|
||||
end,
|
||||
Received2 = [Msg || {publish, Msg} <- ?drainMailbox(SessionSleep)],
|
||||
{IsWill12, ReceivedNoWill2} = filter_payload(Received2, <<"willpayload_delay10">>),
|
||||
?assertEqual([], ReceivedNoWill2),
|
||||
?assert(IsWill12),
|
||||
|
@ -493,14 +558,12 @@ t_takeover_before_session_expire(Config) ->
|
|||
Commands =
|
||||
%% GIVEN: client connect with willmsg payload <<"willpayload_delay10">>
|
||||
%% and delay-interval 10s > session expiry 3s.
|
||||
[{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++
|
||||
[
|
||||
lists:flatten([
|
||||
{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]},
|
||||
{fun start_client/5, [
|
||||
<<ClientId/binary, <<"_willsub">>/binary>>, WillTopic, ?QOS_1, []
|
||||
]}
|
||||
] ++
|
||||
[{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs] ++
|
||||
[
|
||||
]},
|
||||
[{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs],
|
||||
%% avoid two clients race for session takeover
|
||||
{
|
||||
fun(CTX) ->
|
||||
|
@ -508,10 +571,10 @@ t_takeover_before_session_expire(Config) ->
|
|||
CTX
|
||||
end,
|
||||
[]
|
||||
}
|
||||
] ++
|
||||
},
|
||||
%% WHEN: client session is taken over within 3s.
|
||||
[{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}],
|
||||
{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}
|
||||
]),
|
||||
|
||||
FCtx = lists:foldl(
|
||||
fun({Fun, Args}, Ctx) ->
|
||||
|
@ -540,7 +603,7 @@ t_takeover_session_then_normal_disconnect(Config) ->
|
|||
?config(mqtt_vsn, Config) =:= v5 orelse ct:fail("MQTTv5 Only"),
|
||||
process_flag(trap_exit, true),
|
||||
ClientId = atom_to_binary(?FUNCTION_NAME),
|
||||
WillTopic = <<ClientId/binary, <<"willtopic">>/binary>>,
|
||||
WillTopic = <<ClientId/binary, "willtopic">>,
|
||||
Client1Msgs = messages(ClientId, 0, 10),
|
||||
WillOpts = [
|
||||
{proto_ver, ?config(mqtt_vsn, Config)},
|
||||
|
@ -552,14 +615,13 @@ t_takeover_session_then_normal_disconnect(Config) ->
|
|||
{properties, #{'Session-Expiry-Interval' => 3}}
|
||||
],
|
||||
Commands =
|
||||
[{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++
|
||||
[
|
||||
lists:flatten([
|
||||
{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]},
|
||||
{fun maybe_wait_subscriptions/1, []},
|
||||
{fun start_client/5, [
|
||||
<<ClientId/binary, <<"_willsub">>/binary>>, WillTopic, ?QOS_1, []
|
||||
]}
|
||||
] ++
|
||||
[{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs] ++
|
||||
[
|
||||
<<ClientId/binary, "_willsub">>, WillTopic, ?QOS_1, []
|
||||
]},
|
||||
[{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs],
|
||||
%% avoid two clients race for session takeover
|
||||
{
|
||||
fun(CTX) ->
|
||||
|
@ -567,25 +629,28 @@ t_takeover_session_then_normal_disconnect(Config) ->
|
|||
CTX
|
||||
end,
|
||||
[]
|
||||
}
|
||||
] ++
|
||||
},
|
||||
%% GIVEN: client reconnect with willmsg payload <<"willpayload_delay10">>
|
||||
%% and delay-interval 10s > session expiry 3s.
|
||||
[{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}],
|
||||
{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]},
|
||||
{fun maybe_wait_subscriptions/1, []}
|
||||
]),
|
||||
|
||||
FCtx = lists:foldl(
|
||||
fun({Fun, Args}, Ctx) ->
|
||||
ct:pal("COMMAND: ~p ~p", [element(2, erlang:fun_info(Fun, name)), Args]),
|
||||
apply(Fun, [Ctx | Args])
|
||||
end,
|
||||
#{},
|
||||
#{persistence_enabled => ?config(persistence_enabled, Config)},
|
||||
Commands
|
||||
),
|
||||
#{client := [CPid2, CPidSub, CPid1]} = FCtx,
|
||||
assert_client_exit(CPid1, ?config(mqtt_vsn, Config), takenover),
|
||||
Received1 = [Msg || {publish, Msg} <- ?drainMailbox(1000)],
|
||||
%% WHEN: client disconnect normally.
|
||||
emqtt:disconnect(CPid2, ?RC_SUCCESS),
|
||||
Received = [Msg || {publish, Msg} <- ?drainMailbox(1000)],
|
||||
Received2 = [Msg || {publish, Msg} <- ?drainMailbox(1000)],
|
||||
Received = Received1 ++ Received2,
|
||||
ct:pal("received: ~p", [[P || #{payload := P} <- Received]]),
|
||||
{IsWill, ReceivedNoWill} = filter_payload(Received, <<"willpayload_delay10">>),
|
||||
%% THEN: willmsg is not published.
|
||||
|
@ -600,7 +665,7 @@ t_takeover_session_then_abnormal_disconnect(Config) ->
|
|||
?config(mqtt_vsn, Config) =:= v5 orelse ct:fail("MQTTv5 Only"),
|
||||
process_flag(trap_exit, true),
|
||||
ClientId = atom_to_binary(?FUNCTION_NAME),
|
||||
WillTopic = <<ClientId/binary, <<"willtopic">>/binary>>,
|
||||
WillTopic = <<ClientId/binary, "willtopic">>,
|
||||
Client1Msgs = messages(ClientId, 0, 10),
|
||||
WillOpts = [
|
||||
{proto_ver, ?config(mqtt_vsn, Config)},
|
||||
|
@ -612,16 +677,13 @@ t_takeover_session_then_abnormal_disconnect(Config) ->
|
|||
{properties, #{'Session-Expiry-Interval' => 3}}
|
||||
],
|
||||
Commands =
|
||||
lists:flatten([
|
||||
%% GIVEN: client connect with willmsg payload <<"willpayload_delay10">>
|
||||
%% and will-delay-interval 10s > session expiry 3s.
|
||||
[{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++
|
||||
[
|
||||
{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]},
|
||||
{fun start_client/5, [
|
||||
<<ClientId/binary, <<"_willsub">>/binary>>, WillTopic, ?QOS_1, []
|
||||
]}
|
||||
] ++
|
||||
[{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs] ++
|
||||
[
|
||||
<<ClientId/binary, "_willsub">>, WillTopic, ?QOS_1, []
|
||||
]},
|
||||
%% avoid two clients race for session takeover
|
||||
{
|
||||
fun(CTX) ->
|
||||
|
@ -629,9 +691,10 @@ t_takeover_session_then_abnormal_disconnect(Config) ->
|
|||
CTX
|
||||
end,
|
||||
[]
|
||||
}
|
||||
] ++
|
||||
[{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}],
|
||||
},
|
||||
[{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs],
|
||||
{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}
|
||||
]),
|
||||
|
||||
FCtx = lists:foldl(
|
||||
fun({Fun, Args}, Ctx) ->
|
||||
|
@ -643,16 +706,26 @@ t_takeover_session_then_abnormal_disconnect(Config) ->
|
|||
),
|
||||
#{client := [CPid2, CPidSub, CPid1]} = FCtx,
|
||||
assert_client_exit(CPid1, ?config(mqtt_vsn, Config), takenover),
|
||||
Received1 = [Msg || {publish, Msg} <- ?drainMailbox(1000)],
|
||||
%% WHEN: client disconnect abnormally
|
||||
emqtt:disconnect(CPid2, ?RC_DISCONNECT_WITH_WILL_MESSAGE),
|
||||
Received = [Msg || {publish, Msg} <- ?drainMailbox(1000)],
|
||||
Received2 = [Msg || {publish, Msg} <- ?drainMailbox(1000)],
|
||||
Received = Received1 ++ Received2,
|
||||
ct:pal("received: ~p", [[P || #{payload := P} <- Received]]),
|
||||
{IsWill, ReceivedNoWill} = filter_payload(Received, <<"willpayload_delay10">>),
|
||||
%% THEN: willmsg is not published before session expiry
|
||||
?assertNot(IsWill),
|
||||
?assertNotEqual([], ReceivedNoWill),
|
||||
Received1 = [Msg || {publish, Msg} <- ?drainMailbox(3000)],
|
||||
{IsWill1, ReceivedNoWill1} = filter_payload(Received1, <<"willpayload_delay10">>),
|
||||
SessionSleep =
|
||||
case ?config(persistence_enabled, Config) of
|
||||
true ->
|
||||
%% Session GC uses a larger, safer cutoff time (GC interval x 3)
|
||||
10_000;
|
||||
false ->
|
||||
3_000
|
||||
end,
|
||||
Received3 = [Msg || {publish, Msg} <- ?drainMailbox(SessionSleep)],
|
||||
{IsWill1, ReceivedNoWill1} = filter_payload(Received3, <<"willpayload_delay10">>),
|
||||
%% AND THEN: willmsg is published after session expiry
|
||||
?assert(IsWill1),
|
||||
?assertEqual([], ReceivedNoWill1),
|
||||
|
@ -866,19 +939,39 @@ start_client(Ctx, ClientId, Topic, Qos, Opts) ->
|
|||
end),
|
||||
Ctx#{client => [CPid | maps:get(client, Ctx, [])]}.
|
||||
|
||||
maybe_wait_subscriptions(Ctx = #{persistence_enabled := true, client := CPids}) ->
|
||||
ok = do_wait_subscription(CPids),
|
||||
Ctx;
|
||||
maybe_wait_subscriptions(Ctx) ->
|
||||
Ctx.
|
||||
|
||||
do_wait_subscription([]) ->
|
||||
ok;
|
||||
do_wait_subscription([CPid | Rest]) ->
|
||||
try emqtt:subscriptions(CPid) of
|
||||
[] ->
|
||||
ok = timer:sleep(rand:uniform(?SLEEP)),
|
||||
do_wait_subscription([CPid | Rest]);
|
||||
[_ | _] ->
|
||||
do_wait_subscription(Rest)
|
||||
catch
|
||||
exit:{noproc, _} ->
|
||||
ok
|
||||
end.
|
||||
|
||||
kick_client(Ctx, ClientId) ->
|
||||
ok = emqx_cm:kick_session(ClientId),
|
||||
Ctx.
|
||||
|
||||
publish_msg(Ctx, Msg) ->
|
||||
ok = timer:sleep(rand:uniform(?SLEEP)),
|
||||
case emqx:publish(Msg) of
|
||||
case emqx:publish(Msg#message{timestamp = emqx_message:timestamp_now()}) of
|
||||
[] -> publish_msg(Ctx, Msg);
|
||||
[_ | _] -> Ctx
|
||||
end.
|
||||
|
||||
stop_client(Ctx = #{client := [CPid | _]}) ->
|
||||
ok = timer:sleep(?SLEEP),
|
||||
stop_client(Ctx = #{client := [CPid | _], sleep := Sleep}) ->
|
||||
ok = timer:sleep(Sleep),
|
||||
ok = emqtt:stop(CPid),
|
||||
Ctx.
|
||||
|
||||
|
@ -904,14 +997,18 @@ assert_messages_missed(Ls1, Ls2) ->
|
|||
error
|
||||
end.
|
||||
|
||||
assert_messages_order([], []) ->
|
||||
assert_messages_order([] = _Expected, _Received) ->
|
||||
ok;
|
||||
assert_messages_order([Msg | Expected], Received) ->
|
||||
%% Account for duplicate messages:
|
||||
case lists:splitwith(fun(#{payload := P}) -> emqx_message:payload(Msg) == P end, Received) of
|
||||
{[], [#{payload := Mismatch} | _]} ->
|
||||
{[], [#{timestamp := TSMismatch, payload := Mismatch} | _]} ->
|
||||
ct:fail("Message order is not correct, expected: ~p, received: ~p", [
|
||||
emqx_message:payload(Msg), Mismatch
|
||||
{
|
||||
emqx_utils_calendar:epoch_to_rfc3339(emqx_message:timestamp(Msg)),
|
||||
emqx_message:payload(Msg)
|
||||
},
|
||||
{emqx_utils_calendar:epoch_to_rfc3339(TSMismatch), Mismatch}
|
||||
]),
|
||||
error;
|
||||
{_Matching, Rest} ->
|
||||
|
|
|
@ -613,7 +613,8 @@ channel(InitFields) ->
|
|||
},
|
||||
Session = emqx_session:create(
|
||||
ClientInfo,
|
||||
#{receive_maximum => 0, expiry_interval => 0}
|
||||
#{receive_maximum => 0, expiry_interval => 0},
|
||||
_WillMsg = undefined
|
||||
),
|
||||
maps:fold(
|
||||
fun(Field, Value, Channel) ->
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
{application, emqx_eviction_agent, [
|
||||
{description, "EMQX Eviction Agent"},
|
||||
{vsn, "5.1.5"},
|
||||
{vsn, "5.1.6"},
|
||||
{registered, [
|
||||
emqx_eviction_agent_sup,
|
||||
emqx_eviction_agent,
|
||||
|
|
|
@ -27,12 +27,15 @@
|
|||
evict_connections/1,
|
||||
evict_sessions/2,
|
||||
evict_sessions/3,
|
||||
purge_sessions/1,
|
||||
evict_session_channel/3
|
||||
purge_sessions/1
|
||||
]).
|
||||
|
||||
%% RPC targets
|
||||
-export([all_local_channels_count/0]).
|
||||
-export([
|
||||
all_local_channels_count/0,
|
||||
evict_session_channel/3,
|
||||
do_evict_session_channel_v3/4
|
||||
]).
|
||||
|
||||
-behaviour(gen_server).
|
||||
|
||||
|
@ -397,12 +400,23 @@ evict_session_channel(Nodes, ClientId, ConnInfo, ClientInfo) ->
|
|||
Res
|
||||
end.
|
||||
|
||||
%% RPC target for `emqx_eviction_agent_proto_v2'
|
||||
-spec evict_session_channel(
|
||||
emqx_types:clientid(),
|
||||
emqx_types:conninfo(),
|
||||
emqx_types:clientinfo()
|
||||
) -> supervisor:startchild_ret().
|
||||
evict_session_channel(ClientId, ConnInfo, ClientInfo) ->
|
||||
do_evict_session_channel_v3(ClientId, ConnInfo, ClientInfo, _WillMsg = undefined).
|
||||
|
||||
%% RPC target for `emqx_eviction_agent_proto_v3'
|
||||
-spec do_evict_session_channel_v3(
|
||||
emqx_types:clientid(),
|
||||
emqx_types:conninfo(),
|
||||
emqx_types:clientinfo(),
|
||||
emqx_maybe:t(emqx_types:message())
|
||||
) -> supervisor:startchild_ret().
|
||||
do_evict_session_channel_v3(ClientId, ConnInfo, ClientInfo, MaybeWillMsg) ->
|
||||
?SLOG(info, #{
|
||||
msg => "evict_session_channel",
|
||||
client_id => ClientId,
|
||||
|
@ -412,7 +426,8 @@ evict_session_channel(ClientId, ConnInfo, ClientInfo) ->
|
|||
Result = emqx_eviction_agent_channel:start_supervised(
|
||||
#{
|
||||
conninfo => ConnInfo,
|
||||
clientinfo => ClientInfo
|
||||
clientinfo => ClientInfo,
|
||||
will_message => MaybeWillMsg
|
||||
}
|
||||
),
|
||||
?SLOG(
|
||||
|
|
|
@ -32,7 +32,8 @@
|
|||
|
||||
-type opts() :: #{
|
||||
conninfo := emqx_types:conninfo(),
|
||||
clientinfo := emqx_types:clientinfo()
|
||||
clientinfo := emqx_types:clientinfo(),
|
||||
will_message => emqx_maybe:t(emqx_types:message())
|
||||
}.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
|
@ -81,11 +82,12 @@ stop(Pid) ->
|
|||
%% gen_server API
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
init([#{conninfo := OldConnInfo, clientinfo := #{clientid := ClientId} = OldClientInfo}]) ->
|
||||
init([#{conninfo := OldConnInfo, clientinfo := #{clientid := ClientId} = OldClientInfo} = Opts]) ->
|
||||
process_flag(trap_exit, true),
|
||||
ClientInfo = clientinfo(OldClientInfo),
|
||||
ConnInfo = conninfo(OldConnInfo),
|
||||
case open_session(ConnInfo, ClientInfo) of
|
||||
MaybeWillMsg = maps:get(will_message, Opts, undefined),
|
||||
case open_session(ConnInfo, ClientInfo, MaybeWillMsg) of
|
||||
{ok, Channel0} ->
|
||||
case set_expiry_timer(Channel0) of
|
||||
{ok, Channel1} ->
|
||||
|
@ -221,9 +223,9 @@ set_expiry_timer(#{conninfo := ConnInfo} = Channel) ->
|
|||
{error, should_be_expired}
|
||||
end.
|
||||
|
||||
open_session(ConnInfo, #{clientid := ClientId} = ClientInfo) ->
|
||||
open_session(ConnInfo, #{clientid := ClientId} = ClientInfo, MaybeWillMsg) ->
|
||||
Channel = channel(ConnInfo, ClientInfo),
|
||||
case emqx_cm:open_session(_CleanSession = false, ClientInfo, ConnInfo) of
|
||||
case emqx_cm:open_session(_CleanSession = false, ClientInfo, ConnInfo, MaybeWillMsg) of
|
||||
{ok, #{present := false}} ->
|
||||
?SLOG(
|
||||
info,
|
||||
|
|
|
@ -8,6 +8,7 @@
|
|||
|
||||
-export([
|
||||
introduced_in/0,
|
||||
deprecated_since/0,
|
||||
|
||||
evict_session_channel/4,
|
||||
|
||||
|
@ -20,6 +21,9 @@
|
|||
introduced_in() ->
|
||||
"5.2.1".
|
||||
|
||||
deprecated_since() ->
|
||||
"5.7.0".
|
||||
|
||||
-spec evict_session_channel(
|
||||
node(),
|
||||
emqx_types:clientid(),
|
||||
|
|
|
@ -0,0 +1,41 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_eviction_agent_proto_v3).
|
||||
|
||||
-behaviour(emqx_bpapi).
|
||||
|
||||
-export([
|
||||
introduced_in/0,
|
||||
|
||||
all_channels_count/2,
|
||||
|
||||
%% Changed in v3:
|
||||
evict_session_channel/5
|
||||
]).
|
||||
|
||||
-include_lib("emqx/include/bpapi.hrl").
|
||||
|
||||
introduced_in() ->
|
||||
"5.7.0".
|
||||
|
||||
-spec all_channels_count([node()], timeout()) -> emqx_rpc:erpc_multicall(non_neg_integer()).
|
||||
all_channels_count(Nodes, Timeout) ->
|
||||
erpc:multicall(Nodes, emqx_eviction_agent, all_local_channels_count, [], Timeout).
|
||||
|
||||
%% Changed in v3:
|
||||
-spec evict_session_channel(
|
||||
node(),
|
||||
emqx_types:clientid(),
|
||||
emqx_types:conninfo(),
|
||||
emqx_types:clientinfo(),
|
||||
emqx_maybe:t(emqx_types:message())
|
||||
) -> supervisor:startchild_err() | emqx_rpc:badrpc().
|
||||
evict_session_channel(Node, ClientId, ConnInfo, ClientInfo, MaybeWillMsg) ->
|
||||
rpc:call(
|
||||
Node,
|
||||
emqx_eviction_agent,
|
||||
do_evict_session_channel_v3,
|
||||
[ClientId, ConnInfo, ClientInfo, MaybeWillMsg]
|
||||
).
|
|
@ -373,10 +373,11 @@ process_connect(
|
|||
Channel = #channel{
|
||||
ctx = Ctx,
|
||||
conninfo = ConnInfo = #{clean_start := CleanStart},
|
||||
clientinfo = ClientInfo
|
||||
clientinfo = ClientInfo,
|
||||
will_msg = MaybeWillMsg
|
||||
}
|
||||
) ->
|
||||
SessFun = fun(ClientInfoT, _) -> emqx_mqttsn_session:init(ClientInfoT) end,
|
||||
SessFun = fun(ClientInfoT, _) -> emqx_mqttsn_session:init(ClientInfoT, MaybeWillMsg) end,
|
||||
case
|
||||
emqx_gateway_ctx:open_session(
|
||||
Ctx,
|
||||
|
|
|
@ -19,7 +19,7 @@
|
|||
-export([registry/1, set_registry/2]).
|
||||
|
||||
-export([
|
||||
init/1,
|
||||
init/2,
|
||||
info/1,
|
||||
info/2,
|
||||
stats/1
|
||||
|
@ -52,12 +52,12 @@
|
|||
|
||||
-export_type([session/0]).
|
||||
|
||||
init(ClientInfo) ->
|
||||
init(ClientInfo, MaybeWillMsg) ->
|
||||
ConnInfo = #{receive_maximum => 1, expiry_interval => 0},
|
||||
SessionConf = emqx_session:get_session_conf(ClientInfo),
|
||||
#{
|
||||
registry => emqx_mqttsn_registry:init(),
|
||||
session => emqx_session_mem:create(ClientInfo, ConnInfo, SessionConf)
|
||||
session => emqx_session_mem:create(ClientInfo, ConnInfo, MaybeWillMsg, SessionConf)
|
||||
}.
|
||||
|
||||
registry(#{registry := Registry}) ->
|
||||
|
|
Loading…
Reference in New Issue