feat(sessds): Add zone overrides for session durability settings

This commit is contained in:
ieQu1 2024-03-19 16:02:47 +01:00
parent 749ad73819
commit 0547b32727
No known key found for this signature in database
GPG Key ID: 488654DF3FED6FDE
6 changed files with 66 additions and 37 deletions

View File

@ -19,9 +19,10 @@
-behaviour(emqx_config_handler).
-include("emqx.hrl").
-include_lib("emqx/include/logger.hrl").
-export([init/0]).
-export([is_persistence_enabled/0, force_ds/0]).
-export([is_persistence_enabled/0, is_persistence_enabled/1, force_ds/1]).
%% Config handler
-export([add_handler/0, pre_config_update/3]).
@ -32,6 +33,7 @@
]).
-define(PERSISTENT_MESSAGE_DB, emqx_persistent_message).
-define(PERSISTENCE_ENABLED, emqx_message_persistence_enabled).
-define(WHEN_ENABLED(DO),
case is_persistence_enabled() of
@ -43,7 +45,14 @@
%%--------------------------------------------------------------------
init() ->
%% Note: currently persistence can't be enabled or disabled in the
%% runtime. If persistence is enabled for any of the zones, we
%% consider durability feature to be on:
Zones = maps:keys(emqx_config:get([zones])),
IsEnabled = lists:any(fun is_persistence_enabled/1, Zones),
persistent_term:put(?PERSISTENCE_ENABLED, IsEnabled),
?WHEN_ENABLED(begin
?SLOG(notice, #{msg => "Session durability is enabled"}),
Backend = storage_backend(),
ok = emqx_ds:open_db(?PERSISTENT_MESSAGE_DB, Backend),
ok = emqx_persistent_session_ds_router:init_tables(),
@ -53,7 +62,11 @@ init() ->
-spec is_persistence_enabled() -> boolean().
is_persistence_enabled() ->
emqx_config:get([session_persistence, enable]).
persistent_term:get(?PERSISTENCE_ENABLED).
-spec is_persistence_enabled(emqx_types:zone()) -> boolean().
is_persistence_enabled(Zone) ->
emqx_config:get_zone_conf(Zone, [session_persistence, enable]).
-spec storage_backend() -> emqx_ds:create_db_opts().
storage_backend() ->
@ -61,9 +74,9 @@ storage_backend() ->
%% Dev-only option: force all messages to go through
%% `emqx_persistent_session_ds':
-spec force_ds() -> boolean().
force_ds() ->
emqx_config:get([session_persistence, force_persistence]).
-spec force_ds(emqx_types:zone()) -> boolean().
force_ds(Zone) ->
emqx_config:get_zone_conf(Zone, [session_persistence, force_persistence]).
storage_backend(Path) ->
ConfigTree = #{'_config_handler' := {Module, Function}} = emqx_config:get(Path),

View File

@ -471,7 +471,7 @@ handle_timeout(ClientInfo, ?TIMER_PULL, Session0) ->
Timeout =
case Publishes of
[] ->
emqx_config:get([session_persistence, idle_poll_interval]);
get_config(ClientInfo, [idle_poll_interval]);
[_ | _] ->
0
end,
@ -480,10 +480,10 @@ handle_timeout(ClientInfo, ?TIMER_PULL, Session0) ->
handle_timeout(ClientInfo, ?TIMER_RETRY_REPLAY, Session0) ->
Session = replay_streams(Session0, ClientInfo),
{ok, [], Session};
handle_timeout(_ClientInfo, ?TIMER_GET_STREAMS, Session0 = #{s := S0}) ->
handle_timeout(ClientInfo, ?TIMER_GET_STREAMS, Session0 = #{s := S0}) ->
S1 = emqx_persistent_session_ds_subs:gc(S0),
S = emqx_persistent_session_ds_stream_scheduler:renew_streams(S1),
Interval = emqx_config:get([session_persistence, renew_streams_interval]),
Interval = get_config(ClientInfo, [renew_streams_interval]),
Session = emqx_session:ensure_timer(
?TIMER_GET_STREAMS,
Interval,
@ -768,7 +768,7 @@ fetch_new_messages(Session = #{s := S}, ClientInfo) ->
fetch_new_messages([], Session, _ClientInfo) ->
Session;
fetch_new_messages([I | Streams], Session0 = #{inflight := Inflight}, ClientInfo) ->
BatchSize = emqx_config:get([session_persistence, batch_size]),
BatchSize = get_config(ClientInfo, [batch_size]),
case emqx_persistent_session_ds_inflight:n_buffered(all, Inflight) >= BatchSize of
true ->
%% Buffer is full:
@ -1024,9 +1024,15 @@ receive_maximum(ConnInfo) ->
expiry_interval(ConnInfo) ->
maps:get(expiry_interval, ConnInfo, 0).
%% Note: we don't allow overriding `last_alive_update_interval' per
%% zone, since the GC process is responsible for all sessions
%% regardless of the zone.
bump_interval() ->
emqx_config:get([session_persistence, last_alive_update_interval]).
get_config(#{zone := Zone}, Key) ->
emqx_config:get_zone_conf(Zone, [session_persistence | Key]).
-spec try_get_live_session(emqx_types:clientid()) ->
{pid(), session()} | not_found | not_persistent.
try_get_live_session(ClientId) ->

View File

@ -192,7 +192,7 @@ create(ClientInfo, ConnInfo) ->
create(ClientInfo, ConnInfo, Conf) ->
% FIXME error conditions
create(choose_impl_mod(ConnInfo), ClientInfo, ConnInfo, Conf).
create(hd(choose_impl_candidates(ClientInfo, ConnInfo)), ClientInfo, ConnInfo, Conf).
create(Mod, ClientInfo, ConnInfo, Conf) ->
% FIXME error conditions
@ -205,7 +205,7 @@ create(Mod, ClientInfo, ConnInfo, Conf) ->
{_IsPresent :: true, t(), _ReplayContext} | {_IsPresent :: false, t()}.
open(ClientInfo, ConnInfo) ->
Conf = get_session_conf(ClientInfo),
Mods = [Default | _] = choose_impl_candidates(ConnInfo),
Mods = [Default | _] = choose_impl_candidates(ClientInfo, ConnInfo),
%% NOTE
%% Try to look the existing session up in session stores corresponding to the given
%% `Mods` in order, starting from the last one.
@ -253,7 +253,7 @@ destroy(ClientInfo, ConnInfo) ->
%% 0, and later reconnects with `Session-Expiry-Interval' = 0 and `clean_start' =
%% true. So we may simply destroy sessions from all implementations, since the key
%% (ClientID) is the same.
Mods = choose_impl_candidates(ConnInfo),
Mods = choose_impl_candidates(ClientInfo, ConnInfo),
lists:foreach(fun(Mod) -> Mod:destroy(ClientInfo) end, Mods).
-spec destroy(t()) -> ok.
@ -610,31 +610,26 @@ maybe_mock_impl_mod(Session) ->
error(noimpl, [Session]).
-endif.
-spec choose_impl_mod(conninfo()) -> module().
choose_impl_mod(#{expiry_interval := EI}) ->
hd(choose_impl_candidates(EI, emqx_persistent_message:is_persistence_enabled())).
-spec choose_impl_candidates(conninfo()) -> [module()].
choose_impl_candidates(#{expiry_interval := EI}) ->
choose_impl_candidates(EI, emqx_persistent_message:is_persistence_enabled()).
choose_impl_candidates(_, _IsPSStoreEnabled = false) ->
[emqx_session_mem];
choose_impl_candidates(0, _IsPSStoreEnabled = true) ->
case emqx_persistent_message:force_ds() of
choose_impl_candidates(#{zone := Zone}, #{expiry_interval := EI}) ->
case emqx_persistent_message:is_persistence_enabled(Zone) of
false ->
%% NOTE
%% If ExpiryInterval is 0, the natural choice is
%% `emqx_session_mem'. Yet we still need to look the
%% existing session up in the `emqx_persistent_session_ds'
%% store first, because previous connection may have set
%% ExpiryInterval to a non-zero value.
[emqx_session_mem, emqx_persistent_session_ds];
[emqx_session_mem];
true ->
[emqx_persistent_session_ds]
end;
choose_impl_candidates(EI, _IsPSStoreEnabled = true) when EI > 0 ->
[emqx_persistent_session_ds].
Force = emqx_persistent_message:force_ds(Zone),
case EI of
0 when not Force ->
%% NOTE
%% If ExpiryInterval is 0, the natural choice is
%% `emqx_session_mem'. Yet we still need to look
%% the existing session up in the
%% `emqx_persistent_session_ds' store first,
%% because previous connection may have set
%% ExpiryInterval to a non-zero value.
[emqx_session_mem, emqx_persistent_session_ds];
_ ->
[emqx_persistent_session_ds]
end
end.
-compile({inline, [run_hook/2]}).
run_hook(Name, Args) ->

View File

@ -33,7 +33,8 @@ roots() ->
force_shutdown,
conn_congestion,
force_gc,
overload_protection
overload_protection,
session_persistence
].
zones_without_default() ->

View File

@ -463,5 +463,17 @@ zone_global_defaults() ->
backoff_new_conn => true,
enable => false
},
stats => #{enable => true}
stats => #{enable => true},
session_persistence =>
#{
enable => false,
batch_size => 100,
force_persistence => false,
idle_poll_interval => 100,
last_alive_update_interval => 5000,
message_retention_period => 86400000,
renew_streams_interval => 5000,
session_gc_batch_size => 100,
session_gc_interval => 600000
}
}.

View File

@ -0,0 +1,2 @@
Make it possible to override `session_persistence` settings per zone.
Since durable sessions are inherently more expensive to maintain than the regular sessions, it's desirable to grant the operator finer control of session durability for different classes of clients.