From 0547b3272702784c707ab74aec4cd51631b38c8e Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Tue, 19 Mar 2024 16:02:47 +0100 Subject: [PATCH] feat(sessds): Add zone overrides for session durability settings --- apps/emqx/src/emqx_persistent_message.erl | 23 +++++++--- apps/emqx/src/emqx_persistent_session_ds.erl | 14 ++++-- apps/emqx/src/emqx_session.erl | 47 +++++++++----------- apps/emqx/src/emqx_zone_schema.erl | 3 +- apps/emqx/test/emqx_config_SUITE.erl | 14 +++++- changes/ce/feat-12739.en.md | 2 + 6 files changed, 66 insertions(+), 37 deletions(-) create mode 100644 changes/ce/feat-12739.en.md diff --git a/apps/emqx/src/emqx_persistent_message.erl b/apps/emqx/src/emqx_persistent_message.erl index 36ad8a3df..e3fa23296 100644 --- a/apps/emqx/src/emqx_persistent_message.erl +++ b/apps/emqx/src/emqx_persistent_message.erl @@ -19,9 +19,10 @@ -behaviour(emqx_config_handler). -include("emqx.hrl"). +-include_lib("emqx/include/logger.hrl"). -export([init/0]). --export([is_persistence_enabled/0, force_ds/0]). +-export([is_persistence_enabled/0, is_persistence_enabled/1, force_ds/1]). %% Config handler -export([add_handler/0, pre_config_update/3]). @@ -32,6 +33,7 @@ ]). -define(PERSISTENT_MESSAGE_DB, emqx_persistent_message). +-define(PERSISTENCE_ENABLED, emqx_message_persistence_enabled). -define(WHEN_ENABLED(DO), case is_persistence_enabled() of @@ -43,7 +45,14 @@ %%-------------------------------------------------------------------- init() -> + %% Note: currently persistence can't be enabled or disabled in the + %% runtime. If persistence is enabled for any of the zones, we + %% consider durability feature to be on: + Zones = maps:keys(emqx_config:get([zones])), + IsEnabled = lists:any(fun is_persistence_enabled/1, Zones), + persistent_term:put(?PERSISTENCE_ENABLED, IsEnabled), ?WHEN_ENABLED(begin + ?SLOG(notice, #{msg => "Session durability is enabled"}), Backend = storage_backend(), ok = emqx_ds:open_db(?PERSISTENT_MESSAGE_DB, Backend), ok = emqx_persistent_session_ds_router:init_tables(), @@ -53,7 +62,11 @@ init() -> -spec is_persistence_enabled() -> boolean(). is_persistence_enabled() -> - emqx_config:get([session_persistence, enable]). + persistent_term:get(?PERSISTENCE_ENABLED). + +-spec is_persistence_enabled(emqx_types:zone()) -> boolean(). +is_persistence_enabled(Zone) -> + emqx_config:get_zone_conf(Zone, [session_persistence, enable]). -spec storage_backend() -> emqx_ds:create_db_opts(). storage_backend() -> @@ -61,9 +74,9 @@ storage_backend() -> %% Dev-only option: force all messages to go through %% `emqx_persistent_session_ds': --spec force_ds() -> boolean(). -force_ds() -> - emqx_config:get([session_persistence, force_persistence]). +-spec force_ds(emqx_types:zone()) -> boolean(). +force_ds(Zone) -> + emqx_config:get_zone_conf(Zone, [session_persistence, force_persistence]). storage_backend(Path) -> ConfigTree = #{'_config_handler' := {Module, Function}} = emqx_config:get(Path), diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 8cf3cb284..679ce3164 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -471,7 +471,7 @@ handle_timeout(ClientInfo, ?TIMER_PULL, Session0) -> Timeout = case Publishes of [] -> - emqx_config:get([session_persistence, idle_poll_interval]); + get_config(ClientInfo, [idle_poll_interval]); [_ | _] -> 0 end, @@ -480,10 +480,10 @@ handle_timeout(ClientInfo, ?TIMER_PULL, Session0) -> handle_timeout(ClientInfo, ?TIMER_RETRY_REPLAY, Session0) -> Session = replay_streams(Session0, ClientInfo), {ok, [], Session}; -handle_timeout(_ClientInfo, ?TIMER_GET_STREAMS, Session0 = #{s := S0}) -> +handle_timeout(ClientInfo, ?TIMER_GET_STREAMS, Session0 = #{s := S0}) -> S1 = emqx_persistent_session_ds_subs:gc(S0), S = emqx_persistent_session_ds_stream_scheduler:renew_streams(S1), - Interval = emqx_config:get([session_persistence, renew_streams_interval]), + Interval = get_config(ClientInfo, [renew_streams_interval]), Session = emqx_session:ensure_timer( ?TIMER_GET_STREAMS, Interval, @@ -768,7 +768,7 @@ fetch_new_messages(Session = #{s := S}, ClientInfo) -> fetch_new_messages([], Session, _ClientInfo) -> Session; fetch_new_messages([I | Streams], Session0 = #{inflight := Inflight}, ClientInfo) -> - BatchSize = emqx_config:get([session_persistence, batch_size]), + BatchSize = get_config(ClientInfo, [batch_size]), case emqx_persistent_session_ds_inflight:n_buffered(all, Inflight) >= BatchSize of true -> %% Buffer is full: @@ -1024,9 +1024,15 @@ receive_maximum(ConnInfo) -> expiry_interval(ConnInfo) -> maps:get(expiry_interval, ConnInfo, 0). +%% Note: we don't allow overriding `last_alive_update_interval' per +%% zone, since the GC process is responsible for all sessions +%% regardless of the zone. bump_interval() -> emqx_config:get([session_persistence, last_alive_update_interval]). +get_config(#{zone := Zone}, Key) -> + emqx_config:get_zone_conf(Zone, [session_persistence | Key]). + -spec try_get_live_session(emqx_types:clientid()) -> {pid(), session()} | not_found | not_persistent. try_get_live_session(ClientId) -> diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index de9af5388..0723452f1 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -192,7 +192,7 @@ create(ClientInfo, ConnInfo) -> create(ClientInfo, ConnInfo, Conf) -> % FIXME error conditions - create(choose_impl_mod(ConnInfo), ClientInfo, ConnInfo, Conf). + create(hd(choose_impl_candidates(ClientInfo, ConnInfo)), ClientInfo, ConnInfo, Conf). create(Mod, ClientInfo, ConnInfo, Conf) -> % FIXME error conditions @@ -205,7 +205,7 @@ create(Mod, ClientInfo, ConnInfo, Conf) -> {_IsPresent :: true, t(), _ReplayContext} | {_IsPresent :: false, t()}. open(ClientInfo, ConnInfo) -> Conf = get_session_conf(ClientInfo), - Mods = [Default | _] = choose_impl_candidates(ConnInfo), + Mods = [Default | _] = choose_impl_candidates(ClientInfo, ConnInfo), %% NOTE %% Try to look the existing session up in session stores corresponding to the given %% `Mods` in order, starting from the last one. @@ -253,7 +253,7 @@ destroy(ClientInfo, ConnInfo) -> %% 0, and later reconnects with `Session-Expiry-Interval' = 0 and `clean_start' = %% true. So we may simply destroy sessions from all implementations, since the key %% (ClientID) is the same. - Mods = choose_impl_candidates(ConnInfo), + Mods = choose_impl_candidates(ClientInfo, ConnInfo), lists:foreach(fun(Mod) -> Mod:destroy(ClientInfo) end, Mods). -spec destroy(t()) -> ok. @@ -610,31 +610,26 @@ maybe_mock_impl_mod(Session) -> error(noimpl, [Session]). -endif. --spec choose_impl_mod(conninfo()) -> module(). -choose_impl_mod(#{expiry_interval := EI}) -> - hd(choose_impl_candidates(EI, emqx_persistent_message:is_persistence_enabled())). - --spec choose_impl_candidates(conninfo()) -> [module()]. -choose_impl_candidates(#{expiry_interval := EI}) -> - choose_impl_candidates(EI, emqx_persistent_message:is_persistence_enabled()). - -choose_impl_candidates(_, _IsPSStoreEnabled = false) -> - [emqx_session_mem]; -choose_impl_candidates(0, _IsPSStoreEnabled = true) -> - case emqx_persistent_message:force_ds() of +choose_impl_candidates(#{zone := Zone}, #{expiry_interval := EI}) -> + case emqx_persistent_message:is_persistence_enabled(Zone) of false -> - %% NOTE - %% If ExpiryInterval is 0, the natural choice is - %% `emqx_session_mem'. Yet we still need to look the - %% existing session up in the `emqx_persistent_session_ds' - %% store first, because previous connection may have set - %% ExpiryInterval to a non-zero value. - [emqx_session_mem, emqx_persistent_session_ds]; + [emqx_session_mem]; true -> - [emqx_persistent_session_ds] - end; -choose_impl_candidates(EI, _IsPSStoreEnabled = true) when EI > 0 -> - [emqx_persistent_session_ds]. + Force = emqx_persistent_message:force_ds(Zone), + case EI of + 0 when not Force -> + %% NOTE + %% If ExpiryInterval is 0, the natural choice is + %% `emqx_session_mem'. Yet we still need to look + %% the existing session up in the + %% `emqx_persistent_session_ds' store first, + %% because previous connection may have set + %% ExpiryInterval to a non-zero value. + [emqx_session_mem, emqx_persistent_session_ds]; + _ -> + [emqx_persistent_session_ds] + end + end. -compile({inline, [run_hook/2]}). run_hook(Name, Args) -> diff --git a/apps/emqx/src/emqx_zone_schema.erl b/apps/emqx/src/emqx_zone_schema.erl index aafd499b7..563edf6a6 100644 --- a/apps/emqx/src/emqx_zone_schema.erl +++ b/apps/emqx/src/emqx_zone_schema.erl @@ -33,7 +33,8 @@ roots() -> force_shutdown, conn_congestion, force_gc, - overload_protection + overload_protection, + session_persistence ]. zones_without_default() -> diff --git a/apps/emqx/test/emqx_config_SUITE.erl b/apps/emqx/test/emqx_config_SUITE.erl index 77fd28f11..c2dc22288 100644 --- a/apps/emqx/test/emqx_config_SUITE.erl +++ b/apps/emqx/test/emqx_config_SUITE.erl @@ -463,5 +463,17 @@ zone_global_defaults() -> backoff_new_conn => true, enable => false }, - stats => #{enable => true} + stats => #{enable => true}, + session_persistence => + #{ + enable => false, + batch_size => 100, + force_persistence => false, + idle_poll_interval => 100, + last_alive_update_interval => 5000, + message_retention_period => 86400000, + renew_streams_interval => 5000, + session_gc_batch_size => 100, + session_gc_interval => 600000 + } }. diff --git a/changes/ce/feat-12739.en.md b/changes/ce/feat-12739.en.md new file mode 100644 index 000000000..833918fc5 --- /dev/null +++ b/changes/ce/feat-12739.en.md @@ -0,0 +1,2 @@ +Make it possible to override `session_persistence` settings per zone. +Since durable sessions are inherently more expensive to maintain than the regular sessions, it's desirable to grant the operator finer control of session durability for different classes of clients.