fix(sessds): Rename session_persistence to durable_sessions
This commit is contained in:
parent
ee6e7174cf
commit
73f17249e9
|
@ -56,7 +56,7 @@ init_per_testcase(t_session_gc = TestCase, Config) ->
|
|||
n => 3,
|
||||
roles => [core, core, core],
|
||||
extra_emqx_conf =>
|
||||
"\n session_persistence {"
|
||||
"\n durable_sessions {"
|
||||
"\n last_alive_update_interval = 500ms "
|
||||
"\n session_gc_interval = 1s "
|
||||
"\n session_gc_batch_size = 2 "
|
||||
|
@ -116,7 +116,7 @@ app_specs() ->
|
|||
app_specs(_Opts = #{}).
|
||||
|
||||
app_specs(Opts) ->
|
||||
DefaultEMQXConf = "session_persistence {enable = true, renew_streams_interval = 1s}",
|
||||
DefaultEMQXConf = "durable_sessions {enable = true, renew_streams_interval = 1s}",
|
||||
ExtraEMQXConf = maps:get(extra_emqx_conf, Opts, ""),
|
||||
[
|
||||
{emqx, DefaultEMQXConf ++ ExtraEMQXConf}
|
||||
|
|
|
@ -58,7 +58,7 @@ is_persistence_enabled() ->
|
|||
|
||||
-spec is_persistence_enabled(emqx_types:zone()) -> boolean().
|
||||
is_persistence_enabled(Zone) ->
|
||||
emqx_config:get_zone_conf(Zone, [session_persistence, enable]).
|
||||
emqx_config:get_zone_conf(Zone, [durable_sessions, enable]).
|
||||
|
||||
-spec storage_backend() -> emqx_ds:create_db_opts().
|
||||
storage_backend() ->
|
||||
|
@ -68,7 +68,7 @@ storage_backend() ->
|
|||
%% `emqx_persistent_session_ds':
|
||||
-spec force_ds(emqx_types:zone()) -> boolean().
|
||||
force_ds(Zone) ->
|
||||
emqx_config:get_zone_conf(Zone, [session_persistence, force_persistence]).
|
||||
emqx_config:get_zone_conf(Zone, [durable_sessions, force_persistence]).
|
||||
|
||||
storage_backend(Path) ->
|
||||
ConfigTree = #{'_config_handler' := {Module, Function}} = emqx_config:get(Path),
|
||||
|
@ -78,12 +78,12 @@ storage_backend(Path) ->
|
|||
|
||||
-spec add_handler() -> ok.
|
||||
add_handler() ->
|
||||
emqx_config_handler:add_handler([session_persistence], ?MODULE).
|
||||
emqx_config_handler:add_handler([durable_sessions], ?MODULE).
|
||||
|
||||
pre_config_update([session_persistence], #{<<"enable">> := New}, #{<<"enable">> := Old}) when
|
||||
pre_config_update([durable_sessions], #{<<"enable">> := New}, #{<<"enable">> := Old}) when
|
||||
New =/= Old
|
||||
->
|
||||
{error, "Hot update of session_persistence.enable parameter is currently not supported"};
|
||||
{error, "Hot update of durable_sessions.enable parameter is currently not supported"};
|
||||
pre_config_update(_Root, _NewConf, _OldConf) ->
|
||||
ok.
|
||||
|
||||
|
|
|
@ -81,7 +81,7 @@ handle_info(_Info, State) ->
|
|||
%%--------------------------------------------------------------------------------
|
||||
|
||||
ensure_gc_timer() ->
|
||||
Timeout = emqx_config:get([session_persistence, message_retention_period]),
|
||||
Timeout = emqx_config:get([durable_sessions, message_retention_period]),
|
||||
_ = erlang:send_after(Timeout, self(), #gc{}),
|
||||
ok.
|
||||
|
||||
|
@ -114,7 +114,7 @@ now_ms() ->
|
|||
maybe_gc() ->
|
||||
AllGens = emqx_ds:list_generations_with_lifetimes(?PERSISTENT_MESSAGE_DB),
|
||||
NowMS = now_ms(),
|
||||
RetentionPeriod = emqx_config:get([session_persistence, message_retention_period]),
|
||||
RetentionPeriod = emqx_config:get([durable_sessions, message_retention_period]),
|
||||
TimeThreshold = NowMS - RetentionPeriod,
|
||||
maybe_create_new_generation(AllGens, TimeThreshold),
|
||||
?tp_span(
|
||||
|
|
|
@ -102,6 +102,6 @@ tally_persistent_subscriptions(State0) ->
|
|||
State0#{subs_count := N}.
|
||||
|
||||
ensure_subs_tally_timer() ->
|
||||
Timeout = emqx_config:get([session_persistence, subscription_count_refresh_interval]),
|
||||
Timeout = emqx_config:get([durable_sessions, subscription_count_refresh_interval]),
|
||||
_ = erlang:send_after(Timeout, self(), #tally_subs{}),
|
||||
ok.
|
||||
|
|
|
@ -1150,10 +1150,10 @@ expiry_interval(ConnInfo) ->
|
|||
%% zone, since the GC process is responsible for all sessions
|
||||
%% regardless of the zone.
|
||||
bump_interval() ->
|
||||
emqx_config:get([session_persistence, last_alive_update_interval]).
|
||||
emqx_config:get([durable_sessions, last_alive_update_interval]).
|
||||
|
||||
get_config(#{zone := Zone}, Key) ->
|
||||
emqx_config:get_zone_conf(Zone, [session_persistence | Key]).
|
||||
emqx_config:get_zone_conf(Zone, [durable_sessions | Key]).
|
||||
|
||||
-spec try_get_live_session(emqx_types:clientid()) ->
|
||||
{pid(), session()} | not_found | not_persistent.
|
||||
|
|
|
@ -93,7 +93,7 @@ handle_info(_Info, State) ->
|
|||
%%--------------------------------------------------------------------------------
|
||||
|
||||
ensure_gc_timer() ->
|
||||
Timeout = emqx_config:get([session_persistence, session_gc_interval]),
|
||||
Timeout = emqx_config:get([durable_sessions, session_gc_interval]),
|
||||
_ = erlang:send_after(Timeout, self(), #gc{}),
|
||||
ok.
|
||||
|
||||
|
@ -133,8 +133,8 @@ start_gc() ->
|
|||
).
|
||||
|
||||
gc_context() ->
|
||||
GCInterval = emqx_config:get([session_persistence, session_gc_interval]),
|
||||
BumpInterval = emqx_config:get([session_persistence, last_alive_update_interval]),
|
||||
GCInterval = emqx_config:get([durable_sessions, session_gc_interval]),
|
||||
BumpInterval = emqx_config:get([durable_sessions, last_alive_update_interval]),
|
||||
TimeThreshold = max(GCInterval, BumpInterval) * 3,
|
||||
NowMS = now_ms(),
|
||||
#{
|
||||
|
@ -149,7 +149,7 @@ gc_context() ->
|
|||
}.
|
||||
|
||||
gc_loop(MinLastAlive, MinLastAliveWillMsg, It0) ->
|
||||
GCBatchSize = emqx_config:get([session_persistence, session_gc_batch_size]),
|
||||
GCBatchSize = emqx_config:get([durable_sessions, session_gc_batch_size]),
|
||||
case emqx_persistent_session_ds_state:session_iterator_next(It0, GCBatchSize) of
|
||||
{[], _It} ->
|
||||
ok;
|
||||
|
|
|
@ -303,9 +303,9 @@ roots(low) ->
|
|||
converter => fun flapping_detect_converter/2
|
||||
}
|
||||
)},
|
||||
{session_persistence,
|
||||
{durable_sessions,
|
||||
sc(
|
||||
ref("session_persistence"),
|
||||
ref("durable_sessions"),
|
||||
#{
|
||||
importance => ?IMPORTANCE_HIDDEN
|
||||
}
|
||||
|
@ -1652,12 +1652,12 @@ fields("trace") ->
|
|||
desc => ?DESC(fields_trace_payload_encode)
|
||||
})}
|
||||
];
|
||||
fields("session_persistence") ->
|
||||
fields("durable_sessions") ->
|
||||
[
|
||||
{"enable",
|
||||
sc(
|
||||
boolean(), #{
|
||||
desc => ?DESC(session_persistence_enable),
|
||||
desc => ?DESC(durable_sessions_enable),
|
||||
default => false
|
||||
}
|
||||
)},
|
||||
|
@ -2091,7 +2091,7 @@ desc("ocsp") ->
|
|||
"Per listener OCSP Stapling configuration.";
|
||||
desc("crl_cache") ->
|
||||
"Global CRL cache options.";
|
||||
desc("session_persistence") ->
|
||||
desc("durable_sessions") ->
|
||||
"Settings governing durable sessions persistence.";
|
||||
desc(durable_storage) ->
|
||||
?DESC(durable_storage);
|
||||
|
|
|
@ -34,7 +34,7 @@ roots() ->
|
|||
conn_congestion,
|
||||
force_gc,
|
||||
overload_protection,
|
||||
session_persistence
|
||||
durable_sessions
|
||||
].
|
||||
|
||||
zones_without_default() ->
|
||||
|
|
|
@ -465,7 +465,7 @@ zone_global_defaults() ->
|
|||
enable => false
|
||||
},
|
||||
stats => #{enable => true},
|
||||
session_persistence =>
|
||||
durable_sessions =>
|
||||
#{
|
||||
enable => false,
|
||||
batch_size => 100,
|
||||
|
|
|
@ -46,7 +46,7 @@ init_per_testcase(t_session_subscription_iterators = TestCase, Config) ->
|
|||
init_per_testcase(t_message_gc = TestCase, Config) ->
|
||||
Opts = #{
|
||||
extra_emqx_conf =>
|
||||
"\n session_persistence.message_retention_period = 3s"
|
||||
"\n durable_sessions.message_retention_period = 3s"
|
||||
"\n durable_storage.messages.n_shards = 3"
|
||||
},
|
||||
common_init_per_testcase(TestCase, [{n_shards, 3} | Config], Opts);
|
||||
|
@ -554,7 +554,7 @@ app_specs(Opts) ->
|
|||
ExtraEMQXConf = maps:get(extra_emqx_conf, Opts, ""),
|
||||
[
|
||||
emqx_durable_storage,
|
||||
{emqx, "session_persistence {enable = true}" ++ ExtraEMQXConf}
|
||||
{emqx, "durable_sessions {enable = true}" ++ ExtraEMQXConf}
|
||||
].
|
||||
|
||||
cluster() ->
|
||||
|
|
|
@ -68,7 +68,7 @@ groups() ->
|
|||
|
||||
init_per_group(persistence_disabled, Config) ->
|
||||
[
|
||||
{emqx_config, ?EMQX_CONFIG ++ "session_persistence { enable = false }"},
|
||||
{emqx_config, ?EMQX_CONFIG ++ "durable_sessions { enable = false }"},
|
||||
{persistence, false}
|
||||
| Config
|
||||
];
|
||||
|
@ -76,7 +76,7 @@ init_per_group(persistence_enabled, Config) ->
|
|||
[
|
||||
{emqx_config,
|
||||
?EMQX_CONFIG ++
|
||||
"session_persistence {\n"
|
||||
"durable_sessions {\n"
|
||||
" enable = true\n"
|
||||
" last_alive_update_interval = 100ms\n"
|
||||
" renew_streams_interval = 100ms\n"
|
||||
|
|
|
@ -38,7 +38,7 @@ init_per_suite(Config) ->
|
|||
AppSpecs = [
|
||||
emqx_durable_storage,
|
||||
{emqx, #{
|
||||
config => #{session_persistence => #{enable => true}},
|
||||
config => #{durable_sessions => #{enable => true}},
|
||||
override_env => [{boot_modules, [broker]}]
|
||||
}}
|
||||
],
|
||||
|
|
|
@ -69,7 +69,7 @@ init_per_group(persistence_enabled = Group, Config) ->
|
|||
Apps = emqx_cth_suite:start(
|
||||
[
|
||||
{emqx,
|
||||
"session_persistence = {\n"
|
||||
"durable_sessions = {\n"
|
||||
" enable = true\n"
|
||||
" last_alive_update_interval = 100ms\n"
|
||||
" renew_streams_interval = 100ms\n"
|
||||
|
@ -85,7 +85,7 @@ init_per_group(persistence_enabled = Group, Config) ->
|
|||
];
|
||||
init_per_group(persistence_disabled = Group, Config) ->
|
||||
Apps = emqx_cth_suite:start(
|
||||
[{emqx, "session_persistence.enable = false"}],
|
||||
[{emqx, "durable_sessions.enable = false"}],
|
||||
#{work_dir => emqx_cth_suite:work_dir(Group, Config)}
|
||||
),
|
||||
[
|
||||
|
|
|
@ -208,7 +208,7 @@ hidden_roots() ->
|
|||
<<"stats">>,
|
||||
<<"broker">>,
|
||||
<<"persistent_session_store">>,
|
||||
<<"session_persistence">>,
|
||||
<<"durable_sessions">>,
|
||||
<<"plugins">>,
|
||||
<<"zones">>
|
||||
].
|
||||
|
|
|
@ -82,7 +82,7 @@ init_per_group(persistent_sessions = Group, Config) ->
|
|||
Apps = emqx_cth_suite:start(
|
||||
[
|
||||
emqx_conf,
|
||||
{emqx, "session_persistence {enable = true}"},
|
||||
{emqx, "durable_sessions {enable = true}"},
|
||||
{emqx_retainer, ?BASE_RETAINER_CONF},
|
||||
emqx_management,
|
||||
emqx_mgmt_api_test_util:emqx_dashboard(
|
||||
|
|
|
@ -65,7 +65,7 @@
|
|||
<<"limiter">>,
|
||||
<<"log">>,
|
||||
<<"persistent_session_store">>,
|
||||
<<"session_persistence">>,
|
||||
<<"durable_sessions">>,
|
||||
<<"prometheus">>,
|
||||
<<"crl_cache">>,
|
||||
<<"conn_congestion">>,
|
||||
|
|
|
@ -46,7 +46,7 @@ groups() ->
|
|||
init_per_group(persistence_disabled, Config) ->
|
||||
Apps = emqx_cth_suite:start(
|
||||
[
|
||||
{emqx, "session_persistence { enable = false }"},
|
||||
{emqx, "durable_sessions { enable = false }"},
|
||||
emqx_management
|
||||
],
|
||||
#{work_dir => emqx_cth_suite:work_dir(Config)}
|
||||
|
@ -59,7 +59,7 @@ init_per_group(persistence_enabled, Config) ->
|
|||
Apps = emqx_cth_suite:start(
|
||||
[
|
||||
{emqx,
|
||||
"session_persistence {\n"
|
||||
"durable_sessions {\n"
|
||||
" enable = true\n"
|
||||
" last_alive_update_interval = 100ms\n"
|
||||
" renew_streams_interval = 100ms\n"
|
||||
|
|
|
@ -79,7 +79,7 @@ end_per_suite(Config) ->
|
|||
|
||||
init_per_group(persistent_sessions, Config) ->
|
||||
AppSpecs = [
|
||||
{emqx, "session_persistence.enable = true"},
|
||||
{emqx, "durable_sessions.enable = true"},
|
||||
emqx_management
|
||||
],
|
||||
Dashboard = emqx_mgmt_api_test_util:emqx_dashboard(
|
||||
|
|
|
@ -29,7 +29,7 @@ all() ->
|
|||
init_per_suite(Config) ->
|
||||
Apps = emqx_cth_suite:start(
|
||||
[
|
||||
{emqx, "session_persistence.enable = true"},
|
||||
{emqx, "durable_sessions.enable = true"},
|
||||
emqx_management,
|
||||
{emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}
|
||||
],
|
||||
|
|
|
@ -61,7 +61,7 @@ init_per_suite(Config) ->
|
|||
Apps = emqx_cth_suite:start(
|
||||
[
|
||||
{emqx,
|
||||
"session_persistence {\n"
|
||||
"durable_sessions {\n"
|
||||
" enable = true\n"
|
||||
" renew_streams_interval = 10ms\n"
|
||||
"}"},
|
||||
|
|
|
@ -27,7 +27,7 @@ all() ->
|
|||
init_per_suite(Config) ->
|
||||
Apps = emqx_cth_suite:start(
|
||||
[
|
||||
{emqx, "session_persistence.enable = true"},
|
||||
{emqx, "durable_sessions.enable = true"},
|
||||
emqx_management,
|
||||
emqx_mgmt_api_test_util:emqx_dashboard()
|
||||
],
|
||||
|
|
|
@ -1,2 +1,2 @@
|
|||
Make it possible to override `session_persistence` settings per zone.
|
||||
Make it possible to override `durable_sessions` settings per zone.
|
||||
Since durable sessions are inherently more expensive to maintain than the regular sessions, it's desirable to grant the operator finer control of session durability for different classes of clients.
|
||||
|
|
|
@ -0,0 +1,2 @@
|
|||
- Rename durable storage for MQTT messages from `emqx_persistent_message` to `messages`
|
||||
- Rename configuration root from `session_persistence` to `durable_sessions`
|
|
@ -1207,7 +1207,7 @@ base_listener_zone.desc: """~
|
|||
- `force_shutdown`
|
||||
- `force_gc`
|
||||
- `flapping_detect`
|
||||
- `session_persistence`"""
|
||||
- `durable_sessions`"""
|
||||
|
||||
base_listener_zone.label: "Zone"
|
||||
|
||||
|
@ -1544,10 +1544,10 @@ resource_tags.label:
|
|||
resource_tags.desc:
|
||||
"""Tags to annotate this config entry."""
|
||||
|
||||
session_persistence_enable.label:
|
||||
durable_sessions_enable.label:
|
||||
"""Enable session persistence"""
|
||||
|
||||
session_persistence_enable.desc:
|
||||
durable_sessions_enable.desc:
|
||||
"""Use durable storage for client sessions persistence.
|
||||
If enabled, sessions configured to outlive client connections, along with their corresponding messages, will be durably stored and survive broker downtime.
|
||||
|
||||
|
|
Loading…
Reference in New Issue