diff --git a/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl b/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl index 6c5fdc56e..56246e743 100644 --- a/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl +++ b/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl @@ -79,7 +79,7 @@ cluster(#{n := N}) -> app_specs() -> [ emqx_durable_storage, - {emqx, "persistent_session_store = {ds = true}"} + {emqx, "session_persistence = {enable = true}"} ]. get_mqtt_port(Node, Type) -> diff --git a/apps/emqx/src/emqx_persistent_message.erl b/apps/emqx/src/emqx_persistent_message.erl index 632ff2a27..82a345eef 100644 --- a/apps/emqx/src/emqx_persistent_message.erl +++ b/apps/emqx/src/emqx_persistent_message.erl @@ -19,7 +19,7 @@ -include("emqx.hrl"). -export([init/0]). --export([is_store_enabled/0]). +-export([is_persistence_enabled/0]). %% Message persistence -export([ @@ -28,9 +28,8 @@ -define(PERSISTENT_MESSAGE_DB, emqx_persistent_message). -%% FIXME -define(WHEN_ENABLED(DO), - case is_store_enabled() of + case is_persistence_enabled() of true -> DO; false -> {skipped, disabled} end @@ -40,18 +39,26 @@ init() -> ?WHEN_ENABLED(begin - ok = emqx_ds:open_db(?PERSISTENT_MESSAGE_DB, #{ - backend => builtin, - storage => {emqx_ds_storage_bitfield_lts, #{}} - }), + Backend = storage_backend(), + ok = emqx_ds:open_db(?PERSISTENT_MESSAGE_DB, Backend), ok = emqx_persistent_session_ds_router:init_tables(), ok = emqx_persistent_session_ds:create_tables(), ok end). --spec is_store_enabled() -> boolean(). -is_store_enabled() -> - emqx_config:get([persistent_session_store, ds]). +-spec is_persistence_enabled() -> boolean(). +is_persistence_enabled() -> + emqx_config:get([session_persistence, enable]). + +-spec storage_backend() -> emqx_ds:create_db_opts(). +storage_backend() -> + storage_backend(emqx_config:get([session_persistence, storage])). + +storage_backend(#{builtin := #{enable := true}}) -> + #{ + backend => builtin, + storage => {emqx_ds_storage_bitfield_lts, #{}} + }. %%-------------------------------------------------------------------- diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 3ad03c4d4..ef34d3e8f 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -294,7 +294,19 @@ roots(low) -> {"persistent_session_store", sc( ref("persistent_session_store"), - #{importance => ?IMPORTANCE_HIDDEN} + #{ + %% NOTE + %% Due to some quirks in interaction between `emqx_config` and + %% `hocon_tconf`, schema roots cannot currently be deprecated. + importance => ?IMPORTANCE_HIDDEN + } + )}, + {"session_persistence", + sc( + ref("session_persistence"), + #{ + importance => ?IMPORTANCE_HIDDEN + } )}, {"trace", sc( @@ -309,11 +321,12 @@ roots(low) -> ]. fields("persistent_session_store") -> + Deprecated = #{deprecated => {since, "5.4.0"}}, [ {"enabled", sc( boolean(), - #{ + Deprecated#{ default => false, %% TODO(5.2): change field name to 'enable' and keep 'enabled' as an alias aliases => [enable], @@ -323,7 +336,7 @@ fields("persistent_session_store") -> {"ds", sc( boolean(), - #{ + Deprecated#{ default => false, importance => ?IMPORTANCE_HIDDEN } @@ -331,7 +344,7 @@ fields("persistent_session_store") -> {"on_disc", sc( boolean(), - #{ + Deprecated#{ default => true, desc => ?DESC(persistent_store_on_disc) } @@ -339,7 +352,7 @@ fields("persistent_session_store") -> {"ram_cache", sc( boolean(), - #{ + Deprecated#{ default => false, desc => ?DESC(persistent_store_ram_cache) } @@ -347,7 +360,7 @@ fields("persistent_session_store") -> {"backend", sc( hoconsc:union([ref("persistent_session_builtin")]), - #{ + Deprecated#{ default => #{ <<"type">> => <<"builtin">>, <<"session">> => @@ -363,7 +376,7 @@ fields("persistent_session_store") -> {"max_retain_undelivered", sc( duration(), - #{ + Deprecated#{ default => <<"1h">>, desc => ?DESC(persistent_session_store_max_retain_undelivered) } @@ -371,7 +384,7 @@ fields("persistent_session_store") -> {"message_gc_interval", sc( duration(), - #{ + Deprecated#{ default => <<"1h">>, desc => ?DESC(persistent_session_store_message_gc_interval) } @@ -379,7 +392,7 @@ fields("persistent_session_store") -> {"session_message_gc_interval", sc( duration(), - #{ + Deprecated#{ default => <<"1m">>, desc => ?DESC(persistent_session_store_session_message_gc_interval) } @@ -1740,6 +1753,45 @@ fields("trace") -> importance => ?IMPORTANCE_HIDDEN, desc => ?DESC(fields_trace_payload_encode) })} + ]; +fields("session_persistence") -> + [ + {"enable", + sc( + boolean(), #{ + desc => ?DESC(session_persistence_enable), + default => false + } + )}, + {"storage", + sc( + ref("session_storage_backend"), #{ + desc => ?DESC(session_persistence_storage), + validator => fun validate_backend_enabled/1, + default => #{ + <<"builtin">> => #{} + } + } + )} + ]; +fields("session_storage_backend") -> + [ + {"builtin", + sc(ref("session_storage_backend_builtin"), #{ + desc => ?DESC(session_storage_backend_builtin), + required => {false, recursively} + })} + ]; +fields("session_storage_backend_builtin") -> + [ + {"enable", + sc( + boolean(), + #{ + desc => ?DESC(session_storage_backend_enable), + default => true + } + )} ]. mqtt_listener(Bind) -> @@ -1992,6 +2044,8 @@ desc("ocsp") -> "Per listener OCSP Stapling configuration."; desc("crl_cache") -> "Global CRL cache options."; +desc("session_persistence") -> + "Settings governing durable sessions persistence."; desc(_) -> undefined. @@ -2014,6 +2068,17 @@ ensure_list(V) -> filter(Opts) -> [{K, V} || {K, V} <- Opts, V =/= undefined]. +validate_backend_enabled(Config) -> + Enabled = maps:filter(fun(_, #{<<"enable">> := E}) -> E end, Config), + case maps:to_list(Enabled) of + [{_Type, _BackendConfig}] -> + ok; + _Conflicts = [_ | _] -> + {error, multiple_enabled_backends}; + _None = [] -> + {error, no_enabled_backend} + end. + %% @private This function defines the SSL opts which are commonly used by %% SSL listener and client. -spec common_ssl_opts_schema(map(), server | client) -> hocon_schema:field_schema(). diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index 4bae4ce03..ba49d3f85 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -617,11 +617,11 @@ maybe_mock_impl_mod(_) -> -spec choose_impl_mod(conninfo()) -> module(). choose_impl_mod(#{expiry_interval := EI}) -> - hd(choose_impl_candidates(EI, emqx_persistent_message:is_store_enabled())). + hd(choose_impl_candidates(EI, emqx_persistent_message:is_persistence_enabled())). -spec choose_impl_candidates(conninfo()) -> [module()]. choose_impl_candidates(#{expiry_interval := EI}) -> - choose_impl_candidates(EI, emqx_persistent_message:is_store_enabled()). + choose_impl_candidates(EI, emqx_persistent_message:is_persistence_enabled()). choose_impl_candidates(_, _IsPSStoreEnabled = false) -> [emqx_session_mem]; diff --git a/apps/emqx/test/emqx_persistent_messages_SUITE.erl b/apps/emqx/test/emqx_persistent_messages_SUITE.erl index 45cf85a05..922d7248f 100644 --- a/apps/emqx/test/emqx_persistent_messages_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_messages_SUITE.erl @@ -291,7 +291,7 @@ publish(Node, Message) -> app_specs() -> [ emqx_durable_storage, - {emqx, "persistent_session_store {ds = true}"} + {emqx, "session_persistence {enable = true}"} ]. cluster() -> diff --git a/apps/emqx/test/emqx_persistent_session_SUITE.erl b/apps/emqx/test/emqx_persistent_session_SUITE.erl index 77b625f05..f3af45fe0 100644 --- a/apps/emqx/test/emqx_persistent_session_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_session_SUITE.erl @@ -35,8 +35,8 @@ all() -> % NOTE % Tests are disabled while existing session persistence impl is being % phased out. - {group, persistent_store_disabled}, - {group, persistent_store_ds} + {group, persistence_disabled}, + {group, persistence_enabled} ]. %% A persistent session can be resumed in two ways: @@ -54,24 +54,24 @@ groups() -> TCs = emqx_common_test_helpers:all(?MODULE), TCsNonGeneric = [t_choose_impl], [ - {persistent_store_disabled, [{group, no_kill_connection_process}]}, - {persistent_store_ds, [{group, no_kill_connection_process}]}, + {persistence_disabled, [{group, no_kill_connection_process}]}, + {persistence_enabled, [{group, no_kill_connection_process}]}, {no_kill_connection_process, [], [{group, tcp}, {group, quic}, {group, ws}]}, {tcp, [], TCs}, {quic, [], TCs -- TCsNonGeneric}, {ws, [], TCs -- TCsNonGeneric} ]. -init_per_group(persistent_store_disabled, Config) -> +init_per_group(persistence_disabled, Config) -> [ - {emqx_config, "persistent_session_store { enabled = false }"}, - {persistent_store, false} + {emqx_config, "session_persistence { enable = false }"}, + {persistence, false} | Config ]; -init_per_group(persistent_store_ds, Config) -> +init_per_group(persistence_enabled, Config) -> [ - {emqx_config, "persistent_session_store { ds = true }"}, - {persistent_store, ds} + {emqx_config, "session_persistence { enable = true }"}, + {persistence, ds} | Config ]; init_per_group(Group, Config) when Group == tcp -> @@ -312,7 +312,7 @@ t_choose_impl(Config) -> {ok, _} = emqtt:ConnFun(Client), [ChanPid] = emqx_cm:lookup_channels(ClientId), ?assertEqual( - case ?config(persistent_store, Config) of + case ?config(persistence, Config) of false -> emqx_session_mem; ds -> emqx_persistent_session_ds end, @@ -878,7 +878,7 @@ t_multiple_subscription_matches(Config) -> ok = emqtt:disconnect(Client2). skip_ds_tc(Config) -> - case ?config(persistent_store, Config) of + case ?config(persistence, Config) of ds -> {skip, "Testcase not yet supported under 'emqx_persistent_session_ds' implementation"}; _ -> diff --git a/apps/emqx/test/emqx_persistent_session_ds_router_SUITE.erl b/apps/emqx/test/emqx_persistent_session_ds_router_SUITE.erl index 3e48173c3..cc50d66ee 100644 --- a/apps/emqx/test/emqx_persistent_session_ds_router_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_session_ds_router_SUITE.erl @@ -38,7 +38,7 @@ init_per_suite(Config) -> AppSpecs = [ emqx_durable_storage, {emqx, #{ - config => #{persistent_session_store => #{ds => true}}, + config => #{session_persistence => #{enable => true}}, override_env => [{boot_modules, [broker]}] }} ], diff --git a/apps/emqx_conf/src/emqx_conf_cli.erl b/apps/emqx_conf/src/emqx_conf_cli.erl index fc00c7dc9..7e55ada4f 100644 --- a/apps/emqx_conf/src/emqx_conf_cli.erl +++ b/apps/emqx_conf/src/emqx_conf_cli.erl @@ -194,7 +194,7 @@ keys() -> emqx_config:get_root_names() -- hidden_roots(). drop_hidden_roots(Conf) -> - lists:foldl(fun(K, Acc) -> maps:remove(K, Acc) end, Conf, hidden_roots()). + maps:without(hidden_roots(), Conf). hidden_roots() -> [ @@ -202,6 +202,7 @@ hidden_roots() -> <<"stats">>, <<"broker">>, <<"persistent_session_store">>, + <<"session_persistence">>, <<"plugins">>, <<"zones">> ]. diff --git a/apps/emqx_management/src/emqx_mgmt_data_backup.erl b/apps/emqx_management/src/emqx_mgmt_data_backup.erl index e75e5f935..9825b26cf 100644 --- a/apps/emqx_management/src/emqx_mgmt_data_backup.erl +++ b/apps/emqx_management/src/emqx_mgmt_data_backup.erl @@ -52,6 +52,7 @@ <<"limiter">>, <<"log">>, <<"persistent_session_store">>, + <<"session_persistence">>, <<"prometheus">>, <<"crl_cache">>, <<"conn_congestion">>, diff --git a/rel/i18n/emqx_schema.hocon b/rel/i18n/emqx_schema.hocon index 3eb816f3b..d12f6a2d1 100644 --- a/rel/i18n/emqx_schema.hocon +++ b/rel/i18n/emqx_schema.hocon @@ -1555,4 +1555,17 @@ description.label: description.desc: """Descriptive text.""" +session_persistence_enable.desc: +"""Use durable storage for client sessions persistence. +If enabled, sessions configured to outlive client connections, along with their corresponding messages, will be durably stored and survive broker downtime.""" + +session_persistence_storage.desc: +"""Durable storage backend to use for session persistence.""" + +session_storage_backend_enable.desc: +"""Enable this backend.""" + +session_storage_backend_builtin.desc: +"""Builtin session storage backend utilizing embedded RocksDB key-value store.""" + }