From 8c1a1d21a71df090c5375a8160fee958ba51a607 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Tue, 19 Mar 2024 09:51:55 -0300 Subject: [PATCH 1/2] feat(session): pass will message down to session when creating/opening --- .../emqx_persistent_session_ds_SUITE.erl | 7 ++- apps/emqx/priv/bpapi.versions | 1 + apps/emqx/src/emqx_channel.erl | 35 ++++++----- apps/emqx/src/emqx_cm.erl | 23 ++++--- apps/emqx/src/emqx_persistent_session_ds.erl | 52 +++++++++++----- apps/emqx/src/emqx_persistent_session_ds.hrl | 1 + .../src/emqx_persistent_session_ds_state.erl | 11 ++++ apps/emqx/src/emqx_session.erl | 53 +++++++++------- apps/emqx/src/emqx_session_mem.erl | 26 ++++++-- apps/emqx/test/emqx_channel_SUITE.erl | 60 ++++++------------- apps/emqx/test/emqx_cm_SUITE.erl | 13 +++- apps/emqx/test/emqx_connection_SUITE.erl | 3 +- .../test/emqx_persistent_session_SUITE.erl | 48 +++++++++++++++ apps/emqx/test/emqx_session_mem_SUITE.erl | 2 + apps/emqx/test/emqx_ws_connection_SUITE.erl | 3 +- .../src/emqx_eviction_agent.app.src | 2 +- .../src/emqx_eviction_agent.erl | 23 +++++-- .../src/emqx_eviction_agent_channel.erl | 12 ++-- .../proto/emqx_eviction_agent_proto_v2.erl | 4 ++ .../proto/emqx_eviction_agent_proto_v3.erl | 41 +++++++++++++ .../src/emqx_mqttsn_channel.erl | 5 +- .../src/emqx_mqttsn_session.erl | 6 +- 22 files changed, 300 insertions(+), 131 deletions(-) create mode 100644 apps/emqx_eviction_agent/src/proto/emqx_eviction_agent_proto_v3.erl diff --git a/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl b/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl index 736c911ff..1f358e3c0 100644 --- a/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl +++ b/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl @@ -25,6 +25,7 @@ all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> + emqx_common_test_helpers:clear_screen(), TCApps = emqx_cth_suite:start( app_specs(), #{work_dir => emqx_cth_suite:work_dir(Config)} @@ -244,8 +245,9 @@ t_session_subscription_idempotency(Config) -> fun(Trace) -> ct:pal("trace:\n ~p", [Trace]), ConnInfo = #{peername => {undefined, undefined}}, + WillMsg = undefined, Session = erpc:call( - Node1, emqx_persistent_session_ds, session_open, [ClientId, ConnInfo] + Node1, emqx_persistent_session_ds, session_open, [ClientId, ConnInfo, WillMsg] ), ?assertMatch( #{SubTopicFilter := #{}}, @@ -321,8 +323,9 @@ t_session_unsubscription_idempotency(Config) -> fun(Trace) -> ct:pal("trace:\n ~p", [Trace]), ConnInfo = #{peername => {undefined, undefined}}, + WillMsg = undefined, Session = erpc:call( - Node1, emqx_persistent_session_ds, session_open, [ClientId, ConnInfo] + Node1, emqx_persistent_session_ds, session_open, [ClientId, ConnInfo, WillMsg] ), ?assertEqual( #{}, diff --git a/apps/emqx/priv/bpapi.versions b/apps/emqx/priv/bpapi.versions index 11ac8f582..71cfccdea 100644 --- a/apps/emqx/priv/bpapi.versions +++ b/apps/emqx/priv/bpapi.versions @@ -26,6 +26,7 @@ {emqx_ds,4}. {emqx_eviction_agent,1}. {emqx_eviction_agent,2}. +{emqx_eviction_agent,3}. {emqx_exhook,1}. {emqx_ft_storage_exporter_fs,1}. {emqx_ft_storage_fs,1}. diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index a3480e232..59786afea 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -584,11 +584,12 @@ process_connect( AckProps, Channel = #channel{ conninfo = ConnInfo, - clientinfo = ClientInfo + clientinfo = ClientInfo, + will_msg = MaybeWillMsg } ) -> #{clean_start := CleanStart} = ConnInfo, - case emqx_cm:open_session(CleanStart, ClientInfo, ConnInfo) of + case emqx_cm:open_session(CleanStart, ClientInfo, ConnInfo, MaybeWillMsg) of {ok, #{session := Session, present := false}} -> NChannel = Channel#channel{session = Session}, handle_out(connack, {?RC_SUCCESS, sp(false), AckProps}, ensure_connected(NChannel)); @@ -1019,7 +1020,6 @@ handle_out(connack, {?RC_SUCCESS, SP, Props}, Channel = #channel{conninfo = Conn [ConnInfo, emqx_reason_codes:name(?RC_SUCCESS)], AckProps ), - return_connack( ?CONNACK_PACKET(?RC_SUCCESS, SP, NAckProps), ensure_keepalive(NAckProps, Channel) @@ -1378,9 +1378,9 @@ handle_timeout(_TRef, expire_session, Channel = #channel{session = Session}) -> handle_timeout( _TRef, will_message = TimerName, - Channel = #channel{clientinfo = ClientInfo, will_msg = WillMsg} + Channel = #channel{will_msg = WillMsg} ) -> - (WillMsg =/= undefined) andalso publish_will_msg(ClientInfo, WillMsg), + (WillMsg =/= undefined) andalso publish_will_msg(Channel), {ok, clean_timer(TimerName, Channel#channel{will_msg = undefined})}; handle_timeout( _TRef, @@ -2302,19 +2302,17 @@ maybe_publish_will_msg( maybe_publish_will_msg( _Reason, Channel = #channel{ - conninfo = #{proto_ver := ?MQTT_PROTO_V3, clientid := ClientId}, will_msg = WillMsg + conninfo = #{proto_ver := ?MQTT_PROTO_V3, clientid := ClientId} } ) -> %% Unconditionally publish will message for MQTT 3.1.1 ?tp(debug, maybe_publish_willmsg_v3, #{clientid => ClientId}), - _ = publish_will_msg(Channel#channel.clientinfo, WillMsg), + _ = publish_will_msg(Channel), Channel#channel{will_msg = undefined}; maybe_publish_will_msg( Reason, Channel = #channel{ - clientinfo = ClientInfo, - conninfo = #{clientid := ClientId}, - will_msg = WillMsg + conninfo = #{clientid := ClientId} } ) when Reason =:= expired orelse @@ -2332,12 +2330,11 @@ maybe_publish_will_msg( %% This ensures willmsg will be published if the willmsg timer is scheduled but not fired %% OR fired but not yet handled ?tp(debug, maybe_publish_willmsg_session_ends, #{clientid => ClientId, reason => Reason}), - _ = publish_will_msg(ClientInfo, WillMsg), + _ = publish_will_msg(Channel), remove_willmsg(Channel); maybe_publish_will_msg( takenover, Channel = #channel{ - clientinfo = ClientInfo, will_msg = WillMsg, conninfo = #{clientid := ClientId} } @@ -2355,7 +2352,7 @@ maybe_publish_will_msg( case will_delay_interval(WillMsg) of 0 -> ?tp(debug, maybe_publish_willmsg_takenover_pub, #{clientid => ClientId}), - _ = publish_will_msg(ClientInfo, WillMsg); + _ = publish_will_msg(Channel); I when I > 0 -> %% @NOTE Non-normative comment in MQTT 5.0 spec %% """ @@ -2370,7 +2367,6 @@ maybe_publish_will_msg( maybe_publish_will_msg( Reason, Channel = #channel{ - clientinfo = ClientInfo, will_msg = WillMsg, conninfo = #{clientid := ClientId} } @@ -2381,7 +2377,7 @@ maybe_publish_will_msg( ?tp(debug, maybe_publish_will_msg_other_publish, #{ clientid => ClientId, reason => Reason }), - _ = publish_will_msg(ClientInfo, WillMsg), + _ = publish_will_msg(Channel), remove_willmsg(Channel); I when I > 0 -> ?tp(debug, maybe_publish_will_msg_other_delay, #{clientid => ClientId, reason => Reason}), @@ -2396,8 +2392,11 @@ will_delay_interval(WillMsg) -> ). publish_will_msg( - ClientInfo = #{mountpoint := MountPoint}, - Msg = #message{topic = Topic} + #channel{ + session = Session, + clientinfo = ClientInfo = #{mountpoint := MountPoint}, + will_msg = Msg = #message{topic = Topic} + } ) -> Action = authz_action(Msg), PublishingDisallowed = emqx_access_control:authorize(ClientInfo, Action, Topic) =/= allow, @@ -2417,7 +2416,7 @@ publish_will_msg( false -> NMsg = emqx_mountpoint:mount(MountPoint, Msg), NMsg2 = NMsg#message{timestamp = erlang:system_time(millisecond)}, - _ = emqx_broker:publish(NMsg2), + ok = emqx_session:publish_will_message(Session, NMsg2), ok end. diff --git a/apps/emqx/src/emqx_cm.erl b/apps/emqx/src/emqx_cm.erl index c14058f9a..072611949 100644 --- a/apps/emqx/src/emqx_cm.erl +++ b/apps/emqx/src/emqx_cm.erl @@ -47,7 +47,7 @@ ]). -export([ - open_session/3, + open_session/4, discard_session/1, discard_session/2, takeover_session_begin/1, @@ -110,6 +110,8 @@ chan_pid/0 ]). +-type message() :: emqx_types:message(). + -type chan_pid() :: pid(). -type channel_info() :: { @@ -266,24 +268,29 @@ set_chan_stats(ClientId, ChanPid, Stats) when ?IS_CLIENTID(ClientId) -> end. %% @doc Open a session. --spec open_session(_CleanStart :: boolean(), emqx_types:clientinfo(), emqx_types:conninfo()) -> +-spec open_session( + _CleanStart :: boolean(), + emqx_types:clientinfo(), + emqx_types:conninfo(), + emqx_maybe:t(message()) +) -> {ok, #{ session := emqx_session:t(), present := boolean(), replay => _ReplayContext }} | {error, Reason :: term()}. -open_session(_CleanStart = true, ClientInfo = #{clientid := ClientId}, ConnInfo) -> +open_session(_CleanStart = true, ClientInfo = #{clientid := ClientId}, ConnInfo, MaybeWillMsg) -> Self = self(), emqx_cm_locker:trans(ClientId, fun(_) -> ok = discard_session(ClientId), ok = emqx_session:destroy(ClientInfo, ConnInfo), - create_register_session(ClientInfo, ConnInfo, Self) + create_register_session(ClientInfo, ConnInfo, MaybeWillMsg, Self) end); -open_session(_CleanStart = false, ClientInfo = #{clientid := ClientId}, ConnInfo) -> +open_session(_CleanStart = false, ClientInfo = #{clientid := ClientId}, ConnInfo, MaybeWillMsg) -> Self = self(), emqx_cm_locker:trans(ClientId, fun(_) -> - case emqx_session:open(ClientInfo, ConnInfo) of + case emqx_session:open(ClientInfo, ConnInfo, MaybeWillMsg) of {true, Session, ReplayContext} -> ok = register_channel(ClientId, Self, ConnInfo), {ok, #{session => Session, present => true, replay => ReplayContext}}; @@ -293,8 +300,8 @@ open_session(_CleanStart = false, ClientInfo = #{clientid := ClientId}, ConnInfo end end). -create_register_session(ClientInfo = #{clientid := ClientId}, ConnInfo, ChanPid) -> - Session = emqx_session:create(ClientInfo, ConnInfo), +create_register_session(ClientInfo = #{clientid := ClientId}, ConnInfo, MaybeWillMsg, ChanPid) -> + Session = emqx_session:create(ClientInfo, ConnInfo, MaybeWillMsg), ok = register_channel(ClientId, ChanPid, ConnInfo), {ok, #{session => Session, present => false}}. diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index d0c839598..72014bd37 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -34,8 +34,8 @@ %% Session API -export([ - create/3, - open/3, + create/4, + open/4, destroy/1 ]). @@ -66,6 +66,11 @@ terminate/2 ]). +%% Will message handling +-export([ + publish_will_message/2 +]). + %% Managment APIs: -export([ list_client_subscriptions/1 @@ -88,7 +93,7 @@ -ifdef(TEST). -export([ - session_open/2, + session_open/3, list_all_sessions/0 ]). -endif. @@ -155,6 +160,7 @@ -type stream_state() :: #srs{}. +-type message() :: emqx_types:message(). -type timestamp() :: emqx_utils_calendar:epoch_millisecond(). -type millisecond() :: non_neg_integer(). -type clientinfo() :: emqx_types:clientinfo(). @@ -181,14 +187,14 @@ %% --spec create(clientinfo(), conninfo(), emqx_session:conf()) -> +-spec create(clientinfo(), conninfo(), emqx_maybe:t(message()), emqx_session:conf()) -> session(). -create(#{clientid := ClientID}, ConnInfo, Conf) -> - ensure_timers(session_ensure_new(ClientID, ConnInfo, Conf)). +create(#{clientid := ClientID}, ConnInfo, MaybeWillMsg, Conf) -> + ensure_timers(session_ensure_new(ClientID, ConnInfo, MaybeWillMsg, Conf)). --spec open(clientinfo(), conninfo(), emqx_session:conf()) -> +-spec open(clientinfo(), conninfo(), emqx_maybe:t(message()), emqx_session:conf()) -> {_IsPresent :: true, session(), []} | false. -open(#{clientid := ClientID} = _ClientInfo, ConnInfo, Conf) -> +open(#{clientid := ClientID} = _ClientInfo, ConnInfo, MaybeWillMsg, Conf) -> %% NOTE %% The fact that we need to concern about discarding all live channels here %% is essentially a consequence of the in-memory session design, where we @@ -196,7 +202,7 @@ open(#{clientid := ClientID} = _ClientInfo, ConnInfo, Conf) -> %% somehow isolate those idling not-yet-expired sessions into a separate process %% space, and move this call back into `emqx_cm` where it belongs. ok = emqx_cm:discard_session(ClientID), - case session_open(ClientID, ConnInfo) of + case session_open(ClientID, ConnInfo, MaybeWillMsg) of Session0 = #{} -> Session = Session0#{props => Conf}, {true, ensure_timers(Session), []}; @@ -679,9 +685,9 @@ sync(ClientId) -> %% %% Note: session API doesn't handle session takeovers, it's the job of %% the broker. --spec session_open(id(), emqx_types:conninfo()) -> +-spec session_open(id(), emqx_types:conninfo(), emqx_maybe:t(message())) -> session() | false. -session_open(SessionId, NewConnInfo) -> +session_open(SessionId, NewConnInfo, MaybeWillMsg) -> NowMS = now_ms(), case emqx_persistent_session_ds_state:open(SessionId) of {ok, S0} -> @@ -699,7 +705,8 @@ session_open(SessionId, NewConnInfo) -> S3 = emqx_persistent_session_ds_state:set_peername( maps:get(peername, NewConnInfo), S2 ), - S = emqx_persistent_session_ds_state:commit(S3), + S4 = emqx_persistent_session_ds_state:set_will_message(MaybeWillMsg, S3), + S = emqx_persistent_session_ds_state:commit(S4), Inflight = emqx_persistent_session_ds_inflight:new( receive_maximum(NewConnInfo) ), @@ -714,9 +721,14 @@ session_open(SessionId, NewConnInfo) -> false end. --spec session_ensure_new(id(), emqx_types:conninfo(), emqx_session:conf()) -> +-spec session_ensure_new( + id(), + emqx_types:conninfo(), + emqx_maybe:t(message()), + emqx_session:conf() +) -> session(). -session_ensure_new(Id, ConnInfo, Conf) -> +session_ensure_new(Id, ConnInfo, MaybeWillMsg, Conf) -> ?tp(debug, persistent_session_ds_ensure_new, #{id => Id}), Now = now_ms(), S0 = emqx_persistent_session_ds_state:create_new(Id), @@ -738,7 +750,8 @@ session_ensure_new(Id, ConnInfo, Conf) -> ?committed(?QOS_2) ] ), - S = emqx_persistent_session_ds_state:commit(S4), + S5 = emqx_persistent_session_ds_state:set_will_message(MaybeWillMsg, S4), + S = emqx_persistent_session_ds_state:commit(S5), #{ id => Id, props => Conf, @@ -1191,6 +1204,15 @@ seqno_diff(?QOS_1, A, B) -> seqno_diff(?QOS_2, A, B) -> A - B. +%%-------------------------------------------------------------------- +%% Will message handling +%%-------------------------------------------------------------------- + +-spec publish_will_message(session(), message()) -> ok. +publish_will_message(_Session, #message{} = _WillMsg) -> + %% TODO + ok. + %%-------------------------------------------------------------------- %% Tests %%-------------------------------------------------------------------- diff --git a/apps/emqx/src/emqx_persistent_session_ds.hrl b/apps/emqx/src/emqx_persistent_session_ds.hrl index 17e5d31e4..67ef485d5 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.hrl +++ b/apps/emqx/src/emqx_persistent_session_ds.hrl @@ -75,5 +75,6 @@ %% Unique integer used to create unique identities -define(last_id, last_id). -define(peername, peername). +-define(will_message, will_message). -endif. diff --git a/apps/emqx/src/emqx_persistent_session_ds_state.erl b/apps/emqx/src/emqx_persistent_session_ds_state.erl index 167709d8a..5de74fb06 100644 --- a/apps/emqx/src/emqx_persistent_session_ds_state.erl +++ b/apps/emqx/src/emqx_persistent_session_ds_state.erl @@ -30,6 +30,7 @@ -export([get_created_at/1, set_created_at/2]). -export([get_last_alive_at/1, set_last_alive_at/2]). -export([get_expiry_interval/1, set_expiry_interval/2]). +-export([get_will_message/1, set_will_message/2]). -export([get_peername/1, set_peername/2]). -export([new_id/1]). -export([get_stream/2, put_stream/3, del_stream/2, fold_streams/3]). @@ -58,6 +59,8 @@ %% Type declarations %%================================================================================ +-type message() :: emqx_types:message(). + -type subscriptions() :: emqx_topic_gbt:t(_SubId, emqx_persistent_session_ds:subscription()). -opaque session_iterator() :: emqx_persistent_session_ds:id() | '$end_of_table'. @@ -288,6 +291,14 @@ get_peername(Rec) -> set_peername(Val, Rec) -> set_meta(?peername, Val, Rec). +-spec get_will_message(t()) -> emqx_maybe:t(message()). +get_will_message(Rec) -> + get_meta(?will_message, Rec). + +-spec set_will_message(emqx_maybe:t(message()), t()) -> t(). +set_will_message(Val, Rec) -> + set_meta(?will_message, Val, Rec). + -spec new_id(t()) -> {emqx_persistent_session_ds:subscription_id(), t()}. new_id(Rec) -> LastId = diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index 0723452f1..63963c242 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -55,8 +55,8 @@ -endif. -export([ - create/2, - open/2, + create/3, + open/3, destroy/1, destroy/2 ]). @@ -88,6 +88,11 @@ terminate/3 ]). +%% Will message handling +-export([ + publish_will_message/2 +]). + % Timers -export([ ensure_timer/3, @@ -175,57 +180,57 @@ %% Behaviour %% ------------------------------------------------------------------- --callback create(clientinfo(), conninfo(), conf()) -> +-callback create(clientinfo(), conninfo(), emqx_maybe:t(message()), conf()) -> t(). --callback open(clientinfo(), conninfo(), conf()) -> +-callback open(clientinfo(), conninfo(), emqx_maybe:t(message()), conf()) -> {_IsPresent :: true, t(), _ReplayContext} | false. -callback destroy(t() | clientinfo()) -> ok. +-callback publish_will_message(t(), message()) -> ok. %%-------------------------------------------------------------------- %% Create a Session %%-------------------------------------------------------------------- --spec create(clientinfo(), conninfo()) -> t(). -create(ClientInfo, ConnInfo) -> +-spec create(clientinfo(), conninfo(), emqx_maybe:t(message())) -> t(). +create(ClientInfo, ConnInfo, MaybeWillMsg) -> Conf = get_session_conf(ClientInfo), - create(ClientInfo, ConnInfo, Conf). - -create(ClientInfo, ConnInfo, Conf) -> % FIXME error conditions - create(hd(choose_impl_candidates(ClientInfo, ConnInfo)), ClientInfo, ConnInfo, Conf). + create( + hd(choose_impl_candidates(ClientInfo, ConnInfo)), ClientInfo, ConnInfo, MaybeWillMsg, Conf + ). -create(Mod, ClientInfo, ConnInfo, Conf) -> +create(Mod, ClientInfo, ConnInfo, MaybeWillMsg, Conf) -> % FIXME error conditions - Session = Mod:create(ClientInfo, ConnInfo, Conf), + Session = Mod:create(ClientInfo, ConnInfo, MaybeWillMsg, Conf), ok = emqx_metrics:inc('session.created'), ok = emqx_hooks:run('session.created', [ClientInfo, info(Session)]), Session. --spec open(clientinfo(), conninfo()) -> +-spec open(clientinfo(), conninfo(), emqx_maybe:t(message())) -> {_IsPresent :: true, t(), _ReplayContext} | {_IsPresent :: false, t()}. -open(ClientInfo, ConnInfo) -> +open(ClientInfo, ConnInfo, MaybeWillMsg) -> Conf = get_session_conf(ClientInfo), Mods = [Default | _] = choose_impl_candidates(ClientInfo, ConnInfo), %% NOTE %% Try to look the existing session up in session stores corresponding to the given %% `Mods` in order, starting from the last one. - case try_open(Mods, ClientInfo, ConnInfo, Conf) of + case try_open(Mods, ClientInfo, ConnInfo, MaybeWillMsg, Conf) of {_IsPresent = true, _, _} = Present -> Present; false -> %% NOTE %% Nothing was found, create a new session with the `Default` implementation. - {false, create(Default, ClientInfo, ConnInfo, Conf)} + {false, create(Default, ClientInfo, ConnInfo, MaybeWillMsg, Conf)} end. -try_open([Mod | Rest], ClientInfo, ConnInfo, Conf) -> - case try_open(Rest, ClientInfo, ConnInfo, Conf) of +try_open([Mod | Rest], ClientInfo, ConnInfo, MaybeWillMsg, Conf) -> + case try_open(Rest, ClientInfo, ConnInfo, MaybeWillMsg, Conf) of {_IsPresent = true, _, _} = Present -> Present; false -> - Mod:open(ClientInfo, ConnInfo, Conf) + Mod:open(ClientInfo, ConnInfo, MaybeWillMsg, Conf) end; -try_open([], _ClientInfo, _ConnInfo, _Conf) -> +try_open([], _ClientInfo, _ConnInfo, _MaybeWillMsg, _Conf) -> false. -spec get_session_conf(clientinfo()) -> conf(). @@ -635,3 +640,11 @@ choose_impl_candidates(#{zone := Zone}, #{expiry_interval := EI}) -> run_hook(Name, Args) -> ok = emqx_metrics:inc(Name), emqx_hooks:run(Name, Args). + +%%-------------------------------------------------------------------- +%% Will message handling +%%-------------------------------------------------------------------- + +-spec publish_will_message(t(), message()) -> ok. +publish_will_message(Session, WillMsg) -> + ?IMPL(Session):publish_will_message(Session, WillMsg). diff --git a/apps/emqx/src/emqx_session_mem.erl b/apps/emqx/src/emqx_session_mem.erl index 439057384..95222a4b3 100644 --- a/apps/emqx/src/emqx_session_mem.erl +++ b/apps/emqx/src/emqx_session_mem.erl @@ -58,8 +58,8 @@ -endif. -export([ - create/3, - open/3, + create/4, + open/4, destroy/1 ]). @@ -106,6 +106,11 @@ dedup/4 ]). +%% Will message handling +-export([ + publish_will_message/2 +]). + %% Export for CT -export([set_field/3]). @@ -127,6 +132,7 @@ -type session() :: #session{}. -type replayctx() :: [emqx_types:message()]. +-type message() :: emqx_types:message(). -type clientinfo() :: emqx_types:clientinfo(). -type conninfo() :: emqx_session:conninfo(). -type replies() :: emqx_session:replies(). @@ -151,11 +157,12 @@ %% Init a Session %%-------------------------------------------------------------------- --spec create(clientinfo(), conninfo(), emqx_session:conf()) -> +-spec create(clientinfo(), conninfo(), emqx_maybe:t(message()), emqx_session:conf()) -> session(). create( #{zone := Zone, clientid := ClientId}, #{expiry_interval := EI, receive_maximum := ReceiveMax}, + _MaybeWillMsg, Conf ) -> QueueOpts = get_mqueue_conf(Zone), @@ -200,9 +207,9 @@ destroy(_Session) -> %% Open a (possibly existing) Session %%-------------------------------------------------------------------- --spec open(clientinfo(), conninfo(), emqx_session:conf()) -> +-spec open(clientinfo(), conninfo(), emqx_maybe:t(message()), emqx_session:conf()) -> {_IsPresent :: true, session(), replayctx()} | _IsPresent :: false. -open(ClientInfo = #{clientid := ClientId}, ConnInfo, Conf) -> +open(ClientInfo = #{clientid := ClientId}, ConnInfo, _MaybeWillMsg, Conf) -> case emqx_cm:takeover_session_begin(ClientId) of {ok, SessionRemote, TakeoverState} -> Session0 = resume(ClientInfo, SessionRemote), @@ -797,6 +804,15 @@ next_pkt_id(Session = #session{next_pkt_id = ?MAX_PACKET_ID}) -> next_pkt_id(Session = #session{next_pkt_id = Id}) -> Session#session{next_pkt_id = Id + 1}. +%%-------------------------------------------------------------------- +%% Will message handling +%%-------------------------------------------------------------------- + +-spec publish_will_message(session(), message()) -> ok. +publish_will_message(#session{}, #message{} = WillMsg) -> + _ = emqx_broker:publish(WillMsg), + ok. + %%-------------------------------------------------------------------- %% Helper functions %%-------------------------------------------------------------------- diff --git a/apps/emqx/test/emqx_channel_SUITE.erl b/apps/emqx/test/emqx_channel_SUITE.erl index 078272571..4b3fa1209 100644 --- a/apps/emqx/test/emqx_channel_SUITE.erl +++ b/apps/emqx/test/emqx_channel_SUITE.erl @@ -288,13 +288,7 @@ t_handle_in_puback_id_not_found(_) -> % ?assertEqual(#{puback_in => 1}, emqx_channel:info(pub_stats, Channel)). t_bad_receive_maximum(_) -> - ok = meck:expect( - emqx_cm, - open_session, - fun(true, _ClientInfo, _ConnInfo) -> - {ok, #{session => session(), present => false}} - end - ), + mock_cm_open_session(), emqx_config:put_zone_conf(default, [mqtt, response_information], test), C1 = channel(#{conn_state => idle}), {shutdown, protocol_error, _, _} = @@ -304,13 +298,7 @@ t_bad_receive_maximum(_) -> ). t_override_client_receive_maximum(_) -> - ok = meck:expect( - emqx_cm, - open_session, - fun(true, _ClientInfo, _ConnInfo) -> - {ok, #{session => session(), present => false}} - end - ), + mock_cm_open_session(), emqx_config:put_zone_conf(default, [mqtt, response_information], test), emqx_config:put_zone_conf(default, [mqtt, max_inflight], 0), C1 = channel(#{conn_state => idle}), @@ -460,13 +448,7 @@ t_handle_in_expected_packet(_) -> emqx_channel:handle_in(packet, channel()). t_process_connect(_) -> - ok = meck:expect( - emqx_cm, - open_session, - fun(true, _ClientInfo, _ConnInfo) -> - {ok, #{session => session(), present => false}} - end - ), + mock_cm_open_session(), {ok, [{event, connected}, {connack, ?CONNACK_PACKET(?RC_SUCCESS)}], _Chan} = emqx_channel:process_connect(#{}, channel(#{conn_state => idle})). @@ -604,13 +586,7 @@ t_handle_out_connack_sucess(_) -> ?assertEqual(connected, emqx_channel:info(conn_state, Channel)). t_handle_out_connack_response_information(_) -> - ok = meck:expect( - emqx_cm, - open_session, - fun(true, _ClientInfo, _ConnInfo) -> - {ok, #{session => session(), present => false}} - end - ), + mock_cm_open_session(), emqx_config:put_zone_conf(default, [mqtt, response_information], test), IdleChannel = channel(#{conn_state => idle}), {ok, @@ -624,13 +600,7 @@ t_handle_out_connack_response_information(_) -> ). t_handle_out_connack_not_response_information(_) -> - ok = meck:expect( - emqx_cm, - open_session, - fun(true, _ClientInfo, _ConnInfo) -> - {ok, #{session => session(), present => false}} - end - ), + mock_cm_open_session(), emqx_config:put_zone_conf(default, [mqtt, response_information], test), IdleChannel = channel(#{conn_state => idle}), {ok, [{event, connected}, {connack, ?CONNACK_PACKET(?RC_SUCCESS, 0, AckProps)}], _} = @@ -1017,13 +987,7 @@ t_ws_cookie_init(_) -> t_flapping_detect(_) -> emqx_config:put_zone_conf(default, [flapping_detect, window_time], 60000), Parent = self(), - ok = meck:expect( - emqx_cm, - open_session, - fun(true, _ClientInfo, _ConnInfo) -> - {ok, #{session => session(), present => false}} - end - ), + mock_cm_open_session(), ok = meck:expect(emqx_flapping, detect, fun(_) -> Parent ! flapping_detect end), IdleChannel = channel( clientinfo(#{ @@ -1128,7 +1092,8 @@ session(ClientInfo, InitFields) when is_map(InitFields) -> #{ receive_maximum => 0, expiry_interval => 0 - } + }, + _WillMsg = undefined ), maps:fold( fun(Field, Value, SessionAcc) -> @@ -1139,6 +1104,15 @@ session(ClientInfo, InitFields) when is_map(InitFields) -> InitFields ). +mock_cm_open_session() -> + ok = meck:expect( + emqx_cm, + open_session, + fun(true, _ClientInfo, _ConnInfo, _MaybeWillMsg) -> + {ok, #{session => session(), present => false}} + end + ). + %% conn: 5/s; overall: 10/s quota() -> emqx_limiter_container:get_limiter_by_types(?MODULE, [message_routing], limiter_cfg()). diff --git a/apps/emqx/test/emqx_cm_SUITE.erl b/apps/emqx/test/emqx_cm_SUITE.erl index e47cf6730..16245e272 100644 --- a/apps/emqx/test/emqx_cm_SUITE.erl +++ b/apps/emqx/test/emqx_cm_SUITE.erl @@ -59,6 +59,13 @@ init_per_suite(Config) -> end_per_suite(Config) -> emqx_cth_suite:stop(proplists:get_value(apps, Config)). +%%-------------------------------------------------------------------- +%% Helper fns +%%-------------------------------------------------------------------- + +open_session(CleanStart, ClientInfo, ConnInfo) -> + emqx_cm:open_session(CleanStart, ClientInfo, ConnInfo, _WillMsg = undefined). + %%-------------------------------------------------------------------- %% TODO: Add more test cases %%-------------------------------------------------------------------- @@ -129,10 +136,10 @@ t_open_session(_) -> receive_maximum => 100 }, {ok, #{session := Session1, present := false}} = - emqx_cm:open_session(true, ClientInfo, ConnInfo), + open_session(true, ClientInfo, ConnInfo), ?assertEqual(100, emqx_session:info(inflight_max, Session1)), {ok, #{session := Session2, present := false}} = - emqx_cm:open_session(true, ClientInfo, ConnInfo), + open_session(true, ClientInfo, ConnInfo), ?assertEqual(100, emqx_session:info(inflight_max, Session2)), emqx_cm:unregister_channel(<<"clientid">>), @@ -163,7 +170,7 @@ t_open_session_race_condition(_) -> Parent = self(), OpenASession = fun() -> timer:sleep(rand:uniform(100)), - OpenR = (emqx_cm:open_session(true, ClientInfo, ConnInfo)), + OpenR = open_session(true, ClientInfo, ConnInfo), Parent ! OpenR, case OpenR of {ok, _} -> diff --git a/apps/emqx/test/emqx_connection_SUITE.erl b/apps/emqx/test/emqx_connection_SUITE.erl index f3f470d1e..7fffa3374 100644 --- a/apps/emqx/test/emqx_connection_SUITE.erl +++ b/apps/emqx/test/emqx_connection_SUITE.erl @@ -679,7 +679,8 @@ channel(InitFields) -> }, Session = emqx_session:create( ClientInfo, - #{receive_maximum => 0, expiry_interval => 1000} + #{receive_maximum => 0, expiry_interval => 1000}, + _WillMsg = undefined ), maps:fold( fun(Field, Value, Channel) -> diff --git a/apps/emqx/test/emqx_persistent_session_SUITE.erl b/apps/emqx/test/emqx_persistent_session_SUITE.erl index a5c171f67..088023ec2 100644 --- a/apps/emqx/test/emqx_persistent_session_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_session_SUITE.erl @@ -21,6 +21,7 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). +-include_lib("emqx_utils/include/emqx_message.hrl"). -compile(export_all). -compile(nowarn_export_all). @@ -1240,6 +1241,49 @@ t_multiple_subscription_matches(Config) -> ?assertEqual({ok, 2}, maps:find(qos, Msg2)), ok = emqtt:disconnect(Client2). +%% Check that we get a single will message when the client disconnects with a non +%% successfull reason code, with `Will-Delay-Interval' = `Session-Expiry-Interval' > 0, +%% QoS = 1. +t_will_message1(Config) -> + ConnFun = ?config(conn_fun, Config), + WillTopic = ?config(topic, Config), + WillPayload = <<"will message">>, + ClientId = ?config(client_id, Config), + ok = emqx_hooks:add('client.connack', {?MODULE, on_client_connack, [self()]}, _Prio = 1000), + + ?check_trace( + #{timetrap => 15_000}, + begin + ok = emqx:subscribe(WillTopic, #{qos => 2}), + {ok, Client} = emqtt:start_link([ + {clientid, ClientId}, + {proto_ver, v5}, + {properties, #{'Session-Expiry-Interval' => 1}}, + {will_topic, WillTopic}, + {will_payload, WillPayload}, + {will_qos, 1}, + {will_props, #{'Will-Delay-Interval' => 1}} + | Config + ]), + {{ok, _}, {ok, _}} = + ?wait_async_action(emqtt:ConnFun(Client), #{?snk_kind := client_connack}), + ok = emqtt:disconnect(Client, ?RC_UNSPECIFIED_ERROR), + + ?assertReceive({deliver, WillTopic, #message{payload = WillPayload}}, 5_000), + %% No duplicates + ?assertNotReceive({deliver, WillTopic, _}, 1_000), + + ok + end, + [] + ), + ok. +t_will_message1(init, Config) -> + Config; +t_will_message1('end', _Config) -> + ok = emqx_hooks:del('client.connack', {?MODULE, on_client_connack}), + ok. + get_topicwise_order(Msgs) -> maps:groups_from_list(fun get_msgpub_topic/1, fun get_msgpub_payload/1, Msgs). @@ -1267,3 +1311,7 @@ pick_respective_msgs(MsgRefs, Msgs) -> debug_info(ClientId) -> Info = emqx_persistent_session_ds:print_session(ClientId), ct:pal("*** State:~n~p", [Info]). + +on_client_connack(_ConnInfo, _ReasonCode, _Props, _TestPid) -> + ?tp(client_connack, #{}), + ok. diff --git a/apps/emqx/test/emqx_session_mem_SUITE.erl b/apps/emqx/test/emqx_session_mem_SUITE.erl index ec98388d7..ec08a118d 100644 --- a/apps/emqx/test/emqx_session_mem_SUITE.erl +++ b/apps/emqx/test/emqx_session_mem_SUITE.erl @@ -67,6 +67,7 @@ t_session_init(_) -> Session = emqx_session_mem:create( ClientInfo, ConnInfo, + _WillMsg = undefined, emqx_session:get_session_conf(ClientInfo) ), ?assertEqual(#{}, emqx_session_mem:info(subscriptions, Session)), @@ -531,6 +532,7 @@ session(InitFields) when is_map(InitFields) -> Session = emqx_session_mem:create( ClientInfo, ConnInfo, + _WillMsg = undefined, emqx_session:get_session_conf(ClientInfo) ), maps:fold( diff --git a/apps/emqx/test/emqx_ws_connection_SUITE.erl b/apps/emqx/test/emqx_ws_connection_SUITE.erl index b9e40b454..59f201864 100644 --- a/apps/emqx/test/emqx_ws_connection_SUITE.erl +++ b/apps/emqx/test/emqx_ws_connection_SUITE.erl @@ -613,7 +613,8 @@ channel(InitFields) -> }, Session = emqx_session:create( ClientInfo, - #{receive_maximum => 0, expiry_interval => 0} + #{receive_maximum => 0, expiry_interval => 0}, + _WillMsg = undefined ), maps:fold( fun(Field, Value, Channel) -> diff --git a/apps/emqx_eviction_agent/src/emqx_eviction_agent.app.src b/apps/emqx_eviction_agent/src/emqx_eviction_agent.app.src index cc415d495..10a464f26 100644 --- a/apps/emqx_eviction_agent/src/emqx_eviction_agent.app.src +++ b/apps/emqx_eviction_agent/src/emqx_eviction_agent.app.src @@ -1,6 +1,6 @@ {application, emqx_eviction_agent, [ {description, "EMQX Eviction Agent"}, - {vsn, "5.1.5"}, + {vsn, "5.1.6"}, {registered, [ emqx_eviction_agent_sup, emqx_eviction_agent, diff --git a/apps/emqx_eviction_agent/src/emqx_eviction_agent.erl b/apps/emqx_eviction_agent/src/emqx_eviction_agent.erl index 2f0bc78a1..5c02da6fc 100644 --- a/apps/emqx_eviction_agent/src/emqx_eviction_agent.erl +++ b/apps/emqx_eviction_agent/src/emqx_eviction_agent.erl @@ -27,12 +27,15 @@ evict_connections/1, evict_sessions/2, evict_sessions/3, - purge_sessions/1, - evict_session_channel/3 + purge_sessions/1 ]). %% RPC targets --export([all_local_channels_count/0]). +-export([ + all_local_channels_count/0, + evict_session_channel/3, + do_evict_session_channel_v3/4 +]). -behaviour(gen_server). @@ -397,12 +400,23 @@ evict_session_channel(Nodes, ClientId, ConnInfo, ClientInfo) -> Res end. +%% RPC target for `emqx_eviction_agent_proto_v2' -spec evict_session_channel( emqx_types:clientid(), emqx_types:conninfo(), emqx_types:clientinfo() ) -> supervisor:startchild_ret(). evict_session_channel(ClientId, ConnInfo, ClientInfo) -> + do_evict_session_channel_v3(ClientId, ConnInfo, ClientInfo, _WillMsg = undefined). + +%% RPC target for `emqx_eviction_agent_proto_v3' +-spec do_evict_session_channel_v3( + emqx_types:clientid(), + emqx_types:conninfo(), + emqx_types:clientinfo(), + emqx_maybe:t(emqx_types:message()) +) -> supervisor:startchild_ret(). +do_evict_session_channel_v3(ClientId, ConnInfo, ClientInfo, MaybeWillMsg) -> ?SLOG(info, #{ msg => "evict_session_channel", client_id => ClientId, @@ -412,7 +426,8 @@ evict_session_channel(ClientId, ConnInfo, ClientInfo) -> Result = emqx_eviction_agent_channel:start_supervised( #{ conninfo => ConnInfo, - clientinfo => ClientInfo + clientinfo => ClientInfo, + will_message => MaybeWillMsg } ), ?SLOG( diff --git a/apps/emqx_eviction_agent/src/emqx_eviction_agent_channel.erl b/apps/emqx_eviction_agent/src/emqx_eviction_agent_channel.erl index 55cc4a2c9..20ee129f3 100644 --- a/apps/emqx_eviction_agent/src/emqx_eviction_agent_channel.erl +++ b/apps/emqx_eviction_agent/src/emqx_eviction_agent_channel.erl @@ -32,7 +32,8 @@ -type opts() :: #{ conninfo := emqx_types:conninfo(), - clientinfo := emqx_types:clientinfo() + clientinfo := emqx_types:clientinfo(), + will_message => emqx_maybe:t(emqx_types:message()) }. %%-------------------------------------------------------------------- @@ -81,11 +82,12 @@ stop(Pid) -> %% gen_server API %%-------------------------------------------------------------------- -init([#{conninfo := OldConnInfo, clientinfo := #{clientid := ClientId} = OldClientInfo}]) -> +init([#{conninfo := OldConnInfo, clientinfo := #{clientid := ClientId} = OldClientInfo} = Opts]) -> process_flag(trap_exit, true), ClientInfo = clientinfo(OldClientInfo), ConnInfo = conninfo(OldConnInfo), - case open_session(ConnInfo, ClientInfo) of + MaybeWillMsg = maps:get(will_message, Opts, undefined), + case open_session(ConnInfo, ClientInfo, MaybeWillMsg) of {ok, Channel0} -> case set_expiry_timer(Channel0) of {ok, Channel1} -> @@ -221,9 +223,9 @@ set_expiry_timer(#{conninfo := ConnInfo} = Channel) -> {error, should_be_expired} end. -open_session(ConnInfo, #{clientid := ClientId} = ClientInfo) -> +open_session(ConnInfo, #{clientid := ClientId} = ClientInfo, MaybeWillMsg) -> Channel = channel(ConnInfo, ClientInfo), - case emqx_cm:open_session(_CleanSession = false, ClientInfo, ConnInfo) of + case emqx_cm:open_session(_CleanSession = false, ClientInfo, ConnInfo, MaybeWillMsg) of {ok, #{present := false}} -> ?SLOG( info, diff --git a/apps/emqx_eviction_agent/src/proto/emqx_eviction_agent_proto_v2.erl b/apps/emqx_eviction_agent/src/proto/emqx_eviction_agent_proto_v2.erl index 7c8f47faa..0db75d326 100644 --- a/apps/emqx_eviction_agent/src/proto/emqx_eviction_agent_proto_v2.erl +++ b/apps/emqx_eviction_agent/src/proto/emqx_eviction_agent_proto_v2.erl @@ -8,6 +8,7 @@ -export([ introduced_in/0, + deprecated_since/0, evict_session_channel/4, @@ -20,6 +21,9 @@ introduced_in() -> "5.2.1". +deprecated_since() -> + "5.7.0". + -spec evict_session_channel( node(), emqx_types:clientid(), diff --git a/apps/emqx_eviction_agent/src/proto/emqx_eviction_agent_proto_v3.erl b/apps/emqx_eviction_agent/src/proto/emqx_eviction_agent_proto_v3.erl new file mode 100644 index 000000000..45950a006 --- /dev/null +++ b/apps/emqx_eviction_agent/src/proto/emqx_eviction_agent_proto_v3.erl @@ -0,0 +1,41 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_eviction_agent_proto_v3). + +-behaviour(emqx_bpapi). + +-export([ + introduced_in/0, + + all_channels_count/2, + + %% Changed in v3: + evict_session_channel/5 +]). + +-include_lib("emqx/include/bpapi.hrl"). + +introduced_in() -> + "5.7.0". + +-spec all_channels_count([node()], timeout()) -> emqx_rpc:erpc_multicall(non_neg_integer()). +all_channels_count(Nodes, Timeout) -> + erpc:multicall(Nodes, emqx_eviction_agent, all_local_channels_count, [], Timeout). + +%% Changed in v3: +-spec evict_session_channel( + node(), + emqx_types:clientid(), + emqx_types:conninfo(), + emqx_types:clientinfo(), + emqx_maybe:t(emqx_types:message()) +) -> supervisor:startchild_err() | emqx_rpc:badrpc(). +evict_session_channel(Node, ClientId, ConnInfo, ClientInfo, MaybeWillMsg) -> + rpc:call( + Node, + emqx_eviction_agent, + do_evict_session_channel_v3, + [ClientId, ConnInfo, ClientInfo, MaybeWillMsg] + ). diff --git a/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_channel.erl b/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_channel.erl index 79d4c2164..b840d53a3 100644 --- a/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_channel.erl +++ b/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_channel.erl @@ -373,10 +373,11 @@ process_connect( Channel = #channel{ ctx = Ctx, conninfo = ConnInfo = #{clean_start := CleanStart}, - clientinfo = ClientInfo + clientinfo = ClientInfo, + will_msg = MaybeWillMsg } ) -> - SessFun = fun(ClientInfoT, _) -> emqx_mqttsn_session:init(ClientInfoT) end, + SessFun = fun(ClientInfoT, _) -> emqx_mqttsn_session:init(ClientInfoT, MaybeWillMsg) end, case emqx_gateway_ctx:open_session( Ctx, diff --git a/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_session.erl b/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_session.erl index edae3dcf8..94f470f88 100644 --- a/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_session.erl +++ b/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_session.erl @@ -19,7 +19,7 @@ -export([registry/1, set_registry/2]). -export([ - init/1, + init/2, info/1, info/2, stats/1 @@ -52,12 +52,12 @@ -export_type([session/0]). -init(ClientInfo) -> +init(ClientInfo, MaybeWillMsg) -> ConnInfo = #{receive_maximum => 1, expiry_interval => 0}, SessionConf = emqx_session:get_session_conf(ClientInfo), #{ registry => emqx_mqttsn_registry:init(), - session => emqx_session_mem:create(ClientInfo, ConnInfo, SessionConf) + session => emqx_session_mem:create(ClientInfo, ConnInfo, MaybeWillMsg, SessionConf) }. registry(#{registry := Registry}) -> From 0f426e6e77273bc18640698defaa0edc856b1cdf Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Tue, 19 Mar 2024 15:45:52 -0300 Subject: [PATCH 2/2] feat(ds): make durable sessions handle will messages Fixes https://emqx.atlassian.net/browse/EMQX-10431 --- .../emqx_persistent_session_ds_SUITE.erl | 73 +++- apps/emqx/priv/bpapi.versions | 1 + apps/emqx/src/emqx_channel.erl | 90 +++-- apps/emqx/src/emqx_cm.erl | 77 +++- apps/emqx/src/emqx_persistent_session_ds.erl | 58 ++- apps/emqx/src/emqx_persistent_session_ds.hrl | 1 + .../emqx_persistent_session_ds_gc_worker.erl | 110 +++++- .../src/emqx_persistent_session_ds_state.erl | 28 +- apps/emqx/src/emqx_session.erl | 16 +- apps/emqx/src/emqx_session_mem.erl | 13 +- apps/emqx/src/proto/emqx_cm_proto_v2.erl | 4 + apps/emqx/src/proto/emqx_cm_proto_v3.erl | 106 ++++++ apps/emqx/test/emqx_cth_suite.erl | 1 + .../test/emqx_persistent_session_SUITE.erl | 86 ++++- apps/emqx/test/emqx_takeover_SUITE.erl | 343 +++++++++++------- 15 files changed, 789 insertions(+), 218 deletions(-) create mode 100644 apps/emqx/src/proto/emqx_cm_proto_v3.erl diff --git a/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl b/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl index 1f358e3c0..fae54d612 100644 --- a/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl +++ b/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl @@ -25,7 +25,6 @@ all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> - emqx_common_test_helpers:clear_screen(), TCApps = emqx_cth_suite:start( app_specs(), #{work_dir => emqx_cth_suite:work_dir(Config)} @@ -144,6 +143,7 @@ wait_gen_rpc_down(_NodeSpec = #{apps := Apps}) -> start_client(Opts0 = #{}) -> Defaults = #{ + port => 1883, proto_ver => v5, properties => #{'Session-Expiry-Interval' => 300} }, @@ -190,6 +190,23 @@ list_all_subscriptions(Node) -> list_all_pubranges(Node) -> erpc:call(Node, emqx_persistent_session_ds, list_all_pubranges, []). +session_open(Node, ClientId) -> + ClientInfo = #{}, + ConnInfo = #{peername => {undefined, undefined}}, + WillMsg = undefined, + erpc:call( + Node, + emqx_persistent_session_ds, + session_open, + [ClientId, ClientInfo, ConnInfo, WillMsg] + ). + +force_last_alive_at(ClientId, Time) -> + {ok, S0} = emqx_persistent_session_ds_state:open(ClientId), + S = emqx_persistent_session_ds_state:set_last_alive_at(Time, S0), + _ = emqx_persistent_session_ds_state:commit(S), + ok. + %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ @@ -244,11 +261,7 @@ t_session_subscription_idempotency(Config) -> end, fun(Trace) -> ct:pal("trace:\n ~p", [Trace]), - ConnInfo = #{peername => {undefined, undefined}}, - WillMsg = undefined, - Session = erpc:call( - Node1, emqx_persistent_session_ds, session_open, [ClientId, ConnInfo, WillMsg] - ), + Session = session_open(Node1, ClientId), ?assertMatch( #{SubTopicFilter := #{}}, emqx_session:info(subscriptions, Session) @@ -322,11 +335,7 @@ t_session_unsubscription_idempotency(Config) -> end, fun(Trace) -> ct:pal("trace:\n ~p", [Trace]), - ConnInfo = #{peername => {undefined, undefined}}, - WillMsg = undefined, - Session = erpc:call( - Node1, emqx_persistent_session_ds, session_open, [ClientId, ConnInfo, WillMsg] - ), + Session = session_open(Node1, ClientId), ?assertEqual( #{}, emqx_session:info(subscriptions, Session) @@ -555,6 +564,7 @@ t_session_gc(Config) -> ), %% Clients are still alive; no session is garbage collected. + ?tp(notice, "waiting for gc", #{}), ?assertMatch( {ok, _}, ?block_until( @@ -567,9 +577,11 @@ t_session_gc(Config) -> ), ?assertMatch([_, _, _], list_all_sessions(Node1), sessions), ?assertMatch([_, _, _], list_all_subscriptions(Node1), subscriptions), + ?tp(notice, "gc ran", #{}), %% Now we disconnect 2 of them; only those should be GC'ed. + ?tp(notice, "disconnecting client1", #{}), ?assertMatch( {ok, {ok, _}}, ?wait_async_action( @@ -674,3 +686,42 @@ t_session_replay_retry(_Config) -> [maps:with([topic, payload, qos], P) || P <- Pubs0], [maps:with([topic, payload, qos], P) || P <- Pubs1 ++ Pubs2] ). + +%% Check that we send will messages when performing GC without relying on timers set by +%% the channel process. +t_session_gc_will_message(_Config) -> + ?check_trace( + #{timetrap => 10_000}, + begin + WillTopic = <<"will/t">>, + ok = emqx:subscribe(WillTopic, #{qos => 2}), + ClientId = <<"will_msg_client">>, + Client = start_client(#{ + clientid => ClientId, + will_topic => WillTopic, + will_payload => <<"will payload">>, + will_qos => 0, + will_props => #{'Will-Delay-Interval' => 300} + }), + {ok, _} = emqtt:connect(Client), + %% Use reason code =/= `?RC_SUCCESS' to allow will message + {ok, {ok, _}} = + ?wait_async_action( + emqtt:disconnect(Client, ?RC_UNSPECIFIED_ERROR), + #{?snk_kind := emqx_cm_clean_down} + ), + ?assertNotReceive({deliver, WillTopic, _}), + %% Set fake `last_alive_at' to trigger immediate will message. + force_last_alive_at(ClientId, _Time = 0), + {ok, {ok, _}} = + ?wait_async_action( + emqx_persistent_session_ds_gc_worker:check_session(ClientId), + #{?snk_kind := session_gc_published_will_msg} + ), + ?assertReceive({deliver, WillTopic, _}), + + ok + end, + [] + ), + ok. diff --git a/apps/emqx/priv/bpapi.versions b/apps/emqx/priv/bpapi.versions index 71cfccdea..b35808911 100644 --- a/apps/emqx/priv/bpapi.versions +++ b/apps/emqx/priv/bpapi.versions @@ -12,6 +12,7 @@ {emqx_broker,1}. {emqx_cm,1}. {emqx_cm,2}. +{emqx_cm,3}. {emqx_conf,1}. {emqx_conf,2}. {emqx_conf,3}. diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 59786afea..0a0f37bcb 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -64,6 +64,12 @@ maybe_nack/1 ]). +%% Export for DS session GC worker and session implementations +-export([ + will_delay_interval/1, + prepare_will_message_for_publishing/2 +]). + %% Exports for CT -export([set_field/3]). @@ -885,9 +891,10 @@ do_unsubscribe( %%-------------------------------------------------------------------- %% MQTT-v5.0: 3.14.4 DISCONNECT Actions -maybe_clean_will_msg(?RC_SUCCESS, Channel) -> +maybe_clean_will_msg(?RC_SUCCESS, Channel = #channel{session = Session0}) -> %% [MQTT-3.14.4-3] - Channel#channel{will_msg = undefined}; + Session = emqx_session:clear_will_message(Session0), + Channel#channel{will_msg = undefined, session = Session}; maybe_clean_will_msg(_ReasonCode, Channel) -> Channel. @@ -1204,6 +1211,9 @@ handle_call( ), Channel0 = maybe_publish_will_msg(takenover, Channel), disconnect_and_shutdown(takenover, AllPendings, Channel0); +handle_call(takeover_kick, Channel) -> + Channel0 = maybe_publish_will_msg(takenover, Channel), + disconnect_and_shutdown(takenover, ok, Channel0); handle_call(list_authz_cache, Channel) -> {reply, emqx_authz_cache:list_authz_cache(), Channel}; handle_call( @@ -2301,17 +2311,17 @@ maybe_publish_will_msg( Channel; maybe_publish_will_msg( _Reason, - Channel = #channel{ + Channel0 = #channel{ conninfo = #{proto_ver := ?MQTT_PROTO_V3, clientid := ClientId} } ) -> %% Unconditionally publish will message for MQTT 3.1.1 ?tp(debug, maybe_publish_willmsg_v3, #{clientid => ClientId}), - _ = publish_will_msg(Channel), - Channel#channel{will_msg = undefined}; + Channel = publish_will_msg(Channel0), + remove_willmsg(Channel); maybe_publish_will_msg( Reason, - Channel = #channel{ + Channel0 = #channel{ conninfo = #{clientid := ClientId} } ) when @@ -2329,12 +2339,20 @@ maybe_publish_will_msg( %% d. internal_error (maybe not recoverable) %% This ensures willmsg will be published if the willmsg timer is scheduled but not fired %% OR fired but not yet handled - ?tp(debug, maybe_publish_willmsg_session_ends, #{clientid => ClientId, reason => Reason}), - _ = publish_will_msg(Channel), - remove_willmsg(Channel); + %% NOTE! For durable sessions, `?chan_terminating' does NOT imply that the session is + %% gone. + case is_durable_session(Channel0) andalso Reason =:= ?chan_terminating of + false -> + ?tp(debug, maybe_publish_willmsg_session_ends, #{clientid => ClientId, reason => Reason}), + + Channel = publish_will_msg(Channel0), + remove_willmsg(Channel); + true -> + Channel0 + end; maybe_publish_will_msg( takenover, - Channel = #channel{ + Channel0 = #channel{ will_msg = WillMsg, conninfo = #{clientid := ClientId} } @@ -2352,7 +2370,8 @@ maybe_publish_will_msg( case will_delay_interval(WillMsg) of 0 -> ?tp(debug, maybe_publish_willmsg_takenover_pub, #{clientid => ClientId}), - _ = publish_will_msg(Channel); + Channel = publish_will_msg(Channel0), + ok; I when I > 0 -> %% @NOTE Non-normative comment in MQTT 5.0 spec %% """ @@ -2361,12 +2380,13 @@ maybe_publish_will_msg( %% before the Will Message is published. %% """ ?tp(debug, maybe_publish_willmsg_takenover_skip, #{clientid => ClientId}), + Channel = Channel0, skip end, remove_willmsg(Channel); maybe_publish_will_msg( Reason, - Channel = #channel{ + Channel0 = #channel{ will_msg = WillMsg, conninfo = #{clientid := ClientId} } @@ -2377,11 +2397,11 @@ maybe_publish_will_msg( ?tp(debug, maybe_publish_will_msg_other_publish, #{ clientid => ClientId, reason => Reason }), - _ = publish_will_msg(Channel), + Channel = publish_will_msg(Channel0), remove_willmsg(Channel); I when I > 0 -> ?tp(debug, maybe_publish_will_msg_other_delay, #{clientid => ClientId, reason => Reason}), - ensure_timer(will_message, timer:seconds(I), Channel) + ensure_timer(will_message, timer:seconds(I), Channel0) end. will_delay_interval(WillMsg) -> @@ -2394,15 +2414,15 @@ will_delay_interval(WillMsg) -> publish_will_msg( #channel{ session = Session, - clientinfo = ClientInfo = #{mountpoint := MountPoint}, + clientinfo = ClientInfo, will_msg = Msg = #message{topic = Topic} - } + } = Channel ) -> - Action = authz_action(Msg), - PublishingDisallowed = emqx_access_control:authorize(ClientInfo, Action, Topic) =/= allow, - ClientBanned = emqx_banned:check(ClientInfo), - case PublishingDisallowed orelse ClientBanned of - true -> + case prepare_will_message_for_publishing(ClientInfo, Msg) of + {ok, PreparedMessage} -> + NSession = emqx_session:publish_will_message_now(Session, PreparedMessage), + Channel#channel{session = NSession}; + {error, #{client_banned := ClientBanned, publishing_disallowed := PublishingDisallowed}} -> ?tp( warning, last_will_testament_publish_denied, @@ -2412,12 +2432,23 @@ publish_will_msg( publishing_disallowed => PublishingDisallowed } ), - ok; + Channel + end. + +prepare_will_message_for_publishing( + ClientInfo = #{mountpoint := MountPoint}, + Msg = #message{topic = Topic} +) -> + Action = authz_action(Msg), + PublishingDisallowed = emqx_access_control:authorize(ClientInfo, Action, Topic) =/= allow, + ClientBanned = emqx_banned:check(ClientInfo), + case PublishingDisallowed orelse ClientBanned of + true -> + {error, #{client_banned => ClientBanned, publishing_disallowed => PublishingDisallowed}}; false -> NMsg = emqx_mountpoint:mount(MountPoint, Msg), - NMsg2 = NMsg#message{timestamp = erlang:system_time(millisecond)}, - ok = emqx_session:publish_will_message(Session, NMsg2), - ok + PreparedMessage = NMsg#message{timestamp = emqx_message:timestamp_now()}, + {ok, PreparedMessage} end. %%-------------------------------------------------------------------- @@ -2529,6 +2560,15 @@ remove_willmsg(Channel = #channel{timers = Timers}) -> timers = maps:remove(will_message, Timers) } end. + +is_durable_session(#channel{session = Session}) -> + case emqx_session:info(impl, Session) of + emqx_persistent_session_ds -> + true; + _ -> + false + end. + %%-------------------------------------------------------------------- %% For CT tests %%-------------------------------------------------------------------- diff --git a/apps/emqx/src/emqx_cm.erl b/apps/emqx/src/emqx_cm.erl index 072611949..421452554 100644 --- a/apps/emqx/src/emqx_cm.erl +++ b/apps/emqx/src/emqx_cm.erl @@ -53,7 +53,8 @@ takeover_session_begin/1, takeover_session_end/1, kick_session/1, - kick_session/2 + kick_session/2, + takeover_kick/1 ]). -export([ @@ -100,6 +101,7 @@ takeover_session/2, takeover_finish/2, do_kick_session/3, + do_takeover_kick_session_v3/2, do_get_chan_info/2, do_get_chan_stats/2, do_get_chann_conn_mod/2 @@ -122,6 +124,8 @@ -type takeover_state() :: {_ConnMod :: module(), _ChanPid :: pid()}. +-define(BPAPI_NAME, emqx_cm). + -define(CHAN_STATS, [ {?CHAN_TAB, 'channels.count', 'channels.max'}, {?CHAN_TAB, 'sessions.count', 'sessions.max'}, @@ -352,6 +356,38 @@ pick_channel(ClientId) -> ChanPid end. +%% Used by `emqx_persistent_session_ds' +-spec takeover_kick(emqx_types:clientid()) -> ok. +takeover_kick(ClientId) -> + case lookup_channels(ClientId) of + [] -> + ok; + ChanPids -> + lists:foreach( + fun(Pid) -> + do_takeover_session(ClientId, Pid) + end, + ChanPids + ) + end. + +%% Used by `emqx_persistent_session_ds'. +%% We stop any running channels with reason `takenover' so that correct reason codes and +%% will message processing may take place. For older BPAPI nodes, we don't have much +%% choice other than calling the old `discard_session' code. +do_takeover_session(ClientId, Pid) -> + Node = node(Pid), + case emqx_bpapi:supported_version(Node, ?BPAPI_NAME) of + undefined -> + %% Race: node (re)starting? Assume v2. + discard_session(ClientId, Pid); + Vsn when Vsn =< 2 -> + discard_session(ClientId, Pid); + _Vsn -> + takeover_kick_session(ClientId, Pid) + end. + +%% Used only by `emqx_session_mem' takeover_finish(ConnMod, ChanPid) -> request_stepdown( {takeover, 'end'}, @@ -360,6 +396,7 @@ takeover_finish(ConnMod, ChanPid) -> ). %% @doc RPC Target @ emqx_cm_proto_v2:takeover_session/2 +%% Used only by `emqx_session_mem' takeover_session(ClientId, Pid) -> try do_takeover_begin(ClientId, Pid) @@ -415,7 +452,7 @@ discard_session(ClientId) when is_binary(ClientId) -> | {ok, emqx_session:t() | _ReplayContext} | {error, term()} when - Action :: kick | discard | {takeover, 'begin'} | {takeover, 'end'}. + Action :: kick | discard | {takeover, 'begin'} | {takeover, 'end'} | takeover_kick. request_stepdown(Action, ConnMod, Pid) -> Timeout = case Action == kick orelse Action == discard of @@ -496,7 +533,19 @@ do_kick_session(Action, ClientId, ChanPid) when node(ChanPid) =:= node() -> ok = request_stepdown(Action, ConnMod, ChanPid) end. -%% @private This function is shared for session 'kick' and 'discard' (as the first arg Action). +%% @doc RPC Target for emqx_cm_proto_v3:takeover_kick_session/3 +-spec do_takeover_kick_session_v3(emqx_types:clientid(), chan_pid()) -> ok. +do_takeover_kick_session_v3(ClientId, ChanPid) when node(ChanPid) =:= node() -> + case do_get_chann_conn_mod(ClientId, ChanPid) of + undefined -> + %% already deregistered + ok; + ConnMod when is_atom(ConnMod) -> + ok = request_stepdown(takeover_kick, ConnMod, ChanPid) + end. + +%% @private This function is shared for session `kick' and `discard' (as the first arg +%% Action). kick_session(Action, ClientId, ChanPid) -> try wrap_rpc(emqx_cm_proto_v2:kick_session(Action, ClientId, ChanPid)) @@ -519,6 +568,28 @@ kick_session(Action, ClientId, ChanPid) -> ) end. +takeover_kick_session(ClientId, ChanPid) -> + try + wrap_rpc(emqx_cm_proto_v3:takeover_kick_session(ClientId, ChanPid)) + catch + Error:Reason -> + %% This should mostly be RPC failures. + %% However, if the node is still running the old version + %% code (prior to emqx app 4.3.10) some of the RPC handler + %% exceptions may get propagated to a new version node + ?SLOG( + error, + #{ + msg => "failed_to_kick_session_on_remote_node", + node => node(ChanPid), + action => takeover, + error => Error, + reason => Reason + }, + #{clientid => ClientId} + ) + end. + kick_session(ClientId) -> case lookup_channels(ClientId) of [] -> diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 72014bd37..cc861f71f 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -68,7 +68,8 @@ %% Will message handling -export([ - publish_will_message/2 + clear_will_message/1, + publish_will_message_now/2 ]). %% Managment APIs: @@ -93,7 +94,7 @@ -ifdef(TEST). -export([ - session_open/3, + session_open/4, list_all_sessions/0 ]). -endif. @@ -189,20 +190,20 @@ -spec create(clientinfo(), conninfo(), emqx_maybe:t(message()), emqx_session:conf()) -> session(). -create(#{clientid := ClientID}, ConnInfo, MaybeWillMsg, Conf) -> - ensure_timers(session_ensure_new(ClientID, ConnInfo, MaybeWillMsg, Conf)). +create(#{clientid := ClientID} = ClientInfo, ConnInfo, MaybeWillMsg, Conf) -> + ensure_timers(session_ensure_new(ClientID, ClientInfo, ConnInfo, MaybeWillMsg, Conf)). -spec open(clientinfo(), conninfo(), emqx_maybe:t(message()), emqx_session:conf()) -> {_IsPresent :: true, session(), []} | false. -open(#{clientid := ClientID} = _ClientInfo, ConnInfo, MaybeWillMsg, Conf) -> +open(#{clientid := ClientID} = ClientInfo, ConnInfo, MaybeWillMsg, Conf) -> %% NOTE %% The fact that we need to concern about discarding all live channels here %% is essentially a consequence of the in-memory session design, where we %% have disconnected channels holding onto session state. Ideally, we should %% somehow isolate those idling not-yet-expired sessions into a separate process %% space, and move this call back into `emqx_cm` where it belongs. - ok = emqx_cm:discard_session(ClientID), - case session_open(ClientID, ConnInfo, MaybeWillMsg) of + ok = emqx_cm:takeover_kick(ClientID), + case session_open(ClientID, ClientInfo, ConnInfo, MaybeWillMsg) of Session0 = #{} -> Session = Session0#{props => Conf}, {true, ensure_timers(Session), []}; @@ -613,7 +614,8 @@ disconnect(Session = #{s := S0}, ConnInfo) -> {shutdown, Session#{s => S}}. -spec terminate(Reason :: term(), session()) -> ok. -terminate(_Reason, _Session = #{id := Id, s := S}) -> +terminate(_Reason, Session = #{id := Id, s := S}) -> + maybe_set_will_message_timer(Session), _ = emqx_persistent_session_ds_state:commit(S), ?tp(debug, persistent_session_ds_terminate, #{id => Id}), ok. @@ -685,9 +687,9 @@ sync(ClientId) -> %% %% Note: session API doesn't handle session takeovers, it's the job of %% the broker. --spec session_open(id(), emqx_types:conninfo(), emqx_maybe:t(message())) -> +-spec session_open(id(), emqx_types:clientinfo(), emqx_types:conninfo(), emqx_maybe:t(message())) -> session() | false. -session_open(SessionId, NewConnInfo, MaybeWillMsg) -> +session_open(SessionId, ClientInfo, NewConnInfo, MaybeWillMsg) -> NowMS = now_ms(), case emqx_persistent_session_ds_state:open(SessionId) of {ok, S0} -> @@ -706,7 +708,8 @@ session_open(SessionId, NewConnInfo, MaybeWillMsg) -> maps:get(peername, NewConnInfo), S2 ), S4 = emqx_persistent_session_ds_state:set_will_message(MaybeWillMsg, S3), - S = emqx_persistent_session_ds_state:commit(S4), + S5 = emqx_persistent_session_ds_state:set_clientinfo(ClientInfo, S4), + S = emqx_persistent_session_ds_state:commit(S5), Inflight = emqx_persistent_session_ds_inflight:new( receive_maximum(NewConnInfo) ), @@ -723,12 +726,13 @@ session_open(SessionId, NewConnInfo, MaybeWillMsg) -> -spec session_ensure_new( id(), + emqx_types:clientinfo(), emqx_types:conninfo(), emqx_maybe:t(message()), emqx_session:conf() ) -> session(). -session_ensure_new(Id, ConnInfo, MaybeWillMsg, Conf) -> +session_ensure_new(Id, ClientInfo, ConnInfo, MaybeWillMsg, Conf) -> ?tp(debug, persistent_session_ds_ensure_new, #{id => Id}), Now = now_ms(), S0 = emqx_persistent_session_ds_state:create_new(Id), @@ -751,7 +755,8 @@ session_ensure_new(Id, ConnInfo, MaybeWillMsg, Conf) -> ] ), S5 = emqx_persistent_session_ds_state:set_will_message(MaybeWillMsg, S4), - S = emqx_persistent_session_ds_state:commit(S5), + S6 = emqx_persistent_session_ds_state:set_clientinfo(ClientInfo, S5), + S = emqx_persistent_session_ds_state:commit(S6), #{ id => Id, props => Conf, @@ -1208,10 +1213,29 @@ seqno_diff(?QOS_2, A, B) -> %% Will message handling %%-------------------------------------------------------------------- --spec publish_will_message(session(), message()) -> ok. -publish_will_message(_Session, #message{} = _WillMsg) -> - %% TODO - ok. +-spec clear_will_message(session()) -> session(). +clear_will_message(#{s := S0} = Session) -> + S = emqx_persistent_session_ds_state:clear_will_message(S0), + Session#{s := S}. + +-spec publish_will_message_now(session(), message()) -> session(). +publish_will_message_now(#{} = Session, WillMsg = #message{}) -> + _ = emqx_broker:publish(WillMsg), + clear_will_message(Session). + +maybe_set_will_message_timer(#{id := SessionId, s := S}) -> + case emqx_persistent_session_ds_state:get_will_message(S) of + #message{} = WillMsg -> + WillDelayInterval = emqx_channel:will_delay_interval(WillMsg), + WillDelayInterval > 0 andalso + emqx_persistent_session_ds_gc_worker:check_session_after( + SessionId, + timer:seconds(WillDelayInterval) + ), + ok; + _ -> + ok + end. %%-------------------------------------------------------------------- %% Tests diff --git a/apps/emqx/src/emqx_persistent_session_ds.hrl b/apps/emqx/src/emqx_persistent_session_ds.hrl index 67ef485d5..56862dfa5 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.hrl +++ b/apps/emqx/src/emqx_persistent_session_ds.hrl @@ -76,5 +76,6 @@ -define(last_id, last_id). -define(peername, peername). -define(will_message, will_message). +-define(clientinfo, clientinfo). -endif. diff --git a/apps/emqx/src/emqx_persistent_session_ds_gc_worker.erl b/apps/emqx/src/emqx_persistent_session_ds_gc_worker.erl index a4d1fe638..4f9accfb2 100644 --- a/apps/emqx/src/emqx_persistent_session_ds_gc_worker.erl +++ b/apps/emqx/src/emqx_persistent_session_ds_gc_worker.erl @@ -25,7 +25,9 @@ %% API -export([ - start_link/0 + start_link/0, + check_session/1, + check_session_after/2 ]). %% `gen_server' API @@ -38,6 +40,7 @@ %% call/cast/info records -record(gc, {}). +-record(check_session, {id :: emqx_persistent_session_ds:id()}). %%-------------------------------------------------------------------------------- %% API @@ -46,6 +49,17 @@ start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). +-spec check_session(emqx_persistent_session_ds:id()) -> ok. +check_session(SessionId) -> + gen_server:cast(?MODULE, #check_session{id = SessionId}). + +-spec check_session_after(emqx_persistent_session_ds:id(), pos_integer()) -> ok. +check_session_after(SessionId, Time0) -> + #{bump_interval := BumpInterval} = gc_context(), + Time = max(Time0, BumpInterval), + _ = erlang:send_after(Time, ?MODULE, #check_session{id = SessionId}), + ok. + %%-------------------------------------------------------------------------------- %% `gen_server' API %%-------------------------------------------------------------------------------- @@ -58,6 +72,9 @@ init(_Opts) -> handle_call(_Call, _From, State) -> {reply, error, State}. +handle_cast(#check_session{id = SessionId}, State) -> + do_check_session(SessionId), + {noreply, State}; handle_cast(_Cast, State) -> {noreply, State}. @@ -65,6 +82,9 @@ handle_info(#gc{}, State) -> try_gc(), ensure_gc_timer(), {noreply, State}; +handle_info(#check_session{id = SessionId}, State) -> + do_check_session(SessionId), + {noreply, State}; handle_info(_Info, State) -> {noreply, State}. @@ -104,25 +124,65 @@ now_ms() -> erlang:system_time(millisecond). start_gc() -> + #{ + min_last_alive := MinLastAlive, + min_last_alive_will_msg := MinLastAliveWillMsg + } = gc_context(), + gc_loop( + MinLastAlive, MinLastAliveWillMsg, emqx_persistent_session_ds_state:make_session_iterator() + ). + +gc_context() -> GCInterval = emqx_config:get([session_persistence, session_gc_interval]), BumpInterval = emqx_config:get([session_persistence, last_alive_update_interval]), TimeThreshold = max(GCInterval, BumpInterval) * 3, - MinLastAlive = now_ms() - TimeThreshold, - gc_loop(MinLastAlive, emqx_persistent_session_ds_state:make_session_iterator()). + NowMS = now_ms(), + #{ + min_last_alive => NowMS - TimeThreshold, + %% For will messages, we don't need to be so strict as session GC (GC interval is + %% of the order of ~ 10 minutes by default, bump interval ~ 100 ms), otherwise + %% most will be sent very late. + min_last_alive_will_msg => NowMS - BumpInterval * 5, + time_threshold => TimeThreshold, + bump_interval => BumpInterval, + gc_interval => GCInterval + }. -gc_loop(MinLastAlive, It0) -> +gc_loop(MinLastAlive, MinLastAliveWillMsg, It0) -> GCBatchSize = emqx_config:get([session_persistence, session_gc_batch_size]), case emqx_persistent_session_ds_state:session_iterator_next(It0, GCBatchSize) of {[], _It} -> ok; {Sessions, It} -> - [do_gc(SessionId, MinLastAlive, Metadata) || {SessionId, Metadata} <- Sessions], - gc_loop(MinLastAlive, It) + [ + do_gc(SessionId, MinLastAliveWillMsg, MinLastAlive, Metadata) + || {SessionId, Metadata} <- Sessions + ], + gc_loop(MinLastAlive, MinLastAliveWillMsg, It) end. -do_gc(SessionId, MinLastAlive, Metadata) -> - #{?last_alive_at := LastAliveAt, ?expiry_interval := EI} = Metadata, - case LastAliveAt + EI < MinLastAlive of +do_gc(SessionId, MinLastAliveWillMsg, MinLastAlive, Metadata) -> + #{ + ?last_alive_at := LastAliveAt, + ?expiry_interval := EI, + ?will_message := MaybeWillMessage, + ?clientinfo := ClientInfo + } = Metadata, + IsExpired = LastAliveAt + EI < MinLastAlive, + case + should_send_will_message( + MaybeWillMessage, ClientInfo, IsExpired, LastAliveAt, MinLastAliveWillMsg + ) + of + {true, PreparedMessage} -> + _ = emqx_broker:publish(PreparedMessage), + ok = emqx_persistent_session_ds_state:clear_will_message_now(SessionId), + ?tp(session_gc_published_will_msg, #{id => SessionId, msg => PreparedMessage}), + ok; + false -> + ok + end, + case IsExpired of true -> emqx_persistent_session_ds:destroy_session(SessionId), ?tp(debug, ds_session_gc_cleaned, #{ @@ -134,3 +194,35 @@ do_gc(SessionId, MinLastAlive, Metadata) -> false -> ok end. + +should_send_will_message( + undefined = _WillMsg, _ClientInfo, _IsExpired, _LastAliveAt, _MinLastAliveWillMsg +) -> + false; +should_send_will_message(WillMsg, ClientInfo, IsExpired, LastAliveAt, MinLastAliveWillMsg) -> + WillDelayIntervalS = emqx_channel:will_delay_interval(WillMsg), + WillDelayInterval = timer:seconds(WillDelayIntervalS), + PastWillDelay = LastAliveAt + WillDelayInterval < MinLastAliveWillMsg, + case PastWillDelay orelse IsExpired of + true -> + case emqx_channel:prepare_will_message_for_publishing(ClientInfo, WillMsg) of + {ok, PreparedMessage} -> + {true, PreparedMessage}; + {error, _} -> + false + end; + false -> + false + end. + +do_check_session(SessionId) -> + case emqx_persistent_session_ds_state:print_session(SessionId) of + #{metadata := Metadata} -> + #{ + min_last_alive := MinLastAlive, + min_last_alive_will_msg := MinLastAliveWillMsg + } = gc_context(), + do_gc(SessionId, MinLastAliveWillMsg, MinLastAlive, Metadata); + _ -> + ok + end. diff --git a/apps/emqx/src/emqx_persistent_session_ds_state.erl b/apps/emqx/src/emqx_persistent_session_ds_state.erl index 5de74fb06..28297964d 100644 --- a/apps/emqx/src/emqx_persistent_session_ds_state.erl +++ b/apps/emqx/src/emqx_persistent_session_ds_state.erl @@ -30,7 +30,8 @@ -export([get_created_at/1, set_created_at/2]). -export([get_last_alive_at/1, set_last_alive_at/2]). -export([get_expiry_interval/1, set_expiry_interval/2]). --export([get_will_message/1, set_will_message/2]). +-export([get_clientinfo/1, set_clientinfo/2]). +-export([get_will_message/1, set_will_message/2, clear_will_message/1, clear_will_message_now/1]). -export([get_peername/1, set_peername/2]). -export([new_id/1]). -export([get_stream/2, put_stream/3, del_stream/2, fold_streams/3]). @@ -291,6 +292,14 @@ get_peername(Rec) -> set_peername(Val, Rec) -> set_meta(?peername, Val, Rec). +-spec get_clientinfo(t()) -> emqx_maybe:t(emqx_types:clientinfo()). +get_clientinfo(Rec) -> + get_meta(?clientinfo, Rec). + +-spec set_clientinfo(emqx_types:clientinfo(), t()) -> t(). +set_clientinfo(Val, Rec) -> + set_meta(?clientinfo, Val, Rec). + -spec get_will_message(t()) -> emqx_maybe:t(message()). get_will_message(Rec) -> get_meta(?will_message, Rec). @@ -299,6 +308,23 @@ get_will_message(Rec) -> set_will_message(Val, Rec) -> set_meta(?will_message, Val, Rec). +-spec clear_will_message_now(emqx_persistent_session_ds:id()) -> ok. +clear_will_message_now(SessionId) when is_binary(SessionId) -> + transaction(fun() -> + case kv_restore(?session_tab, SessionId) of + [Metadata0] -> + Metadata = Metadata0#{?will_message => undefined}, + kv_persist(?session_tab, SessionId, Metadata), + ok; + [] -> + ok + end + end). + +-spec clear_will_message(t()) -> t(). +clear_will_message(Rec) -> + set_will_message(undefined, Rec). + -spec new_id(t()) -> {emqx_persistent_session_ds:subscription_id(), t()}. new_id(Rec) -> LastId = diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index 63963c242..37a86bda6 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -90,7 +90,8 @@ %% Will message handling -export([ - publish_will_message/2 + clear_will_message/1, + publish_will_message_now/2 ]). % Timers @@ -185,7 +186,8 @@ -callback open(clientinfo(), conninfo(), emqx_maybe:t(message()), conf()) -> {_IsPresent :: true, t(), _ReplayContext} | false. -callback destroy(t() | clientinfo()) -> ok. --callback publish_will_message(t(), message()) -> ok. +-callback clear_will_message(t()) -> t(). +-callback publish_will_message_now(t(), message()) -> t(). %%-------------------------------------------------------------------- %% Create a Session @@ -645,6 +647,10 @@ run_hook(Name, Args) -> %% Will message handling %%-------------------------------------------------------------------- --spec publish_will_message(t(), message()) -> ok. -publish_will_message(Session, WillMsg) -> - ?IMPL(Session):publish_will_message(Session, WillMsg). +-spec clear_will_message(t()) -> t(). +clear_will_message(Session) -> + ?IMPL(Session):clear_will_message(Session). + +-spec publish_will_message_now(t(), message()) -> t(). +publish_will_message_now(Session, WillMsg) -> + ?IMPL(Session):publish_will_message_now(Session, WillMsg). diff --git a/apps/emqx/src/emqx_session_mem.erl b/apps/emqx/src/emqx_session_mem.erl index 95222a4b3..c9c4a71e7 100644 --- a/apps/emqx/src/emqx_session_mem.erl +++ b/apps/emqx/src/emqx_session_mem.erl @@ -108,7 +108,8 @@ %% Will message handling -export([ - publish_will_message/2 + clear_will_message/1, + publish_will_message_now/2 ]). %% Export for CT @@ -808,10 +809,14 @@ next_pkt_id(Session = #session{next_pkt_id = Id}) -> %% Will message handling %%-------------------------------------------------------------------- --spec publish_will_message(session(), message()) -> ok. -publish_will_message(#session{}, #message{} = WillMsg) -> +-spec clear_will_message(session()) -> session(). +clear_will_message(#session{} = Session) -> + Session. + +-spec publish_will_message_now(session(), message()) -> session(). +publish_will_message_now(#session{} = Session, #message{} = WillMsg) -> _ = emqx_broker:publish(WillMsg), - ok. + Session. %%-------------------------------------------------------------------- %% Helper functions diff --git a/apps/emqx/src/proto/emqx_cm_proto_v2.erl b/apps/emqx/src/proto/emqx_cm_proto_v2.erl index ec86cd237..93e9cd959 100644 --- a/apps/emqx/src/proto/emqx_cm_proto_v2.erl +++ b/apps/emqx/src/proto/emqx_cm_proto_v2.erl @@ -20,6 +20,7 @@ -export([ introduced_in/0, + deprecated_since/0, lookup_client/2, kickout_client/2, @@ -39,6 +40,9 @@ introduced_in() -> "5.0.0". +deprecated_since() -> + "5.7.0". + -spec kickout_client(node(), emqx_types:clientid()) -> ok | {badrpc, _}. kickout_client(Node, ClientId) -> rpc:call(Node, emqx_cm, kick_session, [ClientId]). diff --git a/apps/emqx/src/proto/emqx_cm_proto_v3.erl b/apps/emqx/src/proto/emqx_cm_proto_v3.erl new file mode 100644 index 000000000..e0423cd5e --- /dev/null +++ b/apps/emqx/src/proto/emqx_cm_proto_v3.erl @@ -0,0 +1,106 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_cm_proto_v3). + +-behaviour(emqx_bpapi). + +-export([ + introduced_in/0, + + lookup_client/2, + kickout_client/2, + + get_chan_stats/2, + get_chan_info/2, + get_chann_conn_mod/2, + + takeover_session/2, + takeover_finish/2, + kick_session/3, + + %% Introduced in v3 + takeover_kick_session/2 +]). + +-include("bpapi.hrl"). +-include_lib("emqx/include/emqx_cm.hrl"). + +introduced_in() -> + "5.7.0". + +-spec kickout_client(node(), emqx_types:clientid()) -> ok | {badrpc, _}. +kickout_client(Node, ClientId) -> + rpc:call(Node, emqx_cm, kick_session, [ClientId]). + +-spec lookup_client(node(), {clientid, emqx_types:clientid()} | {username, emqx_types:username()}) -> + [emqx_cm:channel_info()] | {badrpc, _}. +lookup_client(Node, Key) -> + rpc:call(Node, emqx_cm, lookup_client, [Key]). + +-spec get_chan_stats(emqx_types:clientid(), emqx_cm:chan_pid()) -> + emqx_types:stats() | undefined | {badrpc, _}. +get_chan_stats(ClientId, ChanPid) -> + rpc:call(node(ChanPid), emqx_cm, do_get_chan_stats, [ClientId, ChanPid], ?T_GET_INFO * 2). + +-spec get_chan_info(emqx_types:clientid(), emqx_cm:chan_pid()) -> + emqx_types:infos() | undefined | {badrpc, _}. +get_chan_info(ClientId, ChanPid) -> + rpc:call(node(ChanPid), emqx_cm, do_get_chan_info, [ClientId, ChanPid], ?T_GET_INFO * 2). + +-spec get_chann_conn_mod(emqx_types:clientid(), emqx_cm:chan_pid()) -> + module() | undefined | {badrpc, _}. +get_chann_conn_mod(ClientId, ChanPid) -> + rpc:call(node(ChanPid), emqx_cm, do_get_chann_conn_mod, [ClientId, ChanPid], ?T_GET_INFO * 2). + +-spec takeover_session(emqx_types:clientid(), emqx_cm:chan_pid()) -> + none + | {living, _ConnMod :: atom(), emqx_cm:chan_pid(), emqx_session:session()} + %% NOTE: v5.3.0 + | {living, _ConnMod :: atom(), emqx_session:session()} + | {expired | persistent, emqx_session:session()} + | {badrpc, _}. +takeover_session(ClientId, ChanPid) -> + rpc:call(node(ChanPid), emqx_cm, takeover_session, [ClientId, ChanPid], ?T_TAKEOVER * 2). + +-spec takeover_finish(module(), emqx_cm:chan_pid()) -> + {ok, emqx_types:takeover_data()} + | {ok, list(emqx_types:deliver())} + | {error, term()} + | {badrpc, _}. +takeover_finish(ConnMod, ChanPid) -> + erpc:call( + node(ChanPid), + emqx_cm, + takeover_finish, + [ConnMod, ChanPid], + ?T_TAKEOVER * 2 + ). + +-spec kick_session(kick | discard, emqx_types:clientid(), emqx_cm:chan_pid()) -> ok | {badrpc, _}. +kick_session(Action, ClientId, ChanPid) -> + rpc:call(node(ChanPid), emqx_cm, do_kick_session, [Action, ClientId, ChanPid], ?T_KICK * 2). + +%%-------------------------------------------------------------------------------- +%% Introduced in v3 +%%-------------------------------------------------------------------------------- + +-spec takeover_kick_session(emqx_types:clientid(), emqx_cm:chan_pid()) -> + ok | {badrpc, _}. +takeover_kick_session(ClientId, ChanPid) -> + rpc:call( + node(ChanPid), emqx_cm, do_takeover_kick_session_v3, [ClientId, ChanPid], ?T_TAKEOVER * 2 + ). diff --git a/apps/emqx/test/emqx_cth_suite.erl b/apps/emqx/test/emqx_cth_suite.erl index 6a3eea5d9..65b4364d0 100644 --- a/apps/emqx/test/emqx_cth_suite.erl +++ b/apps/emqx/test/emqx_cth_suite.erl @@ -150,6 +150,7 @@ when work_dir := file:name() }. start(Apps, SuiteOpts = #{work_dir := WorkDir}) -> + emqx_common_test_helpers:clear_screen(), % 1. Prepare appspec instructions AppSpecs = [mk_appspec(App, SuiteOpts) || App <- Apps], % 2. Load every app so that stuff scanning attributes of loaded modules works diff --git a/apps/emqx/test/emqx_persistent_session_SUITE.erl b/apps/emqx/test/emqx_persistent_session_SUITE.erl index 088023ec2..674e1a8d9 100644 --- a/apps/emqx/test/emqx_persistent_session_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_session_SUITE.erl @@ -77,6 +77,7 @@ init_per_group(persistence_enabled, Config) -> " enable = true\n" " last_alive_update_interval = 100ms\n" " renew_streams_interval = 100ms\n" + " session_gc_interval = 2s\n" "}"}, {persistence, ds} | Config @@ -1241,15 +1242,13 @@ t_multiple_subscription_matches(Config) -> ?assertEqual({ok, 2}, maps:find(qos, Msg2)), ok = emqtt:disconnect(Client2). -%% Check that we get a single will message when the client disconnects with a non -%% successfull reason code, with `Will-Delay-Interval' = `Session-Expiry-Interval' > 0, -%% QoS = 1. -t_will_message1(Config) -> +%% Check that we don't get a will message when the client disconnects with success reason +%% code, with `Will-Delay-Interval' = 0, `Session-Expiry-Interval' > 0, QoS = 1. +t_no_will_message(Config) -> ConnFun = ?config(conn_fun, Config), WillTopic = ?config(topic, Config), WillPayload = <<"will message">>, ClientId = ?config(client_id, Config), - ok = emqx_hooks:add('client.connack', {?MODULE, on_client_connack, [self()]}, _Prio = 1000), ?check_trace( #{timetrap => 15_000}, @@ -1262,26 +1261,77 @@ t_will_message1(Config) -> {will_topic, WillTopic}, {will_payload, WillPayload}, {will_qos, 1}, - {will_props, #{'Will-Delay-Interval' => 1}} + {will_props, #{'Will-Delay-Interval' => 0}} | Config ]), - {{ok, _}, {ok, _}} = - ?wait_async_action(emqtt:ConnFun(Client), #{?snk_kind := client_connack}), - ok = emqtt:disconnect(Client, ?RC_UNSPECIFIED_ERROR), + {ok, _} = emqtt:ConnFun(Client), + ok = emqtt:disconnect(Client, ?RC_SUCCESS), - ?assertReceive({deliver, WillTopic, #message{payload = WillPayload}}, 5_000), - %% No duplicates - ?assertNotReceive({deliver, WillTopic, _}, 1_000), + %% No will message + ?assertNotReceive({deliver, WillTopic, _}, 5_000), ok end, [] ), ok. -t_will_message1(init, Config) -> - Config; -t_will_message1('end', _Config) -> - ok = emqx_hooks:del('client.connack', {?MODULE, on_client_connack}), + +%% Check that we get a single will message when the client disconnects with a non +%% successfull reason code, with `Will-Delay-Interval' = `Session-Expiry-Interval' > 0, +%% QoS = 1. +t_will_message1(Config) -> + do_t_will_message(Config, #{will_delay => 1, session_expiry => 1}), + ok. + +%% Check that we get a single will message when the client disconnects with a non +%% successfull reason code, with `Will-Delay-Interval' = 0, `Session-Expiry-Interval' > 0, +%% QoS = 1. +t_will_message2(Config) -> + do_t_will_message(Config, #{will_delay => 0, session_expiry => 1}), + ok. + +%% Check that we get a single will message when the client disconnects with a non +%% successfull reason code, with `Will-Delay-Interval' >> `Session-Expiry-Interval' > 0, +%% QoS = 1. +t_will_message3(Config) -> + do_t_will_message(Config, #{will_delay => 300, session_expiry => 1}), + ok. + +do_t_will_message(Config, Opts) -> + #{ + session_expiry := SessionExpiry, + will_delay := WillDelay + } = Opts, + ConnFun = ?config(conn_fun, Config), + WillTopic = ?config(topic, Config), + WillPayload = <<"will message">>, + ClientId = ?config(client_id, Config), + + ?check_trace( + #{timetrap => 15_000}, + begin + ok = emqx:subscribe(WillTopic, #{qos => 2}), + {ok, Client} = emqtt:start_link([ + {clientid, ClientId}, + {proto_ver, v5}, + {properties, #{'Session-Expiry-Interval' => SessionExpiry}}, + {will_topic, WillTopic}, + {will_payload, WillPayload}, + {will_qos, 1}, + {will_props, #{'Will-Delay-Interval' => WillDelay}} + | Config + ]), + {ok, _} = emqtt:ConnFun(Client), + ok = emqtt:disconnect(Client, ?RC_UNSPECIFIED_ERROR), + + ?assertReceive({deliver, WillTopic, #message{payload = WillPayload}}, 10_000), + %% No duplicates + ?assertNotReceive({deliver, WillTopic, _}, 100), + + ok + end, + [] + ), ok. get_topicwise_order(Msgs) -> @@ -1311,7 +1361,3 @@ pick_respective_msgs(MsgRefs, Msgs) -> debug_info(ClientId) -> Info = emqx_persistent_session_ds:print_session(ClientId), ct:pal("*** State:~n~p", [Info]). - -on_client_connack(_ConnInfo, _ReasonCode, _Props, _TestPid) -> - ?tp(client_connack, #{}), - ok. diff --git a/apps/emqx/test/emqx_takeover_SUITE.erl b/apps/emqx/test/emqx_takeover_SUITE.erl index 4cc9f29bf..c9fdeed51 100644 --- a/apps/emqx/test/emqx_takeover_SUITE.erl +++ b/apps/emqx/test/emqx_takeover_SUITE.erl @@ -23,6 +23,7 @@ -include_lib("emqx/include/asserts.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). +-include_lib("emqx_utils/include/emqx_message.hrl"). -define(CNT, 100). -define(SLEEP, 10). @@ -32,25 +33,15 @@ all() -> [ - {group, mqttv3}, - {group, mqttv5} + {group, persistence_disabled}, + {group, persistence_enabled} ]. -init_per_suite(Config) -> - Apps = emqx_cth_suite:start( - [emqx], - #{work_dir => emqx_cth_suite:work_dir(Config)} - ), - emqx_logger:set_log_level(debug), - [{apps, Apps} | Config]. - -end_per_suite(Config) -> - Apps = ?config(apps, Config), - ok = emqx_cth_suite:stop(Apps), - ok. - groups() -> + MQTTGroups = [{group, G} || G <- [mqttv3, mqttv5]], [ + {persistence_enabled, MQTTGroups}, + {persistence_disabled, MQTTGroups}, {mqttv3, [], emqx_common_test_helpers:all(?MODULE) -- tc_v5_only()}, {mqttv5, [], emqx_common_test_helpers:all(?MODULE)} ]. @@ -67,11 +58,55 @@ tc_v5_only() -> t_takeover_session_then_abnormal_disconnect_2 ]. +init_per_suite(Config) -> + emqx_common_test_helpers:clear_screen(), + Config. + +end_per_suite(_Config) -> + ok. + +init_per_group(persistence_enabled = Group, Config) -> + Apps = emqx_cth_suite:start( + [ + {emqx, + "session_persistence = {\n" + " enable = true\n" + " last_alive_update_interval = 100ms\n" + " renew_streams_interval = 100ms\n" + " session_gc_interval = 2s\n" + "}\n"} + ], + #{work_dir => emqx_cth_suite:work_dir(Group, Config)} + ), + emqx_logger:set_log_level(debug), + [ + {apps, Apps}, + {persistence_enabled, true} + | Config + ]; +init_per_group(persistence_disabled = Group, Config) -> + Apps = emqx_cth_suite:start( + [{emqx, "session_persistence.enable = false"}], + #{work_dir => emqx_cth_suite:work_dir(Group, Config)} + ), + emqx_logger:set_log_level(debug), + [ + {apps, Apps}, + {persistence_enabled, false} + | Config + ]; init_per_group(mqttv3, Config) -> lists:keystore(mqtt_vsn, 1, Config, {mqtt_vsn, v3}); init_per_group(mqttv5, Config) -> lists:keystore(mqtt_vsn, 1, Config, {mqtt_vsn, v5}). +end_per_group(Group, Config) when + Group =:= persistence_disabled; + Group =:= persistence_enabled +-> + Apps = ?config(apps, Config), + ok = emqx_cth_suite:stop(Apps), + ok; end_per_group(_Group, _Config) -> ok. @@ -97,19 +132,34 @@ t_takeover(Config) -> ok = timer:sleep(?SLEEP * 2), meck:passthrough([Arg]) end), + meck:expect(emqx_cm, takeover_kick, fun(Arg) -> + %% trigger more complex takeover conditions during 2-phase takeover protocol: + %% when messages are accumulated in 2 processes simultaneously, + %% and need to be properly ordered / deduplicated after the protocol commences. + ok = timer:sleep(?SLEEP * 2), + meck:passthrough([Arg]) + end), Commands = - [{fun start_client/5, [ClientId, ClientId, ?QOS_1, ClientOpts]}] ++ - [{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs] ++ - [{fun start_client/5, [ClientId, ClientId, ?QOS_1, ClientOpts]}] ++ - [{fun publish_msg/2, [Msg]} || Msg <- Client2Msgs] ++ - [{fun stop_client/1, []}], + lists:flatten([ + {fun start_client/5, [ClientId, ClientId, ?QOS_1, ClientOpts]}, + {fun maybe_wait_subscriptions/1, []}, + [{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs], + {fun start_client/5, [ClientId, ClientId, ?QOS_1, ClientOpts]}, + [{fun publish_msg/2, [Msg]} || Msg <- Client2Msgs], + {fun stop_client/1, []} + ]), + Sleep = + case ?config(persistence_enabled, Config) of + true -> 1_500; + false -> ?SLEEP + end, FCtx = lists:foldl( fun({Fun, Args}, Ctx) -> ct:pal("COMMAND: ~p ~p", [element(2, erlang:fun_info(Fun, name)), Args]), apply(Fun, [Ctx | Args]) end, - #{}, + #{persistence_enabled => ?config(persistence_enabled, Config), sleep => Sleep}, Commands ), @@ -117,7 +167,7 @@ t_takeover(Config) -> assert_client_exit(CPid1, ?config(mqtt_vsn, Config), takenover), ?assertReceive({'EXIT', CPid2, normal}), - Received = [Msg || {publish, Msg} <- ?drainMailbox(?SLEEP)], + Received = [Msg || {publish, Msg} <- ?drainMailbox(Sleep)], ct:pal("middle: ~p", [Middle]), ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), assert_messages_missed(AllMsgs, Received), @@ -141,30 +191,36 @@ t_takeover_willmsg(Config) -> {will_qos, 0} ], Commands = - %% GIVEN client connect with will message - [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++ - [ - {fun start_client/5, [ - <>/binary>>, WillTopic, ?QOS_1, [] - ]} - ] ++ - [{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs] ++ + lists:flatten([ + %% GIVEN client connect with will message + {fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}, + {fun maybe_wait_subscriptions/1, []}, + {fun start_client/5, [ + <>/binary>>, WillTopic, ?QOS_1, [] + ]}, + [{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs], %% WHEN client reconnect with clean_start = false - [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++ - [{fun publish_msg/2, [Msg]} || Msg <- Client2Msgs], + {fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}, + [{fun publish_msg/2, [Msg]} || Msg <- Client2Msgs] + ]), FCtx = lists:foldl( fun({Fun, Args}, Ctx) -> ct:pal("COMMAND: ~p ~p", [element(2, erlang:fun_info(Fun, name)), Args]), apply(Fun, [Ctx | Args]) end, - #{}, + #{persistence_enabled => ?config(persistence_enabled, Config)}, Commands ), #{client := [CPid2, CPidSub, CPid1]} = FCtx, assert_client_exit(CPid1, ?config(mqtt_vsn, Config), takenover), - Received = [Msg || {publish, Msg} <- ?drainMailbox(?SLEEP)], + Sleep = + case ?config(persistence_enabled, Config) of + true -> 2_000; + false -> ?SLEEP + end, + Received = [Msg || {publish, Msg} <- ?drainMailbox(Sleep)], ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), {IsWill, ReceivedNoWill} = filter_payload(Received, <<"willpayload">>), assert_messages_missed(AllMsgs, ReceivedNoWill), @@ -297,7 +353,7 @@ t_no_takeover_with_delayed_willmsg(Config) -> ?config(mqtt_vsn, Config) =:= v5 orelse ct:fail("MQTTv5 Only"), process_flag(trap_exit, true), ClientId = atom_to_binary(?FUNCTION_NAME), - WillTopic = <>/binary>>, + WillTopic = <>, Client1Msgs = messages(ClientId, 0, 10), WillOpts = [ {proto_ver, ?config(mqtt_vsn, Config)}, @@ -312,24 +368,25 @@ t_no_takeover_with_delayed_willmsg(Config) -> ], Commands = %% GIVEN: client connect with willmsg payload <<"willpayload_delay3">> and delay-interval 3s - [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++ + lists:flatten( [ - {fun start_client/5, [ - <>/binary>>, WillTopic, ?QOS_1, [] - ]} - ] ++ - [{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs], + {fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}, + {fun maybe_wait_subscriptions/1, []}, + {fun start_client/5, [<>, WillTopic, ?QOS_1, []]}, + [{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs] + ] + ), FCtx = lists:foldl( fun({Fun, Args}, Ctx) -> ct:pal("COMMAND: ~p ~p", [element(2, erlang:fun_info(Fun, name)), Args]), apply(Fun, [Ctx | Args]) end, - #{}, + #{persistence_enabled => ?config(persistence_enabled, Config)}, Commands ), - Received = [Msg || {publish, Msg} <- ?drainMailbox(1000)], + Received = [Msg || {publish, Msg} <- ?drainMailbox(2000)], ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), assert_messages_missed(Client1Msgs, Received), {IsWill0, ReceivedNoWill0} = filter_payload(Received, <<"willpayload_delay3">>), @@ -369,15 +426,15 @@ t_session_expire_with_delayed_willmsg(Config) -> {properties, #{'Session-Expiry-Interval' => 3}} ], Commands = - %% GIVEN: client connect with willmsg payload <<"willpayload_delay10">> - %% and delay-interval 10s > session expiry 3s. - [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++ - [ - {fun start_client/5, [ - <>/binary>>, WillTopic, ?QOS_1, [] - ]} - ] ++ - [{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs], + lists:flatten([ + %% GIVEN: client connect with willmsg payload <<"willpayload_delay10">> + %% and delay-interval 10s > session expiry 3s. + {fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}, + {fun start_client/5, [ + <>/binary>>, WillTopic, ?QOS_1, [] + ]}, + [{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs] + ]), FCtx = lists:foldl( fun({Fun, Args}, Ctx) -> @@ -388,7 +445,7 @@ t_session_expire_with_delayed_willmsg(Config) -> Commands ), - Received = [Msg || {publish, Msg} <- ?drainMailbox(1000)], + Received = [Msg || {publish, Msg} <- ?drainMailbox(2000)], ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), {IsWill, ReceivedNoWill} = filter_payload(Received, <<"willpayload_delay10">>), ?assertNot(IsWill), @@ -404,7 +461,15 @@ t_session_expire_with_delayed_willmsg(Config) -> ?assertNot(IsWill1), ?assertEqual([], ReceivedNoWill1), %% THEN: for MQTT v5, payload <<"willpayload_delay3">> should be published after session expiry. - Received2 = [Msg || {publish, Msg} <- ?drainMailbox(5000)], + SessionSleep = + case ?config(persistence_enabled, Config) of + true -> + %% Session GC uses a larger, safer cutoff time. + 10_000; + false -> + 5_000 + end, + Received2 = [Msg || {publish, Msg} <- ?drainMailbox(SessionSleep)], {IsWill12, ReceivedNoWill2} = filter_payload(Received2, <<"willpayload_delay10">>), ?assertEqual([], ReceivedNoWill2), ?assert(IsWill12), @@ -493,25 +558,23 @@ t_takeover_before_session_expire(Config) -> Commands = %% GIVEN: client connect with willmsg payload <<"willpayload_delay10">> %% and delay-interval 10s > session expiry 3s. - [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++ - [ - {fun start_client/5, [ - <>/binary>>, WillTopic, ?QOS_1, [] - ]} - ] ++ - [{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs] ++ - [ - %% avoid two clients race for session takeover - { - fun(CTX) -> - timer:sleep(100), - CTX - end, - [] - } - ] ++ + lists:flatten([ + {fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}, + {fun start_client/5, [ + <>/binary>>, WillTopic, ?QOS_1, [] + ]}, + [{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs], + %% avoid two clients race for session takeover + { + fun(CTX) -> + timer:sleep(100), + CTX + end, + [] + }, %% WHEN: client session is taken over within 3s. - [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}], + {fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]} + ]), FCtx = lists:foldl( fun({Fun, Args}, Ctx) -> @@ -540,7 +603,7 @@ t_takeover_session_then_normal_disconnect(Config) -> ?config(mqtt_vsn, Config) =:= v5 orelse ct:fail("MQTTv5 Only"), process_flag(trap_exit, true), ClientId = atom_to_binary(?FUNCTION_NAME), - WillTopic = <>/binary>>, + WillTopic = <>, Client1Msgs = messages(ClientId, 0, 10), WillOpts = [ {proto_ver, ?config(mqtt_vsn, Config)}, @@ -552,40 +615,42 @@ t_takeover_session_then_normal_disconnect(Config) -> {properties, #{'Session-Expiry-Interval' => 3}} ], Commands = - [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++ - [ - {fun start_client/5, [ - <>/binary>>, WillTopic, ?QOS_1, [] - ]} - ] ++ - [{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs] ++ - [ - %% avoid two clients race for session takeover - { - fun(CTX) -> - timer:sleep(100), - CTX - end, - [] - } - ] ++ + lists:flatten([ + {fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}, + {fun maybe_wait_subscriptions/1, []}, + {fun start_client/5, [ + <>, WillTopic, ?QOS_1, [] + ]}, + [{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs], + %% avoid two clients race for session takeover + { + fun(CTX) -> + timer:sleep(100), + CTX + end, + [] + }, %% GIVEN: client reconnect with willmsg payload <<"willpayload_delay10">> %% and delay-interval 10s > session expiry 3s. - [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}], + {fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}, + {fun maybe_wait_subscriptions/1, []} + ]), FCtx = lists:foldl( fun({Fun, Args}, Ctx) -> ct:pal("COMMAND: ~p ~p", [element(2, erlang:fun_info(Fun, name)), Args]), apply(Fun, [Ctx | Args]) end, - #{}, + #{persistence_enabled => ?config(persistence_enabled, Config)}, Commands ), #{client := [CPid2, CPidSub, CPid1]} = FCtx, assert_client_exit(CPid1, ?config(mqtt_vsn, Config), takenover), + Received1 = [Msg || {publish, Msg} <- ?drainMailbox(1000)], %% WHEN: client disconnect normally. emqtt:disconnect(CPid2, ?RC_SUCCESS), - Received = [Msg || {publish, Msg} <- ?drainMailbox(1000)], + Received2 = [Msg || {publish, Msg} <- ?drainMailbox(1000)], + Received = Received1 ++ Received2, ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), {IsWill, ReceivedNoWill} = filter_payload(Received, <<"willpayload_delay10">>), %% THEN: willmsg is not published. @@ -600,7 +665,7 @@ t_takeover_session_then_abnormal_disconnect(Config) -> ?config(mqtt_vsn, Config) =:= v5 orelse ct:fail("MQTTv5 Only"), process_flag(trap_exit, true), ClientId = atom_to_binary(?FUNCTION_NAME), - WillTopic = <>/binary>>, + WillTopic = <>, Client1Msgs = messages(ClientId, 0, 10), WillOpts = [ {proto_ver, ?config(mqtt_vsn, Config)}, @@ -612,26 +677,24 @@ t_takeover_session_then_abnormal_disconnect(Config) -> {properties, #{'Session-Expiry-Interval' => 3}} ], Commands = - %% GIVEN: client connect with willmsg payload <<"willpayload_delay10">> - %% and will-delay-interval 10s > session expiry 3s. - [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++ - [ - {fun start_client/5, [ - <>/binary>>, WillTopic, ?QOS_1, [] - ]} - ] ++ - [{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs] ++ - [ - %% avoid two clients race for session takeover - { - fun(CTX) -> - timer:sleep(100), - CTX - end, - [] - } - ] ++ - [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}], + lists:flatten([ + %% GIVEN: client connect with willmsg payload <<"willpayload_delay10">> + %% and will-delay-interval 10s > session expiry 3s. + {fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}, + {fun start_client/5, [ + <>, WillTopic, ?QOS_1, [] + ]}, + %% avoid two clients race for session takeover + { + fun(CTX) -> + timer:sleep(100), + CTX + end, + [] + }, + [{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs], + {fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]} + ]), FCtx = lists:foldl( fun({Fun, Args}, Ctx) -> @@ -643,16 +706,26 @@ t_takeover_session_then_abnormal_disconnect(Config) -> ), #{client := [CPid2, CPidSub, CPid1]} = FCtx, assert_client_exit(CPid1, ?config(mqtt_vsn, Config), takenover), + Received1 = [Msg || {publish, Msg} <- ?drainMailbox(1000)], %% WHEN: client disconnect abnormally emqtt:disconnect(CPid2, ?RC_DISCONNECT_WITH_WILL_MESSAGE), - Received = [Msg || {publish, Msg} <- ?drainMailbox(1000)], + Received2 = [Msg || {publish, Msg} <- ?drainMailbox(1000)], + Received = Received1 ++ Received2, ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), {IsWill, ReceivedNoWill} = filter_payload(Received, <<"willpayload_delay10">>), %% THEN: willmsg is not published before session expiry ?assertNot(IsWill), ?assertNotEqual([], ReceivedNoWill), - Received1 = [Msg || {publish, Msg} <- ?drainMailbox(3000)], - {IsWill1, ReceivedNoWill1} = filter_payload(Received1, <<"willpayload_delay10">>), + SessionSleep = + case ?config(persistence_enabled, Config) of + true -> + %% Session GC uses a larger, safer cutoff time (GC interval x 3) + 10_000; + false -> + 3_000 + end, + Received3 = [Msg || {publish, Msg} <- ?drainMailbox(SessionSleep)], + {IsWill1, ReceivedNoWill1} = filter_payload(Received3, <<"willpayload_delay10">>), %% AND THEN: willmsg is published after session expiry ?assert(IsWill1), ?assertEqual([], ReceivedNoWill1), @@ -866,19 +939,39 @@ start_client(Ctx, ClientId, Topic, Qos, Opts) -> end), Ctx#{client => [CPid | maps:get(client, Ctx, [])]}. +maybe_wait_subscriptions(Ctx = #{persistence_enabled := true, client := CPids}) -> + ok = do_wait_subscription(CPids), + Ctx; +maybe_wait_subscriptions(Ctx) -> + Ctx. + +do_wait_subscription([]) -> + ok; +do_wait_subscription([CPid | Rest]) -> + try emqtt:subscriptions(CPid) of + [] -> + ok = timer:sleep(rand:uniform(?SLEEP)), + do_wait_subscription([CPid | Rest]); + [_ | _] -> + do_wait_subscription(Rest) + catch + exit:{noproc, _} -> + ok + end. + kick_client(Ctx, ClientId) -> ok = emqx_cm:kick_session(ClientId), Ctx. publish_msg(Ctx, Msg) -> ok = timer:sleep(rand:uniform(?SLEEP)), - case emqx:publish(Msg) of + case emqx:publish(Msg#message{timestamp = emqx_message:timestamp_now()}) of [] -> publish_msg(Ctx, Msg); [_ | _] -> Ctx end. -stop_client(Ctx = #{client := [CPid | _]}) -> - ok = timer:sleep(?SLEEP), +stop_client(Ctx = #{client := [CPid | _], sleep := Sleep}) -> + ok = timer:sleep(Sleep), ok = emqtt:stop(CPid), Ctx. @@ -904,14 +997,18 @@ assert_messages_missed(Ls1, Ls2) -> error end. -assert_messages_order([], []) -> +assert_messages_order([] = _Expected, _Received) -> ok; assert_messages_order([Msg | Expected], Received) -> %% Account for duplicate messages: case lists:splitwith(fun(#{payload := P}) -> emqx_message:payload(Msg) == P end, Received) of - {[], [#{payload := Mismatch} | _]} -> + {[], [#{timestamp := TSMismatch, payload := Mismatch} | _]} -> ct:fail("Message order is not correct, expected: ~p, received: ~p", [ - emqx_message:payload(Msg), Mismatch + { + emqx_utils_calendar:epoch_to_rfc3339(emqx_message:timestamp(Msg)), + emqx_message:payload(Msg) + }, + {emqx_utils_calendar:epoch_to_rfc3339(TSMismatch), Mismatch} ]), error; {_Matching, Rest} ->