From 4b3b29873a390849b62438a365619208052928c1 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Fri, 3 Dec 2021 14:41:42 +0800 Subject: [PATCH 1/2] feat(retainer): allow to stop publish clear msg --- apps/emqx_retainer/etc/emqx_retainer.conf | 7 +++++++ apps/emqx_retainer/src/emqx_retainer.erl | 10 +++++++++- apps/emqx_retainer/src/emqx_retainer_schema.erl | 1 + 3 files changed, 17 insertions(+), 1 deletion(-) diff --git a/apps/emqx_retainer/etc/emqx_retainer.conf b/apps/emqx_retainer/etc/emqx_retainer.conf index ba6bdfa6c..92dc62f24 100644 --- a/apps/emqx_retainer/etc/emqx_retainer.conf +++ b/apps/emqx_retainer/etc/emqx_retainer.conf @@ -29,6 +29,13 @@ emqx_retainer { ## Default: 0s msg_expiry_interval = 0s + ## When the retained flag of the PUBLISH message is set and Payload is empty, + ## whether to continue to publish the message. + ## see: http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718038 + ## + ## Default: false + #stop_publish_clear_msg = false + ## The message read and deliver flow rate control ## When a client subscribe to a wildcard topic, may many retained messages will be loaded. ## If you don't want these data loaded to the memory all at once, you can use this to control. diff --git a/apps/emqx_retainer/src/emqx_retainer.erl b/apps/emqx_retainer/src/emqx_retainer.erl index 5d248e638..9be449b60 100644 --- a/apps/emqx_retainer/src/emqx_retainer.erl +++ b/apps/emqx_retainer/src/emqx_retainer.erl @@ -88,7 +88,12 @@ on_message_publish(Msg = #message{flags = #{retain := true}, payload = <<>>}, Context) -> delete_message(Context, Topic), - {ok, Msg}; + case get_stop_publish_clear_msg() of + true -> + {ok, emqx_message:set_header(allow_publish, false, Msg)}; + _ -> + {ok, Msg} + end; on_message_publish(Msg = #message{flags = #{retain := true}}, Context) -> Msg1 = emqx_message:set_header(retained, true, Msg), @@ -157,6 +162,9 @@ get_expiry_time(#message{timestamp = Ts}) -> _ -> Ts + Interval end. +get_stop_publish_clear_msg() -> + emqx_conf:get([?APP, stop_publish_clear_msg], false). + -spec update_config(hocon:config()) -> ok. update_config(Conf) -> gen_server:call(?MODULE, {?FUNCTION_NAME, Conf}). diff --git a/apps/emqx_retainer/src/emqx_retainer_schema.erl b/apps/emqx_retainer/src/emqx_retainer_schema.erl index 55cfa2fcc..e1fa8373a 100644 --- a/apps/emqx_retainer/src/emqx_retainer_schema.erl +++ b/apps/emqx_retainer/src/emqx_retainer_schema.erl @@ -14,6 +14,7 @@ fields("emqx_retainer") -> , {msg_clear_interval, sc(emqx_schema:duration_ms(), "0s")} , {flow_control, ?TYPE(hoconsc:ref(?MODULE, flow_control))} , {max_payload_size, sc(emqx_schema:bytesize(), "1MB")} + , {stop_publish_clear_msg, sc(boolean(), false)} , {config, config()} ]; From f645a4eada91292b1b6ecd0f7914c107a78ea863 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Fri, 3 Dec 2021 14:42:27 +0800 Subject: [PATCH 2/2] test(retainer): add test case for stopping publish clear msg --- apps/emqx_retainer/test/emqx_retainer_SUITE.erl | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl index ccc647ddc..5596e9539 100644 --- a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl +++ b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl @@ -195,6 +195,21 @@ t_clean(_) -> ok = emqtt:disconnect(C1). +t_stop_publish_clear_msg(_) -> + emqx_retainer:update_config(#{<<"stop_publish_clear_msg">> => true}), + {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), + {ok, _} = emqtt:connect(C1), + emqtt:publish(C1, <<"retained/0">>, <<"this is a retained message 0">>, [{qos, 0}, {retain, true}]), + + {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/#">>, [{qos, 0}, {rh, 0}]), + ?assertEqual(1, length(receive_messages(1))), + + emqtt:publish(C1, <<"retained/0">>, <<"">>, [{qos, 0}, {retain, true}]), + ?assertEqual(0, length(receive_messages(1))), + + emqx_retainer:update_config(#{<<"stop_publish_clear_msg">> => false}), + ok = emqtt:disconnect(C1). + t_flow_control(_) -> emqx_retainer:update_config(#{<<"flow_control">> => #{<<"max_read_number">> => 1, <<"msg_deliver_quota">> => 1,