diff --git a/apps/emqx_retainer/etc/emqx_retainer.conf b/apps/emqx_retainer/etc/emqx_retainer.conf index 0a883cee5..8b5c251c6 100644 --- a/apps/emqx_retainer/etc/emqx_retainer.conf +++ b/apps/emqx_retainer/etc/emqx_retainer.conf @@ -39,3 +39,11 @@ retainer.max_payload_size = 1MB ## ## Defaut: 0 retainer.expiry_interval = 0 + +## When the retained flag of the PUBLISH message is set and Payload is empty, +## whether to continue to publish the message. +## see: http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718038 +## +## Value: Boolean +## Default: false +#retainer.stop_publish_clear_msg = false diff --git a/apps/emqx_retainer/priv/emqx_retainer.schema b/apps/emqx_retainer/priv/emqx_retainer.schema index e598864e1..5e998aa16 100644 --- a/apps/emqx_retainer/priv/emqx_retainer.schema +++ b/apps/emqx_retainer/priv/emqx_retainer.schema @@ -28,3 +28,10 @@ {default, 0}, {datatype, [integer, {duration, ms}]} ]}. + +%% Stop publish clear message +%% {$configurable} +{mapping, "retainer.stop_publish_clear_msg", "emqx_retainer.stop_publish_clear_msg", [ + {default, false}, + {datatype, {enum, [true, false]}} +]}. diff --git a/apps/emqx_retainer/src/emqx_retainer.erl b/apps/emqx_retainer/src/emqx_retainer.erl index 340e6929d..33220eb1f 100644 --- a/apps/emqx_retainer/src/emqx_retainer.erl +++ b/apps/emqx_retainer/src/emqx_retainer.erl @@ -83,9 +83,14 @@ dispatch(Pid, Topic) -> %% RETAIN flag set to 1 and payload containing zero bytes on_message_publish(Msg = #message{flags = #{retain := true}, topic = Topic, - payload = <<>>}, _Env) -> + payload = <<>>}, Env) -> mnesia:dirty_delete(?TAB, topic2tokens(Topic)), - {ok, Msg}; + case stop_publish_clear_msg(Env) of + true -> + {ok, emqx_message:set_header(allow_publish, false, Msg)}; + _ -> + {ok, Msg} + end; on_message_publish(Msg = #message{flags = #{retain := true}}, Env) -> Msg1 = emqx_message:set_header(retained, true, Msg), @@ -224,6 +229,9 @@ store_retained(Msg = #message{topic = Topic, payload = Payload}, Env) -> "for payload is too big!", [Topic, iolist_size(Payload)]) end. +stop_publish_clear_msg(Env) -> + proplists:get_bool(stop_publish_clear_msg, Env). + is_table_full(Env) -> Limit = proplists:get_value(max_retained_messages, Env, 0), Limit > 0 andalso (retained_count() > Limit).