Merge pull request #13035 from keynslug/fix/EMQX-12291/pers-sys-msg

fix(sessds): persist $SYS messages as well
This commit is contained in:
Andrew Mayorov 2024-05-14 10:05:49 +02:00 committed by GitHub
commit 98a93662c1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 47 additions and 8 deletions

View File

@ -110,7 +110,7 @@ persist(Msg) ->
).
needs_persistence(Msg) ->
not (emqx_message:get_flag(dup, Msg) orelse emqx_message:is_sys(Msg)).
not emqx_message:get_flag(dup, Msg).
-spec store_message(emqx_types:message()) -> emqx_ds:store_batch_result().
store_message(Msg) ->

View File

@ -27,6 +27,7 @@
-compile(nowarn_export_all).
-define(PERSISTENT_MESSAGE_DB, emqx_persistent_message).
-define(EMQX_CONFIG, "sys_topics.sys_heartbeat_interval = 1s\n").
%%--------------------------------------------------------------------
%% SUITE boilerplate
@ -66,19 +67,20 @@ groups() ->
init_per_group(persistence_disabled, Config) ->
[
{emqx_config, "session_persistence { enable = false }"},
{emqx_config, ?EMQX_CONFIG ++ "session_persistence { enable = false }"},
{persistence, false}
| Config
];
init_per_group(persistence_enabled, Config) ->
[
{emqx_config,
"session_persistence {\n"
" enable = true\n"
" last_alive_update_interval = 100ms\n"
" renew_streams_interval = 100ms\n"
" session_gc_interval = 2s\n"
"}"},
?EMQX_CONFIG ++
"session_persistence {\n"
" enable = true\n"
" last_alive_update_interval = 100ms\n"
" renew_streams_interval = 100ms\n"
" session_gc_interval = 2s\n"
"}"},
{persistence, ds}
| Config
];
@ -1334,6 +1336,43 @@ do_t_will_message(Config, Opts) ->
),
ok.
t_sys_message_delivery(Config) ->
ConnFun = ?config(conn_fun, Config),
SysTopicFilter = emqx_topic:join(["$SYS", "brokers", '+', "uptime"]),
SysTopic = emqx_topic:join(["$SYS", "brokers", atom_to_list(node()), "uptime"]),
ClientId = ?config(client_id, Config),
{ok, Client1} = emqtt:start_link([
{clientid, ClientId},
{proto_ver, v5},
{properties, #{'Session-Expiry-Interval' => 30}}
| Config
]),
{ok, _} = emqtt:ConnFun(Client1),
{ok, _, [1]} = emqtt:subscribe(Client1, SysTopicFilter, [{qos, 1}, {rh, 2}]),
?assertMatch(
[
#{topic := SysTopic, qos := 0, retain := false, payload := _Uptime1},
#{topic := SysTopic, qos := 0, retain := false, payload := _Uptime2}
],
receive_messages(2)
),
ok = emqtt:disconnect(Client1),
{ok, Client2} = emqtt:start_link([
{clientid, ClientId},
{proto_ver, v5},
{properties, #{'Session-Expiry-Interval' => 30}},
{clean_start, false}
| Config
]),
{ok, _} = emqtt:ConnFun(Client2),
?assertMatch(
[#{topic := SysTopic, qos := 0, retain := false, payload := _Uptime3}],
receive_messages(1)
).
get_topicwise_order(Msgs) ->
maps:groups_from_list(fun get_msgpub_topic/1, fun get_msgpub_payload/1, Msgs).