Merge pull request #6364 from HJianBo/stop_publish_clear_msg_for_5
Allow to stop publish clear message
This commit is contained in:
commit
1ae1460e44
|
@ -29,6 +29,13 @@ emqx_retainer {
|
||||||
## Default: 0s
|
## Default: 0s
|
||||||
msg_expiry_interval = 0s
|
msg_expiry_interval = 0s
|
||||||
|
|
||||||
|
## When the retained flag of the PUBLISH message is set and Payload is empty,
|
||||||
|
## whether to continue to publish the message.
|
||||||
|
## see: http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718038
|
||||||
|
##
|
||||||
|
## Default: false
|
||||||
|
#stop_publish_clear_msg = false
|
||||||
|
|
||||||
## The message read and deliver flow rate control
|
## The message read and deliver flow rate control
|
||||||
## When a client subscribe to a wildcard topic, may many retained messages will be loaded.
|
## When a client subscribe to a wildcard topic, may many retained messages will be loaded.
|
||||||
## If you don't want these data loaded to the memory all at once, you can use this to control.
|
## If you don't want these data loaded to the memory all at once, you can use this to control.
|
||||||
|
|
|
@ -88,7 +88,12 @@ on_message_publish(Msg = #message{flags = #{retain := true},
|
||||||
payload = <<>>},
|
payload = <<>>},
|
||||||
Context) ->
|
Context) ->
|
||||||
delete_message(Context, Topic),
|
delete_message(Context, Topic),
|
||||||
{ok, Msg};
|
case get_stop_publish_clear_msg() of
|
||||||
|
true ->
|
||||||
|
{ok, emqx_message:set_header(allow_publish, false, Msg)};
|
||||||
|
_ ->
|
||||||
|
{ok, Msg}
|
||||||
|
end;
|
||||||
|
|
||||||
on_message_publish(Msg = #message{flags = #{retain := true}}, Context) ->
|
on_message_publish(Msg = #message{flags = #{retain := true}}, Context) ->
|
||||||
Msg1 = emqx_message:set_header(retained, true, Msg),
|
Msg1 = emqx_message:set_header(retained, true, Msg),
|
||||||
|
@ -157,6 +162,9 @@ get_expiry_time(#message{timestamp = Ts}) ->
|
||||||
_ -> Ts + Interval
|
_ -> Ts + Interval
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
get_stop_publish_clear_msg() ->
|
||||||
|
emqx_conf:get([?APP, stop_publish_clear_msg], false).
|
||||||
|
|
||||||
-spec update_config(hocon:config()) -> ok.
|
-spec update_config(hocon:config()) -> ok.
|
||||||
update_config(Conf) ->
|
update_config(Conf) ->
|
||||||
gen_server:call(?MODULE, {?FUNCTION_NAME, Conf}).
|
gen_server:call(?MODULE, {?FUNCTION_NAME, Conf}).
|
||||||
|
|
|
@ -14,6 +14,7 @@ fields("emqx_retainer") ->
|
||||||
, {msg_clear_interval, sc(emqx_schema:duration_ms(), "0s")}
|
, {msg_clear_interval, sc(emqx_schema:duration_ms(), "0s")}
|
||||||
, {flow_control, ?TYPE(hoconsc:ref(?MODULE, flow_control))}
|
, {flow_control, ?TYPE(hoconsc:ref(?MODULE, flow_control))}
|
||||||
, {max_payload_size, sc(emqx_schema:bytesize(), "1MB")}
|
, {max_payload_size, sc(emqx_schema:bytesize(), "1MB")}
|
||||||
|
, {stop_publish_clear_msg, sc(boolean(), false)}
|
||||||
, {config, config()}
|
, {config, config()}
|
||||||
];
|
];
|
||||||
|
|
||||||
|
|
|
@ -195,6 +195,21 @@ t_clean(_) ->
|
||||||
|
|
||||||
ok = emqtt:disconnect(C1).
|
ok = emqtt:disconnect(C1).
|
||||||
|
|
||||||
|
t_stop_publish_clear_msg(_) ->
|
||||||
|
emqx_retainer:update_config(#{<<"stop_publish_clear_msg">> => true}),
|
||||||
|
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
|
||||||
|
{ok, _} = emqtt:connect(C1),
|
||||||
|
emqtt:publish(C1, <<"retained/0">>, <<"this is a retained message 0">>, [{qos, 0}, {retain, true}]),
|
||||||
|
|
||||||
|
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/#">>, [{qos, 0}, {rh, 0}]),
|
||||||
|
?assertEqual(1, length(receive_messages(1))),
|
||||||
|
|
||||||
|
emqtt:publish(C1, <<"retained/0">>, <<"">>, [{qos, 0}, {retain, true}]),
|
||||||
|
?assertEqual(0, length(receive_messages(1))),
|
||||||
|
|
||||||
|
emqx_retainer:update_config(#{<<"stop_publish_clear_msg">> => false}),
|
||||||
|
ok = emqtt:disconnect(C1).
|
||||||
|
|
||||||
t_flow_control(_) ->
|
t_flow_control(_) ->
|
||||||
emqx_retainer:update_config(#{<<"flow_control">> => #{<<"max_read_number">> => 1,
|
emqx_retainer:update_config(#{<<"flow_control">> => #{<<"max_read_number">> => 1,
|
||||||
<<"msg_deliver_quota">> => 1,
|
<<"msg_deliver_quota">> => 1,
|
||||||
|
|
Loading…
Reference in New Issue