Merge pull request #6343 from HJianBo/drop-empty-retain-msg
[4.3][Retainer] Allow to stop publish clear message
This commit is contained in:
commit
fdacb9040d
|
@ -39,3 +39,11 @@ retainer.max_payload_size = 1MB
|
||||||
##
|
##
|
||||||
## Defaut: 0
|
## Defaut: 0
|
||||||
retainer.expiry_interval = 0
|
retainer.expiry_interval = 0
|
||||||
|
|
||||||
|
## When the retained flag of the PUBLISH message is set and Payload is empty,
|
||||||
|
## whether to continue to publish the message.
|
||||||
|
## see: http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718038
|
||||||
|
##
|
||||||
|
## Value: Boolean
|
||||||
|
## Default: false
|
||||||
|
#retainer.stop_publish_clear_msg = false
|
||||||
|
|
|
@ -28,3 +28,10 @@
|
||||||
{default, 0},
|
{default, 0},
|
||||||
{datatype, [integer, {duration, ms}]}
|
{datatype, [integer, {duration, ms}]}
|
||||||
]}.
|
]}.
|
||||||
|
|
||||||
|
%% Stop publish clear message
|
||||||
|
%% {$configurable}
|
||||||
|
{mapping, "retainer.stop_publish_clear_msg", "emqx_retainer.stop_publish_clear_msg", [
|
||||||
|
{default, false},
|
||||||
|
{datatype, {enum, [true, false]}}
|
||||||
|
]}.
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
{application, emqx_retainer,
|
{application, emqx_retainer,
|
||||||
[{description, "EMQ X Retainer"},
|
[{description, "EMQ X Retainer"},
|
||||||
{vsn, "4.3.1"}, % strict semver, bump manually!
|
{vsn, "4.3.2"}, % strict semver, bump manually!
|
||||||
{modules, []},
|
{modules, []},
|
||||||
{registered, [emqx_retainer_sup]},
|
{registered, [emqx_retainer_sup]},
|
||||||
{applications, [kernel,stdlib]},
|
{applications, [kernel,stdlib]},
|
||||||
|
|
|
@ -1,13 +1,13 @@
|
||||||
%% -*-: erlang -*-
|
%% -*-: erlang -*-
|
||||||
{VSN,
|
{VSN,
|
||||||
[
|
[
|
||||||
{"4.3.0", [
|
{<<"4\\.3\\.[0-1]+">>, [
|
||||||
{load_module, emqx_retainer, brutal_purge, soft_purge, []}
|
{load_module, emqx_retainer, brutal_purge, soft_purge, []}
|
||||||
]},
|
]},
|
||||||
{<<".*">>, []}
|
{<<".*">>, []}
|
||||||
],
|
],
|
||||||
[
|
[
|
||||||
{"4.3.0", [
|
{<<"4\\.3\\.[0-1]+">>, [
|
||||||
{load_module, emqx_retainer, brutal_purge, soft_purge, []}
|
{load_module, emqx_retainer, brutal_purge, soft_purge, []}
|
||||||
]},
|
]},
|
||||||
{<<".*">>, []}
|
{<<".*">>, []}
|
||||||
|
|
|
@ -83,9 +83,14 @@ dispatch(Pid, Topic) ->
|
||||||
%% RETAIN flag set to 1 and payload containing zero bytes
|
%% RETAIN flag set to 1 and payload containing zero bytes
|
||||||
on_message_publish(Msg = #message{flags = #{retain := true},
|
on_message_publish(Msg = #message{flags = #{retain := true},
|
||||||
topic = Topic,
|
topic = Topic,
|
||||||
payload = <<>>}, _Env) ->
|
payload = <<>>}, Env) ->
|
||||||
mnesia:dirty_delete(?TAB, topic2tokens(Topic)),
|
mnesia:dirty_delete(?TAB, topic2tokens(Topic)),
|
||||||
{ok, Msg};
|
case stop_publish_clear_msg(Env) of
|
||||||
|
true ->
|
||||||
|
{ok, emqx_message:set_header(allow_publish, false, Msg)};
|
||||||
|
_ ->
|
||||||
|
{ok, Msg}
|
||||||
|
end;
|
||||||
|
|
||||||
on_message_publish(Msg = #message{flags = #{retain := true}}, Env) ->
|
on_message_publish(Msg = #message{flags = #{retain := true}}, Env) ->
|
||||||
Msg1 = emqx_message:set_header(retained, true, Msg),
|
Msg1 = emqx_message:set_header(retained, true, Msg),
|
||||||
|
@ -224,6 +229,9 @@ store_retained(Msg = #message{topic = Topic, payload = Payload}, Env) ->
|
||||||
"for payload is too big!", [Topic, iolist_size(Payload)])
|
"for payload is too big!", [Topic, iolist_size(Payload)])
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
stop_publish_clear_msg(Env) ->
|
||||||
|
proplists:get_bool(stop_publish_clear_msg, Env).
|
||||||
|
|
||||||
is_table_full(Env) ->
|
is_table_full(Env) ->
|
||||||
Limit = proplists:get_value(max_retained_messages, Env, 0),
|
Limit = proplists:get_value(max_retained_messages, Env, 0),
|
||||||
Limit > 0 andalso (retained_count() > Limit).
|
Limit > 0 andalso (retained_count() > Limit).
|
||||||
|
|
|
@ -42,6 +42,8 @@ init_per_testcase(TestCase, Config) ->
|
||||||
case TestCase of
|
case TestCase of
|
||||||
t_message_expiry_2 ->
|
t_message_expiry_2 ->
|
||||||
application:set_env(emqx_retainer, expiry_interval, 2000);
|
application:set_env(emqx_retainer, expiry_interval, 2000);
|
||||||
|
t_stop_publish_clear_msg ->
|
||||||
|
application:set_env(emqx_retainer, stop_publish_clear_msg, true);
|
||||||
_ ->
|
_ ->
|
||||||
application:set_env(emqx_retainer, expiry_interval, 0)
|
application:set_env(emqx_retainer, expiry_interval, 0)
|
||||||
end,
|
end,
|
||||||
|
@ -173,6 +175,19 @@ t_clean(_) ->
|
||||||
|
|
||||||
ok = emqtt:disconnect(C1).
|
ok = emqtt:disconnect(C1).
|
||||||
|
|
||||||
|
t_stop_publish_clear_msg(_) ->
|
||||||
|
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
|
||||||
|
{ok, _} = emqtt:connect(C1),
|
||||||
|
emqtt:publish(C1, <<"retained/0">>, <<"this is a retained message 0">>, [{qos, 0}, {retain, true}]),
|
||||||
|
|
||||||
|
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/#">>, [{qos, 0}, {rh, 0}]),
|
||||||
|
?assertEqual(1, length(receive_messages(1))),
|
||||||
|
|
||||||
|
emqtt:publish(C1, <<"retained/0">>, <<"">>, [{qos, 0}, {retain, true}]),
|
||||||
|
?assertEqual(0, length(receive_messages(1))),
|
||||||
|
|
||||||
|
ok = emqtt:disconnect(C1).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Helper functions
|
%% Helper functions
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
Loading…
Reference in New Issue