Merge pull request #11988 from keynslug/ft/EMQX-11222/basic-config

feat(schema): introduce separate root for new session persistence
This commit is contained in:
Andrew Mayorov 2023-11-21 15:02:21 +07:00 committed by GitHub
commit cd48c283db
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 124 additions and 37 deletions

View File

@ -79,7 +79,7 @@ cluster(#{n := N}) ->
app_specs() ->
[
emqx_durable_storage,
{emqx, "persistent_session_store = {ds = true}"}
{emqx, "session_persistence = {enable = true}"}
].
get_mqtt_port(Node, Type) ->

View File

@ -19,7 +19,7 @@
-include("emqx.hrl").
-export([init/0]).
-export([is_store_enabled/0]).
-export([is_persistence_enabled/0]).
%% Message persistence
-export([
@ -28,9 +28,8 @@
-define(PERSISTENT_MESSAGE_DB, emqx_persistent_message).
%% FIXME
-define(WHEN_ENABLED(DO),
case is_store_enabled() of
case is_persistence_enabled() of
true -> DO;
false -> {skipped, disabled}
end
@ -40,18 +39,26 @@
init() ->
?WHEN_ENABLED(begin
ok = emqx_ds:open_db(?PERSISTENT_MESSAGE_DB, #{
backend => builtin,
storage => {emqx_ds_storage_bitfield_lts, #{}}
}),
Backend = storage_backend(),
ok = emqx_ds:open_db(?PERSISTENT_MESSAGE_DB, Backend),
ok = emqx_persistent_session_ds_router:init_tables(),
ok = emqx_persistent_session_ds:create_tables(),
ok
end).
-spec is_store_enabled() -> boolean().
is_store_enabled() ->
emqx_config:get([persistent_session_store, ds]).
-spec is_persistence_enabled() -> boolean().
is_persistence_enabled() ->
emqx_config:get([session_persistence, enable]).
-spec storage_backend() -> emqx_ds:create_db_opts().
storage_backend() ->
storage_backend(emqx_config:get([session_persistence, storage])).
storage_backend(#{builtin := #{enable := true}}) ->
#{
backend => builtin,
storage => {emqx_ds_storage_bitfield_lts, #{}}
}.
%%--------------------------------------------------------------------

View File

@ -294,7 +294,19 @@ roots(low) ->
{"persistent_session_store",
sc(
ref("persistent_session_store"),
#{importance => ?IMPORTANCE_HIDDEN}
#{
%% NOTE
%% Due to some quirks in interaction between `emqx_config` and
%% `hocon_tconf`, schema roots cannot currently be deprecated.
importance => ?IMPORTANCE_HIDDEN
}
)},
{"session_persistence",
sc(
ref("session_persistence"),
#{
importance => ?IMPORTANCE_HIDDEN
}
)},
{"trace",
sc(
@ -309,11 +321,12 @@ roots(low) ->
].
fields("persistent_session_store") ->
Deprecated = #{deprecated => {since, "5.4.0"}},
[
{"enabled",
sc(
boolean(),
#{
Deprecated#{
default => false,
%% TODO(5.2): change field name to 'enable' and keep 'enabled' as an alias
aliases => [enable],
@ -323,7 +336,7 @@ fields("persistent_session_store") ->
{"ds",
sc(
boolean(),
#{
Deprecated#{
default => false,
importance => ?IMPORTANCE_HIDDEN
}
@ -331,7 +344,7 @@ fields("persistent_session_store") ->
{"on_disc",
sc(
boolean(),
#{
Deprecated#{
default => true,
desc => ?DESC(persistent_store_on_disc)
}
@ -339,7 +352,7 @@ fields("persistent_session_store") ->
{"ram_cache",
sc(
boolean(),
#{
Deprecated#{
default => false,
desc => ?DESC(persistent_store_ram_cache)
}
@ -347,7 +360,7 @@ fields("persistent_session_store") ->
{"backend",
sc(
hoconsc:union([ref("persistent_session_builtin")]),
#{
Deprecated#{
default => #{
<<"type">> => <<"builtin">>,
<<"session">> =>
@ -363,7 +376,7 @@ fields("persistent_session_store") ->
{"max_retain_undelivered",
sc(
duration(),
#{
Deprecated#{
default => <<"1h">>,
desc => ?DESC(persistent_session_store_max_retain_undelivered)
}
@ -371,7 +384,7 @@ fields("persistent_session_store") ->
{"message_gc_interval",
sc(
duration(),
#{
Deprecated#{
default => <<"1h">>,
desc => ?DESC(persistent_session_store_message_gc_interval)
}
@ -379,7 +392,7 @@ fields("persistent_session_store") ->
{"session_message_gc_interval",
sc(
duration(),
#{
Deprecated#{
default => <<"1m">>,
desc => ?DESC(persistent_session_store_session_message_gc_interval)
}
@ -1740,6 +1753,45 @@ fields("trace") ->
importance => ?IMPORTANCE_HIDDEN,
desc => ?DESC(fields_trace_payload_encode)
})}
];
fields("session_persistence") ->
[
{"enable",
sc(
boolean(), #{
desc => ?DESC(session_persistence_enable),
default => false
}
)},
{"storage",
sc(
ref("session_storage_backend"), #{
desc => ?DESC(session_persistence_storage),
validator => fun validate_backend_enabled/1,
default => #{
<<"builtin">> => #{}
}
}
)}
];
fields("session_storage_backend") ->
[
{"builtin",
sc(ref("session_storage_backend_builtin"), #{
desc => ?DESC(session_storage_backend_builtin),
required => {false, recursively}
})}
];
fields("session_storage_backend_builtin") ->
[
{"enable",
sc(
boolean(),
#{
desc => ?DESC(session_storage_backend_enable),
default => true
}
)}
].
mqtt_listener(Bind) ->
@ -1992,6 +2044,8 @@ desc("ocsp") ->
"Per listener OCSP Stapling configuration.";
desc("crl_cache") ->
"Global CRL cache options.";
desc("session_persistence") ->
"Settings governing durable sessions persistence.";
desc(_) ->
undefined.
@ -2014,6 +2068,17 @@ ensure_list(V) ->
filter(Opts) ->
[{K, V} || {K, V} <- Opts, V =/= undefined].
validate_backend_enabled(Config) ->
Enabled = maps:filter(fun(_, #{<<"enable">> := E}) -> E end, Config),
case maps:to_list(Enabled) of
[{_Type, _BackendConfig}] ->
ok;
_Conflicts = [_ | _] ->
{error, multiple_enabled_backends};
_None = [] ->
{error, no_enabled_backend}
end.
%% @private This function defines the SSL opts which are commonly used by
%% SSL listener and client.
-spec common_ssl_opts_schema(map(), server | client) -> hocon_schema:field_schema().

View File

@ -617,11 +617,11 @@ maybe_mock_impl_mod(_) ->
-spec choose_impl_mod(conninfo()) -> module().
choose_impl_mod(#{expiry_interval := EI}) ->
hd(choose_impl_candidates(EI, emqx_persistent_message:is_store_enabled())).
hd(choose_impl_candidates(EI, emqx_persistent_message:is_persistence_enabled())).
-spec choose_impl_candidates(conninfo()) -> [module()].
choose_impl_candidates(#{expiry_interval := EI}) ->
choose_impl_candidates(EI, emqx_persistent_message:is_store_enabled()).
choose_impl_candidates(EI, emqx_persistent_message:is_persistence_enabled()).
choose_impl_candidates(_, _IsPSStoreEnabled = false) ->
[emqx_session_mem];

View File

@ -291,7 +291,7 @@ publish(Node, Message) ->
app_specs() ->
[
emqx_durable_storage,
{emqx, "persistent_session_store {ds = true}"}
{emqx, "session_persistence {enable = true}"}
].
cluster() ->

View File

@ -35,8 +35,8 @@ all() ->
% NOTE
% Tests are disabled while existing session persistence impl is being
% phased out.
{group, persistent_store_disabled},
{group, persistent_store_ds}
{group, persistence_disabled},
{group, persistence_enabled}
].
%% A persistent session can be resumed in two ways:
@ -54,24 +54,24 @@ groups() ->
TCs = emqx_common_test_helpers:all(?MODULE),
TCsNonGeneric = [t_choose_impl],
[
{persistent_store_disabled, [{group, no_kill_connection_process}]},
{persistent_store_ds, [{group, no_kill_connection_process}]},
{persistence_disabled, [{group, no_kill_connection_process}]},
{persistence_enabled, [{group, no_kill_connection_process}]},
{no_kill_connection_process, [], [{group, tcp}, {group, quic}, {group, ws}]},
{tcp, [], TCs},
{quic, [], TCs -- TCsNonGeneric},
{ws, [], TCs -- TCsNonGeneric}
].
init_per_group(persistent_store_disabled, Config) ->
init_per_group(persistence_disabled, Config) ->
[
{emqx_config, "persistent_session_store { enabled = false }"},
{persistent_store, false}
{emqx_config, "session_persistence { enable = false }"},
{persistence, false}
| Config
];
init_per_group(persistent_store_ds, Config) ->
init_per_group(persistence_enabled, Config) ->
[
{emqx_config, "persistent_session_store { ds = true }"},
{persistent_store, ds}
{emqx_config, "session_persistence { enable = true }"},
{persistence, ds}
| Config
];
init_per_group(Group, Config) when Group == tcp ->
@ -312,7 +312,7 @@ t_choose_impl(Config) ->
{ok, _} = emqtt:ConnFun(Client),
[ChanPid] = emqx_cm:lookup_channels(ClientId),
?assertEqual(
case ?config(persistent_store, Config) of
case ?config(persistence, Config) of
false -> emqx_session_mem;
ds -> emqx_persistent_session_ds
end,
@ -878,7 +878,7 @@ t_multiple_subscription_matches(Config) ->
ok = emqtt:disconnect(Client2).
skip_ds_tc(Config) ->
case ?config(persistent_store, Config) of
case ?config(persistence, Config) of
ds ->
{skip, "Testcase not yet supported under 'emqx_persistent_session_ds' implementation"};
_ ->

View File

@ -38,7 +38,7 @@ init_per_suite(Config) ->
AppSpecs = [
emqx_durable_storage,
{emqx, #{
config => #{persistent_session_store => #{ds => true}},
config => #{session_persistence => #{enable => true}},
override_env => [{boot_modules, [broker]}]
}}
],

View File

@ -194,7 +194,7 @@ keys() ->
emqx_config:get_root_names() -- hidden_roots().
drop_hidden_roots(Conf) ->
lists:foldl(fun(K, Acc) -> maps:remove(K, Acc) end, Conf, hidden_roots()).
maps:without(hidden_roots(), Conf).
hidden_roots() ->
[
@ -202,6 +202,7 @@ hidden_roots() ->
<<"stats">>,
<<"broker">>,
<<"persistent_session_store">>,
<<"session_persistence">>,
<<"plugins">>,
<<"zones">>
].

View File

@ -52,6 +52,7 @@
<<"limiter">>,
<<"log">>,
<<"persistent_session_store">>,
<<"session_persistence">>,
<<"prometheus">>,
<<"crl_cache">>,
<<"conn_congestion">>,

View File

@ -1555,4 +1555,17 @@ description.label:
description.desc:
"""Descriptive text."""
session_persistence_enable.desc:
"""Use durable storage for client sessions persistence.
If enabled, sessions configured to outlive client connections, along with their corresponding messages, will be durably stored and survive broker downtime."""
session_persistence_storage.desc:
"""Durable storage backend to use for session persistence."""
session_storage_backend_enable.desc:
"""Enable this backend."""
session_storage_backend_builtin.desc:
"""Builtin session storage backend utilizing embedded RocksDB key-value store."""
}