feat(session): pass will message down to session when creating/opening

This commit is contained in:
Thales Macedo Garitezi 2024-03-19 09:51:55 -03:00
parent 8b963d5960
commit 8c1a1d21a7
22 changed files with 300 additions and 131 deletions

View File

@ -25,6 +25,7 @@ all() ->
emqx_common_test_helpers:all(?MODULE). emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) -> init_per_suite(Config) ->
emqx_common_test_helpers:clear_screen(),
TCApps = emqx_cth_suite:start( TCApps = emqx_cth_suite:start(
app_specs(), app_specs(),
#{work_dir => emqx_cth_suite:work_dir(Config)} #{work_dir => emqx_cth_suite:work_dir(Config)}
@ -244,8 +245,9 @@ t_session_subscription_idempotency(Config) ->
fun(Trace) -> fun(Trace) ->
ct:pal("trace:\n ~p", [Trace]), ct:pal("trace:\n ~p", [Trace]),
ConnInfo = #{peername => {undefined, undefined}}, ConnInfo = #{peername => {undefined, undefined}},
WillMsg = undefined,
Session = erpc:call( Session = erpc:call(
Node1, emqx_persistent_session_ds, session_open, [ClientId, ConnInfo] Node1, emqx_persistent_session_ds, session_open, [ClientId, ConnInfo, WillMsg]
), ),
?assertMatch( ?assertMatch(
#{SubTopicFilter := #{}}, #{SubTopicFilter := #{}},
@ -321,8 +323,9 @@ t_session_unsubscription_idempotency(Config) ->
fun(Trace) -> fun(Trace) ->
ct:pal("trace:\n ~p", [Trace]), ct:pal("trace:\n ~p", [Trace]),
ConnInfo = #{peername => {undefined, undefined}}, ConnInfo = #{peername => {undefined, undefined}},
WillMsg = undefined,
Session = erpc:call( Session = erpc:call(
Node1, emqx_persistent_session_ds, session_open, [ClientId, ConnInfo] Node1, emqx_persistent_session_ds, session_open, [ClientId, ConnInfo, WillMsg]
), ),
?assertEqual( ?assertEqual(
#{}, #{},

View File

@ -26,6 +26,7 @@
{emqx_ds,4}. {emqx_ds,4}.
{emqx_eviction_agent,1}. {emqx_eviction_agent,1}.
{emqx_eviction_agent,2}. {emqx_eviction_agent,2}.
{emqx_eviction_agent,3}.
{emqx_exhook,1}. {emqx_exhook,1}.
{emqx_ft_storage_exporter_fs,1}. {emqx_ft_storage_exporter_fs,1}.
{emqx_ft_storage_fs,1}. {emqx_ft_storage_fs,1}.

View File

@ -584,11 +584,12 @@ process_connect(
AckProps, AckProps,
Channel = #channel{ Channel = #channel{
conninfo = ConnInfo, conninfo = ConnInfo,
clientinfo = ClientInfo clientinfo = ClientInfo,
will_msg = MaybeWillMsg
} }
) -> ) ->
#{clean_start := CleanStart} = ConnInfo, #{clean_start := CleanStart} = ConnInfo,
case emqx_cm:open_session(CleanStart, ClientInfo, ConnInfo) of case emqx_cm:open_session(CleanStart, ClientInfo, ConnInfo, MaybeWillMsg) of
{ok, #{session := Session, present := false}} -> {ok, #{session := Session, present := false}} ->
NChannel = Channel#channel{session = Session}, NChannel = Channel#channel{session = Session},
handle_out(connack, {?RC_SUCCESS, sp(false), AckProps}, ensure_connected(NChannel)); handle_out(connack, {?RC_SUCCESS, sp(false), AckProps}, ensure_connected(NChannel));
@ -1019,7 +1020,6 @@ handle_out(connack, {?RC_SUCCESS, SP, Props}, Channel = #channel{conninfo = Conn
[ConnInfo, emqx_reason_codes:name(?RC_SUCCESS)], [ConnInfo, emqx_reason_codes:name(?RC_SUCCESS)],
AckProps AckProps
), ),
return_connack( return_connack(
?CONNACK_PACKET(?RC_SUCCESS, SP, NAckProps), ?CONNACK_PACKET(?RC_SUCCESS, SP, NAckProps),
ensure_keepalive(NAckProps, Channel) ensure_keepalive(NAckProps, Channel)
@ -1378,9 +1378,9 @@ handle_timeout(_TRef, expire_session, Channel = #channel{session = Session}) ->
handle_timeout( handle_timeout(
_TRef, _TRef,
will_message = TimerName, will_message = TimerName,
Channel = #channel{clientinfo = ClientInfo, will_msg = WillMsg} Channel = #channel{will_msg = WillMsg}
) -> ) ->
(WillMsg =/= undefined) andalso publish_will_msg(ClientInfo, WillMsg), (WillMsg =/= undefined) andalso publish_will_msg(Channel),
{ok, clean_timer(TimerName, Channel#channel{will_msg = undefined})}; {ok, clean_timer(TimerName, Channel#channel{will_msg = undefined})};
handle_timeout( handle_timeout(
_TRef, _TRef,
@ -2302,19 +2302,17 @@ maybe_publish_will_msg(
maybe_publish_will_msg( maybe_publish_will_msg(
_Reason, _Reason,
Channel = #channel{ Channel = #channel{
conninfo = #{proto_ver := ?MQTT_PROTO_V3, clientid := ClientId}, will_msg = WillMsg conninfo = #{proto_ver := ?MQTT_PROTO_V3, clientid := ClientId}
} }
) -> ) ->
%% Unconditionally publish will message for MQTT 3.1.1 %% Unconditionally publish will message for MQTT 3.1.1
?tp(debug, maybe_publish_willmsg_v3, #{clientid => ClientId}), ?tp(debug, maybe_publish_willmsg_v3, #{clientid => ClientId}),
_ = publish_will_msg(Channel#channel.clientinfo, WillMsg), _ = publish_will_msg(Channel),
Channel#channel{will_msg = undefined}; Channel#channel{will_msg = undefined};
maybe_publish_will_msg( maybe_publish_will_msg(
Reason, Reason,
Channel = #channel{ Channel = #channel{
clientinfo = ClientInfo, conninfo = #{clientid := ClientId}
conninfo = #{clientid := ClientId},
will_msg = WillMsg
} }
) when ) when
Reason =:= expired orelse Reason =:= expired orelse
@ -2332,12 +2330,11 @@ maybe_publish_will_msg(
%% This ensures willmsg will be published if the willmsg timer is scheduled but not fired %% This ensures willmsg will be published if the willmsg timer is scheduled but not fired
%% OR fired but not yet handled %% OR fired but not yet handled
?tp(debug, maybe_publish_willmsg_session_ends, #{clientid => ClientId, reason => Reason}), ?tp(debug, maybe_publish_willmsg_session_ends, #{clientid => ClientId, reason => Reason}),
_ = publish_will_msg(ClientInfo, WillMsg), _ = publish_will_msg(Channel),
remove_willmsg(Channel); remove_willmsg(Channel);
maybe_publish_will_msg( maybe_publish_will_msg(
takenover, takenover,
Channel = #channel{ Channel = #channel{
clientinfo = ClientInfo,
will_msg = WillMsg, will_msg = WillMsg,
conninfo = #{clientid := ClientId} conninfo = #{clientid := ClientId}
} }
@ -2355,7 +2352,7 @@ maybe_publish_will_msg(
case will_delay_interval(WillMsg) of case will_delay_interval(WillMsg) of
0 -> 0 ->
?tp(debug, maybe_publish_willmsg_takenover_pub, #{clientid => ClientId}), ?tp(debug, maybe_publish_willmsg_takenover_pub, #{clientid => ClientId}),
_ = publish_will_msg(ClientInfo, WillMsg); _ = publish_will_msg(Channel);
I when I > 0 -> I when I > 0 ->
%% @NOTE Non-normative comment in MQTT 5.0 spec %% @NOTE Non-normative comment in MQTT 5.0 spec
%% """ %% """
@ -2370,7 +2367,6 @@ maybe_publish_will_msg(
maybe_publish_will_msg( maybe_publish_will_msg(
Reason, Reason,
Channel = #channel{ Channel = #channel{
clientinfo = ClientInfo,
will_msg = WillMsg, will_msg = WillMsg,
conninfo = #{clientid := ClientId} conninfo = #{clientid := ClientId}
} }
@ -2381,7 +2377,7 @@ maybe_publish_will_msg(
?tp(debug, maybe_publish_will_msg_other_publish, #{ ?tp(debug, maybe_publish_will_msg_other_publish, #{
clientid => ClientId, reason => Reason clientid => ClientId, reason => Reason
}), }),
_ = publish_will_msg(ClientInfo, WillMsg), _ = publish_will_msg(Channel),
remove_willmsg(Channel); remove_willmsg(Channel);
I when I > 0 -> I when I > 0 ->
?tp(debug, maybe_publish_will_msg_other_delay, #{clientid => ClientId, reason => Reason}), ?tp(debug, maybe_publish_will_msg_other_delay, #{clientid => ClientId, reason => Reason}),
@ -2396,8 +2392,11 @@ will_delay_interval(WillMsg) ->
). ).
publish_will_msg( publish_will_msg(
ClientInfo = #{mountpoint := MountPoint}, #channel{
Msg = #message{topic = Topic} session = Session,
clientinfo = ClientInfo = #{mountpoint := MountPoint},
will_msg = Msg = #message{topic = Topic}
}
) -> ) ->
Action = authz_action(Msg), Action = authz_action(Msg),
PublishingDisallowed = emqx_access_control:authorize(ClientInfo, Action, Topic) =/= allow, PublishingDisallowed = emqx_access_control:authorize(ClientInfo, Action, Topic) =/= allow,
@ -2417,7 +2416,7 @@ publish_will_msg(
false -> false ->
NMsg = emqx_mountpoint:mount(MountPoint, Msg), NMsg = emqx_mountpoint:mount(MountPoint, Msg),
NMsg2 = NMsg#message{timestamp = erlang:system_time(millisecond)}, NMsg2 = NMsg#message{timestamp = erlang:system_time(millisecond)},
_ = emqx_broker:publish(NMsg2), ok = emqx_session:publish_will_message(Session, NMsg2),
ok ok
end. end.

View File

@ -47,7 +47,7 @@
]). ]).
-export([ -export([
open_session/3, open_session/4,
discard_session/1, discard_session/1,
discard_session/2, discard_session/2,
takeover_session_begin/1, takeover_session_begin/1,
@ -110,6 +110,8 @@
chan_pid/0 chan_pid/0
]). ]).
-type message() :: emqx_types:message().
-type chan_pid() :: pid(). -type chan_pid() :: pid().
-type channel_info() :: { -type channel_info() :: {
@ -266,24 +268,29 @@ set_chan_stats(ClientId, ChanPid, Stats) when ?IS_CLIENTID(ClientId) ->
end. end.
%% @doc Open a session. %% @doc Open a session.
-spec open_session(_CleanStart :: boolean(), emqx_types:clientinfo(), emqx_types:conninfo()) -> -spec open_session(
_CleanStart :: boolean(),
emqx_types:clientinfo(),
emqx_types:conninfo(),
emqx_maybe:t(message())
) ->
{ok, #{ {ok, #{
session := emqx_session:t(), session := emqx_session:t(),
present := boolean(), present := boolean(),
replay => _ReplayContext replay => _ReplayContext
}} }}
| {error, Reason :: term()}. | {error, Reason :: term()}.
open_session(_CleanStart = true, ClientInfo = #{clientid := ClientId}, ConnInfo) -> open_session(_CleanStart = true, ClientInfo = #{clientid := ClientId}, ConnInfo, MaybeWillMsg) ->
Self = self(), Self = self(),
emqx_cm_locker:trans(ClientId, fun(_) -> emqx_cm_locker:trans(ClientId, fun(_) ->
ok = discard_session(ClientId), ok = discard_session(ClientId),
ok = emqx_session:destroy(ClientInfo, ConnInfo), ok = emqx_session:destroy(ClientInfo, ConnInfo),
create_register_session(ClientInfo, ConnInfo, Self) create_register_session(ClientInfo, ConnInfo, MaybeWillMsg, Self)
end); end);
open_session(_CleanStart = false, ClientInfo = #{clientid := ClientId}, ConnInfo) -> open_session(_CleanStart = false, ClientInfo = #{clientid := ClientId}, ConnInfo, MaybeWillMsg) ->
Self = self(), Self = self(),
emqx_cm_locker:trans(ClientId, fun(_) -> emqx_cm_locker:trans(ClientId, fun(_) ->
case emqx_session:open(ClientInfo, ConnInfo) of case emqx_session:open(ClientInfo, ConnInfo, MaybeWillMsg) of
{true, Session, ReplayContext} -> {true, Session, ReplayContext} ->
ok = register_channel(ClientId, Self, ConnInfo), ok = register_channel(ClientId, Self, ConnInfo),
{ok, #{session => Session, present => true, replay => ReplayContext}}; {ok, #{session => Session, present => true, replay => ReplayContext}};
@ -293,8 +300,8 @@ open_session(_CleanStart = false, ClientInfo = #{clientid := ClientId}, ConnInfo
end end
end). end).
create_register_session(ClientInfo = #{clientid := ClientId}, ConnInfo, ChanPid) -> create_register_session(ClientInfo = #{clientid := ClientId}, ConnInfo, MaybeWillMsg, ChanPid) ->
Session = emqx_session:create(ClientInfo, ConnInfo), Session = emqx_session:create(ClientInfo, ConnInfo, MaybeWillMsg),
ok = register_channel(ClientId, ChanPid, ConnInfo), ok = register_channel(ClientId, ChanPid, ConnInfo),
{ok, #{session => Session, present => false}}. {ok, #{session => Session, present => false}}.

View File

@ -34,8 +34,8 @@
%% Session API %% Session API
-export([ -export([
create/3, create/4,
open/3, open/4,
destroy/1 destroy/1
]). ]).
@ -66,6 +66,11 @@
terminate/2 terminate/2
]). ]).
%% Will message handling
-export([
publish_will_message/2
]).
%% Managment APIs: %% Managment APIs:
-export([ -export([
list_client_subscriptions/1 list_client_subscriptions/1
@ -88,7 +93,7 @@
-ifdef(TEST). -ifdef(TEST).
-export([ -export([
session_open/2, session_open/3,
list_all_sessions/0 list_all_sessions/0
]). ]).
-endif. -endif.
@ -155,6 +160,7 @@
-type stream_state() :: #srs{}. -type stream_state() :: #srs{}.
-type message() :: emqx_types:message().
-type timestamp() :: emqx_utils_calendar:epoch_millisecond(). -type timestamp() :: emqx_utils_calendar:epoch_millisecond().
-type millisecond() :: non_neg_integer(). -type millisecond() :: non_neg_integer().
-type clientinfo() :: emqx_types:clientinfo(). -type clientinfo() :: emqx_types:clientinfo().
@ -181,14 +187,14 @@
%% %%
-spec create(clientinfo(), conninfo(), emqx_session:conf()) -> -spec create(clientinfo(), conninfo(), emqx_maybe:t(message()), emqx_session:conf()) ->
session(). session().
create(#{clientid := ClientID}, ConnInfo, Conf) -> create(#{clientid := ClientID}, ConnInfo, MaybeWillMsg, Conf) ->
ensure_timers(session_ensure_new(ClientID, ConnInfo, Conf)). ensure_timers(session_ensure_new(ClientID, ConnInfo, MaybeWillMsg, Conf)).
-spec open(clientinfo(), conninfo(), emqx_session:conf()) -> -spec open(clientinfo(), conninfo(), emqx_maybe:t(message()), emqx_session:conf()) ->
{_IsPresent :: true, session(), []} | false. {_IsPresent :: true, session(), []} | false.
open(#{clientid := ClientID} = _ClientInfo, ConnInfo, Conf) -> open(#{clientid := ClientID} = _ClientInfo, ConnInfo, MaybeWillMsg, Conf) ->
%% NOTE %% NOTE
%% The fact that we need to concern about discarding all live channels here %% The fact that we need to concern about discarding all live channels here
%% is essentially a consequence of the in-memory session design, where we %% is essentially a consequence of the in-memory session design, where we
@ -196,7 +202,7 @@ open(#{clientid := ClientID} = _ClientInfo, ConnInfo, Conf) ->
%% somehow isolate those idling not-yet-expired sessions into a separate process %% somehow isolate those idling not-yet-expired sessions into a separate process
%% space, and move this call back into `emqx_cm` where it belongs. %% space, and move this call back into `emqx_cm` where it belongs.
ok = emqx_cm:discard_session(ClientID), ok = emqx_cm:discard_session(ClientID),
case session_open(ClientID, ConnInfo) of case session_open(ClientID, ConnInfo, MaybeWillMsg) of
Session0 = #{} -> Session0 = #{} ->
Session = Session0#{props => Conf}, Session = Session0#{props => Conf},
{true, ensure_timers(Session), []}; {true, ensure_timers(Session), []};
@ -679,9 +685,9 @@ sync(ClientId) ->
%% %%
%% Note: session API doesn't handle session takeovers, it's the job of %% Note: session API doesn't handle session takeovers, it's the job of
%% the broker. %% the broker.
-spec session_open(id(), emqx_types:conninfo()) -> -spec session_open(id(), emqx_types:conninfo(), emqx_maybe:t(message())) ->
session() | false. session() | false.
session_open(SessionId, NewConnInfo) -> session_open(SessionId, NewConnInfo, MaybeWillMsg) ->
NowMS = now_ms(), NowMS = now_ms(),
case emqx_persistent_session_ds_state:open(SessionId) of case emqx_persistent_session_ds_state:open(SessionId) of
{ok, S0} -> {ok, S0} ->
@ -699,7 +705,8 @@ session_open(SessionId, NewConnInfo) ->
S3 = emqx_persistent_session_ds_state:set_peername( S3 = emqx_persistent_session_ds_state:set_peername(
maps:get(peername, NewConnInfo), S2 maps:get(peername, NewConnInfo), S2
), ),
S = emqx_persistent_session_ds_state:commit(S3), S4 = emqx_persistent_session_ds_state:set_will_message(MaybeWillMsg, S3),
S = emqx_persistent_session_ds_state:commit(S4),
Inflight = emqx_persistent_session_ds_inflight:new( Inflight = emqx_persistent_session_ds_inflight:new(
receive_maximum(NewConnInfo) receive_maximum(NewConnInfo)
), ),
@ -714,9 +721,14 @@ session_open(SessionId, NewConnInfo) ->
false false
end. end.
-spec session_ensure_new(id(), emqx_types:conninfo(), emqx_session:conf()) -> -spec session_ensure_new(
id(),
emqx_types:conninfo(),
emqx_maybe:t(message()),
emqx_session:conf()
) ->
session(). session().
session_ensure_new(Id, ConnInfo, Conf) -> session_ensure_new(Id, ConnInfo, MaybeWillMsg, Conf) ->
?tp(debug, persistent_session_ds_ensure_new, #{id => Id}), ?tp(debug, persistent_session_ds_ensure_new, #{id => Id}),
Now = now_ms(), Now = now_ms(),
S0 = emqx_persistent_session_ds_state:create_new(Id), S0 = emqx_persistent_session_ds_state:create_new(Id),
@ -738,7 +750,8 @@ session_ensure_new(Id, ConnInfo, Conf) ->
?committed(?QOS_2) ?committed(?QOS_2)
] ]
), ),
S = emqx_persistent_session_ds_state:commit(S4), S5 = emqx_persistent_session_ds_state:set_will_message(MaybeWillMsg, S4),
S = emqx_persistent_session_ds_state:commit(S5),
#{ #{
id => Id, id => Id,
props => Conf, props => Conf,
@ -1191,6 +1204,15 @@ seqno_diff(?QOS_1, A, B) ->
seqno_diff(?QOS_2, A, B) -> seqno_diff(?QOS_2, A, B) ->
A - B. A - B.
%%--------------------------------------------------------------------
%% Will message handling
%%--------------------------------------------------------------------
-spec publish_will_message(session(), message()) -> ok.
publish_will_message(_Session, #message{} = _WillMsg) ->
%% TODO
ok.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Tests %% Tests
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -75,5 +75,6 @@
%% Unique integer used to create unique identities %% Unique integer used to create unique identities
-define(last_id, last_id). -define(last_id, last_id).
-define(peername, peername). -define(peername, peername).
-define(will_message, will_message).
-endif. -endif.

View File

@ -30,6 +30,7 @@
-export([get_created_at/1, set_created_at/2]). -export([get_created_at/1, set_created_at/2]).
-export([get_last_alive_at/1, set_last_alive_at/2]). -export([get_last_alive_at/1, set_last_alive_at/2]).
-export([get_expiry_interval/1, set_expiry_interval/2]). -export([get_expiry_interval/1, set_expiry_interval/2]).
-export([get_will_message/1, set_will_message/2]).
-export([get_peername/1, set_peername/2]). -export([get_peername/1, set_peername/2]).
-export([new_id/1]). -export([new_id/1]).
-export([get_stream/2, put_stream/3, del_stream/2, fold_streams/3]). -export([get_stream/2, put_stream/3, del_stream/2, fold_streams/3]).
@ -58,6 +59,8 @@
%% Type declarations %% Type declarations
%%================================================================================ %%================================================================================
-type message() :: emqx_types:message().
-type subscriptions() :: emqx_topic_gbt:t(_SubId, emqx_persistent_session_ds:subscription()). -type subscriptions() :: emqx_topic_gbt:t(_SubId, emqx_persistent_session_ds:subscription()).
-opaque session_iterator() :: emqx_persistent_session_ds:id() | '$end_of_table'. -opaque session_iterator() :: emqx_persistent_session_ds:id() | '$end_of_table'.
@ -288,6 +291,14 @@ get_peername(Rec) ->
set_peername(Val, Rec) -> set_peername(Val, Rec) ->
set_meta(?peername, Val, Rec). set_meta(?peername, Val, Rec).
-spec get_will_message(t()) -> emqx_maybe:t(message()).
get_will_message(Rec) ->
get_meta(?will_message, Rec).
-spec set_will_message(emqx_maybe:t(message()), t()) -> t().
set_will_message(Val, Rec) ->
set_meta(?will_message, Val, Rec).
-spec new_id(t()) -> {emqx_persistent_session_ds:subscription_id(), t()}. -spec new_id(t()) -> {emqx_persistent_session_ds:subscription_id(), t()}.
new_id(Rec) -> new_id(Rec) ->
LastId = LastId =

View File

@ -55,8 +55,8 @@
-endif. -endif.
-export([ -export([
create/2, create/3,
open/2, open/3,
destroy/1, destroy/1,
destroy/2 destroy/2
]). ]).
@ -88,6 +88,11 @@
terminate/3 terminate/3
]). ]).
%% Will message handling
-export([
publish_will_message/2
]).
% Timers % Timers
-export([ -export([
ensure_timer/3, ensure_timer/3,
@ -175,57 +180,57 @@
%% Behaviour %% Behaviour
%% ------------------------------------------------------------------- %% -------------------------------------------------------------------
-callback create(clientinfo(), conninfo(), conf()) -> -callback create(clientinfo(), conninfo(), emqx_maybe:t(message()), conf()) ->
t(). t().
-callback open(clientinfo(), conninfo(), conf()) -> -callback open(clientinfo(), conninfo(), emqx_maybe:t(message()), conf()) ->
{_IsPresent :: true, t(), _ReplayContext} | false. {_IsPresent :: true, t(), _ReplayContext} | false.
-callback destroy(t() | clientinfo()) -> ok. -callback destroy(t() | clientinfo()) -> ok.
-callback publish_will_message(t(), message()) -> ok.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Create a Session %% Create a Session
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-spec create(clientinfo(), conninfo()) -> t(). -spec create(clientinfo(), conninfo(), emqx_maybe:t(message())) -> t().
create(ClientInfo, ConnInfo) -> create(ClientInfo, ConnInfo, MaybeWillMsg) ->
Conf = get_session_conf(ClientInfo), Conf = get_session_conf(ClientInfo),
create(ClientInfo, ConnInfo, Conf).
create(ClientInfo, ConnInfo, Conf) ->
% FIXME error conditions % FIXME error conditions
create(hd(choose_impl_candidates(ClientInfo, ConnInfo)), ClientInfo, ConnInfo, Conf). create(
hd(choose_impl_candidates(ClientInfo, ConnInfo)), ClientInfo, ConnInfo, MaybeWillMsg, Conf
).
create(Mod, ClientInfo, ConnInfo, Conf) -> create(Mod, ClientInfo, ConnInfo, MaybeWillMsg, Conf) ->
% FIXME error conditions % FIXME error conditions
Session = Mod:create(ClientInfo, ConnInfo, Conf), Session = Mod:create(ClientInfo, ConnInfo, MaybeWillMsg, Conf),
ok = emqx_metrics:inc('session.created'), ok = emqx_metrics:inc('session.created'),
ok = emqx_hooks:run('session.created', [ClientInfo, info(Session)]), ok = emqx_hooks:run('session.created', [ClientInfo, info(Session)]),
Session. Session.
-spec open(clientinfo(), conninfo()) -> -spec open(clientinfo(), conninfo(), emqx_maybe:t(message())) ->
{_IsPresent :: true, t(), _ReplayContext} | {_IsPresent :: false, t()}. {_IsPresent :: true, t(), _ReplayContext} | {_IsPresent :: false, t()}.
open(ClientInfo, ConnInfo) -> open(ClientInfo, ConnInfo, MaybeWillMsg) ->
Conf = get_session_conf(ClientInfo), Conf = get_session_conf(ClientInfo),
Mods = [Default | _] = choose_impl_candidates(ClientInfo, ConnInfo), Mods = [Default | _] = choose_impl_candidates(ClientInfo, ConnInfo),
%% NOTE %% NOTE
%% Try to look the existing session up in session stores corresponding to the given %% Try to look the existing session up in session stores corresponding to the given
%% `Mods` in order, starting from the last one. %% `Mods` in order, starting from the last one.
case try_open(Mods, ClientInfo, ConnInfo, Conf) of case try_open(Mods, ClientInfo, ConnInfo, MaybeWillMsg, Conf) of
{_IsPresent = true, _, _} = Present -> {_IsPresent = true, _, _} = Present ->
Present; Present;
false -> false ->
%% NOTE %% NOTE
%% Nothing was found, create a new session with the `Default` implementation. %% Nothing was found, create a new session with the `Default` implementation.
{false, create(Default, ClientInfo, ConnInfo, Conf)} {false, create(Default, ClientInfo, ConnInfo, MaybeWillMsg, Conf)}
end. end.
try_open([Mod | Rest], ClientInfo, ConnInfo, Conf) -> try_open([Mod | Rest], ClientInfo, ConnInfo, MaybeWillMsg, Conf) ->
case try_open(Rest, ClientInfo, ConnInfo, Conf) of case try_open(Rest, ClientInfo, ConnInfo, MaybeWillMsg, Conf) of
{_IsPresent = true, _, _} = Present -> {_IsPresent = true, _, _} = Present ->
Present; Present;
false -> false ->
Mod:open(ClientInfo, ConnInfo, Conf) Mod:open(ClientInfo, ConnInfo, MaybeWillMsg, Conf)
end; end;
try_open([], _ClientInfo, _ConnInfo, _Conf) -> try_open([], _ClientInfo, _ConnInfo, _MaybeWillMsg, _Conf) ->
false. false.
-spec get_session_conf(clientinfo()) -> conf(). -spec get_session_conf(clientinfo()) -> conf().
@ -635,3 +640,11 @@ choose_impl_candidates(#{zone := Zone}, #{expiry_interval := EI}) ->
run_hook(Name, Args) -> run_hook(Name, Args) ->
ok = emqx_metrics:inc(Name), ok = emqx_metrics:inc(Name),
emqx_hooks:run(Name, Args). emqx_hooks:run(Name, Args).
%%--------------------------------------------------------------------
%% Will message handling
%%--------------------------------------------------------------------
-spec publish_will_message(t(), message()) -> ok.
publish_will_message(Session, WillMsg) ->
?IMPL(Session):publish_will_message(Session, WillMsg).

View File

@ -58,8 +58,8 @@
-endif. -endif.
-export([ -export([
create/3, create/4,
open/3, open/4,
destroy/1 destroy/1
]). ]).
@ -106,6 +106,11 @@
dedup/4 dedup/4
]). ]).
%% Will message handling
-export([
publish_will_message/2
]).
%% Export for CT %% Export for CT
-export([set_field/3]). -export([set_field/3]).
@ -127,6 +132,7 @@
-type session() :: #session{}. -type session() :: #session{}.
-type replayctx() :: [emqx_types:message()]. -type replayctx() :: [emqx_types:message()].
-type message() :: emqx_types:message().
-type clientinfo() :: emqx_types:clientinfo(). -type clientinfo() :: emqx_types:clientinfo().
-type conninfo() :: emqx_session:conninfo(). -type conninfo() :: emqx_session:conninfo().
-type replies() :: emqx_session:replies(). -type replies() :: emqx_session:replies().
@ -151,11 +157,12 @@
%% Init a Session %% Init a Session
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-spec create(clientinfo(), conninfo(), emqx_session:conf()) -> -spec create(clientinfo(), conninfo(), emqx_maybe:t(message()), emqx_session:conf()) ->
session(). session().
create( create(
#{zone := Zone, clientid := ClientId}, #{zone := Zone, clientid := ClientId},
#{expiry_interval := EI, receive_maximum := ReceiveMax}, #{expiry_interval := EI, receive_maximum := ReceiveMax},
_MaybeWillMsg,
Conf Conf
) -> ) ->
QueueOpts = get_mqueue_conf(Zone), QueueOpts = get_mqueue_conf(Zone),
@ -200,9 +207,9 @@ destroy(_Session) ->
%% Open a (possibly existing) Session %% Open a (possibly existing) Session
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-spec open(clientinfo(), conninfo(), emqx_session:conf()) -> -spec open(clientinfo(), conninfo(), emqx_maybe:t(message()), emqx_session:conf()) ->
{_IsPresent :: true, session(), replayctx()} | _IsPresent :: false. {_IsPresent :: true, session(), replayctx()} | _IsPresent :: false.
open(ClientInfo = #{clientid := ClientId}, ConnInfo, Conf) -> open(ClientInfo = #{clientid := ClientId}, ConnInfo, _MaybeWillMsg, Conf) ->
case emqx_cm:takeover_session_begin(ClientId) of case emqx_cm:takeover_session_begin(ClientId) of
{ok, SessionRemote, TakeoverState} -> {ok, SessionRemote, TakeoverState} ->
Session0 = resume(ClientInfo, SessionRemote), Session0 = resume(ClientInfo, SessionRemote),
@ -797,6 +804,15 @@ next_pkt_id(Session = #session{next_pkt_id = ?MAX_PACKET_ID}) ->
next_pkt_id(Session = #session{next_pkt_id = Id}) -> next_pkt_id(Session = #session{next_pkt_id = Id}) ->
Session#session{next_pkt_id = Id + 1}. Session#session{next_pkt_id = Id + 1}.
%%--------------------------------------------------------------------
%% Will message handling
%%--------------------------------------------------------------------
-spec publish_will_message(session(), message()) -> ok.
publish_will_message(#session{}, #message{} = WillMsg) ->
_ = emqx_broker:publish(WillMsg),
ok.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Helper functions %% Helper functions
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -288,13 +288,7 @@ t_handle_in_puback_id_not_found(_) ->
% ?assertEqual(#{puback_in => 1}, emqx_channel:info(pub_stats, Channel)). % ?assertEqual(#{puback_in => 1}, emqx_channel:info(pub_stats, Channel)).
t_bad_receive_maximum(_) -> t_bad_receive_maximum(_) ->
ok = meck:expect( mock_cm_open_session(),
emqx_cm,
open_session,
fun(true, _ClientInfo, _ConnInfo) ->
{ok, #{session => session(), present => false}}
end
),
emqx_config:put_zone_conf(default, [mqtt, response_information], test), emqx_config:put_zone_conf(default, [mqtt, response_information], test),
C1 = channel(#{conn_state => idle}), C1 = channel(#{conn_state => idle}),
{shutdown, protocol_error, _, _} = {shutdown, protocol_error, _, _} =
@ -304,13 +298,7 @@ t_bad_receive_maximum(_) ->
). ).
t_override_client_receive_maximum(_) -> t_override_client_receive_maximum(_) ->
ok = meck:expect( mock_cm_open_session(),
emqx_cm,
open_session,
fun(true, _ClientInfo, _ConnInfo) ->
{ok, #{session => session(), present => false}}
end
),
emqx_config:put_zone_conf(default, [mqtt, response_information], test), emqx_config:put_zone_conf(default, [mqtt, response_information], test),
emqx_config:put_zone_conf(default, [mqtt, max_inflight], 0), emqx_config:put_zone_conf(default, [mqtt, max_inflight], 0),
C1 = channel(#{conn_state => idle}), C1 = channel(#{conn_state => idle}),
@ -460,13 +448,7 @@ t_handle_in_expected_packet(_) ->
emqx_channel:handle_in(packet, channel()). emqx_channel:handle_in(packet, channel()).
t_process_connect(_) -> t_process_connect(_) ->
ok = meck:expect( mock_cm_open_session(),
emqx_cm,
open_session,
fun(true, _ClientInfo, _ConnInfo) ->
{ok, #{session => session(), present => false}}
end
),
{ok, [{event, connected}, {connack, ?CONNACK_PACKET(?RC_SUCCESS)}], _Chan} = {ok, [{event, connected}, {connack, ?CONNACK_PACKET(?RC_SUCCESS)}], _Chan} =
emqx_channel:process_connect(#{}, channel(#{conn_state => idle})). emqx_channel:process_connect(#{}, channel(#{conn_state => idle})).
@ -604,13 +586,7 @@ t_handle_out_connack_sucess(_) ->
?assertEqual(connected, emqx_channel:info(conn_state, Channel)). ?assertEqual(connected, emqx_channel:info(conn_state, Channel)).
t_handle_out_connack_response_information(_) -> t_handle_out_connack_response_information(_) ->
ok = meck:expect( mock_cm_open_session(),
emqx_cm,
open_session,
fun(true, _ClientInfo, _ConnInfo) ->
{ok, #{session => session(), present => false}}
end
),
emqx_config:put_zone_conf(default, [mqtt, response_information], test), emqx_config:put_zone_conf(default, [mqtt, response_information], test),
IdleChannel = channel(#{conn_state => idle}), IdleChannel = channel(#{conn_state => idle}),
{ok, {ok,
@ -624,13 +600,7 @@ t_handle_out_connack_response_information(_) ->
). ).
t_handle_out_connack_not_response_information(_) -> t_handle_out_connack_not_response_information(_) ->
ok = meck:expect( mock_cm_open_session(),
emqx_cm,
open_session,
fun(true, _ClientInfo, _ConnInfo) ->
{ok, #{session => session(), present => false}}
end
),
emqx_config:put_zone_conf(default, [mqtt, response_information], test), emqx_config:put_zone_conf(default, [mqtt, response_information], test),
IdleChannel = channel(#{conn_state => idle}), IdleChannel = channel(#{conn_state => idle}),
{ok, [{event, connected}, {connack, ?CONNACK_PACKET(?RC_SUCCESS, 0, AckProps)}], _} = {ok, [{event, connected}, {connack, ?CONNACK_PACKET(?RC_SUCCESS, 0, AckProps)}], _} =
@ -1017,13 +987,7 @@ t_ws_cookie_init(_) ->
t_flapping_detect(_) -> t_flapping_detect(_) ->
emqx_config:put_zone_conf(default, [flapping_detect, window_time], 60000), emqx_config:put_zone_conf(default, [flapping_detect, window_time], 60000),
Parent = self(), Parent = self(),
ok = meck:expect( mock_cm_open_session(),
emqx_cm,
open_session,
fun(true, _ClientInfo, _ConnInfo) ->
{ok, #{session => session(), present => false}}
end
),
ok = meck:expect(emqx_flapping, detect, fun(_) -> Parent ! flapping_detect end), ok = meck:expect(emqx_flapping, detect, fun(_) -> Parent ! flapping_detect end),
IdleChannel = channel( IdleChannel = channel(
clientinfo(#{ clientinfo(#{
@ -1128,7 +1092,8 @@ session(ClientInfo, InitFields) when is_map(InitFields) ->
#{ #{
receive_maximum => 0, receive_maximum => 0,
expiry_interval => 0 expiry_interval => 0
} },
_WillMsg = undefined
), ),
maps:fold( maps:fold(
fun(Field, Value, SessionAcc) -> fun(Field, Value, SessionAcc) ->
@ -1139,6 +1104,15 @@ session(ClientInfo, InitFields) when is_map(InitFields) ->
InitFields InitFields
). ).
mock_cm_open_session() ->
ok = meck:expect(
emqx_cm,
open_session,
fun(true, _ClientInfo, _ConnInfo, _MaybeWillMsg) ->
{ok, #{session => session(), present => false}}
end
).
%% conn: 5/s; overall: 10/s %% conn: 5/s; overall: 10/s
quota() -> quota() ->
emqx_limiter_container:get_limiter_by_types(?MODULE, [message_routing], limiter_cfg()). emqx_limiter_container:get_limiter_by_types(?MODULE, [message_routing], limiter_cfg()).

View File

@ -59,6 +59,13 @@ init_per_suite(Config) ->
end_per_suite(Config) -> end_per_suite(Config) ->
emqx_cth_suite:stop(proplists:get_value(apps, Config)). emqx_cth_suite:stop(proplists:get_value(apps, Config)).
%%--------------------------------------------------------------------
%% Helper fns
%%--------------------------------------------------------------------
open_session(CleanStart, ClientInfo, ConnInfo) ->
emqx_cm:open_session(CleanStart, ClientInfo, ConnInfo, _WillMsg = undefined).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% TODO: Add more test cases %% TODO: Add more test cases
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -129,10 +136,10 @@ t_open_session(_) ->
receive_maximum => 100 receive_maximum => 100
}, },
{ok, #{session := Session1, present := false}} = {ok, #{session := Session1, present := false}} =
emqx_cm:open_session(true, ClientInfo, ConnInfo), open_session(true, ClientInfo, ConnInfo),
?assertEqual(100, emqx_session:info(inflight_max, Session1)), ?assertEqual(100, emqx_session:info(inflight_max, Session1)),
{ok, #{session := Session2, present := false}} = {ok, #{session := Session2, present := false}} =
emqx_cm:open_session(true, ClientInfo, ConnInfo), open_session(true, ClientInfo, ConnInfo),
?assertEqual(100, emqx_session:info(inflight_max, Session2)), ?assertEqual(100, emqx_session:info(inflight_max, Session2)),
emqx_cm:unregister_channel(<<"clientid">>), emqx_cm:unregister_channel(<<"clientid">>),
@ -163,7 +170,7 @@ t_open_session_race_condition(_) ->
Parent = self(), Parent = self(),
OpenASession = fun() -> OpenASession = fun() ->
timer:sleep(rand:uniform(100)), timer:sleep(rand:uniform(100)),
OpenR = (emqx_cm:open_session(true, ClientInfo, ConnInfo)), OpenR = open_session(true, ClientInfo, ConnInfo),
Parent ! OpenR, Parent ! OpenR,
case OpenR of case OpenR of
{ok, _} -> {ok, _} ->

View File

@ -679,7 +679,8 @@ channel(InitFields) ->
}, },
Session = emqx_session:create( Session = emqx_session:create(
ClientInfo, ClientInfo,
#{receive_maximum => 0, expiry_interval => 1000} #{receive_maximum => 0, expiry_interval => 1000},
_WillMsg = undefined
), ),
maps:fold( maps:fold(
fun(Field, Value, Channel) -> fun(Field, Value, Channel) ->

View File

@ -21,6 +21,7 @@
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("emqx_utils/include/emqx_message.hrl").
-compile(export_all). -compile(export_all).
-compile(nowarn_export_all). -compile(nowarn_export_all).
@ -1240,6 +1241,49 @@ t_multiple_subscription_matches(Config) ->
?assertEqual({ok, 2}, maps:find(qos, Msg2)), ?assertEqual({ok, 2}, maps:find(qos, Msg2)),
ok = emqtt:disconnect(Client2). ok = emqtt:disconnect(Client2).
%% Check that we get a single will message when the client disconnects with a non
%% successfull reason code, with `Will-Delay-Interval' = `Session-Expiry-Interval' > 0,
%% QoS = 1.
t_will_message1(Config) ->
ConnFun = ?config(conn_fun, Config),
WillTopic = ?config(topic, Config),
WillPayload = <<"will message">>,
ClientId = ?config(client_id, Config),
ok = emqx_hooks:add('client.connack', {?MODULE, on_client_connack, [self()]}, _Prio = 1000),
?check_trace(
#{timetrap => 15_000},
begin
ok = emqx:subscribe(WillTopic, #{qos => 2}),
{ok, Client} = emqtt:start_link([
{clientid, ClientId},
{proto_ver, v5},
{properties, #{'Session-Expiry-Interval' => 1}},
{will_topic, WillTopic},
{will_payload, WillPayload},
{will_qos, 1},
{will_props, #{'Will-Delay-Interval' => 1}}
| Config
]),
{{ok, _}, {ok, _}} =
?wait_async_action(emqtt:ConnFun(Client), #{?snk_kind := client_connack}),
ok = emqtt:disconnect(Client, ?RC_UNSPECIFIED_ERROR),
?assertReceive({deliver, WillTopic, #message{payload = WillPayload}}, 5_000),
%% No duplicates
?assertNotReceive({deliver, WillTopic, _}, 1_000),
ok
end,
[]
),
ok.
t_will_message1(init, Config) ->
Config;
t_will_message1('end', _Config) ->
ok = emqx_hooks:del('client.connack', {?MODULE, on_client_connack}),
ok.
get_topicwise_order(Msgs) -> get_topicwise_order(Msgs) ->
maps:groups_from_list(fun get_msgpub_topic/1, fun get_msgpub_payload/1, Msgs). maps:groups_from_list(fun get_msgpub_topic/1, fun get_msgpub_payload/1, Msgs).
@ -1267,3 +1311,7 @@ pick_respective_msgs(MsgRefs, Msgs) ->
debug_info(ClientId) -> debug_info(ClientId) ->
Info = emqx_persistent_session_ds:print_session(ClientId), Info = emqx_persistent_session_ds:print_session(ClientId),
ct:pal("*** State:~n~p", [Info]). ct:pal("*** State:~n~p", [Info]).
on_client_connack(_ConnInfo, _ReasonCode, _Props, _TestPid) ->
?tp(client_connack, #{}),
ok.

View File

@ -67,6 +67,7 @@ t_session_init(_) ->
Session = emqx_session_mem:create( Session = emqx_session_mem:create(
ClientInfo, ClientInfo,
ConnInfo, ConnInfo,
_WillMsg = undefined,
emqx_session:get_session_conf(ClientInfo) emqx_session:get_session_conf(ClientInfo)
), ),
?assertEqual(#{}, emqx_session_mem:info(subscriptions, Session)), ?assertEqual(#{}, emqx_session_mem:info(subscriptions, Session)),
@ -531,6 +532,7 @@ session(InitFields) when is_map(InitFields) ->
Session = emqx_session_mem:create( Session = emqx_session_mem:create(
ClientInfo, ClientInfo,
ConnInfo, ConnInfo,
_WillMsg = undefined,
emqx_session:get_session_conf(ClientInfo) emqx_session:get_session_conf(ClientInfo)
), ),
maps:fold( maps:fold(

View File

@ -613,7 +613,8 @@ channel(InitFields) ->
}, },
Session = emqx_session:create( Session = emqx_session:create(
ClientInfo, ClientInfo,
#{receive_maximum => 0, expiry_interval => 0} #{receive_maximum => 0, expiry_interval => 0},
_WillMsg = undefined
), ),
maps:fold( maps:fold(
fun(Field, Value, Channel) -> fun(Field, Value, Channel) ->

View File

@ -1,6 +1,6 @@
{application, emqx_eviction_agent, [ {application, emqx_eviction_agent, [
{description, "EMQX Eviction Agent"}, {description, "EMQX Eviction Agent"},
{vsn, "5.1.5"}, {vsn, "5.1.6"},
{registered, [ {registered, [
emqx_eviction_agent_sup, emqx_eviction_agent_sup,
emqx_eviction_agent, emqx_eviction_agent,

View File

@ -27,12 +27,15 @@
evict_connections/1, evict_connections/1,
evict_sessions/2, evict_sessions/2,
evict_sessions/3, evict_sessions/3,
purge_sessions/1, purge_sessions/1
evict_session_channel/3
]). ]).
%% RPC targets %% RPC targets
-export([all_local_channels_count/0]). -export([
all_local_channels_count/0,
evict_session_channel/3,
do_evict_session_channel_v3/4
]).
-behaviour(gen_server). -behaviour(gen_server).
@ -397,12 +400,23 @@ evict_session_channel(Nodes, ClientId, ConnInfo, ClientInfo) ->
Res Res
end. end.
%% RPC target for `emqx_eviction_agent_proto_v2'
-spec evict_session_channel( -spec evict_session_channel(
emqx_types:clientid(), emqx_types:clientid(),
emqx_types:conninfo(), emqx_types:conninfo(),
emqx_types:clientinfo() emqx_types:clientinfo()
) -> supervisor:startchild_ret(). ) -> supervisor:startchild_ret().
evict_session_channel(ClientId, ConnInfo, ClientInfo) -> evict_session_channel(ClientId, ConnInfo, ClientInfo) ->
do_evict_session_channel_v3(ClientId, ConnInfo, ClientInfo, _WillMsg = undefined).
%% RPC target for `emqx_eviction_agent_proto_v3'
-spec do_evict_session_channel_v3(
emqx_types:clientid(),
emqx_types:conninfo(),
emqx_types:clientinfo(),
emqx_maybe:t(emqx_types:message())
) -> supervisor:startchild_ret().
do_evict_session_channel_v3(ClientId, ConnInfo, ClientInfo, MaybeWillMsg) ->
?SLOG(info, #{ ?SLOG(info, #{
msg => "evict_session_channel", msg => "evict_session_channel",
client_id => ClientId, client_id => ClientId,
@ -412,7 +426,8 @@ evict_session_channel(ClientId, ConnInfo, ClientInfo) ->
Result = emqx_eviction_agent_channel:start_supervised( Result = emqx_eviction_agent_channel:start_supervised(
#{ #{
conninfo => ConnInfo, conninfo => ConnInfo,
clientinfo => ClientInfo clientinfo => ClientInfo,
will_message => MaybeWillMsg
} }
), ),
?SLOG( ?SLOG(

View File

@ -32,7 +32,8 @@
-type opts() :: #{ -type opts() :: #{
conninfo := emqx_types:conninfo(), conninfo := emqx_types:conninfo(),
clientinfo := emqx_types:clientinfo() clientinfo := emqx_types:clientinfo(),
will_message => emqx_maybe:t(emqx_types:message())
}. }.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -81,11 +82,12 @@ stop(Pid) ->
%% gen_server API %% gen_server API
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
init([#{conninfo := OldConnInfo, clientinfo := #{clientid := ClientId} = OldClientInfo}]) -> init([#{conninfo := OldConnInfo, clientinfo := #{clientid := ClientId} = OldClientInfo} = Opts]) ->
process_flag(trap_exit, true), process_flag(trap_exit, true),
ClientInfo = clientinfo(OldClientInfo), ClientInfo = clientinfo(OldClientInfo),
ConnInfo = conninfo(OldConnInfo), ConnInfo = conninfo(OldConnInfo),
case open_session(ConnInfo, ClientInfo) of MaybeWillMsg = maps:get(will_message, Opts, undefined),
case open_session(ConnInfo, ClientInfo, MaybeWillMsg) of
{ok, Channel0} -> {ok, Channel0} ->
case set_expiry_timer(Channel0) of case set_expiry_timer(Channel0) of
{ok, Channel1} -> {ok, Channel1} ->
@ -221,9 +223,9 @@ set_expiry_timer(#{conninfo := ConnInfo} = Channel) ->
{error, should_be_expired} {error, should_be_expired}
end. end.
open_session(ConnInfo, #{clientid := ClientId} = ClientInfo) -> open_session(ConnInfo, #{clientid := ClientId} = ClientInfo, MaybeWillMsg) ->
Channel = channel(ConnInfo, ClientInfo), Channel = channel(ConnInfo, ClientInfo),
case emqx_cm:open_session(_CleanSession = false, ClientInfo, ConnInfo) of case emqx_cm:open_session(_CleanSession = false, ClientInfo, ConnInfo, MaybeWillMsg) of
{ok, #{present := false}} -> {ok, #{present := false}} ->
?SLOG( ?SLOG(
info, info,

View File

@ -8,6 +8,7 @@
-export([ -export([
introduced_in/0, introduced_in/0,
deprecated_since/0,
evict_session_channel/4, evict_session_channel/4,
@ -20,6 +21,9 @@
introduced_in() -> introduced_in() ->
"5.2.1". "5.2.1".
deprecated_since() ->
"5.7.0".
-spec evict_session_channel( -spec evict_session_channel(
node(), node(),
emqx_types:clientid(), emqx_types:clientid(),

View File

@ -0,0 +1,41 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_eviction_agent_proto_v3).
-behaviour(emqx_bpapi).
-export([
introduced_in/0,
all_channels_count/2,
%% Changed in v3:
evict_session_channel/5
]).
-include_lib("emqx/include/bpapi.hrl").
introduced_in() ->
"5.7.0".
-spec all_channels_count([node()], timeout()) -> emqx_rpc:erpc_multicall(non_neg_integer()).
all_channels_count(Nodes, Timeout) ->
erpc:multicall(Nodes, emqx_eviction_agent, all_local_channels_count, [], Timeout).
%% Changed in v3:
-spec evict_session_channel(
node(),
emqx_types:clientid(),
emqx_types:conninfo(),
emqx_types:clientinfo(),
emqx_maybe:t(emqx_types:message())
) -> supervisor:startchild_err() | emqx_rpc:badrpc().
evict_session_channel(Node, ClientId, ConnInfo, ClientInfo, MaybeWillMsg) ->
rpc:call(
Node,
emqx_eviction_agent,
do_evict_session_channel_v3,
[ClientId, ConnInfo, ClientInfo, MaybeWillMsg]
).

View File

@ -373,10 +373,11 @@ process_connect(
Channel = #channel{ Channel = #channel{
ctx = Ctx, ctx = Ctx,
conninfo = ConnInfo = #{clean_start := CleanStart}, conninfo = ConnInfo = #{clean_start := CleanStart},
clientinfo = ClientInfo clientinfo = ClientInfo,
will_msg = MaybeWillMsg
} }
) -> ) ->
SessFun = fun(ClientInfoT, _) -> emqx_mqttsn_session:init(ClientInfoT) end, SessFun = fun(ClientInfoT, _) -> emqx_mqttsn_session:init(ClientInfoT, MaybeWillMsg) end,
case case
emqx_gateway_ctx:open_session( emqx_gateway_ctx:open_session(
Ctx, Ctx,

View File

@ -19,7 +19,7 @@
-export([registry/1, set_registry/2]). -export([registry/1, set_registry/2]).
-export([ -export([
init/1, init/2,
info/1, info/1,
info/2, info/2,
stats/1 stats/1
@ -52,12 +52,12 @@
-export_type([session/0]). -export_type([session/0]).
init(ClientInfo) -> init(ClientInfo, MaybeWillMsg) ->
ConnInfo = #{receive_maximum => 1, expiry_interval => 0}, ConnInfo = #{receive_maximum => 1, expiry_interval => 0},
SessionConf = emqx_session:get_session_conf(ClientInfo), SessionConf = emqx_session:get_session_conf(ClientInfo),
#{ #{
registry => emqx_mqttsn_registry:init(), registry => emqx_mqttsn_registry:init(),
session => emqx_session_mem:create(ClientInfo, ConnInfo, SessionConf) session => emqx_session_mem:create(ClientInfo, ConnInfo, MaybeWillMsg, SessionConf)
}. }.
registry(#{registry := Registry}) -> registry(#{registry := Registry}) ->