feat(schema): introduce separate root for new session persistence

With some knobs to choose a storage backend. Support only builtin
RocksDB-based backend with minimal configuration for now.
This commit is contained in:
Andrew Mayorov 2023-11-20 23:43:45 +07:00
parent 8e107ffe45
commit 110a5a4896
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
10 changed files with 124 additions and 37 deletions

View File

@ -79,7 +79,7 @@ cluster(#{n := N}) ->
app_specs() -> app_specs() ->
[ [
emqx_durable_storage, emqx_durable_storage,
{emqx, "persistent_session_store = {ds = true}"} {emqx, "session_persistence = {enable = true}"}
]. ].
get_mqtt_port(Node, Type) -> get_mqtt_port(Node, Type) ->

View File

@ -19,7 +19,7 @@
-include("emqx.hrl"). -include("emqx.hrl").
-export([init/0]). -export([init/0]).
-export([is_store_enabled/0]). -export([is_persistence_enabled/0]).
%% Message persistence %% Message persistence
-export([ -export([
@ -28,9 +28,8 @@
-define(PERSISTENT_MESSAGE_DB, emqx_persistent_message). -define(PERSISTENT_MESSAGE_DB, emqx_persistent_message).
%% FIXME
-define(WHEN_ENABLED(DO), -define(WHEN_ENABLED(DO),
case is_store_enabled() of case is_persistence_enabled() of
true -> DO; true -> DO;
false -> {skipped, disabled} false -> {skipped, disabled}
end end
@ -40,18 +39,26 @@
init() -> init() ->
?WHEN_ENABLED(begin ?WHEN_ENABLED(begin
ok = emqx_ds:open_db(?PERSISTENT_MESSAGE_DB, #{ Backend = storage_backend(),
backend => builtin, ok = emqx_ds:open_db(?PERSISTENT_MESSAGE_DB, Backend),
storage => {emqx_ds_storage_bitfield_lts, #{}}
}),
ok = emqx_persistent_session_ds_router:init_tables(), ok = emqx_persistent_session_ds_router:init_tables(),
ok = emqx_persistent_session_ds:create_tables(), ok = emqx_persistent_session_ds:create_tables(),
ok ok
end). end).
-spec is_store_enabled() -> boolean(). -spec is_persistence_enabled() -> boolean().
is_store_enabled() -> is_persistence_enabled() ->
emqx_config:get([persistent_session_store, ds]). emqx_config:get([session_persistence, enable]).
-spec storage_backend() -> emqx_ds:create_db_opts().
storage_backend() ->
storage_backend(emqx_config:get([session_persistence, storage])).
storage_backend(#{builtin := #{enable := true}}) ->
#{
backend => builtin,
storage => {emqx_ds_storage_bitfield_lts, #{}}
}.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -294,7 +294,19 @@ roots(low) ->
{"persistent_session_store", {"persistent_session_store",
sc( sc(
ref("persistent_session_store"), ref("persistent_session_store"),
#{importance => ?IMPORTANCE_HIDDEN} #{
%% NOTE
%% Due to some quirks in interaction between `emqx_config` and
%% `hocon_tconf`, schema roots cannot currently be deprecated.
importance => ?IMPORTANCE_HIDDEN
}
)},
{"session_persistence",
sc(
ref("session_persistence"),
#{
importance => ?IMPORTANCE_HIDDEN
}
)}, )},
{"trace", {"trace",
sc( sc(
@ -309,11 +321,12 @@ roots(low) ->
]. ].
fields("persistent_session_store") -> fields("persistent_session_store") ->
Deprecated = #{deprecated => {since, "5.4.0"}},
[ [
{"enabled", {"enabled",
sc( sc(
boolean(), boolean(),
#{ Deprecated#{
default => false, default => false,
%% TODO(5.2): change field name to 'enable' and keep 'enabled' as an alias %% TODO(5.2): change field name to 'enable' and keep 'enabled' as an alias
aliases => [enable], aliases => [enable],
@ -323,7 +336,7 @@ fields("persistent_session_store") ->
{"ds", {"ds",
sc( sc(
boolean(), boolean(),
#{ Deprecated#{
default => false, default => false,
importance => ?IMPORTANCE_HIDDEN importance => ?IMPORTANCE_HIDDEN
} }
@ -331,7 +344,7 @@ fields("persistent_session_store") ->
{"on_disc", {"on_disc",
sc( sc(
boolean(), boolean(),
#{ Deprecated#{
default => true, default => true,
desc => ?DESC(persistent_store_on_disc) desc => ?DESC(persistent_store_on_disc)
} }
@ -339,7 +352,7 @@ fields("persistent_session_store") ->
{"ram_cache", {"ram_cache",
sc( sc(
boolean(), boolean(),
#{ Deprecated#{
default => false, default => false,
desc => ?DESC(persistent_store_ram_cache) desc => ?DESC(persistent_store_ram_cache)
} }
@ -347,7 +360,7 @@ fields("persistent_session_store") ->
{"backend", {"backend",
sc( sc(
hoconsc:union([ref("persistent_session_builtin")]), hoconsc:union([ref("persistent_session_builtin")]),
#{ Deprecated#{
default => #{ default => #{
<<"type">> => <<"builtin">>, <<"type">> => <<"builtin">>,
<<"session">> => <<"session">> =>
@ -363,7 +376,7 @@ fields("persistent_session_store") ->
{"max_retain_undelivered", {"max_retain_undelivered",
sc( sc(
duration(), duration(),
#{ Deprecated#{
default => <<"1h">>, default => <<"1h">>,
desc => ?DESC(persistent_session_store_max_retain_undelivered) desc => ?DESC(persistent_session_store_max_retain_undelivered)
} }
@ -371,7 +384,7 @@ fields("persistent_session_store") ->
{"message_gc_interval", {"message_gc_interval",
sc( sc(
duration(), duration(),
#{ Deprecated#{
default => <<"1h">>, default => <<"1h">>,
desc => ?DESC(persistent_session_store_message_gc_interval) desc => ?DESC(persistent_session_store_message_gc_interval)
} }
@ -379,7 +392,7 @@ fields("persistent_session_store") ->
{"session_message_gc_interval", {"session_message_gc_interval",
sc( sc(
duration(), duration(),
#{ Deprecated#{
default => <<"1m">>, default => <<"1m">>,
desc => ?DESC(persistent_session_store_session_message_gc_interval) desc => ?DESC(persistent_session_store_session_message_gc_interval)
} }
@ -1740,6 +1753,45 @@ fields("trace") ->
importance => ?IMPORTANCE_HIDDEN, importance => ?IMPORTANCE_HIDDEN,
desc => ?DESC(fields_trace_payload_encode) desc => ?DESC(fields_trace_payload_encode)
})} })}
];
fields("session_persistence") ->
[
{"enable",
sc(
boolean(), #{
desc => ?DESC(session_persistence_enable),
default => false
}
)},
{"storage",
sc(
ref("session_storage_backend"), #{
desc => ?DESC(session_persistence_storage),
validator => fun validate_backend_enabled/1,
default => #{
<<"builtin">> => #{}
}
}
)}
];
fields("session_storage_backend") ->
[
{"builtin",
sc(ref("session_storage_backend_builtin"), #{
desc => ?DESC(session_storage_backend_builtin),
required => {false, recursively}
})}
];
fields("session_storage_backend_builtin") ->
[
{"enable",
sc(
boolean(),
#{
desc => ?DESC(session_storage_backend_enable),
default => true
}
)}
]. ].
mqtt_listener(Bind) -> mqtt_listener(Bind) ->
@ -1992,6 +2044,8 @@ desc("ocsp") ->
"Per listener OCSP Stapling configuration."; "Per listener OCSP Stapling configuration.";
desc("crl_cache") -> desc("crl_cache") ->
"Global CRL cache options."; "Global CRL cache options.";
desc("session_persistence") ->
"Settings governing durable sessions persistence.";
desc(_) -> desc(_) ->
undefined. undefined.
@ -2014,6 +2068,17 @@ ensure_list(V) ->
filter(Opts) -> filter(Opts) ->
[{K, V} || {K, V} <- Opts, V =/= undefined]. [{K, V} || {K, V} <- Opts, V =/= undefined].
validate_backend_enabled(Config) ->
Enabled = maps:filter(fun(_, #{<<"enable">> := E}) -> E end, Config),
case maps:to_list(Enabled) of
[{_Type, _BackendConfig}] ->
ok;
_Conflicts = [_ | _] ->
{error, multiple_enabled_backends};
_None = [] ->
{error, no_enabled_backend}
end.
%% @private This function defines the SSL opts which are commonly used by %% @private This function defines the SSL opts which are commonly used by
%% SSL listener and client. %% SSL listener and client.
-spec common_ssl_opts_schema(map(), server | client) -> hocon_schema:field_schema(). -spec common_ssl_opts_schema(map(), server | client) -> hocon_schema:field_schema().

View File

@ -617,11 +617,11 @@ maybe_mock_impl_mod(_) ->
-spec choose_impl_mod(conninfo()) -> module(). -spec choose_impl_mod(conninfo()) -> module().
choose_impl_mod(#{expiry_interval := EI}) -> choose_impl_mod(#{expiry_interval := EI}) ->
hd(choose_impl_candidates(EI, emqx_persistent_message:is_store_enabled())). hd(choose_impl_candidates(EI, emqx_persistent_message:is_persistence_enabled())).
-spec choose_impl_candidates(conninfo()) -> [module()]. -spec choose_impl_candidates(conninfo()) -> [module()].
choose_impl_candidates(#{expiry_interval := EI}) -> choose_impl_candidates(#{expiry_interval := EI}) ->
choose_impl_candidates(EI, emqx_persistent_message:is_store_enabled()). choose_impl_candidates(EI, emqx_persistent_message:is_persistence_enabled()).
choose_impl_candidates(_, _IsPSStoreEnabled = false) -> choose_impl_candidates(_, _IsPSStoreEnabled = false) ->
[emqx_session_mem]; [emqx_session_mem];

View File

@ -291,7 +291,7 @@ publish(Node, Message) ->
app_specs() -> app_specs() ->
[ [
emqx_durable_storage, emqx_durable_storage,
{emqx, "persistent_session_store {ds = true}"} {emqx, "session_persistence {enable = true}"}
]. ].
cluster() -> cluster() ->

View File

@ -35,8 +35,8 @@ all() ->
% NOTE % NOTE
% Tests are disabled while existing session persistence impl is being % Tests are disabled while existing session persistence impl is being
% phased out. % phased out.
{group, persistent_store_disabled}, {group, persistence_disabled},
{group, persistent_store_ds} {group, persistence_enabled}
]. ].
%% A persistent session can be resumed in two ways: %% A persistent session can be resumed in two ways:
@ -54,24 +54,24 @@ groups() ->
TCs = emqx_common_test_helpers:all(?MODULE), TCs = emqx_common_test_helpers:all(?MODULE),
TCsNonGeneric = [t_choose_impl], TCsNonGeneric = [t_choose_impl],
[ [
{persistent_store_disabled, [{group, no_kill_connection_process}]}, {persistence_disabled, [{group, no_kill_connection_process}]},
{persistent_store_ds, [{group, no_kill_connection_process}]}, {persistence_enabled, [{group, no_kill_connection_process}]},
{no_kill_connection_process, [], [{group, tcp}, {group, quic}, {group, ws}]}, {no_kill_connection_process, [], [{group, tcp}, {group, quic}, {group, ws}]},
{tcp, [], TCs}, {tcp, [], TCs},
{quic, [], TCs -- TCsNonGeneric}, {quic, [], TCs -- TCsNonGeneric},
{ws, [], TCs -- TCsNonGeneric} {ws, [], TCs -- TCsNonGeneric}
]. ].
init_per_group(persistent_store_disabled, Config) -> init_per_group(persistence_disabled, Config) ->
[ [
{emqx_config, "persistent_session_store { enabled = false }"}, {emqx_config, "session_persistence { enable = false }"},
{persistent_store, false} {persistence, false}
| Config | Config
]; ];
init_per_group(persistent_store_ds, Config) -> init_per_group(persistence_enabled, Config) ->
[ [
{emqx_config, "persistent_session_store { ds = true }"}, {emqx_config, "session_persistence { enable = true }"},
{persistent_store, ds} {persistence, ds}
| Config | Config
]; ];
init_per_group(Group, Config) when Group == tcp -> init_per_group(Group, Config) when Group == tcp ->
@ -312,7 +312,7 @@ t_choose_impl(Config) ->
{ok, _} = emqtt:ConnFun(Client), {ok, _} = emqtt:ConnFun(Client),
[ChanPid] = emqx_cm:lookup_channels(ClientId), [ChanPid] = emqx_cm:lookup_channels(ClientId),
?assertEqual( ?assertEqual(
case ?config(persistent_store, Config) of case ?config(persistence, Config) of
false -> emqx_session_mem; false -> emqx_session_mem;
ds -> emqx_persistent_session_ds ds -> emqx_persistent_session_ds
end, end,
@ -878,7 +878,7 @@ t_multiple_subscription_matches(Config) ->
ok = emqtt:disconnect(Client2). ok = emqtt:disconnect(Client2).
skip_ds_tc(Config) -> skip_ds_tc(Config) ->
case ?config(persistent_store, Config) of case ?config(persistence, Config) of
ds -> ds ->
{skip, "Testcase not yet supported under 'emqx_persistent_session_ds' implementation"}; {skip, "Testcase not yet supported under 'emqx_persistent_session_ds' implementation"};
_ -> _ ->

View File

@ -38,7 +38,7 @@ init_per_suite(Config) ->
AppSpecs = [ AppSpecs = [
emqx_durable_storage, emqx_durable_storage,
{emqx, #{ {emqx, #{
config => #{persistent_session_store => #{ds => true}}, config => #{session_persistence => #{enable => true}},
override_env => [{boot_modules, [broker]}] override_env => [{boot_modules, [broker]}]
}} }}
], ],

View File

@ -194,7 +194,7 @@ keys() ->
emqx_config:get_root_names() -- hidden_roots(). emqx_config:get_root_names() -- hidden_roots().
drop_hidden_roots(Conf) -> drop_hidden_roots(Conf) ->
lists:foldl(fun(K, Acc) -> maps:remove(K, Acc) end, Conf, hidden_roots()). maps:without(hidden_roots(), Conf).
hidden_roots() -> hidden_roots() ->
[ [
@ -202,6 +202,7 @@ hidden_roots() ->
<<"stats">>, <<"stats">>,
<<"broker">>, <<"broker">>,
<<"persistent_session_store">>, <<"persistent_session_store">>,
<<"session_persistence">>,
<<"plugins">>, <<"plugins">>,
<<"zones">> <<"zones">>
]. ].

View File

@ -52,6 +52,7 @@
<<"limiter">>, <<"limiter">>,
<<"log">>, <<"log">>,
<<"persistent_session_store">>, <<"persistent_session_store">>,
<<"session_persistence">>,
<<"prometheus">>, <<"prometheus">>,
<<"crl_cache">>, <<"crl_cache">>,
<<"conn_congestion">>, <<"conn_congestion">>,

View File

@ -1555,4 +1555,17 @@ description.label:
description.desc: description.desc:
"""Descriptive text.""" """Descriptive text."""
session_persistence_enable.desc:
"""Use durable storage for client sessions persistence.
If enabled, sessions configured to outlive client connections, along with their corresponding messages, will be durably stored and survive broker downtime."""
session_persistence_storage.desc:
"""Durable storage backend to use for session persistence."""
session_storage_backend_enable.desc:
"""Enable this backend."""
session_storage_backend_builtin.desc:
"""Builtin session storage backend utilizing embedded RocksDB key-value store."""
} }