feat(retainer): allow to stop publish clear msg

This commit is contained in:
JianBo He 2021-12-03 14:41:42 +08:00
parent 85a6f0f1e8
commit 4b3b29873a
3 changed files with 17 additions and 1 deletions

View File

@ -29,6 +29,13 @@ emqx_retainer {
## Default: 0s ## Default: 0s
msg_expiry_interval = 0s msg_expiry_interval = 0s
## When the retained flag of the PUBLISH message is set and Payload is empty,
## whether to continue to publish the message.
## see: http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718038
##
## Default: false
#stop_publish_clear_msg = false
## The message read and deliver flow rate control ## The message read and deliver flow rate control
## When a client subscribe to a wildcard topic, may many retained messages will be loaded. ## When a client subscribe to a wildcard topic, may many retained messages will be loaded.
## If you don't want these data loaded to the memory all at once, you can use this to control. ## If you don't want these data loaded to the memory all at once, you can use this to control.

View File

@ -88,7 +88,12 @@ on_message_publish(Msg = #message{flags = #{retain := true},
payload = <<>>}, payload = <<>>},
Context) -> Context) ->
delete_message(Context, Topic), delete_message(Context, Topic),
{ok, Msg}; case get_stop_publish_clear_msg() of
true ->
{ok, emqx_message:set_header(allow_publish, false, Msg)};
_ ->
{ok, Msg}
end;
on_message_publish(Msg = #message{flags = #{retain := true}}, Context) -> on_message_publish(Msg = #message{flags = #{retain := true}}, Context) ->
Msg1 = emqx_message:set_header(retained, true, Msg), Msg1 = emqx_message:set_header(retained, true, Msg),
@ -157,6 +162,9 @@ get_expiry_time(#message{timestamp = Ts}) ->
_ -> Ts + Interval _ -> Ts + Interval
end. end.
get_stop_publish_clear_msg() ->
emqx_conf:get([?APP, stop_publish_clear_msg], false).
-spec update_config(hocon:config()) -> ok. -spec update_config(hocon:config()) -> ok.
update_config(Conf) -> update_config(Conf) ->
gen_server:call(?MODULE, {?FUNCTION_NAME, Conf}). gen_server:call(?MODULE, {?FUNCTION_NAME, Conf}).

View File

@ -14,6 +14,7 @@ fields("emqx_retainer") ->
, {msg_clear_interval, sc(emqx_schema:duration_ms(), "0s")} , {msg_clear_interval, sc(emqx_schema:duration_ms(), "0s")}
, {flow_control, ?TYPE(hoconsc:ref(?MODULE, flow_control))} , {flow_control, ?TYPE(hoconsc:ref(?MODULE, flow_control))}
, {max_payload_size, sc(emqx_schema:bytesize(), "1MB")} , {max_payload_size, sc(emqx_schema:bytesize(), "1MB")}
, {stop_publish_clear_msg, sc(boolean(), false)}
, {config, config()} , {config, config()}
]; ];