From 0f426e6e77273bc18640698defaa0edc856b1cdf Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Tue, 19 Mar 2024 15:45:52 -0300 Subject: [PATCH] feat(ds): make durable sessions handle will messages Fixes https://emqx.atlassian.net/browse/EMQX-10431 --- .../emqx_persistent_session_ds_SUITE.erl | 73 +++- apps/emqx/priv/bpapi.versions | 1 + apps/emqx/src/emqx_channel.erl | 90 +++-- apps/emqx/src/emqx_cm.erl | 77 +++- apps/emqx/src/emqx_persistent_session_ds.erl | 58 ++- apps/emqx/src/emqx_persistent_session_ds.hrl | 1 + .../emqx_persistent_session_ds_gc_worker.erl | 110 +++++- .../src/emqx_persistent_session_ds_state.erl | 28 +- apps/emqx/src/emqx_session.erl | 16 +- apps/emqx/src/emqx_session_mem.erl | 13 +- apps/emqx/src/proto/emqx_cm_proto_v2.erl | 4 + apps/emqx/src/proto/emqx_cm_proto_v3.erl | 106 ++++++ apps/emqx/test/emqx_cth_suite.erl | 1 + .../test/emqx_persistent_session_SUITE.erl | 86 ++++- apps/emqx/test/emqx_takeover_SUITE.erl | 343 +++++++++++------- 15 files changed, 789 insertions(+), 218 deletions(-) create mode 100644 apps/emqx/src/proto/emqx_cm_proto_v3.erl diff --git a/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl b/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl index 1f358e3c0..fae54d612 100644 --- a/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl +++ b/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl @@ -25,7 +25,6 @@ all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> - emqx_common_test_helpers:clear_screen(), TCApps = emqx_cth_suite:start( app_specs(), #{work_dir => emqx_cth_suite:work_dir(Config)} @@ -144,6 +143,7 @@ wait_gen_rpc_down(_NodeSpec = #{apps := Apps}) -> start_client(Opts0 = #{}) -> Defaults = #{ + port => 1883, proto_ver => v5, properties => #{'Session-Expiry-Interval' => 300} }, @@ -190,6 +190,23 @@ list_all_subscriptions(Node) -> list_all_pubranges(Node) -> erpc:call(Node, emqx_persistent_session_ds, list_all_pubranges, []). +session_open(Node, ClientId) -> + ClientInfo = #{}, + ConnInfo = #{peername => {undefined, undefined}}, + WillMsg = undefined, + erpc:call( + Node, + emqx_persistent_session_ds, + session_open, + [ClientId, ClientInfo, ConnInfo, WillMsg] + ). + +force_last_alive_at(ClientId, Time) -> + {ok, S0} = emqx_persistent_session_ds_state:open(ClientId), + S = emqx_persistent_session_ds_state:set_last_alive_at(Time, S0), + _ = emqx_persistent_session_ds_state:commit(S), + ok. + %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ @@ -244,11 +261,7 @@ t_session_subscription_idempotency(Config) -> end, fun(Trace) -> ct:pal("trace:\n ~p", [Trace]), - ConnInfo = #{peername => {undefined, undefined}}, - WillMsg = undefined, - Session = erpc:call( - Node1, emqx_persistent_session_ds, session_open, [ClientId, ConnInfo, WillMsg] - ), + Session = session_open(Node1, ClientId), ?assertMatch( #{SubTopicFilter := #{}}, emqx_session:info(subscriptions, Session) @@ -322,11 +335,7 @@ t_session_unsubscription_idempotency(Config) -> end, fun(Trace) -> ct:pal("trace:\n ~p", [Trace]), - ConnInfo = #{peername => {undefined, undefined}}, - WillMsg = undefined, - Session = erpc:call( - Node1, emqx_persistent_session_ds, session_open, [ClientId, ConnInfo, WillMsg] - ), + Session = session_open(Node1, ClientId), ?assertEqual( #{}, emqx_session:info(subscriptions, Session) @@ -555,6 +564,7 @@ t_session_gc(Config) -> ), %% Clients are still alive; no session is garbage collected. + ?tp(notice, "waiting for gc", #{}), ?assertMatch( {ok, _}, ?block_until( @@ -567,9 +577,11 @@ t_session_gc(Config) -> ), ?assertMatch([_, _, _], list_all_sessions(Node1), sessions), ?assertMatch([_, _, _], list_all_subscriptions(Node1), subscriptions), + ?tp(notice, "gc ran", #{}), %% Now we disconnect 2 of them; only those should be GC'ed. + ?tp(notice, "disconnecting client1", #{}), ?assertMatch( {ok, {ok, _}}, ?wait_async_action( @@ -674,3 +686,42 @@ t_session_replay_retry(_Config) -> [maps:with([topic, payload, qos], P) || P <- Pubs0], [maps:with([topic, payload, qos], P) || P <- Pubs1 ++ Pubs2] ). + +%% Check that we send will messages when performing GC without relying on timers set by +%% the channel process. +t_session_gc_will_message(_Config) -> + ?check_trace( + #{timetrap => 10_000}, + begin + WillTopic = <<"will/t">>, + ok = emqx:subscribe(WillTopic, #{qos => 2}), + ClientId = <<"will_msg_client">>, + Client = start_client(#{ + clientid => ClientId, + will_topic => WillTopic, + will_payload => <<"will payload">>, + will_qos => 0, + will_props => #{'Will-Delay-Interval' => 300} + }), + {ok, _} = emqtt:connect(Client), + %% Use reason code =/= `?RC_SUCCESS' to allow will message + {ok, {ok, _}} = + ?wait_async_action( + emqtt:disconnect(Client, ?RC_UNSPECIFIED_ERROR), + #{?snk_kind := emqx_cm_clean_down} + ), + ?assertNotReceive({deliver, WillTopic, _}), + %% Set fake `last_alive_at' to trigger immediate will message. + force_last_alive_at(ClientId, _Time = 0), + {ok, {ok, _}} = + ?wait_async_action( + emqx_persistent_session_ds_gc_worker:check_session(ClientId), + #{?snk_kind := session_gc_published_will_msg} + ), + ?assertReceive({deliver, WillTopic, _}), + + ok + end, + [] + ), + ok. diff --git a/apps/emqx/priv/bpapi.versions b/apps/emqx/priv/bpapi.versions index 71cfccdea..b35808911 100644 --- a/apps/emqx/priv/bpapi.versions +++ b/apps/emqx/priv/bpapi.versions @@ -12,6 +12,7 @@ {emqx_broker,1}. {emqx_cm,1}. {emqx_cm,2}. +{emqx_cm,3}. {emqx_conf,1}. {emqx_conf,2}. {emqx_conf,3}. diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 59786afea..0a0f37bcb 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -64,6 +64,12 @@ maybe_nack/1 ]). +%% Export for DS session GC worker and session implementations +-export([ + will_delay_interval/1, + prepare_will_message_for_publishing/2 +]). + %% Exports for CT -export([set_field/3]). @@ -885,9 +891,10 @@ do_unsubscribe( %%-------------------------------------------------------------------- %% MQTT-v5.0: 3.14.4 DISCONNECT Actions -maybe_clean_will_msg(?RC_SUCCESS, Channel) -> +maybe_clean_will_msg(?RC_SUCCESS, Channel = #channel{session = Session0}) -> %% [MQTT-3.14.4-3] - Channel#channel{will_msg = undefined}; + Session = emqx_session:clear_will_message(Session0), + Channel#channel{will_msg = undefined, session = Session}; maybe_clean_will_msg(_ReasonCode, Channel) -> Channel. @@ -1204,6 +1211,9 @@ handle_call( ), Channel0 = maybe_publish_will_msg(takenover, Channel), disconnect_and_shutdown(takenover, AllPendings, Channel0); +handle_call(takeover_kick, Channel) -> + Channel0 = maybe_publish_will_msg(takenover, Channel), + disconnect_and_shutdown(takenover, ok, Channel0); handle_call(list_authz_cache, Channel) -> {reply, emqx_authz_cache:list_authz_cache(), Channel}; handle_call( @@ -2301,17 +2311,17 @@ maybe_publish_will_msg( Channel; maybe_publish_will_msg( _Reason, - Channel = #channel{ + Channel0 = #channel{ conninfo = #{proto_ver := ?MQTT_PROTO_V3, clientid := ClientId} } ) -> %% Unconditionally publish will message for MQTT 3.1.1 ?tp(debug, maybe_publish_willmsg_v3, #{clientid => ClientId}), - _ = publish_will_msg(Channel), - Channel#channel{will_msg = undefined}; + Channel = publish_will_msg(Channel0), + remove_willmsg(Channel); maybe_publish_will_msg( Reason, - Channel = #channel{ + Channel0 = #channel{ conninfo = #{clientid := ClientId} } ) when @@ -2329,12 +2339,20 @@ maybe_publish_will_msg( %% d. internal_error (maybe not recoverable) %% This ensures willmsg will be published if the willmsg timer is scheduled but not fired %% OR fired but not yet handled - ?tp(debug, maybe_publish_willmsg_session_ends, #{clientid => ClientId, reason => Reason}), - _ = publish_will_msg(Channel), - remove_willmsg(Channel); + %% NOTE! For durable sessions, `?chan_terminating' does NOT imply that the session is + %% gone. + case is_durable_session(Channel0) andalso Reason =:= ?chan_terminating of + false -> + ?tp(debug, maybe_publish_willmsg_session_ends, #{clientid => ClientId, reason => Reason}), + + Channel = publish_will_msg(Channel0), + remove_willmsg(Channel); + true -> + Channel0 + end; maybe_publish_will_msg( takenover, - Channel = #channel{ + Channel0 = #channel{ will_msg = WillMsg, conninfo = #{clientid := ClientId} } @@ -2352,7 +2370,8 @@ maybe_publish_will_msg( case will_delay_interval(WillMsg) of 0 -> ?tp(debug, maybe_publish_willmsg_takenover_pub, #{clientid => ClientId}), - _ = publish_will_msg(Channel); + Channel = publish_will_msg(Channel0), + ok; I when I > 0 -> %% @NOTE Non-normative comment in MQTT 5.0 spec %% """ @@ -2361,12 +2380,13 @@ maybe_publish_will_msg( %% before the Will Message is published. %% """ ?tp(debug, maybe_publish_willmsg_takenover_skip, #{clientid => ClientId}), + Channel = Channel0, skip end, remove_willmsg(Channel); maybe_publish_will_msg( Reason, - Channel = #channel{ + Channel0 = #channel{ will_msg = WillMsg, conninfo = #{clientid := ClientId} } @@ -2377,11 +2397,11 @@ maybe_publish_will_msg( ?tp(debug, maybe_publish_will_msg_other_publish, #{ clientid => ClientId, reason => Reason }), - _ = publish_will_msg(Channel), + Channel = publish_will_msg(Channel0), remove_willmsg(Channel); I when I > 0 -> ?tp(debug, maybe_publish_will_msg_other_delay, #{clientid => ClientId, reason => Reason}), - ensure_timer(will_message, timer:seconds(I), Channel) + ensure_timer(will_message, timer:seconds(I), Channel0) end. will_delay_interval(WillMsg) -> @@ -2394,15 +2414,15 @@ will_delay_interval(WillMsg) -> publish_will_msg( #channel{ session = Session, - clientinfo = ClientInfo = #{mountpoint := MountPoint}, + clientinfo = ClientInfo, will_msg = Msg = #message{topic = Topic} - } + } = Channel ) -> - Action = authz_action(Msg), - PublishingDisallowed = emqx_access_control:authorize(ClientInfo, Action, Topic) =/= allow, - ClientBanned = emqx_banned:check(ClientInfo), - case PublishingDisallowed orelse ClientBanned of - true -> + case prepare_will_message_for_publishing(ClientInfo, Msg) of + {ok, PreparedMessage} -> + NSession = emqx_session:publish_will_message_now(Session, PreparedMessage), + Channel#channel{session = NSession}; + {error, #{client_banned := ClientBanned, publishing_disallowed := PublishingDisallowed}} -> ?tp( warning, last_will_testament_publish_denied, @@ -2412,12 +2432,23 @@ publish_will_msg( publishing_disallowed => PublishingDisallowed } ), - ok; + Channel + end. + +prepare_will_message_for_publishing( + ClientInfo = #{mountpoint := MountPoint}, + Msg = #message{topic = Topic} +) -> + Action = authz_action(Msg), + PublishingDisallowed = emqx_access_control:authorize(ClientInfo, Action, Topic) =/= allow, + ClientBanned = emqx_banned:check(ClientInfo), + case PublishingDisallowed orelse ClientBanned of + true -> + {error, #{client_banned => ClientBanned, publishing_disallowed => PublishingDisallowed}}; false -> NMsg = emqx_mountpoint:mount(MountPoint, Msg), - NMsg2 = NMsg#message{timestamp = erlang:system_time(millisecond)}, - ok = emqx_session:publish_will_message(Session, NMsg2), - ok + PreparedMessage = NMsg#message{timestamp = emqx_message:timestamp_now()}, + {ok, PreparedMessage} end. %%-------------------------------------------------------------------- @@ -2529,6 +2560,15 @@ remove_willmsg(Channel = #channel{timers = Timers}) -> timers = maps:remove(will_message, Timers) } end. + +is_durable_session(#channel{session = Session}) -> + case emqx_session:info(impl, Session) of + emqx_persistent_session_ds -> + true; + _ -> + false + end. + %%-------------------------------------------------------------------- %% For CT tests %%-------------------------------------------------------------------- diff --git a/apps/emqx/src/emqx_cm.erl b/apps/emqx/src/emqx_cm.erl index 072611949..421452554 100644 --- a/apps/emqx/src/emqx_cm.erl +++ b/apps/emqx/src/emqx_cm.erl @@ -53,7 +53,8 @@ takeover_session_begin/1, takeover_session_end/1, kick_session/1, - kick_session/2 + kick_session/2, + takeover_kick/1 ]). -export([ @@ -100,6 +101,7 @@ takeover_session/2, takeover_finish/2, do_kick_session/3, + do_takeover_kick_session_v3/2, do_get_chan_info/2, do_get_chan_stats/2, do_get_chann_conn_mod/2 @@ -122,6 +124,8 @@ -type takeover_state() :: {_ConnMod :: module(), _ChanPid :: pid()}. +-define(BPAPI_NAME, emqx_cm). + -define(CHAN_STATS, [ {?CHAN_TAB, 'channels.count', 'channels.max'}, {?CHAN_TAB, 'sessions.count', 'sessions.max'}, @@ -352,6 +356,38 @@ pick_channel(ClientId) -> ChanPid end. +%% Used by `emqx_persistent_session_ds' +-spec takeover_kick(emqx_types:clientid()) -> ok. +takeover_kick(ClientId) -> + case lookup_channels(ClientId) of + [] -> + ok; + ChanPids -> + lists:foreach( + fun(Pid) -> + do_takeover_session(ClientId, Pid) + end, + ChanPids + ) + end. + +%% Used by `emqx_persistent_session_ds'. +%% We stop any running channels with reason `takenover' so that correct reason codes and +%% will message processing may take place. For older BPAPI nodes, we don't have much +%% choice other than calling the old `discard_session' code. +do_takeover_session(ClientId, Pid) -> + Node = node(Pid), + case emqx_bpapi:supported_version(Node, ?BPAPI_NAME) of + undefined -> + %% Race: node (re)starting? Assume v2. + discard_session(ClientId, Pid); + Vsn when Vsn =< 2 -> + discard_session(ClientId, Pid); + _Vsn -> + takeover_kick_session(ClientId, Pid) + end. + +%% Used only by `emqx_session_mem' takeover_finish(ConnMod, ChanPid) -> request_stepdown( {takeover, 'end'}, @@ -360,6 +396,7 @@ takeover_finish(ConnMod, ChanPid) -> ). %% @doc RPC Target @ emqx_cm_proto_v2:takeover_session/2 +%% Used only by `emqx_session_mem' takeover_session(ClientId, Pid) -> try do_takeover_begin(ClientId, Pid) @@ -415,7 +452,7 @@ discard_session(ClientId) when is_binary(ClientId) -> | {ok, emqx_session:t() | _ReplayContext} | {error, term()} when - Action :: kick | discard | {takeover, 'begin'} | {takeover, 'end'}. + Action :: kick | discard | {takeover, 'begin'} | {takeover, 'end'} | takeover_kick. request_stepdown(Action, ConnMod, Pid) -> Timeout = case Action == kick orelse Action == discard of @@ -496,7 +533,19 @@ do_kick_session(Action, ClientId, ChanPid) when node(ChanPid) =:= node() -> ok = request_stepdown(Action, ConnMod, ChanPid) end. -%% @private This function is shared for session 'kick' and 'discard' (as the first arg Action). +%% @doc RPC Target for emqx_cm_proto_v3:takeover_kick_session/3 +-spec do_takeover_kick_session_v3(emqx_types:clientid(), chan_pid()) -> ok. +do_takeover_kick_session_v3(ClientId, ChanPid) when node(ChanPid) =:= node() -> + case do_get_chann_conn_mod(ClientId, ChanPid) of + undefined -> + %% already deregistered + ok; + ConnMod when is_atom(ConnMod) -> + ok = request_stepdown(takeover_kick, ConnMod, ChanPid) + end. + +%% @private This function is shared for session `kick' and `discard' (as the first arg +%% Action). kick_session(Action, ClientId, ChanPid) -> try wrap_rpc(emqx_cm_proto_v2:kick_session(Action, ClientId, ChanPid)) @@ -519,6 +568,28 @@ kick_session(Action, ClientId, ChanPid) -> ) end. +takeover_kick_session(ClientId, ChanPid) -> + try + wrap_rpc(emqx_cm_proto_v3:takeover_kick_session(ClientId, ChanPid)) + catch + Error:Reason -> + %% This should mostly be RPC failures. + %% However, if the node is still running the old version + %% code (prior to emqx app 4.3.10) some of the RPC handler + %% exceptions may get propagated to a new version node + ?SLOG( + error, + #{ + msg => "failed_to_kick_session_on_remote_node", + node => node(ChanPid), + action => takeover, + error => Error, + reason => Reason + }, + #{clientid => ClientId} + ) + end. + kick_session(ClientId) -> case lookup_channels(ClientId) of [] -> diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 72014bd37..cc861f71f 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -68,7 +68,8 @@ %% Will message handling -export([ - publish_will_message/2 + clear_will_message/1, + publish_will_message_now/2 ]). %% Managment APIs: @@ -93,7 +94,7 @@ -ifdef(TEST). -export([ - session_open/3, + session_open/4, list_all_sessions/0 ]). -endif. @@ -189,20 +190,20 @@ -spec create(clientinfo(), conninfo(), emqx_maybe:t(message()), emqx_session:conf()) -> session(). -create(#{clientid := ClientID}, ConnInfo, MaybeWillMsg, Conf) -> - ensure_timers(session_ensure_new(ClientID, ConnInfo, MaybeWillMsg, Conf)). +create(#{clientid := ClientID} = ClientInfo, ConnInfo, MaybeWillMsg, Conf) -> + ensure_timers(session_ensure_new(ClientID, ClientInfo, ConnInfo, MaybeWillMsg, Conf)). -spec open(clientinfo(), conninfo(), emqx_maybe:t(message()), emqx_session:conf()) -> {_IsPresent :: true, session(), []} | false. -open(#{clientid := ClientID} = _ClientInfo, ConnInfo, MaybeWillMsg, Conf) -> +open(#{clientid := ClientID} = ClientInfo, ConnInfo, MaybeWillMsg, Conf) -> %% NOTE %% The fact that we need to concern about discarding all live channels here %% is essentially a consequence of the in-memory session design, where we %% have disconnected channels holding onto session state. Ideally, we should %% somehow isolate those idling not-yet-expired sessions into a separate process %% space, and move this call back into `emqx_cm` where it belongs. - ok = emqx_cm:discard_session(ClientID), - case session_open(ClientID, ConnInfo, MaybeWillMsg) of + ok = emqx_cm:takeover_kick(ClientID), + case session_open(ClientID, ClientInfo, ConnInfo, MaybeWillMsg) of Session0 = #{} -> Session = Session0#{props => Conf}, {true, ensure_timers(Session), []}; @@ -613,7 +614,8 @@ disconnect(Session = #{s := S0}, ConnInfo) -> {shutdown, Session#{s => S}}. -spec terminate(Reason :: term(), session()) -> ok. -terminate(_Reason, _Session = #{id := Id, s := S}) -> +terminate(_Reason, Session = #{id := Id, s := S}) -> + maybe_set_will_message_timer(Session), _ = emqx_persistent_session_ds_state:commit(S), ?tp(debug, persistent_session_ds_terminate, #{id => Id}), ok. @@ -685,9 +687,9 @@ sync(ClientId) -> %% %% Note: session API doesn't handle session takeovers, it's the job of %% the broker. --spec session_open(id(), emqx_types:conninfo(), emqx_maybe:t(message())) -> +-spec session_open(id(), emqx_types:clientinfo(), emqx_types:conninfo(), emqx_maybe:t(message())) -> session() | false. -session_open(SessionId, NewConnInfo, MaybeWillMsg) -> +session_open(SessionId, ClientInfo, NewConnInfo, MaybeWillMsg) -> NowMS = now_ms(), case emqx_persistent_session_ds_state:open(SessionId) of {ok, S0} -> @@ -706,7 +708,8 @@ session_open(SessionId, NewConnInfo, MaybeWillMsg) -> maps:get(peername, NewConnInfo), S2 ), S4 = emqx_persistent_session_ds_state:set_will_message(MaybeWillMsg, S3), - S = emqx_persistent_session_ds_state:commit(S4), + S5 = emqx_persistent_session_ds_state:set_clientinfo(ClientInfo, S4), + S = emqx_persistent_session_ds_state:commit(S5), Inflight = emqx_persistent_session_ds_inflight:new( receive_maximum(NewConnInfo) ), @@ -723,12 +726,13 @@ session_open(SessionId, NewConnInfo, MaybeWillMsg) -> -spec session_ensure_new( id(), + emqx_types:clientinfo(), emqx_types:conninfo(), emqx_maybe:t(message()), emqx_session:conf() ) -> session(). -session_ensure_new(Id, ConnInfo, MaybeWillMsg, Conf) -> +session_ensure_new(Id, ClientInfo, ConnInfo, MaybeWillMsg, Conf) -> ?tp(debug, persistent_session_ds_ensure_new, #{id => Id}), Now = now_ms(), S0 = emqx_persistent_session_ds_state:create_new(Id), @@ -751,7 +755,8 @@ session_ensure_new(Id, ConnInfo, MaybeWillMsg, Conf) -> ] ), S5 = emqx_persistent_session_ds_state:set_will_message(MaybeWillMsg, S4), - S = emqx_persistent_session_ds_state:commit(S5), + S6 = emqx_persistent_session_ds_state:set_clientinfo(ClientInfo, S5), + S = emqx_persistent_session_ds_state:commit(S6), #{ id => Id, props => Conf, @@ -1208,10 +1213,29 @@ seqno_diff(?QOS_2, A, B) -> %% Will message handling %%-------------------------------------------------------------------- --spec publish_will_message(session(), message()) -> ok. -publish_will_message(_Session, #message{} = _WillMsg) -> - %% TODO - ok. +-spec clear_will_message(session()) -> session(). +clear_will_message(#{s := S0} = Session) -> + S = emqx_persistent_session_ds_state:clear_will_message(S0), + Session#{s := S}. + +-spec publish_will_message_now(session(), message()) -> session(). +publish_will_message_now(#{} = Session, WillMsg = #message{}) -> + _ = emqx_broker:publish(WillMsg), + clear_will_message(Session). + +maybe_set_will_message_timer(#{id := SessionId, s := S}) -> + case emqx_persistent_session_ds_state:get_will_message(S) of + #message{} = WillMsg -> + WillDelayInterval = emqx_channel:will_delay_interval(WillMsg), + WillDelayInterval > 0 andalso + emqx_persistent_session_ds_gc_worker:check_session_after( + SessionId, + timer:seconds(WillDelayInterval) + ), + ok; + _ -> + ok + end. %%-------------------------------------------------------------------- %% Tests diff --git a/apps/emqx/src/emqx_persistent_session_ds.hrl b/apps/emqx/src/emqx_persistent_session_ds.hrl index 67ef485d5..56862dfa5 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.hrl +++ b/apps/emqx/src/emqx_persistent_session_ds.hrl @@ -76,5 +76,6 @@ -define(last_id, last_id). -define(peername, peername). -define(will_message, will_message). +-define(clientinfo, clientinfo). -endif. diff --git a/apps/emqx/src/emqx_persistent_session_ds_gc_worker.erl b/apps/emqx/src/emqx_persistent_session_ds_gc_worker.erl index a4d1fe638..4f9accfb2 100644 --- a/apps/emqx/src/emqx_persistent_session_ds_gc_worker.erl +++ b/apps/emqx/src/emqx_persistent_session_ds_gc_worker.erl @@ -25,7 +25,9 @@ %% API -export([ - start_link/0 + start_link/0, + check_session/1, + check_session_after/2 ]). %% `gen_server' API @@ -38,6 +40,7 @@ %% call/cast/info records -record(gc, {}). +-record(check_session, {id :: emqx_persistent_session_ds:id()}). %%-------------------------------------------------------------------------------- %% API @@ -46,6 +49,17 @@ start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). +-spec check_session(emqx_persistent_session_ds:id()) -> ok. +check_session(SessionId) -> + gen_server:cast(?MODULE, #check_session{id = SessionId}). + +-spec check_session_after(emqx_persistent_session_ds:id(), pos_integer()) -> ok. +check_session_after(SessionId, Time0) -> + #{bump_interval := BumpInterval} = gc_context(), + Time = max(Time0, BumpInterval), + _ = erlang:send_after(Time, ?MODULE, #check_session{id = SessionId}), + ok. + %%-------------------------------------------------------------------------------- %% `gen_server' API %%-------------------------------------------------------------------------------- @@ -58,6 +72,9 @@ init(_Opts) -> handle_call(_Call, _From, State) -> {reply, error, State}. +handle_cast(#check_session{id = SessionId}, State) -> + do_check_session(SessionId), + {noreply, State}; handle_cast(_Cast, State) -> {noreply, State}. @@ -65,6 +82,9 @@ handle_info(#gc{}, State) -> try_gc(), ensure_gc_timer(), {noreply, State}; +handle_info(#check_session{id = SessionId}, State) -> + do_check_session(SessionId), + {noreply, State}; handle_info(_Info, State) -> {noreply, State}. @@ -104,25 +124,65 @@ now_ms() -> erlang:system_time(millisecond). start_gc() -> + #{ + min_last_alive := MinLastAlive, + min_last_alive_will_msg := MinLastAliveWillMsg + } = gc_context(), + gc_loop( + MinLastAlive, MinLastAliveWillMsg, emqx_persistent_session_ds_state:make_session_iterator() + ). + +gc_context() -> GCInterval = emqx_config:get([session_persistence, session_gc_interval]), BumpInterval = emqx_config:get([session_persistence, last_alive_update_interval]), TimeThreshold = max(GCInterval, BumpInterval) * 3, - MinLastAlive = now_ms() - TimeThreshold, - gc_loop(MinLastAlive, emqx_persistent_session_ds_state:make_session_iterator()). + NowMS = now_ms(), + #{ + min_last_alive => NowMS - TimeThreshold, + %% For will messages, we don't need to be so strict as session GC (GC interval is + %% of the order of ~ 10 minutes by default, bump interval ~ 100 ms), otherwise + %% most will be sent very late. + min_last_alive_will_msg => NowMS - BumpInterval * 5, + time_threshold => TimeThreshold, + bump_interval => BumpInterval, + gc_interval => GCInterval + }. -gc_loop(MinLastAlive, It0) -> +gc_loop(MinLastAlive, MinLastAliveWillMsg, It0) -> GCBatchSize = emqx_config:get([session_persistence, session_gc_batch_size]), case emqx_persistent_session_ds_state:session_iterator_next(It0, GCBatchSize) of {[], _It} -> ok; {Sessions, It} -> - [do_gc(SessionId, MinLastAlive, Metadata) || {SessionId, Metadata} <- Sessions], - gc_loop(MinLastAlive, It) + [ + do_gc(SessionId, MinLastAliveWillMsg, MinLastAlive, Metadata) + || {SessionId, Metadata} <- Sessions + ], + gc_loop(MinLastAlive, MinLastAliveWillMsg, It) end. -do_gc(SessionId, MinLastAlive, Metadata) -> - #{?last_alive_at := LastAliveAt, ?expiry_interval := EI} = Metadata, - case LastAliveAt + EI < MinLastAlive of +do_gc(SessionId, MinLastAliveWillMsg, MinLastAlive, Metadata) -> + #{ + ?last_alive_at := LastAliveAt, + ?expiry_interval := EI, + ?will_message := MaybeWillMessage, + ?clientinfo := ClientInfo + } = Metadata, + IsExpired = LastAliveAt + EI < MinLastAlive, + case + should_send_will_message( + MaybeWillMessage, ClientInfo, IsExpired, LastAliveAt, MinLastAliveWillMsg + ) + of + {true, PreparedMessage} -> + _ = emqx_broker:publish(PreparedMessage), + ok = emqx_persistent_session_ds_state:clear_will_message_now(SessionId), + ?tp(session_gc_published_will_msg, #{id => SessionId, msg => PreparedMessage}), + ok; + false -> + ok + end, + case IsExpired of true -> emqx_persistent_session_ds:destroy_session(SessionId), ?tp(debug, ds_session_gc_cleaned, #{ @@ -134,3 +194,35 @@ do_gc(SessionId, MinLastAlive, Metadata) -> false -> ok end. + +should_send_will_message( + undefined = _WillMsg, _ClientInfo, _IsExpired, _LastAliveAt, _MinLastAliveWillMsg +) -> + false; +should_send_will_message(WillMsg, ClientInfo, IsExpired, LastAliveAt, MinLastAliveWillMsg) -> + WillDelayIntervalS = emqx_channel:will_delay_interval(WillMsg), + WillDelayInterval = timer:seconds(WillDelayIntervalS), + PastWillDelay = LastAliveAt + WillDelayInterval < MinLastAliveWillMsg, + case PastWillDelay orelse IsExpired of + true -> + case emqx_channel:prepare_will_message_for_publishing(ClientInfo, WillMsg) of + {ok, PreparedMessage} -> + {true, PreparedMessage}; + {error, _} -> + false + end; + false -> + false + end. + +do_check_session(SessionId) -> + case emqx_persistent_session_ds_state:print_session(SessionId) of + #{metadata := Metadata} -> + #{ + min_last_alive := MinLastAlive, + min_last_alive_will_msg := MinLastAliveWillMsg + } = gc_context(), + do_gc(SessionId, MinLastAliveWillMsg, MinLastAlive, Metadata); + _ -> + ok + end. diff --git a/apps/emqx/src/emqx_persistent_session_ds_state.erl b/apps/emqx/src/emqx_persistent_session_ds_state.erl index 5de74fb06..28297964d 100644 --- a/apps/emqx/src/emqx_persistent_session_ds_state.erl +++ b/apps/emqx/src/emqx_persistent_session_ds_state.erl @@ -30,7 +30,8 @@ -export([get_created_at/1, set_created_at/2]). -export([get_last_alive_at/1, set_last_alive_at/2]). -export([get_expiry_interval/1, set_expiry_interval/2]). --export([get_will_message/1, set_will_message/2]). +-export([get_clientinfo/1, set_clientinfo/2]). +-export([get_will_message/1, set_will_message/2, clear_will_message/1, clear_will_message_now/1]). -export([get_peername/1, set_peername/2]). -export([new_id/1]). -export([get_stream/2, put_stream/3, del_stream/2, fold_streams/3]). @@ -291,6 +292,14 @@ get_peername(Rec) -> set_peername(Val, Rec) -> set_meta(?peername, Val, Rec). +-spec get_clientinfo(t()) -> emqx_maybe:t(emqx_types:clientinfo()). +get_clientinfo(Rec) -> + get_meta(?clientinfo, Rec). + +-spec set_clientinfo(emqx_types:clientinfo(), t()) -> t(). +set_clientinfo(Val, Rec) -> + set_meta(?clientinfo, Val, Rec). + -spec get_will_message(t()) -> emqx_maybe:t(message()). get_will_message(Rec) -> get_meta(?will_message, Rec). @@ -299,6 +308,23 @@ get_will_message(Rec) -> set_will_message(Val, Rec) -> set_meta(?will_message, Val, Rec). +-spec clear_will_message_now(emqx_persistent_session_ds:id()) -> ok. +clear_will_message_now(SessionId) when is_binary(SessionId) -> + transaction(fun() -> + case kv_restore(?session_tab, SessionId) of + [Metadata0] -> + Metadata = Metadata0#{?will_message => undefined}, + kv_persist(?session_tab, SessionId, Metadata), + ok; + [] -> + ok + end + end). + +-spec clear_will_message(t()) -> t(). +clear_will_message(Rec) -> + set_will_message(undefined, Rec). + -spec new_id(t()) -> {emqx_persistent_session_ds:subscription_id(), t()}. new_id(Rec) -> LastId = diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index 63963c242..37a86bda6 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -90,7 +90,8 @@ %% Will message handling -export([ - publish_will_message/2 + clear_will_message/1, + publish_will_message_now/2 ]). % Timers @@ -185,7 +186,8 @@ -callback open(clientinfo(), conninfo(), emqx_maybe:t(message()), conf()) -> {_IsPresent :: true, t(), _ReplayContext} | false. -callback destroy(t() | clientinfo()) -> ok. --callback publish_will_message(t(), message()) -> ok. +-callback clear_will_message(t()) -> t(). +-callback publish_will_message_now(t(), message()) -> t(). %%-------------------------------------------------------------------- %% Create a Session @@ -645,6 +647,10 @@ run_hook(Name, Args) -> %% Will message handling %%-------------------------------------------------------------------- --spec publish_will_message(t(), message()) -> ok. -publish_will_message(Session, WillMsg) -> - ?IMPL(Session):publish_will_message(Session, WillMsg). +-spec clear_will_message(t()) -> t(). +clear_will_message(Session) -> + ?IMPL(Session):clear_will_message(Session). + +-spec publish_will_message_now(t(), message()) -> t(). +publish_will_message_now(Session, WillMsg) -> + ?IMPL(Session):publish_will_message_now(Session, WillMsg). diff --git a/apps/emqx/src/emqx_session_mem.erl b/apps/emqx/src/emqx_session_mem.erl index 95222a4b3..c9c4a71e7 100644 --- a/apps/emqx/src/emqx_session_mem.erl +++ b/apps/emqx/src/emqx_session_mem.erl @@ -108,7 +108,8 @@ %% Will message handling -export([ - publish_will_message/2 + clear_will_message/1, + publish_will_message_now/2 ]). %% Export for CT @@ -808,10 +809,14 @@ next_pkt_id(Session = #session{next_pkt_id = Id}) -> %% Will message handling %%-------------------------------------------------------------------- --spec publish_will_message(session(), message()) -> ok. -publish_will_message(#session{}, #message{} = WillMsg) -> +-spec clear_will_message(session()) -> session(). +clear_will_message(#session{} = Session) -> + Session. + +-spec publish_will_message_now(session(), message()) -> session(). +publish_will_message_now(#session{} = Session, #message{} = WillMsg) -> _ = emqx_broker:publish(WillMsg), - ok. + Session. %%-------------------------------------------------------------------- %% Helper functions diff --git a/apps/emqx/src/proto/emqx_cm_proto_v2.erl b/apps/emqx/src/proto/emqx_cm_proto_v2.erl index ec86cd237..93e9cd959 100644 --- a/apps/emqx/src/proto/emqx_cm_proto_v2.erl +++ b/apps/emqx/src/proto/emqx_cm_proto_v2.erl @@ -20,6 +20,7 @@ -export([ introduced_in/0, + deprecated_since/0, lookup_client/2, kickout_client/2, @@ -39,6 +40,9 @@ introduced_in() -> "5.0.0". +deprecated_since() -> + "5.7.0". + -spec kickout_client(node(), emqx_types:clientid()) -> ok | {badrpc, _}. kickout_client(Node, ClientId) -> rpc:call(Node, emqx_cm, kick_session, [ClientId]). diff --git a/apps/emqx/src/proto/emqx_cm_proto_v3.erl b/apps/emqx/src/proto/emqx_cm_proto_v3.erl new file mode 100644 index 000000000..e0423cd5e --- /dev/null +++ b/apps/emqx/src/proto/emqx_cm_proto_v3.erl @@ -0,0 +1,106 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_cm_proto_v3). + +-behaviour(emqx_bpapi). + +-export([ + introduced_in/0, + + lookup_client/2, + kickout_client/2, + + get_chan_stats/2, + get_chan_info/2, + get_chann_conn_mod/2, + + takeover_session/2, + takeover_finish/2, + kick_session/3, + + %% Introduced in v3 + takeover_kick_session/2 +]). + +-include("bpapi.hrl"). +-include_lib("emqx/include/emqx_cm.hrl"). + +introduced_in() -> + "5.7.0". + +-spec kickout_client(node(), emqx_types:clientid()) -> ok | {badrpc, _}. +kickout_client(Node, ClientId) -> + rpc:call(Node, emqx_cm, kick_session, [ClientId]). + +-spec lookup_client(node(), {clientid, emqx_types:clientid()} | {username, emqx_types:username()}) -> + [emqx_cm:channel_info()] | {badrpc, _}. +lookup_client(Node, Key) -> + rpc:call(Node, emqx_cm, lookup_client, [Key]). + +-spec get_chan_stats(emqx_types:clientid(), emqx_cm:chan_pid()) -> + emqx_types:stats() | undefined | {badrpc, _}. +get_chan_stats(ClientId, ChanPid) -> + rpc:call(node(ChanPid), emqx_cm, do_get_chan_stats, [ClientId, ChanPid], ?T_GET_INFO * 2). + +-spec get_chan_info(emqx_types:clientid(), emqx_cm:chan_pid()) -> + emqx_types:infos() | undefined | {badrpc, _}. +get_chan_info(ClientId, ChanPid) -> + rpc:call(node(ChanPid), emqx_cm, do_get_chan_info, [ClientId, ChanPid], ?T_GET_INFO * 2). + +-spec get_chann_conn_mod(emqx_types:clientid(), emqx_cm:chan_pid()) -> + module() | undefined | {badrpc, _}. +get_chann_conn_mod(ClientId, ChanPid) -> + rpc:call(node(ChanPid), emqx_cm, do_get_chann_conn_mod, [ClientId, ChanPid], ?T_GET_INFO * 2). + +-spec takeover_session(emqx_types:clientid(), emqx_cm:chan_pid()) -> + none + | {living, _ConnMod :: atom(), emqx_cm:chan_pid(), emqx_session:session()} + %% NOTE: v5.3.0 + | {living, _ConnMod :: atom(), emqx_session:session()} + | {expired | persistent, emqx_session:session()} + | {badrpc, _}. +takeover_session(ClientId, ChanPid) -> + rpc:call(node(ChanPid), emqx_cm, takeover_session, [ClientId, ChanPid], ?T_TAKEOVER * 2). + +-spec takeover_finish(module(), emqx_cm:chan_pid()) -> + {ok, emqx_types:takeover_data()} + | {ok, list(emqx_types:deliver())} + | {error, term()} + | {badrpc, _}. +takeover_finish(ConnMod, ChanPid) -> + erpc:call( + node(ChanPid), + emqx_cm, + takeover_finish, + [ConnMod, ChanPid], + ?T_TAKEOVER * 2 + ). + +-spec kick_session(kick | discard, emqx_types:clientid(), emqx_cm:chan_pid()) -> ok | {badrpc, _}. +kick_session(Action, ClientId, ChanPid) -> + rpc:call(node(ChanPid), emqx_cm, do_kick_session, [Action, ClientId, ChanPid], ?T_KICK * 2). + +%%-------------------------------------------------------------------------------- +%% Introduced in v3 +%%-------------------------------------------------------------------------------- + +-spec takeover_kick_session(emqx_types:clientid(), emqx_cm:chan_pid()) -> + ok | {badrpc, _}. +takeover_kick_session(ClientId, ChanPid) -> + rpc:call( + node(ChanPid), emqx_cm, do_takeover_kick_session_v3, [ClientId, ChanPid], ?T_TAKEOVER * 2 + ). diff --git a/apps/emqx/test/emqx_cth_suite.erl b/apps/emqx/test/emqx_cth_suite.erl index 6a3eea5d9..65b4364d0 100644 --- a/apps/emqx/test/emqx_cth_suite.erl +++ b/apps/emqx/test/emqx_cth_suite.erl @@ -150,6 +150,7 @@ when work_dir := file:name() }. start(Apps, SuiteOpts = #{work_dir := WorkDir}) -> + emqx_common_test_helpers:clear_screen(), % 1. Prepare appspec instructions AppSpecs = [mk_appspec(App, SuiteOpts) || App <- Apps], % 2. Load every app so that stuff scanning attributes of loaded modules works diff --git a/apps/emqx/test/emqx_persistent_session_SUITE.erl b/apps/emqx/test/emqx_persistent_session_SUITE.erl index 088023ec2..674e1a8d9 100644 --- a/apps/emqx/test/emqx_persistent_session_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_session_SUITE.erl @@ -77,6 +77,7 @@ init_per_group(persistence_enabled, Config) -> " enable = true\n" " last_alive_update_interval = 100ms\n" " renew_streams_interval = 100ms\n" + " session_gc_interval = 2s\n" "}"}, {persistence, ds} | Config @@ -1241,15 +1242,13 @@ t_multiple_subscription_matches(Config) -> ?assertEqual({ok, 2}, maps:find(qos, Msg2)), ok = emqtt:disconnect(Client2). -%% Check that we get a single will message when the client disconnects with a non -%% successfull reason code, with `Will-Delay-Interval' = `Session-Expiry-Interval' > 0, -%% QoS = 1. -t_will_message1(Config) -> +%% Check that we don't get a will message when the client disconnects with success reason +%% code, with `Will-Delay-Interval' = 0, `Session-Expiry-Interval' > 0, QoS = 1. +t_no_will_message(Config) -> ConnFun = ?config(conn_fun, Config), WillTopic = ?config(topic, Config), WillPayload = <<"will message">>, ClientId = ?config(client_id, Config), - ok = emqx_hooks:add('client.connack', {?MODULE, on_client_connack, [self()]}, _Prio = 1000), ?check_trace( #{timetrap => 15_000}, @@ -1262,26 +1261,77 @@ t_will_message1(Config) -> {will_topic, WillTopic}, {will_payload, WillPayload}, {will_qos, 1}, - {will_props, #{'Will-Delay-Interval' => 1}} + {will_props, #{'Will-Delay-Interval' => 0}} | Config ]), - {{ok, _}, {ok, _}} = - ?wait_async_action(emqtt:ConnFun(Client), #{?snk_kind := client_connack}), - ok = emqtt:disconnect(Client, ?RC_UNSPECIFIED_ERROR), + {ok, _} = emqtt:ConnFun(Client), + ok = emqtt:disconnect(Client, ?RC_SUCCESS), - ?assertReceive({deliver, WillTopic, #message{payload = WillPayload}}, 5_000), - %% No duplicates - ?assertNotReceive({deliver, WillTopic, _}, 1_000), + %% No will message + ?assertNotReceive({deliver, WillTopic, _}, 5_000), ok end, [] ), ok. -t_will_message1(init, Config) -> - Config; -t_will_message1('end', _Config) -> - ok = emqx_hooks:del('client.connack', {?MODULE, on_client_connack}), + +%% Check that we get a single will message when the client disconnects with a non +%% successfull reason code, with `Will-Delay-Interval' = `Session-Expiry-Interval' > 0, +%% QoS = 1. +t_will_message1(Config) -> + do_t_will_message(Config, #{will_delay => 1, session_expiry => 1}), + ok. + +%% Check that we get a single will message when the client disconnects with a non +%% successfull reason code, with `Will-Delay-Interval' = 0, `Session-Expiry-Interval' > 0, +%% QoS = 1. +t_will_message2(Config) -> + do_t_will_message(Config, #{will_delay => 0, session_expiry => 1}), + ok. + +%% Check that we get a single will message when the client disconnects with a non +%% successfull reason code, with `Will-Delay-Interval' >> `Session-Expiry-Interval' > 0, +%% QoS = 1. +t_will_message3(Config) -> + do_t_will_message(Config, #{will_delay => 300, session_expiry => 1}), + ok. + +do_t_will_message(Config, Opts) -> + #{ + session_expiry := SessionExpiry, + will_delay := WillDelay + } = Opts, + ConnFun = ?config(conn_fun, Config), + WillTopic = ?config(topic, Config), + WillPayload = <<"will message">>, + ClientId = ?config(client_id, Config), + + ?check_trace( + #{timetrap => 15_000}, + begin + ok = emqx:subscribe(WillTopic, #{qos => 2}), + {ok, Client} = emqtt:start_link([ + {clientid, ClientId}, + {proto_ver, v5}, + {properties, #{'Session-Expiry-Interval' => SessionExpiry}}, + {will_topic, WillTopic}, + {will_payload, WillPayload}, + {will_qos, 1}, + {will_props, #{'Will-Delay-Interval' => WillDelay}} + | Config + ]), + {ok, _} = emqtt:ConnFun(Client), + ok = emqtt:disconnect(Client, ?RC_UNSPECIFIED_ERROR), + + ?assertReceive({deliver, WillTopic, #message{payload = WillPayload}}, 10_000), + %% No duplicates + ?assertNotReceive({deliver, WillTopic, _}, 100), + + ok + end, + [] + ), ok. get_topicwise_order(Msgs) -> @@ -1311,7 +1361,3 @@ pick_respective_msgs(MsgRefs, Msgs) -> debug_info(ClientId) -> Info = emqx_persistent_session_ds:print_session(ClientId), ct:pal("*** State:~n~p", [Info]). - -on_client_connack(_ConnInfo, _ReasonCode, _Props, _TestPid) -> - ?tp(client_connack, #{}), - ok. diff --git a/apps/emqx/test/emqx_takeover_SUITE.erl b/apps/emqx/test/emqx_takeover_SUITE.erl index 4cc9f29bf..c9fdeed51 100644 --- a/apps/emqx/test/emqx_takeover_SUITE.erl +++ b/apps/emqx/test/emqx_takeover_SUITE.erl @@ -23,6 +23,7 @@ -include_lib("emqx/include/asserts.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). +-include_lib("emqx_utils/include/emqx_message.hrl"). -define(CNT, 100). -define(SLEEP, 10). @@ -32,25 +33,15 @@ all() -> [ - {group, mqttv3}, - {group, mqttv5} + {group, persistence_disabled}, + {group, persistence_enabled} ]. -init_per_suite(Config) -> - Apps = emqx_cth_suite:start( - [emqx], - #{work_dir => emqx_cth_suite:work_dir(Config)} - ), - emqx_logger:set_log_level(debug), - [{apps, Apps} | Config]. - -end_per_suite(Config) -> - Apps = ?config(apps, Config), - ok = emqx_cth_suite:stop(Apps), - ok. - groups() -> + MQTTGroups = [{group, G} || G <- [mqttv3, mqttv5]], [ + {persistence_enabled, MQTTGroups}, + {persistence_disabled, MQTTGroups}, {mqttv3, [], emqx_common_test_helpers:all(?MODULE) -- tc_v5_only()}, {mqttv5, [], emqx_common_test_helpers:all(?MODULE)} ]. @@ -67,11 +58,55 @@ tc_v5_only() -> t_takeover_session_then_abnormal_disconnect_2 ]. +init_per_suite(Config) -> + emqx_common_test_helpers:clear_screen(), + Config. + +end_per_suite(_Config) -> + ok. + +init_per_group(persistence_enabled = Group, Config) -> + Apps = emqx_cth_suite:start( + [ + {emqx, + "session_persistence = {\n" + " enable = true\n" + " last_alive_update_interval = 100ms\n" + " renew_streams_interval = 100ms\n" + " session_gc_interval = 2s\n" + "}\n"} + ], + #{work_dir => emqx_cth_suite:work_dir(Group, Config)} + ), + emqx_logger:set_log_level(debug), + [ + {apps, Apps}, + {persistence_enabled, true} + | Config + ]; +init_per_group(persistence_disabled = Group, Config) -> + Apps = emqx_cth_suite:start( + [{emqx, "session_persistence.enable = false"}], + #{work_dir => emqx_cth_suite:work_dir(Group, Config)} + ), + emqx_logger:set_log_level(debug), + [ + {apps, Apps}, + {persistence_enabled, false} + | Config + ]; init_per_group(mqttv3, Config) -> lists:keystore(mqtt_vsn, 1, Config, {mqtt_vsn, v3}); init_per_group(mqttv5, Config) -> lists:keystore(mqtt_vsn, 1, Config, {mqtt_vsn, v5}). +end_per_group(Group, Config) when + Group =:= persistence_disabled; + Group =:= persistence_enabled +-> + Apps = ?config(apps, Config), + ok = emqx_cth_suite:stop(Apps), + ok; end_per_group(_Group, _Config) -> ok. @@ -97,19 +132,34 @@ t_takeover(Config) -> ok = timer:sleep(?SLEEP * 2), meck:passthrough([Arg]) end), + meck:expect(emqx_cm, takeover_kick, fun(Arg) -> + %% trigger more complex takeover conditions during 2-phase takeover protocol: + %% when messages are accumulated in 2 processes simultaneously, + %% and need to be properly ordered / deduplicated after the protocol commences. + ok = timer:sleep(?SLEEP * 2), + meck:passthrough([Arg]) + end), Commands = - [{fun start_client/5, [ClientId, ClientId, ?QOS_1, ClientOpts]}] ++ - [{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs] ++ - [{fun start_client/5, [ClientId, ClientId, ?QOS_1, ClientOpts]}] ++ - [{fun publish_msg/2, [Msg]} || Msg <- Client2Msgs] ++ - [{fun stop_client/1, []}], + lists:flatten([ + {fun start_client/5, [ClientId, ClientId, ?QOS_1, ClientOpts]}, + {fun maybe_wait_subscriptions/1, []}, + [{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs], + {fun start_client/5, [ClientId, ClientId, ?QOS_1, ClientOpts]}, + [{fun publish_msg/2, [Msg]} || Msg <- Client2Msgs], + {fun stop_client/1, []} + ]), + Sleep = + case ?config(persistence_enabled, Config) of + true -> 1_500; + false -> ?SLEEP + end, FCtx = lists:foldl( fun({Fun, Args}, Ctx) -> ct:pal("COMMAND: ~p ~p", [element(2, erlang:fun_info(Fun, name)), Args]), apply(Fun, [Ctx | Args]) end, - #{}, + #{persistence_enabled => ?config(persistence_enabled, Config), sleep => Sleep}, Commands ), @@ -117,7 +167,7 @@ t_takeover(Config) -> assert_client_exit(CPid1, ?config(mqtt_vsn, Config), takenover), ?assertReceive({'EXIT', CPid2, normal}), - Received = [Msg || {publish, Msg} <- ?drainMailbox(?SLEEP)], + Received = [Msg || {publish, Msg} <- ?drainMailbox(Sleep)], ct:pal("middle: ~p", [Middle]), ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), assert_messages_missed(AllMsgs, Received), @@ -141,30 +191,36 @@ t_takeover_willmsg(Config) -> {will_qos, 0} ], Commands = - %% GIVEN client connect with will message - [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++ - [ - {fun start_client/5, [ - <>/binary>>, WillTopic, ?QOS_1, [] - ]} - ] ++ - [{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs] ++ + lists:flatten([ + %% GIVEN client connect with will message + {fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}, + {fun maybe_wait_subscriptions/1, []}, + {fun start_client/5, [ + <>/binary>>, WillTopic, ?QOS_1, [] + ]}, + [{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs], %% WHEN client reconnect with clean_start = false - [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++ - [{fun publish_msg/2, [Msg]} || Msg <- Client2Msgs], + {fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}, + [{fun publish_msg/2, [Msg]} || Msg <- Client2Msgs] + ]), FCtx = lists:foldl( fun({Fun, Args}, Ctx) -> ct:pal("COMMAND: ~p ~p", [element(2, erlang:fun_info(Fun, name)), Args]), apply(Fun, [Ctx | Args]) end, - #{}, + #{persistence_enabled => ?config(persistence_enabled, Config)}, Commands ), #{client := [CPid2, CPidSub, CPid1]} = FCtx, assert_client_exit(CPid1, ?config(mqtt_vsn, Config), takenover), - Received = [Msg || {publish, Msg} <- ?drainMailbox(?SLEEP)], + Sleep = + case ?config(persistence_enabled, Config) of + true -> 2_000; + false -> ?SLEEP + end, + Received = [Msg || {publish, Msg} <- ?drainMailbox(Sleep)], ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), {IsWill, ReceivedNoWill} = filter_payload(Received, <<"willpayload">>), assert_messages_missed(AllMsgs, ReceivedNoWill), @@ -297,7 +353,7 @@ t_no_takeover_with_delayed_willmsg(Config) -> ?config(mqtt_vsn, Config) =:= v5 orelse ct:fail("MQTTv5 Only"), process_flag(trap_exit, true), ClientId = atom_to_binary(?FUNCTION_NAME), - WillTopic = <>/binary>>, + WillTopic = <>, Client1Msgs = messages(ClientId, 0, 10), WillOpts = [ {proto_ver, ?config(mqtt_vsn, Config)}, @@ -312,24 +368,25 @@ t_no_takeover_with_delayed_willmsg(Config) -> ], Commands = %% GIVEN: client connect with willmsg payload <<"willpayload_delay3">> and delay-interval 3s - [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++ + lists:flatten( [ - {fun start_client/5, [ - <>/binary>>, WillTopic, ?QOS_1, [] - ]} - ] ++ - [{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs], + {fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}, + {fun maybe_wait_subscriptions/1, []}, + {fun start_client/5, [<>, WillTopic, ?QOS_1, []]}, + [{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs] + ] + ), FCtx = lists:foldl( fun({Fun, Args}, Ctx) -> ct:pal("COMMAND: ~p ~p", [element(2, erlang:fun_info(Fun, name)), Args]), apply(Fun, [Ctx | Args]) end, - #{}, + #{persistence_enabled => ?config(persistence_enabled, Config)}, Commands ), - Received = [Msg || {publish, Msg} <- ?drainMailbox(1000)], + Received = [Msg || {publish, Msg} <- ?drainMailbox(2000)], ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), assert_messages_missed(Client1Msgs, Received), {IsWill0, ReceivedNoWill0} = filter_payload(Received, <<"willpayload_delay3">>), @@ -369,15 +426,15 @@ t_session_expire_with_delayed_willmsg(Config) -> {properties, #{'Session-Expiry-Interval' => 3}} ], Commands = - %% GIVEN: client connect with willmsg payload <<"willpayload_delay10">> - %% and delay-interval 10s > session expiry 3s. - [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++ - [ - {fun start_client/5, [ - <>/binary>>, WillTopic, ?QOS_1, [] - ]} - ] ++ - [{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs], + lists:flatten([ + %% GIVEN: client connect with willmsg payload <<"willpayload_delay10">> + %% and delay-interval 10s > session expiry 3s. + {fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}, + {fun start_client/5, [ + <>/binary>>, WillTopic, ?QOS_1, [] + ]}, + [{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs] + ]), FCtx = lists:foldl( fun({Fun, Args}, Ctx) -> @@ -388,7 +445,7 @@ t_session_expire_with_delayed_willmsg(Config) -> Commands ), - Received = [Msg || {publish, Msg} <- ?drainMailbox(1000)], + Received = [Msg || {publish, Msg} <- ?drainMailbox(2000)], ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), {IsWill, ReceivedNoWill} = filter_payload(Received, <<"willpayload_delay10">>), ?assertNot(IsWill), @@ -404,7 +461,15 @@ t_session_expire_with_delayed_willmsg(Config) -> ?assertNot(IsWill1), ?assertEqual([], ReceivedNoWill1), %% THEN: for MQTT v5, payload <<"willpayload_delay3">> should be published after session expiry. - Received2 = [Msg || {publish, Msg} <- ?drainMailbox(5000)], + SessionSleep = + case ?config(persistence_enabled, Config) of + true -> + %% Session GC uses a larger, safer cutoff time. + 10_000; + false -> + 5_000 + end, + Received2 = [Msg || {publish, Msg} <- ?drainMailbox(SessionSleep)], {IsWill12, ReceivedNoWill2} = filter_payload(Received2, <<"willpayload_delay10">>), ?assertEqual([], ReceivedNoWill2), ?assert(IsWill12), @@ -493,25 +558,23 @@ t_takeover_before_session_expire(Config) -> Commands = %% GIVEN: client connect with willmsg payload <<"willpayload_delay10">> %% and delay-interval 10s > session expiry 3s. - [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++ - [ - {fun start_client/5, [ - <>/binary>>, WillTopic, ?QOS_1, [] - ]} - ] ++ - [{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs] ++ - [ - %% avoid two clients race for session takeover - { - fun(CTX) -> - timer:sleep(100), - CTX - end, - [] - } - ] ++ + lists:flatten([ + {fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}, + {fun start_client/5, [ + <>/binary>>, WillTopic, ?QOS_1, [] + ]}, + [{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs], + %% avoid two clients race for session takeover + { + fun(CTX) -> + timer:sleep(100), + CTX + end, + [] + }, %% WHEN: client session is taken over within 3s. - [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}], + {fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]} + ]), FCtx = lists:foldl( fun({Fun, Args}, Ctx) -> @@ -540,7 +603,7 @@ t_takeover_session_then_normal_disconnect(Config) -> ?config(mqtt_vsn, Config) =:= v5 orelse ct:fail("MQTTv5 Only"), process_flag(trap_exit, true), ClientId = atom_to_binary(?FUNCTION_NAME), - WillTopic = <>/binary>>, + WillTopic = <>, Client1Msgs = messages(ClientId, 0, 10), WillOpts = [ {proto_ver, ?config(mqtt_vsn, Config)}, @@ -552,40 +615,42 @@ t_takeover_session_then_normal_disconnect(Config) -> {properties, #{'Session-Expiry-Interval' => 3}} ], Commands = - [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++ - [ - {fun start_client/5, [ - <>/binary>>, WillTopic, ?QOS_1, [] - ]} - ] ++ - [{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs] ++ - [ - %% avoid two clients race for session takeover - { - fun(CTX) -> - timer:sleep(100), - CTX - end, - [] - } - ] ++ + lists:flatten([ + {fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}, + {fun maybe_wait_subscriptions/1, []}, + {fun start_client/5, [ + <>, WillTopic, ?QOS_1, [] + ]}, + [{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs], + %% avoid two clients race for session takeover + { + fun(CTX) -> + timer:sleep(100), + CTX + end, + [] + }, %% GIVEN: client reconnect with willmsg payload <<"willpayload_delay10">> %% and delay-interval 10s > session expiry 3s. - [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}], + {fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}, + {fun maybe_wait_subscriptions/1, []} + ]), FCtx = lists:foldl( fun({Fun, Args}, Ctx) -> ct:pal("COMMAND: ~p ~p", [element(2, erlang:fun_info(Fun, name)), Args]), apply(Fun, [Ctx | Args]) end, - #{}, + #{persistence_enabled => ?config(persistence_enabled, Config)}, Commands ), #{client := [CPid2, CPidSub, CPid1]} = FCtx, assert_client_exit(CPid1, ?config(mqtt_vsn, Config), takenover), + Received1 = [Msg || {publish, Msg} <- ?drainMailbox(1000)], %% WHEN: client disconnect normally. emqtt:disconnect(CPid2, ?RC_SUCCESS), - Received = [Msg || {publish, Msg} <- ?drainMailbox(1000)], + Received2 = [Msg || {publish, Msg} <- ?drainMailbox(1000)], + Received = Received1 ++ Received2, ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), {IsWill, ReceivedNoWill} = filter_payload(Received, <<"willpayload_delay10">>), %% THEN: willmsg is not published. @@ -600,7 +665,7 @@ t_takeover_session_then_abnormal_disconnect(Config) -> ?config(mqtt_vsn, Config) =:= v5 orelse ct:fail("MQTTv5 Only"), process_flag(trap_exit, true), ClientId = atom_to_binary(?FUNCTION_NAME), - WillTopic = <>/binary>>, + WillTopic = <>, Client1Msgs = messages(ClientId, 0, 10), WillOpts = [ {proto_ver, ?config(mqtt_vsn, Config)}, @@ -612,26 +677,24 @@ t_takeover_session_then_abnormal_disconnect(Config) -> {properties, #{'Session-Expiry-Interval' => 3}} ], Commands = - %% GIVEN: client connect with willmsg payload <<"willpayload_delay10">> - %% and will-delay-interval 10s > session expiry 3s. - [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++ - [ - {fun start_client/5, [ - <>/binary>>, WillTopic, ?QOS_1, [] - ]} - ] ++ - [{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs] ++ - [ - %% avoid two clients race for session takeover - { - fun(CTX) -> - timer:sleep(100), - CTX - end, - [] - } - ] ++ - [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}], + lists:flatten([ + %% GIVEN: client connect with willmsg payload <<"willpayload_delay10">> + %% and will-delay-interval 10s > session expiry 3s. + {fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}, + {fun start_client/5, [ + <>, WillTopic, ?QOS_1, [] + ]}, + %% avoid two clients race for session takeover + { + fun(CTX) -> + timer:sleep(100), + CTX + end, + [] + }, + [{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs], + {fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]} + ]), FCtx = lists:foldl( fun({Fun, Args}, Ctx) -> @@ -643,16 +706,26 @@ t_takeover_session_then_abnormal_disconnect(Config) -> ), #{client := [CPid2, CPidSub, CPid1]} = FCtx, assert_client_exit(CPid1, ?config(mqtt_vsn, Config), takenover), + Received1 = [Msg || {publish, Msg} <- ?drainMailbox(1000)], %% WHEN: client disconnect abnormally emqtt:disconnect(CPid2, ?RC_DISCONNECT_WITH_WILL_MESSAGE), - Received = [Msg || {publish, Msg} <- ?drainMailbox(1000)], + Received2 = [Msg || {publish, Msg} <- ?drainMailbox(1000)], + Received = Received1 ++ Received2, ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), {IsWill, ReceivedNoWill} = filter_payload(Received, <<"willpayload_delay10">>), %% THEN: willmsg is not published before session expiry ?assertNot(IsWill), ?assertNotEqual([], ReceivedNoWill), - Received1 = [Msg || {publish, Msg} <- ?drainMailbox(3000)], - {IsWill1, ReceivedNoWill1} = filter_payload(Received1, <<"willpayload_delay10">>), + SessionSleep = + case ?config(persistence_enabled, Config) of + true -> + %% Session GC uses a larger, safer cutoff time (GC interval x 3) + 10_000; + false -> + 3_000 + end, + Received3 = [Msg || {publish, Msg} <- ?drainMailbox(SessionSleep)], + {IsWill1, ReceivedNoWill1} = filter_payload(Received3, <<"willpayload_delay10">>), %% AND THEN: willmsg is published after session expiry ?assert(IsWill1), ?assertEqual([], ReceivedNoWill1), @@ -866,19 +939,39 @@ start_client(Ctx, ClientId, Topic, Qos, Opts) -> end), Ctx#{client => [CPid | maps:get(client, Ctx, [])]}. +maybe_wait_subscriptions(Ctx = #{persistence_enabled := true, client := CPids}) -> + ok = do_wait_subscription(CPids), + Ctx; +maybe_wait_subscriptions(Ctx) -> + Ctx. + +do_wait_subscription([]) -> + ok; +do_wait_subscription([CPid | Rest]) -> + try emqtt:subscriptions(CPid) of + [] -> + ok = timer:sleep(rand:uniform(?SLEEP)), + do_wait_subscription([CPid | Rest]); + [_ | _] -> + do_wait_subscription(Rest) + catch + exit:{noproc, _} -> + ok + end. + kick_client(Ctx, ClientId) -> ok = emqx_cm:kick_session(ClientId), Ctx. publish_msg(Ctx, Msg) -> ok = timer:sleep(rand:uniform(?SLEEP)), - case emqx:publish(Msg) of + case emqx:publish(Msg#message{timestamp = emqx_message:timestamp_now()}) of [] -> publish_msg(Ctx, Msg); [_ | _] -> Ctx end. -stop_client(Ctx = #{client := [CPid | _]}) -> - ok = timer:sleep(?SLEEP), +stop_client(Ctx = #{client := [CPid | _], sleep := Sleep}) -> + ok = timer:sleep(Sleep), ok = emqtt:stop(CPid), Ctx. @@ -904,14 +997,18 @@ assert_messages_missed(Ls1, Ls2) -> error end. -assert_messages_order([], []) -> +assert_messages_order([] = _Expected, _Received) -> ok; assert_messages_order([Msg | Expected], Received) -> %% Account for duplicate messages: case lists:splitwith(fun(#{payload := P}) -> emqx_message:payload(Msg) == P end, Received) of - {[], [#{payload := Mismatch} | _]} -> + {[], [#{timestamp := TSMismatch, payload := Mismatch} | _]} -> ct:fail("Message order is not correct, expected: ~p, received: ~p", [ - emqx_message:payload(Msg), Mismatch + { + emqx_utils_calendar:epoch_to_rfc3339(emqx_message:timestamp(Msg)), + emqx_message:payload(Msg) + }, + {emqx_utils_calendar:epoch_to_rfc3339(TSMismatch), Mismatch} ]), error; {_Matching, Rest} ->