fix(config): configure a plain map for mqueue_priorities
This commit is contained in:
parent
88638da287
commit
499ab5d9c4
|
@ -1039,26 +1039,28 @@ zones.default {
|
||||||
##
|
##
|
||||||
## There's no priority table by default, hence all messages
|
## There's no priority table by default, hence all messages
|
||||||
## are treated equal.
|
## are treated equal.
|
||||||
## The top topicname in the table has the highest priority, and then
|
##
|
||||||
## the next one has the second highest priority, etc.
|
## Priority number [1-255]
|
||||||
## Messages for topics not in the priority table are treated as
|
##
|
||||||
|
## NOTE: comma and equal signs are not allowed for priority topic names
|
||||||
|
## NOTE: Messages for topics not in the priority table are treated as
|
||||||
## either highest or lowest priority depending on the configured
|
## either highest or lowest priority depending on the configured
|
||||||
## value for mqtt.mqueue_default_priority
|
## value for mqtt.mqueue_default_priority
|
||||||
##
|
##
|
||||||
## @doc zones.<name>.mqtt.mqueue_priorities
|
## @doc zones.<name>.mqtt.mqueue_priorities
|
||||||
## ValueType: Array<TopicName>
|
## ValueType: Map | disabled
|
||||||
## Examples:
|
## Examples:
|
||||||
## To configure "t/1" > "t/2" > "t/3":
|
## To configure "topic/1" > "topic/2":
|
||||||
## mqueue_priorities: [t/1,t/2,t/3]
|
## mqueue_priorities: {"topic/1": 10, "topic/2": 8}
|
||||||
## Default: []
|
## Default: disabled
|
||||||
mqueue_priorities: []
|
mqueue_priorities: disabled
|
||||||
|
|
||||||
## Default to highest priority for topics not matching priority table
|
## Default to highest priority for topics not matching priority table
|
||||||
##
|
##
|
||||||
## @doc zones.<name>.mqtt.mqueue_default_priority
|
## @doc zones.<name>.mqtt.mqueue_default_priority
|
||||||
## ValueType: highest | lowest
|
## ValueType: highest | lowest
|
||||||
## Default: highest
|
## Default: lowest
|
||||||
mqueue_default_priority: highest
|
mqueue_default_priority: lowest
|
||||||
|
|
||||||
## Whether to enqueue QoS0 messages.
|
## Whether to enqueue QoS0 messages.
|
||||||
##
|
##
|
||||||
|
|
|
@ -35,12 +35,6 @@
|
||||||
|
|
||||||
-define(ERTS_MINIMUM_REQUIRED, "10.0").
|
-define(ERTS_MINIMUM_REQUIRED, "10.0").
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Configs
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
-define(NO_PRIORITY_TABLE, none).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Topics' prefix: $SYS | $queue | $share
|
%% Topics' prefix: $SYS | $queue | $share
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
|
@ -646,7 +646,7 @@ serialize_properties(Props) when is_map(Props) ->
|
||||||
Bin = << <<(serialize_property(Prop, Val))/binary>> || {Prop, Val} <- maps:to_list(Props) >>,
|
Bin = << <<(serialize_property(Prop, Val))/binary>> || {Prop, Val} <- maps:to_list(Props) >>,
|
||||||
[serialize_variable_byte_integer(byte_size(Bin)), Bin].
|
[serialize_variable_byte_integer(byte_size(Bin)), Bin].
|
||||||
|
|
||||||
serialize_property(_, undefined) ->
|
serialize_property(_, Disabled) when Disabled =:= disabled; Disabled =:= undefined ->
|
||||||
<<>>;
|
<<>>;
|
||||||
serialize_property('Payload-Format-Indicator', Val) ->
|
serialize_property('Payload-Format-Indicator', Val) ->
|
||||||
<<16#01, Val>>;
|
<<16#01, Val>>;
|
||||||
|
|
|
@ -67,6 +67,8 @@
|
||||||
, dropped/1
|
, dropped/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
-define(NO_PRIORITY_TABLE, disabled).
|
||||||
|
|
||||||
-export_type([mqueue/0, options/0]).
|
-export_type([mqueue/0, options/0]).
|
||||||
|
|
||||||
-type(topic() :: emqx_topic:topic()).
|
-type(topic() :: emqx_topic:topic()).
|
||||||
|
|
|
@ -277,7 +277,7 @@ fields("mqtt") ->
|
||||||
, {"await_rel_timeout", t(duration_s(), undefined, "300s")}
|
, {"await_rel_timeout", t(duration_s(), undefined, "300s")}
|
||||||
, {"session_expiry_interval", t(duration_s(), undefined, "2h")}
|
, {"session_expiry_interval", t(duration_s(), undefined, "2h")}
|
||||||
, {"max_mqueue_len", maybe_infinity(integer(), 1000)}
|
, {"max_mqueue_len", maybe_infinity(integer(), 1000)}
|
||||||
, {"mqueue_priorities", t(comma_separated_list(), undefined, "none")}
|
, {"mqueue_priorities", maybe_disabled(map())}
|
||||||
, {"mqueue_default_priority", t(union(highest, lowest), undefined, lowest)}
|
, {"mqueue_default_priority", t(union(highest, lowest), undefined, lowest)}
|
||||||
, {"mqueue_store_qos0", t(boolean(), undefined, true)}
|
, {"mqueue_store_qos0", t(boolean(), undefined, true)}
|
||||||
, {"use_username_as_clientid", t(boolean(), undefined, false)}
|
, {"use_username_as_clientid", t(boolean(), undefined, false)}
|
||||||
|
|
|
@ -25,18 +25,10 @@ all() -> emqx_ct:all(?MODULE).
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
emqx_ct_helpers:boot_modules(all),
|
emqx_ct_helpers:boot_modules(all),
|
||||||
emqx_ct_helpers:start_apps([], fun set_special_configs/1),
|
emqx_ct_helpers:start_apps([]),
|
||||||
|
emqx_config:put_listener_conf(default, mqtt_tcp, [flapping_detect, enable], true),
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
set_special_configs(emqx) ->
|
|
||||||
emqx_zone:set_env(external, enable_flapping_detect, true),
|
|
||||||
application:set_env(emqx, flapping_detect_policy,
|
|
||||||
#{threshold => 3,
|
|
||||||
duration => 100,
|
|
||||||
banned_interval => 2
|
|
||||||
});
|
|
||||||
set_special_configs(_App) -> ok.
|
|
||||||
|
|
||||||
end_per_suite(_Config) ->
|
end_per_suite(_Config) ->
|
||||||
emqx_ct_helpers:stop_apps([]),
|
emqx_ct_helpers:stop_apps([]),
|
||||||
ekka_mnesia:delete_schema(), %% Clean emqx_banned table
|
ekka_mnesia:delete_schema(), %% Clean emqx_banned table
|
||||||
|
|
|
@ -156,6 +156,15 @@ t_async_set_keepalive('end', _Config) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
t_async_set_keepalive(_) ->
|
t_async_set_keepalive(_) ->
|
||||||
|
case os:type() of
|
||||||
|
{unix, darwin} ->
|
||||||
|
%% Mac OSX don't support the feature
|
||||||
|
ok;
|
||||||
|
_ ->
|
||||||
|
do_async_set_keepalive()
|
||||||
|
end.
|
||||||
|
|
||||||
|
do_async_set_keepalive() ->
|
||||||
ClientID = <<"client-tcp-keepalive">>,
|
ClientID = <<"client-tcp-keepalive">>,
|
||||||
{ok, Client} = emqtt:start_link([{host, "localhost"},
|
{ok, Client} = emqtt:start_link([{host, "localhost"},
|
||||||
{proto_ver,v5},
|
{proto_ver,v5},
|
||||||
|
|
|
@ -265,7 +265,7 @@ t_kick_1(_Config) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
% mqtt connection kicked by coap with same client id
|
% mqtt connection kicked by coap with same client id
|
||||||
t_acl(Config) ->
|
t_acl(_Config) ->
|
||||||
OldPath = emqx:get_env(plugins_etc_dir),
|
OldPath = emqx:get_env(plugins_etc_dir),
|
||||||
application:set_env(emqx, plugins_etc_dir,
|
application:set_env(emqx, plugins_etc_dir,
|
||||||
emqx_ct_helpers:deps_path(emqx_authz, "test")),
|
emqx_ct_helpers:deps_path(emqx_authz, "test")),
|
||||||
|
|
|
@ -170,7 +170,6 @@ t_subscribe_case02(_) ->
|
||||||
ReturnCode = 0,
|
ReturnCode = 0,
|
||||||
{ok, Socket} = gen_udp:open(0, [binary]),
|
{ok, Socket} = gen_udp:open(0, [binary]),
|
||||||
|
|
||||||
ClientId = ?CLIENTID,
|
|
||||||
send_connect_msg(Socket, ?CLIENTID),
|
send_connect_msg(Socket, ?CLIENTID),
|
||||||
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
|
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue