Merge pull request #13060 from ieQu1/dev/ds-better-naming

Better naming of DS database and configuration
This commit is contained in:
ieQu1 2024-05-16 22:03:23 +02:00 committed by GitHub
commit 93f1a158f7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
28 changed files with 101 additions and 76 deletions

View File

@ -0,0 +1,29 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-ifndef(EMQX_PERSISTENT_MESSAGE_HRL).
-define(EMQX_PERSISTENT_MESSAGE_HRL, true).
-define(PERSISTENT_MESSAGE_DB, messages).
-define(PERSISTENCE_ENABLED, emqx_message_persistence_enabled).
-define(WITH_DURABILITY_ENABLED(DO),
case is_persistence_enabled() of
true -> DO;
false -> {skipped, disabled}
end
).
-endif.

View File

@ -56,7 +56,7 @@ init_per_testcase(t_session_gc = TestCase, Config) ->
n => 3,
roles => [core, core, core],
extra_emqx_conf =>
"\n session_persistence {"
"\n durable_sessions {"
"\n last_alive_update_interval = 500ms "
"\n session_gc_interval = 1s "
"\n session_gc_batch_size = 2 "
@ -116,7 +116,7 @@ app_specs() ->
app_specs(_Opts = #{}).
app_specs(Opts) ->
DefaultEMQXConf = "session_persistence {enable = true, renew_streams_interval = 1s}",
DefaultEMQXConf = "durable_sessions {enable = true, renew_streams_interval = 1s}",
ExtraEMQXConf = maps:get(extra_emqx_conf, Opts, ""),
[
{emqx, DefaultEMQXConf ++ ExtraEMQXConf}

View File

@ -32,15 +32,7 @@
persist/1
]).
-define(PERSISTENT_MESSAGE_DB, emqx_persistent_message).
-define(PERSISTENCE_ENABLED, emqx_message_persistence_enabled).
-define(WHEN_ENABLED(DO),
case is_persistence_enabled() of
true -> DO;
false -> {skipped, disabled}
end
).
-include("emqx_persistent_message.hrl").
%%--------------------------------------------------------------------
@ -51,7 +43,7 @@ init() ->
Zones = maps:keys(emqx_config:get([zones])),
IsEnabled = lists:any(fun is_persistence_enabled/1, Zones),
persistent_term:put(?PERSISTENCE_ENABLED, IsEnabled),
?WHEN_ENABLED(begin
?WITH_DURABILITY_ENABLED(begin
?SLOG(notice, #{msg => "Session durability is enabled"}),
Backend = storage_backend(),
ok = emqx_ds:open_db(?PERSISTENT_MESSAGE_DB, Backend),
@ -66,7 +58,7 @@ is_persistence_enabled() ->
-spec is_persistence_enabled(emqx_types:zone()) -> boolean().
is_persistence_enabled(Zone) ->
emqx_config:get_zone_conf(Zone, [session_persistence, enable]).
emqx_config:get_zone_conf(Zone, [durable_sessions, enable]).
-spec storage_backend() -> emqx_ds:create_db_opts().
storage_backend() ->
@ -76,7 +68,7 @@ storage_backend() ->
%% `emqx_persistent_session_ds':
-spec force_ds(emqx_types:zone()) -> boolean().
force_ds(Zone) ->
emqx_config:get_zone_conf(Zone, [session_persistence, force_persistence]).
emqx_config:get_zone_conf(Zone, [durable_sessions, force_persistence]).
storage_backend(Path) ->
ConfigTree = #{'_config_handler' := {Module, Function}} = emqx_config:get(Path),
@ -86,12 +78,12 @@ storage_backend(Path) ->
-spec add_handler() -> ok.
add_handler() ->
emqx_config_handler:add_handler([session_persistence], ?MODULE).
emqx_config_handler:add_handler([durable_sessions], ?MODULE).
pre_config_update([session_persistence], #{<<"enable">> := New}, #{<<"enable">> := Old}) when
pre_config_update([durable_sessions], #{<<"enable">> := New}, #{<<"enable">> := Old}) when
New =/= Old
->
{error, "Hot update of session_persistence.enable parameter is currently not supported"};
{error, "Hot update of durable_sessions.enable parameter is currently not supported"};
pre_config_update(_Root, _NewConf, _OldConf) ->
ok.
@ -100,7 +92,7 @@ pre_config_update(_Root, _NewConf, _OldConf) ->
-spec persist(emqx_types:message()) ->
emqx_ds:store_batch_result() | {skipped, needs_no_persistence}.
persist(Msg) ->
?WHEN_ENABLED(
?WITH_DURABILITY_ENABLED(
case needs_persistence(Msg) andalso has_subscribers(Msg) of
true ->
store_message(Msg);

View File

@ -81,7 +81,7 @@ handle_info(_Info, State) ->
%%--------------------------------------------------------------------------------
ensure_gc_timer() ->
Timeout = emqx_config:get([session_persistence, message_retention_period]),
Timeout = emqx_config:get([durable_sessions, message_retention_period]),
_ = erlang:send_after(Timeout, self(), #gc{}),
ok.
@ -114,7 +114,7 @@ now_ms() ->
maybe_gc() ->
AllGens = emqx_ds:list_generations_with_lifetimes(?PERSISTENT_MESSAGE_DB),
NowMS = now_ms(),
RetentionPeriod = emqx_config:get([session_persistence, message_retention_period]),
RetentionPeriod = emqx_config:get([durable_sessions, message_retention_period]),
TimeThreshold = NowMS - RetentionPeriod,
maybe_create_new_generation(AllGens, TimeThreshold),
?tp_span(

View File

@ -102,6 +102,6 @@ tally_persistent_subscriptions(State0) ->
State0#{subs_count := N}.
ensure_subs_tally_timer() ->
Timeout = emqx_config:get([session_persistence, subscription_count_refresh_interval]),
Timeout = emqx_config:get([durable_sessions, subscription_count_refresh_interval]),
_ = erlang:send_after(Timeout, self(), #tally_subs{}),
ok.

View File

@ -1150,10 +1150,10 @@ expiry_interval(ConnInfo) ->
%% zone, since the GC process is responsible for all sessions
%% regardless of the zone.
bump_interval() ->
emqx_config:get([session_persistence, last_alive_update_interval]).
emqx_config:get([durable_sessions, last_alive_update_interval]).
get_config(#{zone := Zone}, Key) ->
emqx_config:get_zone_conf(Zone, [session_persistence | Key]).
emqx_config:get_zone_conf(Zone, [durable_sessions | Key]).
-spec try_get_live_session(emqx_types:clientid()) ->
{pid(), session()} | not_found | not_persistent.

View File

@ -16,7 +16,7 @@
-ifndef(EMQX_PERSISTENT_SESSION_DS_HRL_HRL).
-define(EMQX_PERSISTENT_SESSION_DS_HRL_HRL, true).
-define(PERSISTENT_MESSAGE_DB, emqx_persistent_message).
-include("emqx_persistent_message.hrl").
-define(SESSION_TAB, emqx_ds_session).
-define(SESSION_SUBSCRIPTIONS_TAB, emqx_ds_session_subscriptions).

View File

@ -93,7 +93,7 @@ handle_info(_Info, State) ->
%%--------------------------------------------------------------------------------
ensure_gc_timer() ->
Timeout = emqx_config:get([session_persistence, session_gc_interval]),
Timeout = emqx_config:get([durable_sessions, session_gc_interval]),
_ = erlang:send_after(Timeout, self(), #gc{}),
ok.
@ -133,8 +133,8 @@ start_gc() ->
).
gc_context() ->
GCInterval = emqx_config:get([session_persistence, session_gc_interval]),
BumpInterval = emqx_config:get([session_persistence, last_alive_update_interval]),
GCInterval = emqx_config:get([durable_sessions, session_gc_interval]),
BumpInterval = emqx_config:get([durable_sessions, last_alive_update_interval]),
TimeThreshold = max(GCInterval, BumpInterval) * 3,
NowMS = now_ms(),
#{
@ -149,7 +149,7 @@ gc_context() ->
}.
gc_loop(MinLastAlive, MinLastAliveWillMsg, It0) ->
GCBatchSize = emqx_config:get([session_persistence, session_gc_batch_size]),
GCBatchSize = emqx_config:get([durable_sessions, session_gc_batch_size]),
case emqx_persistent_session_ds_state:session_iterator_next(It0, GCBatchSize) of
{[], _It} ->
ok;

View File

@ -303,9 +303,9 @@ roots(low) ->
converter => fun flapping_detect_converter/2
}
)},
{session_persistence,
{durable_sessions,
sc(
ref("session_persistence"),
ref("durable_sessions"),
#{
importance => ?IMPORTANCE_HIDDEN
}
@ -1652,12 +1652,12 @@ fields("trace") ->
desc => ?DESC(fields_trace_payload_encode)
})}
];
fields("session_persistence") ->
fields("durable_sessions") ->
[
{"enable",
sc(
boolean(), #{
desc => ?DESC(session_persistence_enable),
desc => ?DESC(durable_sessions_enable),
default => false
}
)},
@ -2091,7 +2091,7 @@ desc("ocsp") ->
"Per listener OCSP Stapling configuration.";
desc("crl_cache") ->
"Global CRL cache options.";
desc("session_persistence") ->
desc("durable_sessions") ->
"Settings governing durable sessions persistence.";
desc(durable_storage) ->
?DESC(durable_storage);

View File

@ -34,7 +34,7 @@ roots() ->
conn_congestion,
force_gc,
overload_protection,
session_persistence
durable_sessions
].
zones_without_default() ->

View File

@ -465,7 +465,7 @@ zone_global_defaults() ->
enable => false
},
stats => #{enable => true},
session_persistence =>
durable_sessions =>
#{
enable => false,
batch_size => 100,

View File

@ -27,7 +27,7 @@
-import(emqx_common_test_helpers, [on_exit/1]).
-define(PERSISTENT_MESSAGE_DB, emqx_persistent_message).
-include("emqx_persistent_message.hrl").
all() ->
emqx_common_test_helpers:all(?MODULE).
@ -46,7 +46,7 @@ init_per_testcase(t_session_subscription_iterators = TestCase, Config) ->
init_per_testcase(t_message_gc = TestCase, Config) ->
Opts = #{
extra_emqx_conf =>
"\n session_persistence.message_retention_period = 3s"
"\n durable_sessions.message_retention_period = 3s"
"\n durable_storage.messages.n_shards = 3"
},
common_init_per_testcase(TestCase, [{n_shards, 3} | Config], Opts);
@ -554,7 +554,7 @@ app_specs(Opts) ->
ExtraEMQXConf = maps:get(extra_emqx_conf, Opts, ""),
[
emqx_durable_storage,
{emqx, "session_persistence {enable = true}" ++ ExtraEMQXConf}
{emqx, "durable_sessions {enable = true}" ++ ExtraEMQXConf}
].
cluster() ->

View File

@ -26,7 +26,8 @@
-compile(export_all).
-compile(nowarn_export_all).
-define(PERSISTENT_MESSAGE_DB, emqx_persistent_message).
-include("emqx_persistent_message.hrl").
-define(EMQX_CONFIG, "sys_topics.sys_heartbeat_interval = 1s\n").
%%--------------------------------------------------------------------
@ -67,7 +68,7 @@ groups() ->
init_per_group(persistence_disabled, Config) ->
[
{emqx_config, ?EMQX_CONFIG ++ "session_persistence { enable = false }"},
{emqx_config, ?EMQX_CONFIG ++ "durable_sessions { enable = false }"},
{persistence, false}
| Config
];
@ -75,7 +76,7 @@ init_per_group(persistence_enabled, Config) ->
[
{emqx_config,
?EMQX_CONFIG ++
"session_persistence {\n"
"durable_sessions {\n"
" enable = true\n"
" last_alive_update_interval = 100ms\n"
" renew_streams_interval = 100ms\n"

View File

@ -38,7 +38,7 @@ init_per_suite(Config) ->
AppSpecs = [
emqx_durable_storage,
{emqx, #{
config => #{session_persistence => #{enable => true}},
config => #{durable_sessions => #{enable => true}},
override_env => [{boot_modules, [broker]}]
}}
],

View File

@ -69,7 +69,7 @@ init_per_group(persistence_enabled = Group, Config) ->
Apps = emqx_cth_suite:start(
[
{emqx,
"session_persistence = {\n"
"durable_sessions = {\n"
" enable = true\n"
" last_alive_update_interval = 100ms\n"
" renew_streams_interval = 100ms\n"
@ -85,7 +85,7 @@ init_per_group(persistence_enabled = Group, Config) ->
];
init_per_group(persistence_disabled = Group, Config) ->
Apps = emqx_cth_suite:start(
[{emqx, "session_persistence.enable = false"}],
[{emqx, "durable_sessions.enable = false"}],
#{work_dir => emqx_cth_suite:work_dir(Group, Config)}
),
[

View File

@ -208,7 +208,7 @@ hidden_roots() ->
<<"stats">>,
<<"broker">>,
<<"persistent_session_store">>,
<<"session_persistence">>,
<<"durable_sessions">>,
<<"plugins">>,
<<"zones">>
].

View File

@ -82,7 +82,7 @@ init_per_group(persistent_sessions = Group, Config) ->
Apps = emqx_cth_suite:start(
[
emqx_conf,
{emqx, "session_persistence {enable = true}"},
{emqx, "durable_sessions {enable = true}"},
{emqx_retainer, ?BASE_RETAINER_CONF},
emqx_management,
emqx_mgmt_api_test_util:emqx_dashboard(

View File

@ -186,7 +186,7 @@ prometheus_per_db(NodeOrAggr) ->
%% This function returns the data in the following format:
%% ```
%% #{emqx_ds_store_batch_time =>
%% [{[{db, emqx_persistent_message}], 42}],
%% [{[{db, messages}], 42}],
%% ...
%% '''
%%
@ -222,11 +222,11 @@ prometheus_per_db(NodeOrAggr, DB, Acc0) ->
%% This function returns the data in the following format:
%% ```
%% #{emqx_ds_egress_batches =>
%% [{[{db,emqx_persistent_message},{shard,<<"1">>}],99408},
%% {[{db,emqx_persistent_message},{shard,<<"0">>}],99409}],
%% [{[{db,messages},{shard,<<"1">>}],99408},
%% {[{db,messages},{shard,<<"0">>}],99409}],
%% emqx_ds_egress_batches_retry =>
%% [{[{db,emqx_persistent_message},{shard,<<"1">>}],0},
%% {[{db,emqx_persistent_message},{shard,<<"0">>}],0}],
%% [{[{db,messages},{shard,<<"1">>}],0},
%% {[{db,messages},{shard,<<"0">>}],0}],
%% emqx_ds_egress_messages =>
%% ...
%% }

View File

@ -21,6 +21,7 @@
-include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-include_lib("emqx_utils/include/emqx_utils_api.hrl").
-include_lib("emqx/include/emqx_persistent_message.hrl").
-import(hoconsc, [mk/2, ref/1, enum/1, array/1]).
@ -222,7 +223,7 @@ fields(sites_shard) ->
atom(),
#{
desc => <<"Durable storage ID">>,
example => 'emqx_persistent_message'
example => ?PERSISTENT_MESSAGE_DB
}
)},
{id,
@ -249,7 +250,7 @@ fields(db) ->
atom(),
#{
desc => <<"Name of the durable storage">>,
example => 'emqx_persistent_message'
example => ?PERSISTENT_MESSAGE_DB
}
)},
{shards,
@ -403,7 +404,7 @@ param_storage_id() ->
required => true,
in => path,
desc => <<"Durable storage ID">>,
example => emqx_persistent_message
example => ?PERSISTENT_MESSAGE_DB
},
{ds, mk(enum(dbs()), Info)}.
@ -416,7 +417,7 @@ example_site() ->
end.
dbs() ->
[emqx_persistent_message].
[?PERSISTENT_MESSAGE_DB].
shards_of_site(Site) ->
lists:flatmap(

View File

@ -65,7 +65,7 @@
<<"limiter">>,
<<"log">>,
<<"persistent_session_store">>,
<<"session_persistence">>,
<<"durable_sessions">>,
<<"prometheus">>,
<<"crl_cache">>,
<<"conn_congestion">>,

View File

@ -46,7 +46,7 @@ groups() ->
init_per_group(persistence_disabled, Config) ->
Apps = emqx_cth_suite:start(
[
{emqx, "session_persistence { enable = false }"},
{emqx, "durable_sessions { enable = false }"},
emqx_management
],
#{work_dir => emqx_cth_suite:work_dir(Config)}
@ -59,7 +59,7 @@ init_per_group(persistence_enabled, Config) ->
Apps = emqx_cth_suite:start(
[
{emqx,
"session_persistence {\n"
"durable_sessions {\n"
" enable = true\n"
" last_alive_update_interval = 100ms\n"
" renew_streams_interval = 100ms\n"

View File

@ -79,7 +79,7 @@ end_per_suite(Config) ->
init_per_group(persistent_sessions, Config) ->
AppSpecs = [
{emqx, "session_persistence.enable = true"},
{emqx, "durable_sessions.enable = true"},
emqx_management
],
Dashboard = emqx_mgmt_api_test_util:emqx_dashboard(

View File

@ -29,7 +29,7 @@ all() ->
init_per_suite(Config) ->
Apps = emqx_cth_suite:start(
[
{emqx, "session_persistence.enable = true"},
{emqx, "durable_sessions.enable = true"},
emqx_management,
{emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}
],
@ -59,7 +59,7 @@ t_get_storages(_) ->
Path = api_path(["ds", "storages"]),
{ok, Response} = request_api(get, Path),
?assertEqual(
[<<"emqx_persistent_message">>],
[<<"messages">>],
emqx_utils_json:decode(Response, [return_maps])
).
@ -81,7 +81,7 @@ t_get_site(_) ->
<<"shards">> :=
[
#{
<<"storage">> := <<"emqx_persistent_message">>,
<<"storage">> := <<"messages">>,
<<"id">> := _,
<<"status">> := <<"up">>
}
@ -99,12 +99,12 @@ t_get_db(_) ->
request_api(get, Path400)
),
%% Valid path:
Path = api_path(["ds", "storages", "emqx_persistent_message"]),
Path = api_path(["ds", "storages", "messages"]),
{ok, Response} = request_api(get, Path),
ThisSite = emqx_ds_replication_layer_meta:this_site(),
?assertMatch(
#{
<<"name">> := <<"emqx_persistent_message">>,
<<"name">> := <<"messages">>,
<<"shards">> :=
[
#{
@ -132,7 +132,7 @@ t_get_replicas(_) ->
request_api(get, Path400)
),
%% Valid path:
Path = api_path(["ds", "storages", "emqx_persistent_message", "replicas"]),
Path = api_path(["ds", "storages", "messages", "replicas"]),
{ok, Response} = request_api(get, Path),
ThisSite = emqx_ds_replication_layer_meta:this_site(),
?assertEqual(
@ -141,7 +141,7 @@ t_get_replicas(_) ->
).
t_put_replicas(_) ->
Path = api_path(["ds", "storages", "emqx_persistent_message", "replicas"]),
Path = api_path(["ds", "storages", "messages", "replicas"]),
%% Error cases:
?assertMatch(
{ok, 400, #{<<"message">> := <<"Unknown sites: invalid_site">>}},
@ -154,13 +154,13 @@ t_put_replicas(_) ->
).
t_join(_) ->
Path400 = api_path(["ds", "storages", "emqx_persistent_message", "replicas", "unknown_site"]),
Path400 = api_path(["ds", "storages", "messages", "replicas", "unknown_site"]),
?assertMatch(
{error, {_, 400, _}},
parse_error(request_api(put, Path400))
),
ThisSite = emqx_ds_replication_layer_meta:this_site(),
Path = api_path(["ds", "storages", "emqx_persistent_message", "replicas", ThisSite]),
Path = api_path(["ds", "storages", "messages", "replicas", ThisSite]),
?assertMatch(
{ok, "OK"},
request_api(put, Path)
@ -168,7 +168,7 @@ t_join(_) ->
t_leave(_) ->
ThisSite = emqx_ds_replication_layer_meta:this_site(),
Path = api_path(["ds", "storages", "emqx_persistent_message", "replicas", ThisSite]),
Path = api_path(["ds", "storages", "messages", "replicas", ThisSite]),
?assertMatch(
{error, {_, 400, _}},
request_api(delete, Path)
@ -176,7 +176,7 @@ t_leave(_) ->
t_leave_notfound(_) ->
Site = "not_part_of_replica_set",
Path = api_path(["ds", "storages", "emqx_persistent_message", "replicas", Site]),
Path = api_path(["ds", "storages", "messages", "replicas", Site]),
?assertMatch(
{error, {_, 404, _}},
request_api(delete, Path)

View File

@ -61,7 +61,7 @@ init_per_suite(Config) ->
Apps = emqx_cth_suite:start(
[
{emqx,
"session_persistence {\n"
"durable_sessions {\n"
" enable = true\n"
" renew_streams_interval = 10ms\n"
"}"},

View File

@ -27,7 +27,7 @@ all() ->
init_per_suite(Config) ->
Apps = emqx_cth_suite:start(
[
{emqx, "session_persistence.enable = true"},
{emqx, "durable_sessions.enable = true"},
emqx_management,
emqx_mgmt_api_test_util:emqx_dashboard()
],

View File

@ -1,2 +1,2 @@
Make it possible to override `session_persistence` settings per zone.
Make it possible to override `durable_sessions` settings per zone.
Since durable sessions are inherently more expensive to maintain than the regular sessions, it's desirable to grant the operator finer control of session durability for different classes of clients.

View File

@ -0,0 +1,2 @@
- Rename durable storage for MQTT messages from `emqx_persistent_message` to `messages`
- Rename configuration root from `session_persistence` to `durable_sessions`

View File

@ -1207,7 +1207,7 @@ base_listener_zone.desc: """~
- `force_shutdown`
- `force_gc`
- `flapping_detect`
- `session_persistence`"""
- `durable_sessions`"""
base_listener_zone.label: "Zone"
@ -1544,10 +1544,10 @@ resource_tags.label:
resource_tags.desc:
"""Tags to annotate this config entry."""
session_persistence_enable.label:
durable_sessions_enable.label:
"""Enable session persistence"""
session_persistence_enable.desc:
durable_sessions_enable.desc:
"""Use durable storage for client sessions persistence.
If enabled, sessions configured to outlive client connections, along with their corresponding messages, will be durably stored and survive broker downtime.