From 8aaa2e8333bbfbe49752fba8b9b2f28d0f3b7703 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Wed, 1 Dec 2021 17:00:07 +0800 Subject: [PATCH 1/3] feat(retainer): add option to stop publish clear message --- apps/emqx_retainer/etc/emqx_retainer.conf | 8 ++++++++ apps/emqx_retainer/priv/emqx_retainer.schema | 7 +++++++ apps/emqx_retainer/src/emqx_retainer.erl | 12 ++++++++++-- 3 files changed, 25 insertions(+), 2 deletions(-) diff --git a/apps/emqx_retainer/etc/emqx_retainer.conf b/apps/emqx_retainer/etc/emqx_retainer.conf index 0a883cee5..8b5c251c6 100644 --- a/apps/emqx_retainer/etc/emqx_retainer.conf +++ b/apps/emqx_retainer/etc/emqx_retainer.conf @@ -39,3 +39,11 @@ retainer.max_payload_size = 1MB ## ## Defaut: 0 retainer.expiry_interval = 0 + +## When the retained flag of the PUBLISH message is set and Payload is empty, +## whether to continue to publish the message. +## see: http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718038 +## +## Value: Boolean +## Default: false +#retainer.stop_publish_clear_msg = false diff --git a/apps/emqx_retainer/priv/emqx_retainer.schema b/apps/emqx_retainer/priv/emqx_retainer.schema index e598864e1..5e998aa16 100644 --- a/apps/emqx_retainer/priv/emqx_retainer.schema +++ b/apps/emqx_retainer/priv/emqx_retainer.schema @@ -28,3 +28,10 @@ {default, 0}, {datatype, [integer, {duration, ms}]} ]}. + +%% Stop publish clear message +%% {$configurable} +{mapping, "retainer.stop_publish_clear_msg", "emqx_retainer.stop_publish_clear_msg", [ + {default, false}, + {datatype, {enum, [true, false]}} +]}. diff --git a/apps/emqx_retainer/src/emqx_retainer.erl b/apps/emqx_retainer/src/emqx_retainer.erl index 340e6929d..33220eb1f 100644 --- a/apps/emqx_retainer/src/emqx_retainer.erl +++ b/apps/emqx_retainer/src/emqx_retainer.erl @@ -83,9 +83,14 @@ dispatch(Pid, Topic) -> %% RETAIN flag set to 1 and payload containing zero bytes on_message_publish(Msg = #message{flags = #{retain := true}, topic = Topic, - payload = <<>>}, _Env) -> + payload = <<>>}, Env) -> mnesia:dirty_delete(?TAB, topic2tokens(Topic)), - {ok, Msg}; + case stop_publish_clear_msg(Env) of + true -> + {ok, emqx_message:set_header(allow_publish, false, Msg)}; + _ -> + {ok, Msg} + end; on_message_publish(Msg = #message{flags = #{retain := true}}, Env) -> Msg1 = emqx_message:set_header(retained, true, Msg), @@ -224,6 +229,9 @@ store_retained(Msg = #message{topic = Topic, payload = Payload}, Env) -> "for payload is too big!", [Topic, iolist_size(Payload)]) end. +stop_publish_clear_msg(Env) -> + proplists:get_bool(stop_publish_clear_msg, Env). + is_table_full(Env) -> Limit = proplists:get_value(max_retained_messages, Env, 0), Limit > 0 andalso (retained_count() > Limit). From e323b662854e81aeab52cdf7a3ec924b8692fcb3 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Wed, 1 Dec 2021 17:00:43 +0800 Subject: [PATCH 2/3] test(retainer): testcase for stop_publish_clear_msg --- apps/emqx_retainer/test/emqx_retainer_SUITE.erl | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl index 1df042dd9..28f667b53 100644 --- a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl +++ b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl @@ -42,6 +42,8 @@ init_per_testcase(TestCase, Config) -> case TestCase of t_message_expiry_2 -> application:set_env(emqx_retainer, expiry_interval, 2000); + t_stop_publish_clear_msg -> + application:set_env(emqx_retainer, stop_publish_clear_msg, true); _ -> application:set_env(emqx_retainer, expiry_interval, 0) end, @@ -173,6 +175,19 @@ t_clean(_) -> ok = emqtt:disconnect(C1). +t_stop_publish_clear_msg(_) -> + {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), + {ok, _} = emqtt:connect(C1), + emqtt:publish(C1, <<"retained/0">>, <<"this is a retained message 0">>, [{qos, 0}, {retain, true}]), + + {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/#">>, [{qos, 0}, {rh, 0}]), + ?assertEqual(1, length(receive_messages(1))), + + emqtt:publish(C1, <<"retained/0">>, <<"">>, [{qos, 0}, {retain, true}]), + ?assertEqual(0, length(receive_messages(1))), + + ok = emqtt:disconnect(C1). + %%-------------------------------------------------------------------- %% Helper functions %%-------------------------------------------------------------------- From 10ffe11ba09ee0787b133c2246fe44033ce0ab8b Mon Sep 17 00:00:00 2001 From: JianBo He Date: Wed, 1 Dec 2021 17:05:52 +0800 Subject: [PATCH 3/3] chore(retainer): update appup.src --- apps/emqx_retainer/src/emqx_retainer.app.src | 2 +- apps/emqx_retainer/src/emqx_retainer.appup.src | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/apps/emqx_retainer/src/emqx_retainer.app.src b/apps/emqx_retainer/src/emqx_retainer.app.src index c3ffb9f90..c5ca7599d 100644 --- a/apps/emqx_retainer/src/emqx_retainer.app.src +++ b/apps/emqx_retainer/src/emqx_retainer.app.src @@ -1,6 +1,6 @@ {application, emqx_retainer, [{description, "EMQ X Retainer"}, - {vsn, "4.3.1"}, % strict semver, bump manually! + {vsn, "4.3.2"}, % strict semver, bump manually! {modules, []}, {registered, [emqx_retainer_sup]}, {applications, [kernel,stdlib]}, diff --git a/apps/emqx_retainer/src/emqx_retainer.appup.src b/apps/emqx_retainer/src/emqx_retainer.appup.src index a17e6ee2f..19e8e835f 100644 --- a/apps/emqx_retainer/src/emqx_retainer.appup.src +++ b/apps/emqx_retainer/src/emqx_retainer.appup.src @@ -1,13 +1,13 @@ %% -*-: erlang -*- {VSN, [ - {"4.3.0", [ + {<<"4\\.3\\.[0-1]+">>, [ {load_module, emqx_retainer, brutal_purge, soft_purge, []} ]}, {<<".*">>, []} ], [ - {"4.3.0", [ + {<<"4\\.3\\.[0-1]+">>, [ {load_module, emqx_retainer, brutal_purge, soft_purge, []} ]}, {<<".*">>, []}