From 66b7ac4c45d833f25ce688402f1309096f7eea45 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Mon, 13 May 2024 21:25:00 +0200 Subject: [PATCH] fix(sessds): persist $SYS messages as well Otherwise, persistent sessions will not be able to receive $SYS messages whatsoever. --- apps/emqx/src/emqx_persistent_message.erl | 2 +- .../test/emqx_persistent_session_SUITE.erl | 53 ++++++++++++++++--- 2 files changed, 47 insertions(+), 8 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_message.erl b/apps/emqx/src/emqx_persistent_message.erl index c909c5c5f..dc991619b 100644 --- a/apps/emqx/src/emqx_persistent_message.erl +++ b/apps/emqx/src/emqx_persistent_message.erl @@ -110,7 +110,7 @@ persist(Msg) -> ). needs_persistence(Msg) -> - not (emqx_message:get_flag(dup, Msg) orelse emqx_message:is_sys(Msg)). + not emqx_message:get_flag(dup, Msg). -spec store_message(emqx_types:message()) -> emqx_ds:store_batch_result(). store_message(Msg) -> diff --git a/apps/emqx/test/emqx_persistent_session_SUITE.erl b/apps/emqx/test/emqx_persistent_session_SUITE.erl index 674e1a8d9..f0b783250 100644 --- a/apps/emqx/test/emqx_persistent_session_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_session_SUITE.erl @@ -27,6 +27,7 @@ -compile(nowarn_export_all). -define(PERSISTENT_MESSAGE_DB, emqx_persistent_message). +-define(EMQX_CONFIG, "sys_topics.sys_heartbeat_interval = 1s\n"). %%-------------------------------------------------------------------- %% SUITE boilerplate @@ -66,19 +67,20 @@ groups() -> init_per_group(persistence_disabled, Config) -> [ - {emqx_config, "session_persistence { enable = false }"}, + {emqx_config, ?EMQX_CONFIG ++ "session_persistence { enable = false }"}, {persistence, false} | Config ]; init_per_group(persistence_enabled, Config) -> [ {emqx_config, - "session_persistence {\n" - " enable = true\n" - " last_alive_update_interval = 100ms\n" - " renew_streams_interval = 100ms\n" - " session_gc_interval = 2s\n" - "}"}, + ?EMQX_CONFIG ++ + "session_persistence {\n" + " enable = true\n" + " last_alive_update_interval = 100ms\n" + " renew_streams_interval = 100ms\n" + " session_gc_interval = 2s\n" + "}"}, {persistence, ds} | Config ]; @@ -1334,6 +1336,43 @@ do_t_will_message(Config, Opts) -> ), ok. +t_sys_message_delivery(Config) -> + ConnFun = ?config(conn_fun, Config), + SysTopicFilter = emqx_topic:join(["$SYS", "brokers", '+', "uptime"]), + SysTopic = emqx_topic:join(["$SYS", "brokers", atom_to_list(node()), "uptime"]), + ClientId = ?config(client_id, Config), + + {ok, Client1} = emqtt:start_link([ + {clientid, ClientId}, + {proto_ver, v5}, + {properties, #{'Session-Expiry-Interval' => 30}} + | Config + ]), + {ok, _} = emqtt:ConnFun(Client1), + {ok, _, [1]} = emqtt:subscribe(Client1, SysTopicFilter, [{qos, 1}, {rh, 2}]), + ?assertMatch( + [ + #{topic := SysTopic, qos := 0, retain := false, payload := _Uptime1}, + #{topic := SysTopic, qos := 0, retain := false, payload := _Uptime2} + ], + receive_messages(2) + ), + + ok = emqtt:disconnect(Client1), + + {ok, Client2} = emqtt:start_link([ + {clientid, ClientId}, + {proto_ver, v5}, + {properties, #{'Session-Expiry-Interval' => 30}}, + {clean_start, false} + | Config + ]), + {ok, _} = emqtt:ConnFun(Client2), + ?assertMatch( + [#{topic := SysTopic, qos := 0, retain := false, payload := _Uptime3}], + receive_messages(1) + ). + get_topicwise_order(Msgs) -> maps:groups_from_list(fun get_msgpub_topic/1, fun get_msgpub_payload/1, Msgs).