diff --git a/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl b/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl index 736c911ff..1f358e3c0 100644 --- a/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl +++ b/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl @@ -25,6 +25,7 @@ all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> + emqx_common_test_helpers:clear_screen(), TCApps = emqx_cth_suite:start( app_specs(), #{work_dir => emqx_cth_suite:work_dir(Config)} @@ -244,8 +245,9 @@ t_session_subscription_idempotency(Config) -> fun(Trace) -> ct:pal("trace:\n ~p", [Trace]), ConnInfo = #{peername => {undefined, undefined}}, + WillMsg = undefined, Session = erpc:call( - Node1, emqx_persistent_session_ds, session_open, [ClientId, ConnInfo] + Node1, emqx_persistent_session_ds, session_open, [ClientId, ConnInfo, WillMsg] ), ?assertMatch( #{SubTopicFilter := #{}}, @@ -321,8 +323,9 @@ t_session_unsubscription_idempotency(Config) -> fun(Trace) -> ct:pal("trace:\n ~p", [Trace]), ConnInfo = #{peername => {undefined, undefined}}, + WillMsg = undefined, Session = erpc:call( - Node1, emqx_persistent_session_ds, session_open, [ClientId, ConnInfo] + Node1, emqx_persistent_session_ds, session_open, [ClientId, ConnInfo, WillMsg] ), ?assertEqual( #{}, diff --git a/apps/emqx/priv/bpapi.versions b/apps/emqx/priv/bpapi.versions index 11ac8f582..71cfccdea 100644 --- a/apps/emqx/priv/bpapi.versions +++ b/apps/emqx/priv/bpapi.versions @@ -26,6 +26,7 @@ {emqx_ds,4}. {emqx_eviction_agent,1}. {emqx_eviction_agent,2}. +{emqx_eviction_agent,3}. {emqx_exhook,1}. {emqx_ft_storage_exporter_fs,1}. {emqx_ft_storage_fs,1}. diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index a3480e232..59786afea 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -584,11 +584,12 @@ process_connect( AckProps, Channel = #channel{ conninfo = ConnInfo, - clientinfo = ClientInfo + clientinfo = ClientInfo, + will_msg = MaybeWillMsg } ) -> #{clean_start := CleanStart} = ConnInfo, - case emqx_cm:open_session(CleanStart, ClientInfo, ConnInfo) of + case emqx_cm:open_session(CleanStart, ClientInfo, ConnInfo, MaybeWillMsg) of {ok, #{session := Session, present := false}} -> NChannel = Channel#channel{session = Session}, handle_out(connack, {?RC_SUCCESS, sp(false), AckProps}, ensure_connected(NChannel)); @@ -1019,7 +1020,6 @@ handle_out(connack, {?RC_SUCCESS, SP, Props}, Channel = #channel{conninfo = Conn [ConnInfo, emqx_reason_codes:name(?RC_SUCCESS)], AckProps ), - return_connack( ?CONNACK_PACKET(?RC_SUCCESS, SP, NAckProps), ensure_keepalive(NAckProps, Channel) @@ -1378,9 +1378,9 @@ handle_timeout(_TRef, expire_session, Channel = #channel{session = Session}) -> handle_timeout( _TRef, will_message = TimerName, - Channel = #channel{clientinfo = ClientInfo, will_msg = WillMsg} + Channel = #channel{will_msg = WillMsg} ) -> - (WillMsg =/= undefined) andalso publish_will_msg(ClientInfo, WillMsg), + (WillMsg =/= undefined) andalso publish_will_msg(Channel), {ok, clean_timer(TimerName, Channel#channel{will_msg = undefined})}; handle_timeout( _TRef, @@ -2302,19 +2302,17 @@ maybe_publish_will_msg( maybe_publish_will_msg( _Reason, Channel = #channel{ - conninfo = #{proto_ver := ?MQTT_PROTO_V3, clientid := ClientId}, will_msg = WillMsg + conninfo = #{proto_ver := ?MQTT_PROTO_V3, clientid := ClientId} } ) -> %% Unconditionally publish will message for MQTT 3.1.1 ?tp(debug, maybe_publish_willmsg_v3, #{clientid => ClientId}), - _ = publish_will_msg(Channel#channel.clientinfo, WillMsg), + _ = publish_will_msg(Channel), Channel#channel{will_msg = undefined}; maybe_publish_will_msg( Reason, Channel = #channel{ - clientinfo = ClientInfo, - conninfo = #{clientid := ClientId}, - will_msg = WillMsg + conninfo = #{clientid := ClientId} } ) when Reason =:= expired orelse @@ -2332,12 +2330,11 @@ maybe_publish_will_msg( %% This ensures willmsg will be published if the willmsg timer is scheduled but not fired %% OR fired but not yet handled ?tp(debug, maybe_publish_willmsg_session_ends, #{clientid => ClientId, reason => Reason}), - _ = publish_will_msg(ClientInfo, WillMsg), + _ = publish_will_msg(Channel), remove_willmsg(Channel); maybe_publish_will_msg( takenover, Channel = #channel{ - clientinfo = ClientInfo, will_msg = WillMsg, conninfo = #{clientid := ClientId} } @@ -2355,7 +2352,7 @@ maybe_publish_will_msg( case will_delay_interval(WillMsg) of 0 -> ?tp(debug, maybe_publish_willmsg_takenover_pub, #{clientid => ClientId}), - _ = publish_will_msg(ClientInfo, WillMsg); + _ = publish_will_msg(Channel); I when I > 0 -> %% @NOTE Non-normative comment in MQTT 5.0 spec %% """ @@ -2370,7 +2367,6 @@ maybe_publish_will_msg( maybe_publish_will_msg( Reason, Channel = #channel{ - clientinfo = ClientInfo, will_msg = WillMsg, conninfo = #{clientid := ClientId} } @@ -2381,7 +2377,7 @@ maybe_publish_will_msg( ?tp(debug, maybe_publish_will_msg_other_publish, #{ clientid => ClientId, reason => Reason }), - _ = publish_will_msg(ClientInfo, WillMsg), + _ = publish_will_msg(Channel), remove_willmsg(Channel); I when I > 0 -> ?tp(debug, maybe_publish_will_msg_other_delay, #{clientid => ClientId, reason => Reason}), @@ -2396,8 +2392,11 @@ will_delay_interval(WillMsg) -> ). publish_will_msg( - ClientInfo = #{mountpoint := MountPoint}, - Msg = #message{topic = Topic} + #channel{ + session = Session, + clientinfo = ClientInfo = #{mountpoint := MountPoint}, + will_msg = Msg = #message{topic = Topic} + } ) -> Action = authz_action(Msg), PublishingDisallowed = emqx_access_control:authorize(ClientInfo, Action, Topic) =/= allow, @@ -2417,7 +2416,7 @@ publish_will_msg( false -> NMsg = emqx_mountpoint:mount(MountPoint, Msg), NMsg2 = NMsg#message{timestamp = erlang:system_time(millisecond)}, - _ = emqx_broker:publish(NMsg2), + ok = emqx_session:publish_will_message(Session, NMsg2), ok end. diff --git a/apps/emqx/src/emqx_cm.erl b/apps/emqx/src/emqx_cm.erl index c14058f9a..072611949 100644 --- a/apps/emqx/src/emqx_cm.erl +++ b/apps/emqx/src/emqx_cm.erl @@ -47,7 +47,7 @@ ]). -export([ - open_session/3, + open_session/4, discard_session/1, discard_session/2, takeover_session_begin/1, @@ -110,6 +110,8 @@ chan_pid/0 ]). +-type message() :: emqx_types:message(). + -type chan_pid() :: pid(). -type channel_info() :: { @@ -266,24 +268,29 @@ set_chan_stats(ClientId, ChanPid, Stats) when ?IS_CLIENTID(ClientId) -> end. %% @doc Open a session. --spec open_session(_CleanStart :: boolean(), emqx_types:clientinfo(), emqx_types:conninfo()) -> +-spec open_session( + _CleanStart :: boolean(), + emqx_types:clientinfo(), + emqx_types:conninfo(), + emqx_maybe:t(message()) +) -> {ok, #{ session := emqx_session:t(), present := boolean(), replay => _ReplayContext }} | {error, Reason :: term()}. -open_session(_CleanStart = true, ClientInfo = #{clientid := ClientId}, ConnInfo) -> +open_session(_CleanStart = true, ClientInfo = #{clientid := ClientId}, ConnInfo, MaybeWillMsg) -> Self = self(), emqx_cm_locker:trans(ClientId, fun(_) -> ok = discard_session(ClientId), ok = emqx_session:destroy(ClientInfo, ConnInfo), - create_register_session(ClientInfo, ConnInfo, Self) + create_register_session(ClientInfo, ConnInfo, MaybeWillMsg, Self) end); -open_session(_CleanStart = false, ClientInfo = #{clientid := ClientId}, ConnInfo) -> +open_session(_CleanStart = false, ClientInfo = #{clientid := ClientId}, ConnInfo, MaybeWillMsg) -> Self = self(), emqx_cm_locker:trans(ClientId, fun(_) -> - case emqx_session:open(ClientInfo, ConnInfo) of + case emqx_session:open(ClientInfo, ConnInfo, MaybeWillMsg) of {true, Session, ReplayContext} -> ok = register_channel(ClientId, Self, ConnInfo), {ok, #{session => Session, present => true, replay => ReplayContext}}; @@ -293,8 +300,8 @@ open_session(_CleanStart = false, ClientInfo = #{clientid := ClientId}, ConnInfo end end). -create_register_session(ClientInfo = #{clientid := ClientId}, ConnInfo, ChanPid) -> - Session = emqx_session:create(ClientInfo, ConnInfo), +create_register_session(ClientInfo = #{clientid := ClientId}, ConnInfo, MaybeWillMsg, ChanPid) -> + Session = emqx_session:create(ClientInfo, ConnInfo, MaybeWillMsg), ok = register_channel(ClientId, ChanPid, ConnInfo), {ok, #{session => Session, present => false}}. diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index d0c839598..72014bd37 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -34,8 +34,8 @@ %% Session API -export([ - create/3, - open/3, + create/4, + open/4, destroy/1 ]). @@ -66,6 +66,11 @@ terminate/2 ]). +%% Will message handling +-export([ + publish_will_message/2 +]). + %% Managment APIs: -export([ list_client_subscriptions/1 @@ -88,7 +93,7 @@ -ifdef(TEST). -export([ - session_open/2, + session_open/3, list_all_sessions/0 ]). -endif. @@ -155,6 +160,7 @@ -type stream_state() :: #srs{}. +-type message() :: emqx_types:message(). -type timestamp() :: emqx_utils_calendar:epoch_millisecond(). -type millisecond() :: non_neg_integer(). -type clientinfo() :: emqx_types:clientinfo(). @@ -181,14 +187,14 @@ %% --spec create(clientinfo(), conninfo(), emqx_session:conf()) -> +-spec create(clientinfo(), conninfo(), emqx_maybe:t(message()), emqx_session:conf()) -> session(). -create(#{clientid := ClientID}, ConnInfo, Conf) -> - ensure_timers(session_ensure_new(ClientID, ConnInfo, Conf)). +create(#{clientid := ClientID}, ConnInfo, MaybeWillMsg, Conf) -> + ensure_timers(session_ensure_new(ClientID, ConnInfo, MaybeWillMsg, Conf)). --spec open(clientinfo(), conninfo(), emqx_session:conf()) -> +-spec open(clientinfo(), conninfo(), emqx_maybe:t(message()), emqx_session:conf()) -> {_IsPresent :: true, session(), []} | false. -open(#{clientid := ClientID} = _ClientInfo, ConnInfo, Conf) -> +open(#{clientid := ClientID} = _ClientInfo, ConnInfo, MaybeWillMsg, Conf) -> %% NOTE %% The fact that we need to concern about discarding all live channels here %% is essentially a consequence of the in-memory session design, where we @@ -196,7 +202,7 @@ open(#{clientid := ClientID} = _ClientInfo, ConnInfo, Conf) -> %% somehow isolate those idling not-yet-expired sessions into a separate process %% space, and move this call back into `emqx_cm` where it belongs. ok = emqx_cm:discard_session(ClientID), - case session_open(ClientID, ConnInfo) of + case session_open(ClientID, ConnInfo, MaybeWillMsg) of Session0 = #{} -> Session = Session0#{props => Conf}, {true, ensure_timers(Session), []}; @@ -679,9 +685,9 @@ sync(ClientId) -> %% %% Note: session API doesn't handle session takeovers, it's the job of %% the broker. --spec session_open(id(), emqx_types:conninfo()) -> +-spec session_open(id(), emqx_types:conninfo(), emqx_maybe:t(message())) -> session() | false. -session_open(SessionId, NewConnInfo) -> +session_open(SessionId, NewConnInfo, MaybeWillMsg) -> NowMS = now_ms(), case emqx_persistent_session_ds_state:open(SessionId) of {ok, S0} -> @@ -699,7 +705,8 @@ session_open(SessionId, NewConnInfo) -> S3 = emqx_persistent_session_ds_state:set_peername( maps:get(peername, NewConnInfo), S2 ), - S = emqx_persistent_session_ds_state:commit(S3), + S4 = emqx_persistent_session_ds_state:set_will_message(MaybeWillMsg, S3), + S = emqx_persistent_session_ds_state:commit(S4), Inflight = emqx_persistent_session_ds_inflight:new( receive_maximum(NewConnInfo) ), @@ -714,9 +721,14 @@ session_open(SessionId, NewConnInfo) -> false end. --spec session_ensure_new(id(), emqx_types:conninfo(), emqx_session:conf()) -> +-spec session_ensure_new( + id(), + emqx_types:conninfo(), + emqx_maybe:t(message()), + emqx_session:conf() +) -> session(). -session_ensure_new(Id, ConnInfo, Conf) -> +session_ensure_new(Id, ConnInfo, MaybeWillMsg, Conf) -> ?tp(debug, persistent_session_ds_ensure_new, #{id => Id}), Now = now_ms(), S0 = emqx_persistent_session_ds_state:create_new(Id), @@ -738,7 +750,8 @@ session_ensure_new(Id, ConnInfo, Conf) -> ?committed(?QOS_2) ] ), - S = emqx_persistent_session_ds_state:commit(S4), + S5 = emqx_persistent_session_ds_state:set_will_message(MaybeWillMsg, S4), + S = emqx_persistent_session_ds_state:commit(S5), #{ id => Id, props => Conf, @@ -1191,6 +1204,15 @@ seqno_diff(?QOS_1, A, B) -> seqno_diff(?QOS_2, A, B) -> A - B. +%%-------------------------------------------------------------------- +%% Will message handling +%%-------------------------------------------------------------------- + +-spec publish_will_message(session(), message()) -> ok. +publish_will_message(_Session, #message{} = _WillMsg) -> + %% TODO + ok. + %%-------------------------------------------------------------------- %% Tests %%-------------------------------------------------------------------- diff --git a/apps/emqx/src/emqx_persistent_session_ds.hrl b/apps/emqx/src/emqx_persistent_session_ds.hrl index 17e5d31e4..67ef485d5 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.hrl +++ b/apps/emqx/src/emqx_persistent_session_ds.hrl @@ -75,5 +75,6 @@ %% Unique integer used to create unique identities -define(last_id, last_id). -define(peername, peername). +-define(will_message, will_message). -endif. diff --git a/apps/emqx/src/emqx_persistent_session_ds_state.erl b/apps/emqx/src/emqx_persistent_session_ds_state.erl index 167709d8a..5de74fb06 100644 --- a/apps/emqx/src/emqx_persistent_session_ds_state.erl +++ b/apps/emqx/src/emqx_persistent_session_ds_state.erl @@ -30,6 +30,7 @@ -export([get_created_at/1, set_created_at/2]). -export([get_last_alive_at/1, set_last_alive_at/2]). -export([get_expiry_interval/1, set_expiry_interval/2]). +-export([get_will_message/1, set_will_message/2]). -export([get_peername/1, set_peername/2]). -export([new_id/1]). -export([get_stream/2, put_stream/3, del_stream/2, fold_streams/3]). @@ -58,6 +59,8 @@ %% Type declarations %%================================================================================ +-type message() :: emqx_types:message(). + -type subscriptions() :: emqx_topic_gbt:t(_SubId, emqx_persistent_session_ds:subscription()). -opaque session_iterator() :: emqx_persistent_session_ds:id() | '$end_of_table'. @@ -288,6 +291,14 @@ get_peername(Rec) -> set_peername(Val, Rec) -> set_meta(?peername, Val, Rec). +-spec get_will_message(t()) -> emqx_maybe:t(message()). +get_will_message(Rec) -> + get_meta(?will_message, Rec). + +-spec set_will_message(emqx_maybe:t(message()), t()) -> t(). +set_will_message(Val, Rec) -> + set_meta(?will_message, Val, Rec). + -spec new_id(t()) -> {emqx_persistent_session_ds:subscription_id(), t()}. new_id(Rec) -> LastId = diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index 0723452f1..63963c242 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -55,8 +55,8 @@ -endif. -export([ - create/2, - open/2, + create/3, + open/3, destroy/1, destroy/2 ]). @@ -88,6 +88,11 @@ terminate/3 ]). +%% Will message handling +-export([ + publish_will_message/2 +]). + % Timers -export([ ensure_timer/3, @@ -175,57 +180,57 @@ %% Behaviour %% ------------------------------------------------------------------- --callback create(clientinfo(), conninfo(), conf()) -> +-callback create(clientinfo(), conninfo(), emqx_maybe:t(message()), conf()) -> t(). --callback open(clientinfo(), conninfo(), conf()) -> +-callback open(clientinfo(), conninfo(), emqx_maybe:t(message()), conf()) -> {_IsPresent :: true, t(), _ReplayContext} | false. -callback destroy(t() | clientinfo()) -> ok. +-callback publish_will_message(t(), message()) -> ok. %%-------------------------------------------------------------------- %% Create a Session %%-------------------------------------------------------------------- --spec create(clientinfo(), conninfo()) -> t(). -create(ClientInfo, ConnInfo) -> +-spec create(clientinfo(), conninfo(), emqx_maybe:t(message())) -> t(). +create(ClientInfo, ConnInfo, MaybeWillMsg) -> Conf = get_session_conf(ClientInfo), - create(ClientInfo, ConnInfo, Conf). - -create(ClientInfo, ConnInfo, Conf) -> % FIXME error conditions - create(hd(choose_impl_candidates(ClientInfo, ConnInfo)), ClientInfo, ConnInfo, Conf). + create( + hd(choose_impl_candidates(ClientInfo, ConnInfo)), ClientInfo, ConnInfo, MaybeWillMsg, Conf + ). -create(Mod, ClientInfo, ConnInfo, Conf) -> +create(Mod, ClientInfo, ConnInfo, MaybeWillMsg, Conf) -> % FIXME error conditions - Session = Mod:create(ClientInfo, ConnInfo, Conf), + Session = Mod:create(ClientInfo, ConnInfo, MaybeWillMsg, Conf), ok = emqx_metrics:inc('session.created'), ok = emqx_hooks:run('session.created', [ClientInfo, info(Session)]), Session. --spec open(clientinfo(), conninfo()) -> +-spec open(clientinfo(), conninfo(), emqx_maybe:t(message())) -> {_IsPresent :: true, t(), _ReplayContext} | {_IsPresent :: false, t()}. -open(ClientInfo, ConnInfo) -> +open(ClientInfo, ConnInfo, MaybeWillMsg) -> Conf = get_session_conf(ClientInfo), Mods = [Default | _] = choose_impl_candidates(ClientInfo, ConnInfo), %% NOTE %% Try to look the existing session up in session stores corresponding to the given %% `Mods` in order, starting from the last one. - case try_open(Mods, ClientInfo, ConnInfo, Conf) of + case try_open(Mods, ClientInfo, ConnInfo, MaybeWillMsg, Conf) of {_IsPresent = true, _, _} = Present -> Present; false -> %% NOTE %% Nothing was found, create a new session with the `Default` implementation. - {false, create(Default, ClientInfo, ConnInfo, Conf)} + {false, create(Default, ClientInfo, ConnInfo, MaybeWillMsg, Conf)} end. -try_open([Mod | Rest], ClientInfo, ConnInfo, Conf) -> - case try_open(Rest, ClientInfo, ConnInfo, Conf) of +try_open([Mod | Rest], ClientInfo, ConnInfo, MaybeWillMsg, Conf) -> + case try_open(Rest, ClientInfo, ConnInfo, MaybeWillMsg, Conf) of {_IsPresent = true, _, _} = Present -> Present; false -> - Mod:open(ClientInfo, ConnInfo, Conf) + Mod:open(ClientInfo, ConnInfo, MaybeWillMsg, Conf) end; -try_open([], _ClientInfo, _ConnInfo, _Conf) -> +try_open([], _ClientInfo, _ConnInfo, _MaybeWillMsg, _Conf) -> false. -spec get_session_conf(clientinfo()) -> conf(). @@ -635,3 +640,11 @@ choose_impl_candidates(#{zone := Zone}, #{expiry_interval := EI}) -> run_hook(Name, Args) -> ok = emqx_metrics:inc(Name), emqx_hooks:run(Name, Args). + +%%-------------------------------------------------------------------- +%% Will message handling +%%-------------------------------------------------------------------- + +-spec publish_will_message(t(), message()) -> ok. +publish_will_message(Session, WillMsg) -> + ?IMPL(Session):publish_will_message(Session, WillMsg). diff --git a/apps/emqx/src/emqx_session_mem.erl b/apps/emqx/src/emqx_session_mem.erl index 439057384..95222a4b3 100644 --- a/apps/emqx/src/emqx_session_mem.erl +++ b/apps/emqx/src/emqx_session_mem.erl @@ -58,8 +58,8 @@ -endif. -export([ - create/3, - open/3, + create/4, + open/4, destroy/1 ]). @@ -106,6 +106,11 @@ dedup/4 ]). +%% Will message handling +-export([ + publish_will_message/2 +]). + %% Export for CT -export([set_field/3]). @@ -127,6 +132,7 @@ -type session() :: #session{}. -type replayctx() :: [emqx_types:message()]. +-type message() :: emqx_types:message(). -type clientinfo() :: emqx_types:clientinfo(). -type conninfo() :: emqx_session:conninfo(). -type replies() :: emqx_session:replies(). @@ -151,11 +157,12 @@ %% Init a Session %%-------------------------------------------------------------------- --spec create(clientinfo(), conninfo(), emqx_session:conf()) -> +-spec create(clientinfo(), conninfo(), emqx_maybe:t(message()), emqx_session:conf()) -> session(). create( #{zone := Zone, clientid := ClientId}, #{expiry_interval := EI, receive_maximum := ReceiveMax}, + _MaybeWillMsg, Conf ) -> QueueOpts = get_mqueue_conf(Zone), @@ -200,9 +207,9 @@ destroy(_Session) -> %% Open a (possibly existing) Session %%-------------------------------------------------------------------- --spec open(clientinfo(), conninfo(), emqx_session:conf()) -> +-spec open(clientinfo(), conninfo(), emqx_maybe:t(message()), emqx_session:conf()) -> {_IsPresent :: true, session(), replayctx()} | _IsPresent :: false. -open(ClientInfo = #{clientid := ClientId}, ConnInfo, Conf) -> +open(ClientInfo = #{clientid := ClientId}, ConnInfo, _MaybeWillMsg, Conf) -> case emqx_cm:takeover_session_begin(ClientId) of {ok, SessionRemote, TakeoverState} -> Session0 = resume(ClientInfo, SessionRemote), @@ -797,6 +804,15 @@ next_pkt_id(Session = #session{next_pkt_id = ?MAX_PACKET_ID}) -> next_pkt_id(Session = #session{next_pkt_id = Id}) -> Session#session{next_pkt_id = Id + 1}. +%%-------------------------------------------------------------------- +%% Will message handling +%%-------------------------------------------------------------------- + +-spec publish_will_message(session(), message()) -> ok. +publish_will_message(#session{}, #message{} = WillMsg) -> + _ = emqx_broker:publish(WillMsg), + ok. + %%-------------------------------------------------------------------- %% Helper functions %%-------------------------------------------------------------------- diff --git a/apps/emqx/test/emqx_channel_SUITE.erl b/apps/emqx/test/emqx_channel_SUITE.erl index 078272571..4b3fa1209 100644 --- a/apps/emqx/test/emqx_channel_SUITE.erl +++ b/apps/emqx/test/emqx_channel_SUITE.erl @@ -288,13 +288,7 @@ t_handle_in_puback_id_not_found(_) -> % ?assertEqual(#{puback_in => 1}, emqx_channel:info(pub_stats, Channel)). t_bad_receive_maximum(_) -> - ok = meck:expect( - emqx_cm, - open_session, - fun(true, _ClientInfo, _ConnInfo) -> - {ok, #{session => session(), present => false}} - end - ), + mock_cm_open_session(), emqx_config:put_zone_conf(default, [mqtt, response_information], test), C1 = channel(#{conn_state => idle}), {shutdown, protocol_error, _, _} = @@ -304,13 +298,7 @@ t_bad_receive_maximum(_) -> ). t_override_client_receive_maximum(_) -> - ok = meck:expect( - emqx_cm, - open_session, - fun(true, _ClientInfo, _ConnInfo) -> - {ok, #{session => session(), present => false}} - end - ), + mock_cm_open_session(), emqx_config:put_zone_conf(default, [mqtt, response_information], test), emqx_config:put_zone_conf(default, [mqtt, max_inflight], 0), C1 = channel(#{conn_state => idle}), @@ -460,13 +448,7 @@ t_handle_in_expected_packet(_) -> emqx_channel:handle_in(packet, channel()). t_process_connect(_) -> - ok = meck:expect( - emqx_cm, - open_session, - fun(true, _ClientInfo, _ConnInfo) -> - {ok, #{session => session(), present => false}} - end - ), + mock_cm_open_session(), {ok, [{event, connected}, {connack, ?CONNACK_PACKET(?RC_SUCCESS)}], _Chan} = emqx_channel:process_connect(#{}, channel(#{conn_state => idle})). @@ -604,13 +586,7 @@ t_handle_out_connack_sucess(_) -> ?assertEqual(connected, emqx_channel:info(conn_state, Channel)). t_handle_out_connack_response_information(_) -> - ok = meck:expect( - emqx_cm, - open_session, - fun(true, _ClientInfo, _ConnInfo) -> - {ok, #{session => session(), present => false}} - end - ), + mock_cm_open_session(), emqx_config:put_zone_conf(default, [mqtt, response_information], test), IdleChannel = channel(#{conn_state => idle}), {ok, @@ -624,13 +600,7 @@ t_handle_out_connack_response_information(_) -> ). t_handle_out_connack_not_response_information(_) -> - ok = meck:expect( - emqx_cm, - open_session, - fun(true, _ClientInfo, _ConnInfo) -> - {ok, #{session => session(), present => false}} - end - ), + mock_cm_open_session(), emqx_config:put_zone_conf(default, [mqtt, response_information], test), IdleChannel = channel(#{conn_state => idle}), {ok, [{event, connected}, {connack, ?CONNACK_PACKET(?RC_SUCCESS, 0, AckProps)}], _} = @@ -1017,13 +987,7 @@ t_ws_cookie_init(_) -> t_flapping_detect(_) -> emqx_config:put_zone_conf(default, [flapping_detect, window_time], 60000), Parent = self(), - ok = meck:expect( - emqx_cm, - open_session, - fun(true, _ClientInfo, _ConnInfo) -> - {ok, #{session => session(), present => false}} - end - ), + mock_cm_open_session(), ok = meck:expect(emqx_flapping, detect, fun(_) -> Parent ! flapping_detect end), IdleChannel = channel( clientinfo(#{ @@ -1128,7 +1092,8 @@ session(ClientInfo, InitFields) when is_map(InitFields) -> #{ receive_maximum => 0, expiry_interval => 0 - } + }, + _WillMsg = undefined ), maps:fold( fun(Field, Value, SessionAcc) -> @@ -1139,6 +1104,15 @@ session(ClientInfo, InitFields) when is_map(InitFields) -> InitFields ). +mock_cm_open_session() -> + ok = meck:expect( + emqx_cm, + open_session, + fun(true, _ClientInfo, _ConnInfo, _MaybeWillMsg) -> + {ok, #{session => session(), present => false}} + end + ). + %% conn: 5/s; overall: 10/s quota() -> emqx_limiter_container:get_limiter_by_types(?MODULE, [message_routing], limiter_cfg()). diff --git a/apps/emqx/test/emqx_cm_SUITE.erl b/apps/emqx/test/emqx_cm_SUITE.erl index e47cf6730..16245e272 100644 --- a/apps/emqx/test/emqx_cm_SUITE.erl +++ b/apps/emqx/test/emqx_cm_SUITE.erl @@ -59,6 +59,13 @@ init_per_suite(Config) -> end_per_suite(Config) -> emqx_cth_suite:stop(proplists:get_value(apps, Config)). +%%-------------------------------------------------------------------- +%% Helper fns +%%-------------------------------------------------------------------- + +open_session(CleanStart, ClientInfo, ConnInfo) -> + emqx_cm:open_session(CleanStart, ClientInfo, ConnInfo, _WillMsg = undefined). + %%-------------------------------------------------------------------- %% TODO: Add more test cases %%-------------------------------------------------------------------- @@ -129,10 +136,10 @@ t_open_session(_) -> receive_maximum => 100 }, {ok, #{session := Session1, present := false}} = - emqx_cm:open_session(true, ClientInfo, ConnInfo), + open_session(true, ClientInfo, ConnInfo), ?assertEqual(100, emqx_session:info(inflight_max, Session1)), {ok, #{session := Session2, present := false}} = - emqx_cm:open_session(true, ClientInfo, ConnInfo), + open_session(true, ClientInfo, ConnInfo), ?assertEqual(100, emqx_session:info(inflight_max, Session2)), emqx_cm:unregister_channel(<<"clientid">>), @@ -163,7 +170,7 @@ t_open_session_race_condition(_) -> Parent = self(), OpenASession = fun() -> timer:sleep(rand:uniform(100)), - OpenR = (emqx_cm:open_session(true, ClientInfo, ConnInfo)), + OpenR = open_session(true, ClientInfo, ConnInfo), Parent ! OpenR, case OpenR of {ok, _} -> diff --git a/apps/emqx/test/emqx_connection_SUITE.erl b/apps/emqx/test/emqx_connection_SUITE.erl index f3f470d1e..7fffa3374 100644 --- a/apps/emqx/test/emqx_connection_SUITE.erl +++ b/apps/emqx/test/emqx_connection_SUITE.erl @@ -679,7 +679,8 @@ channel(InitFields) -> }, Session = emqx_session:create( ClientInfo, - #{receive_maximum => 0, expiry_interval => 1000} + #{receive_maximum => 0, expiry_interval => 1000}, + _WillMsg = undefined ), maps:fold( fun(Field, Value, Channel) -> diff --git a/apps/emqx/test/emqx_persistent_session_SUITE.erl b/apps/emqx/test/emqx_persistent_session_SUITE.erl index a5c171f67..088023ec2 100644 --- a/apps/emqx/test/emqx_persistent_session_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_session_SUITE.erl @@ -21,6 +21,7 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). +-include_lib("emqx_utils/include/emqx_message.hrl"). -compile(export_all). -compile(nowarn_export_all). @@ -1240,6 +1241,49 @@ t_multiple_subscription_matches(Config) -> ?assertEqual({ok, 2}, maps:find(qos, Msg2)), ok = emqtt:disconnect(Client2). +%% Check that we get a single will message when the client disconnects with a non +%% successfull reason code, with `Will-Delay-Interval' = `Session-Expiry-Interval' > 0, +%% QoS = 1. +t_will_message1(Config) -> + ConnFun = ?config(conn_fun, Config), + WillTopic = ?config(topic, Config), + WillPayload = <<"will message">>, + ClientId = ?config(client_id, Config), + ok = emqx_hooks:add('client.connack', {?MODULE, on_client_connack, [self()]}, _Prio = 1000), + + ?check_trace( + #{timetrap => 15_000}, + begin + ok = emqx:subscribe(WillTopic, #{qos => 2}), + {ok, Client} = emqtt:start_link([ + {clientid, ClientId}, + {proto_ver, v5}, + {properties, #{'Session-Expiry-Interval' => 1}}, + {will_topic, WillTopic}, + {will_payload, WillPayload}, + {will_qos, 1}, + {will_props, #{'Will-Delay-Interval' => 1}} + | Config + ]), + {{ok, _}, {ok, _}} = + ?wait_async_action(emqtt:ConnFun(Client), #{?snk_kind := client_connack}), + ok = emqtt:disconnect(Client, ?RC_UNSPECIFIED_ERROR), + + ?assertReceive({deliver, WillTopic, #message{payload = WillPayload}}, 5_000), + %% No duplicates + ?assertNotReceive({deliver, WillTopic, _}, 1_000), + + ok + end, + [] + ), + ok. +t_will_message1(init, Config) -> + Config; +t_will_message1('end', _Config) -> + ok = emqx_hooks:del('client.connack', {?MODULE, on_client_connack}), + ok. + get_topicwise_order(Msgs) -> maps:groups_from_list(fun get_msgpub_topic/1, fun get_msgpub_payload/1, Msgs). @@ -1267,3 +1311,7 @@ pick_respective_msgs(MsgRefs, Msgs) -> debug_info(ClientId) -> Info = emqx_persistent_session_ds:print_session(ClientId), ct:pal("*** State:~n~p", [Info]). + +on_client_connack(_ConnInfo, _ReasonCode, _Props, _TestPid) -> + ?tp(client_connack, #{}), + ok. diff --git a/apps/emqx/test/emqx_session_mem_SUITE.erl b/apps/emqx/test/emqx_session_mem_SUITE.erl index ec98388d7..ec08a118d 100644 --- a/apps/emqx/test/emqx_session_mem_SUITE.erl +++ b/apps/emqx/test/emqx_session_mem_SUITE.erl @@ -67,6 +67,7 @@ t_session_init(_) -> Session = emqx_session_mem:create( ClientInfo, ConnInfo, + _WillMsg = undefined, emqx_session:get_session_conf(ClientInfo) ), ?assertEqual(#{}, emqx_session_mem:info(subscriptions, Session)), @@ -531,6 +532,7 @@ session(InitFields) when is_map(InitFields) -> Session = emqx_session_mem:create( ClientInfo, ConnInfo, + _WillMsg = undefined, emqx_session:get_session_conf(ClientInfo) ), maps:fold( diff --git a/apps/emqx/test/emqx_ws_connection_SUITE.erl b/apps/emqx/test/emqx_ws_connection_SUITE.erl index b9e40b454..59f201864 100644 --- a/apps/emqx/test/emqx_ws_connection_SUITE.erl +++ b/apps/emqx/test/emqx_ws_connection_SUITE.erl @@ -613,7 +613,8 @@ channel(InitFields) -> }, Session = emqx_session:create( ClientInfo, - #{receive_maximum => 0, expiry_interval => 0} + #{receive_maximum => 0, expiry_interval => 0}, + _WillMsg = undefined ), maps:fold( fun(Field, Value, Channel) -> diff --git a/apps/emqx_eviction_agent/src/emqx_eviction_agent.app.src b/apps/emqx_eviction_agent/src/emqx_eviction_agent.app.src index cc415d495..10a464f26 100644 --- a/apps/emqx_eviction_agent/src/emqx_eviction_agent.app.src +++ b/apps/emqx_eviction_agent/src/emqx_eviction_agent.app.src @@ -1,6 +1,6 @@ {application, emqx_eviction_agent, [ {description, "EMQX Eviction Agent"}, - {vsn, "5.1.5"}, + {vsn, "5.1.6"}, {registered, [ emqx_eviction_agent_sup, emqx_eviction_agent, diff --git a/apps/emqx_eviction_agent/src/emqx_eviction_agent.erl b/apps/emqx_eviction_agent/src/emqx_eviction_agent.erl index 2f0bc78a1..5c02da6fc 100644 --- a/apps/emqx_eviction_agent/src/emqx_eviction_agent.erl +++ b/apps/emqx_eviction_agent/src/emqx_eviction_agent.erl @@ -27,12 +27,15 @@ evict_connections/1, evict_sessions/2, evict_sessions/3, - purge_sessions/1, - evict_session_channel/3 + purge_sessions/1 ]). %% RPC targets --export([all_local_channels_count/0]). +-export([ + all_local_channels_count/0, + evict_session_channel/3, + do_evict_session_channel_v3/4 +]). -behaviour(gen_server). @@ -397,12 +400,23 @@ evict_session_channel(Nodes, ClientId, ConnInfo, ClientInfo) -> Res end. +%% RPC target for `emqx_eviction_agent_proto_v2' -spec evict_session_channel( emqx_types:clientid(), emqx_types:conninfo(), emqx_types:clientinfo() ) -> supervisor:startchild_ret(). evict_session_channel(ClientId, ConnInfo, ClientInfo) -> + do_evict_session_channel_v3(ClientId, ConnInfo, ClientInfo, _WillMsg = undefined). + +%% RPC target for `emqx_eviction_agent_proto_v3' +-spec do_evict_session_channel_v3( + emqx_types:clientid(), + emqx_types:conninfo(), + emqx_types:clientinfo(), + emqx_maybe:t(emqx_types:message()) +) -> supervisor:startchild_ret(). +do_evict_session_channel_v3(ClientId, ConnInfo, ClientInfo, MaybeWillMsg) -> ?SLOG(info, #{ msg => "evict_session_channel", client_id => ClientId, @@ -412,7 +426,8 @@ evict_session_channel(ClientId, ConnInfo, ClientInfo) -> Result = emqx_eviction_agent_channel:start_supervised( #{ conninfo => ConnInfo, - clientinfo => ClientInfo + clientinfo => ClientInfo, + will_message => MaybeWillMsg } ), ?SLOG( diff --git a/apps/emqx_eviction_agent/src/emqx_eviction_agent_channel.erl b/apps/emqx_eviction_agent/src/emqx_eviction_agent_channel.erl index 55cc4a2c9..20ee129f3 100644 --- a/apps/emqx_eviction_agent/src/emqx_eviction_agent_channel.erl +++ b/apps/emqx_eviction_agent/src/emqx_eviction_agent_channel.erl @@ -32,7 +32,8 @@ -type opts() :: #{ conninfo := emqx_types:conninfo(), - clientinfo := emqx_types:clientinfo() + clientinfo := emqx_types:clientinfo(), + will_message => emqx_maybe:t(emqx_types:message()) }. %%-------------------------------------------------------------------- @@ -81,11 +82,12 @@ stop(Pid) -> %% gen_server API %%-------------------------------------------------------------------- -init([#{conninfo := OldConnInfo, clientinfo := #{clientid := ClientId} = OldClientInfo}]) -> +init([#{conninfo := OldConnInfo, clientinfo := #{clientid := ClientId} = OldClientInfo} = Opts]) -> process_flag(trap_exit, true), ClientInfo = clientinfo(OldClientInfo), ConnInfo = conninfo(OldConnInfo), - case open_session(ConnInfo, ClientInfo) of + MaybeWillMsg = maps:get(will_message, Opts, undefined), + case open_session(ConnInfo, ClientInfo, MaybeWillMsg) of {ok, Channel0} -> case set_expiry_timer(Channel0) of {ok, Channel1} -> @@ -221,9 +223,9 @@ set_expiry_timer(#{conninfo := ConnInfo} = Channel) -> {error, should_be_expired} end. -open_session(ConnInfo, #{clientid := ClientId} = ClientInfo) -> +open_session(ConnInfo, #{clientid := ClientId} = ClientInfo, MaybeWillMsg) -> Channel = channel(ConnInfo, ClientInfo), - case emqx_cm:open_session(_CleanSession = false, ClientInfo, ConnInfo) of + case emqx_cm:open_session(_CleanSession = false, ClientInfo, ConnInfo, MaybeWillMsg) of {ok, #{present := false}} -> ?SLOG( info, diff --git a/apps/emqx_eviction_agent/src/proto/emqx_eviction_agent_proto_v2.erl b/apps/emqx_eviction_agent/src/proto/emqx_eviction_agent_proto_v2.erl index 7c8f47faa..0db75d326 100644 --- a/apps/emqx_eviction_agent/src/proto/emqx_eviction_agent_proto_v2.erl +++ b/apps/emqx_eviction_agent/src/proto/emqx_eviction_agent_proto_v2.erl @@ -8,6 +8,7 @@ -export([ introduced_in/0, + deprecated_since/0, evict_session_channel/4, @@ -20,6 +21,9 @@ introduced_in() -> "5.2.1". +deprecated_since() -> + "5.7.0". + -spec evict_session_channel( node(), emqx_types:clientid(), diff --git a/apps/emqx_eviction_agent/src/proto/emqx_eviction_agent_proto_v3.erl b/apps/emqx_eviction_agent/src/proto/emqx_eviction_agent_proto_v3.erl new file mode 100644 index 000000000..45950a006 --- /dev/null +++ b/apps/emqx_eviction_agent/src/proto/emqx_eviction_agent_proto_v3.erl @@ -0,0 +1,41 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_eviction_agent_proto_v3). + +-behaviour(emqx_bpapi). + +-export([ + introduced_in/0, + + all_channels_count/2, + + %% Changed in v3: + evict_session_channel/5 +]). + +-include_lib("emqx/include/bpapi.hrl"). + +introduced_in() -> + "5.7.0". + +-spec all_channels_count([node()], timeout()) -> emqx_rpc:erpc_multicall(non_neg_integer()). +all_channels_count(Nodes, Timeout) -> + erpc:multicall(Nodes, emqx_eviction_agent, all_local_channels_count, [], Timeout). + +%% Changed in v3: +-spec evict_session_channel( + node(), + emqx_types:clientid(), + emqx_types:conninfo(), + emqx_types:clientinfo(), + emqx_maybe:t(emqx_types:message()) +) -> supervisor:startchild_err() | emqx_rpc:badrpc(). +evict_session_channel(Node, ClientId, ConnInfo, ClientInfo, MaybeWillMsg) -> + rpc:call( + Node, + emqx_eviction_agent, + do_evict_session_channel_v3, + [ClientId, ConnInfo, ClientInfo, MaybeWillMsg] + ). diff --git a/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_channel.erl b/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_channel.erl index 79d4c2164..b840d53a3 100644 --- a/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_channel.erl +++ b/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_channel.erl @@ -373,10 +373,11 @@ process_connect( Channel = #channel{ ctx = Ctx, conninfo = ConnInfo = #{clean_start := CleanStart}, - clientinfo = ClientInfo + clientinfo = ClientInfo, + will_msg = MaybeWillMsg } ) -> - SessFun = fun(ClientInfoT, _) -> emqx_mqttsn_session:init(ClientInfoT) end, + SessFun = fun(ClientInfoT, _) -> emqx_mqttsn_session:init(ClientInfoT, MaybeWillMsg) end, case emqx_gateway_ctx:open_session( Ctx, diff --git a/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_session.erl b/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_session.erl index edae3dcf8..94f470f88 100644 --- a/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_session.erl +++ b/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_session.erl @@ -19,7 +19,7 @@ -export([registry/1, set_registry/2]). -export([ - init/1, + init/2, info/1, info/2, stats/1 @@ -52,12 +52,12 @@ -export_type([session/0]). -init(ClientInfo) -> +init(ClientInfo, MaybeWillMsg) -> ConnInfo = #{receive_maximum => 1, expiry_interval => 0}, SessionConf = emqx_session:get_session_conf(ClientInfo), #{ registry => emqx_mqttsn_registry:init(), - session => emqx_session_mem:create(ClientInfo, ConnInfo, SessionConf) + session => emqx_session_mem:create(ClientInfo, ConnInfo, MaybeWillMsg, SessionConf) }. registry(#{registry := Registry}) ->