Merge pull request #11034 from zhongwencool/mqtt-conf

feat:  move shared_subscription_strategy from broker to mqtt
This commit is contained in:
zhongwencool 2023-06-14 10:00:38 +08:00 committed by GitHub
commit 7f0f40cb58
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 305 additions and 279 deletions

View File

@ -19,5 +19,6 @@
-define(TOMBSTONE_TYPE, marked_for_deletion). -define(TOMBSTONE_TYPE, marked_for_deletion).
-define(TOMBSTONE_VALUE, <<"marked_for_deletion">>). -define(TOMBSTONE_VALUE, <<"marked_for_deletion">>).
-define(TOMBSTONE_CONFIG_CHANGE_REQ, mark_it_for_deletion). -define(TOMBSTONE_CONFIG_CHANGE_REQ, mark_it_for_deletion).
-define(CONFIG_NOT_FOUND_MAGIC, '$0tFound').
-endif. -endif.

View File

@ -19,6 +19,7 @@
-elvis([{elvis_style, god_modules, disable}]). -elvis([{elvis_style, god_modules, disable}]).
-include("logger.hrl"). -include("logger.hrl").
-include("emqx.hrl"). -include("emqx.hrl").
-include("emqx_schema.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl").
-export([ -export([
@ -108,7 +109,6 @@
-define(ZONE_CONF_PATH(ZONE, PATH), [zones, ZONE | PATH]). -define(ZONE_CONF_PATH(ZONE, PATH), [zones, ZONE | PATH]).
-define(LISTENER_CONF_PATH(TYPE, LISTENER, PATH), [listeners, TYPE, LISTENER | PATH]). -define(LISTENER_CONF_PATH(TYPE, LISTENER, PATH), [listeners, TYPE, LISTENER | PATH]).
-define(CONFIG_NOT_FOUND_MAGIC, '$0tFound').
-define(MAX_KEEP_BACKUP_CONFIGS, 10). -define(MAX_KEEP_BACKUP_CONFIGS, 10).
-export_type([ -export_type([

View File

@ -226,7 +226,10 @@ roots(medium) ->
{"broker", {"broker",
sc( sc(
ref("broker"), ref("broker"),
#{desc => ?DESC(broker)} #{
desc => ?DESC(broker),
importance => ?IMPORTANCE_HIDDEN
}
)}, )},
{"sys_topics", {"sys_topics",
sc( sc(
@ -439,251 +442,7 @@ fields("authz_cache") ->
)} )}
]; ];
fields("mqtt") -> fields("mqtt") ->
[ mqtt_general() ++ mqtt_session();
{"idle_timeout",
sc(
hoconsc:union([infinity, duration()]),
#{
default => <<"15s">>,
desc => ?DESC(mqtt_idle_timeout)
}
)},
{"max_packet_size",
sc(
bytesize(),
#{
default => <<"1MB">>,
desc => ?DESC(mqtt_max_packet_size)
}
)},
{"max_clientid_len",
sc(
range(23, 65535),
#{
default => 65535,
desc => ?DESC(mqtt_max_clientid_len)
}
)},
{"max_topic_levels",
sc(
range(1, 65535),
#{
default => 128,
desc => ?DESC(mqtt_max_topic_levels)
}
)},
{"max_qos_allowed",
sc(
qos(),
#{
default => 2,
desc => ?DESC(mqtt_max_qos_allowed)
}
)},
{"max_topic_alias",
sc(
range(0, 65535),
#{
default => 65535,
desc => ?DESC(mqtt_max_topic_alias)
}
)},
{"retain_available",
sc(
boolean(),
#{
default => true,
desc => ?DESC(mqtt_retain_available)
}
)},
{"wildcard_subscription",
sc(
boolean(),
#{
default => true,
desc => ?DESC(mqtt_wildcard_subscription)
}
)},
{"shared_subscription",
sc(
boolean(),
#{
default => true,
desc => ?DESC(mqtt_shared_subscription)
}
)},
{"exclusive_subscription",
sc(
boolean(),
#{
default => false,
desc => ?DESC(mqtt_exclusive_subscription)
}
)},
{"ignore_loop_deliver",
sc(
boolean(),
#{
default => false,
desc => ?DESC(mqtt_ignore_loop_deliver)
}
)},
{"strict_mode",
sc(
boolean(),
#{
default => false,
desc => ?DESC(mqtt_strict_mode)
}
)},
{"response_information",
sc(
string(),
#{
default => <<"">>,
desc => ?DESC(mqtt_response_information)
}
)},
{"server_keepalive",
sc(
hoconsc:union([integer(), disabled]),
#{
default => disabled,
desc => ?DESC(mqtt_server_keepalive)
}
)},
{"keepalive_backoff",
sc(
number(),
#{
default => ?DEFAULT_BACKOFF,
%% Must add required => false, zone schema has no default.
required => false,
importance => ?IMPORTANCE_HIDDEN
}
)},
{"keepalive_multiplier",
sc(
number(),
#{
default => ?DEFAULT_MULTIPLIER,
validator => fun ?MODULE:validate_keepalive_multiplier/1,
desc => ?DESC(mqtt_keepalive_multiplier)
}
)},
{"max_subscriptions",
sc(
hoconsc:union([range(1, inf), infinity]),
#{
default => infinity,
desc => ?DESC(mqtt_max_subscriptions)
}
)},
{"upgrade_qos",
sc(
boolean(),
#{
default => false,
desc => ?DESC(mqtt_upgrade_qos)
}
)},
{"max_inflight",
sc(
range(1, 65535),
#{
default => 32,
desc => ?DESC(mqtt_max_inflight)
}
)},
{"retry_interval",
sc(
duration(),
#{
default => <<"30s">>,
desc => ?DESC(mqtt_retry_interval)
}
)},
{"max_awaiting_rel",
sc(
hoconsc:union([integer(), infinity]),
#{
default => 100,
desc => ?DESC(mqtt_max_awaiting_rel)
}
)},
{"await_rel_timeout",
sc(
duration(),
#{
default => <<"300s">>,
desc => ?DESC(mqtt_await_rel_timeout)
}
)},
{"session_expiry_interval",
sc(
duration(),
#{
default => <<"2h">>,
desc => ?DESC(mqtt_session_expiry_interval)
}
)},
{"max_mqueue_len",
sc(
hoconsc:union([non_neg_integer(), infinity]),
#{
default => 1000,
desc => ?DESC(mqtt_max_mqueue_len)
}
)},
{"mqueue_priorities",
sc(
hoconsc:union([disabled, map()]),
#{
default => disabled,
desc => ?DESC(mqtt_mqueue_priorities)
}
)},
{"mqueue_default_priority",
sc(
hoconsc:enum([highest, lowest]),
#{
default => lowest,
desc => ?DESC(mqtt_mqueue_default_priority)
}
)},
{"mqueue_store_qos0",
sc(
boolean(),
#{
default => true,
desc => ?DESC(mqtt_mqueue_store_qos0)
}
)},
{"use_username_as_clientid",
sc(
boolean(),
#{
default => false,
desc => ?DESC(mqtt_use_username_as_clientid)
}
)},
{"peer_cert_as_username",
sc(
hoconsc:enum([disabled, cn, dn, crt, pem, md5]),
#{
default => disabled,
desc => ?DESC(mqtt_peer_cert_as_username)
}
)},
{"peer_cert_as_clientid",
sc(
hoconsc:enum([disabled, cn, dn, crt, pem, md5]),
#{
default => disabled,
desc => ?DESC(mqtt_peer_cert_as_clientid)
}
)}
];
fields("zone") -> fields("zone") ->
emqx_zone_schema:zones_without_default(); emqx_zone_schema:zones_without_default();
fields("flapping_detect") -> fields("flapping_detect") ->
@ -1563,22 +1322,7 @@ fields("broker") ->
desc => ?DESC(broker_session_locking_strategy) desc => ?DESC(broker_session_locking_strategy)
} }
)}, )},
{"shared_subscription_strategy", shared_subscription_strategy(),
sc(
hoconsc:enum([
random,
round_robin,
round_robin_per_group,
sticky,
local,
hash_topic,
hash_clientid
]),
#{
default => round_robin,
desc => ?DESC(broker_shared_subscription_strategy)
}
)},
{"shared_dispatch_ack_enabled", {"shared_dispatch_ack_enabled",
sc( sc(
boolean(), boolean(),
@ -3564,3 +3308,283 @@ flapping_detect_converter(Conf = #{<<"window_time">> := <<"disable">>}, _Opts) -
Conf#{<<"window_time">> => ?DEFAULT_WINDOW_TIME, <<"enable">> => false}; Conf#{<<"window_time">> => ?DEFAULT_WINDOW_TIME, <<"enable">> => false};
flapping_detect_converter(Conf, _Opts) -> flapping_detect_converter(Conf, _Opts) ->
Conf. Conf.
mqtt_general() ->
[
{"idle_timeout",
sc(
hoconsc:union([infinity, duration()]),
#{
default => <<"15s">>,
desc => ?DESC(mqtt_idle_timeout)
}
)},
{"max_packet_size",
sc(
bytesize(),
#{
default => <<"1MB">>,
desc => ?DESC(mqtt_max_packet_size)
}
)},
{"max_clientid_len",
sc(
range(23, 65535),
#{
default => 65535,
desc => ?DESC(mqtt_max_clientid_len)
}
)},
{"max_topic_levels",
sc(
range(1, 65535),
#{
default => 128,
desc => ?DESC(mqtt_max_topic_levels)
}
)},
{"max_topic_alias",
sc(
range(0, 65535),
#{
default => 65535,
desc => ?DESC(mqtt_max_topic_alias)
}
)},
{"retain_available",
sc(
boolean(),
#{
default => true,
desc => ?DESC(mqtt_retain_available)
}
)},
{"wildcard_subscription",
sc(
boolean(),
#{
default => true,
desc => ?DESC(mqtt_wildcard_subscription)
}
)},
{"shared_subscription",
sc(
boolean(),
#{
default => true,
desc => ?DESC(mqtt_shared_subscription)
}
)},
shared_subscription_strategy(),
{"exclusive_subscription",
sc(
boolean(),
#{
default => false,
desc => ?DESC(mqtt_exclusive_subscription)
}
)},
{"ignore_loop_deliver",
sc(
boolean(),
#{
default => false,
desc => ?DESC(mqtt_ignore_loop_deliver)
}
)},
{"strict_mode",
sc(
boolean(),
#{
default => false,
desc => ?DESC(mqtt_strict_mode)
}
)},
{"response_information",
sc(
string(),
#{
default => <<"">>,
desc => ?DESC(mqtt_response_information)
}
)},
{"server_keepalive",
sc(
hoconsc:union([integer(), disabled]),
#{
default => disabled,
desc => ?DESC(mqtt_server_keepalive)
}
)},
{"keepalive_backoff",
sc(
number(),
#{
default => ?DEFAULT_BACKOFF,
%% Must add required => false, zone schema has no default.
required => false,
importance => ?IMPORTANCE_HIDDEN
}
)},
{"keepalive_multiplier",
sc(
number(),
#{
default => ?DEFAULT_MULTIPLIER,
validator => fun ?MODULE:validate_keepalive_multiplier/1,
desc => ?DESC(mqtt_keepalive_multiplier)
}
)},
{"retry_interval",
sc(
duration(),
#{
default => <<"30s">>,
desc => ?DESC(mqtt_retry_interval)
}
)},
{"use_username_as_clientid",
sc(
boolean(),
#{
default => false,
desc => ?DESC(mqtt_use_username_as_clientid)
}
)},
{"peer_cert_as_username",
sc(
hoconsc:enum([disabled, cn, dn, crt, pem, md5]),
#{
default => disabled,
desc => ?DESC(mqtt_peer_cert_as_username)
}
)},
{"peer_cert_as_clientid",
sc(
hoconsc:enum([disabled, cn, dn, crt, pem, md5]),
#{
default => disabled,
desc => ?DESC(mqtt_peer_cert_as_clientid)
}
)}
].
%% All session's importance should be lower than general part to organize document.
mqtt_session() ->
[
{"session_expiry_interval",
sc(
duration(),
#{
default => <<"2h">>,
desc => ?DESC(mqtt_session_expiry_interval),
importance => ?IMPORTANCE_LOW
}
)},
{"max_awaiting_rel",
sc(
hoconsc:union([integer(), infinity]),
#{
default => 100,
desc => ?DESC(mqtt_max_awaiting_rel),
importance => ?IMPORTANCE_LOW
}
)},
{"max_qos_allowed",
sc(
qos(),
#{
default => 2,
desc => ?DESC(mqtt_max_qos_allowed),
importance => ?IMPORTANCE_LOW
}
)},
{"mqueue_priorities",
sc(
hoconsc:union([disabled, map()]),
#{
default => disabled,
desc => ?DESC(mqtt_mqueue_priorities),
importance => ?IMPORTANCE_LOW
}
)},
{"mqueue_default_priority",
sc(
hoconsc:enum([highest, lowest]),
#{
default => lowest,
desc => ?DESC(mqtt_mqueue_default_priority),
importance => ?IMPORTANCE_LOW
}
)},
{"mqueue_store_qos0",
sc(
boolean(),
#{
default => true,
desc => ?DESC(mqtt_mqueue_store_qos0),
importance => ?IMPORTANCE_LOW
}
)},
{"max_mqueue_len",
sc(
hoconsc:union([non_neg_integer(), infinity]),
#{
default => 1000,
desc => ?DESC(mqtt_max_mqueue_len),
importance => ?IMPORTANCE_LOW
}
)},
{"max_inflight",
sc(
range(1, 65535),
#{
default => 32,
desc => ?DESC(mqtt_max_inflight),
importance => ?IMPORTANCE_LOW
}
)},
{"max_subscriptions",
sc(
hoconsc:union([range(1, inf), infinity]),
#{
default => infinity,
desc => ?DESC(mqtt_max_subscriptions),
importance => ?IMPORTANCE_LOW
}
)},
{"upgrade_qos",
sc(
boolean(),
#{
default => false,
desc => ?DESC(mqtt_upgrade_qos),
importance => ?IMPORTANCE_LOW
}
)},
{"await_rel_timeout",
sc(
duration(),
#{
default => <<"300s">>,
desc => ?DESC(mqtt_await_rel_timeout),
importance => ?IMPORTANCE_LOW
}
)}
].
shared_subscription_strategy() ->
{"shared_subscription_strategy",
sc(
hoconsc:enum([
random,
round_robin,
round_robin_per_group,
sticky,
local,
hash_topic,
hash_clientid
]),
#{
default => round_robin,
desc => ?DESC(broker_shared_subscription_strategy)
}
)}.

View File

@ -18,6 +18,7 @@
-behaviour(gen_server). -behaviour(gen_server).
-include("emqx_schema.hrl").
-include("emqx.hrl"). -include("emqx.hrl").
-include("emqx_mqtt.hrl"). -include("emqx_mqtt.hrl").
-include("logger.hrl"). -include("logger.hrl").
@ -158,16 +159,14 @@ dispatch(Group, Topic, Delivery = #delivery{message = Msg}, FailedSubs) ->
-spec strategy(emqx_types:group()) -> strategy(). -spec strategy(emqx_types:group()) -> strategy().
strategy(Group) -> strategy(Group) ->
try try binary_to_existing_atom(Group) of
emqx:get_config([ GroupAtom ->
broker, Key = [broker, shared_subscription_group, GroupAtom, strategy],
shared_subscription_group, case emqx:get_config(Key, ?CONFIG_NOT_FOUND_MAGIC) of
binary_to_existing_atom(Group), ?CONFIG_NOT_FOUND_MAGIC -> get_default_shared_subscription_strategy();
strategy Strategy -> Strategy
]) end
catch catch
error:{config_not_found, _} ->
get_default_shared_subscription_strategy();
error:badarg -> error:badarg ->
get_default_shared_subscription_strategy() get_default_shared_subscription_strategy()
end. end.
@ -190,7 +189,7 @@ do_dispatch(SubPid, _Group, Topic, Msg, retry) ->
do_dispatch(SubPid, Group, Topic, Msg, fresh) -> do_dispatch(SubPid, Group, Topic, Msg, fresh) ->
case ack_enabled() of case ack_enabled() of
true -> true ->
%% TODO: delete this clase after 5.1.0 %% TODO: delete this case after 5.1.0
do_dispatch_with_ack(SubPid, Group, Topic, Msg); do_dispatch_with_ack(SubPid, Group, Topic, Msg);
false -> false ->
send(SubPid, Topic, {deliver, Topic, Msg}) send(SubPid, Topic, {deliver, Topic, Msg})
@ -240,7 +239,7 @@ with_redispatch_to(#message{qos = ?QOS_0} = Msg, _Group, _Topic) ->
with_redispatch_to(Msg, Group, Topic) -> with_redispatch_to(Msg, Group, Topic) ->
emqx_message:set_headers(#{redispatch_to => ?REDISPATCH_TO(Group, Topic)}, Msg). emqx_message:set_headers(#{redispatch_to => ?REDISPATCH_TO(Group, Topic)}, Msg).
%% @hidden Redispatch is neede only for the messages with redispatch_to header added. %% @hidden Redispatch is needed only for the messages with redispatch_to header added.
is_redispatch_needed(#message{} = Msg) -> is_redispatch_needed(#message{} = Msg) ->
case get_redispatch_to(Msg) of case get_redispatch_to(Msg) of
?REDISPATCH_TO(_, _) -> ?REDISPATCH_TO(_, _) ->
@ -555,4 +554,4 @@ delete_route_if_needed({Group, Topic} = GroupTopic) ->
end). end).
get_default_shared_subscription_strategy() -> get_default_shared_subscription_strategy() ->
emqx:get_config([broker, shared_subscription_strategy]). emqx:get_config([mqtt, shared_subscription_strategy]).

View File

@ -440,6 +440,7 @@ zone_global_defaults() ->
server_keepalive => disabled, server_keepalive => disabled,
session_expiry_interval => 7200000, session_expiry_interval => 7200000,
shared_subscription => true, shared_subscription => true,
shared_subscription_strategy => round_robin,
strict_mode => false, strict_mode => false,
upgrade_qos => false, upgrade_qos => false,
use_username_as_clientid => false, use_username_as_clientid => false,

View File

@ -769,12 +769,12 @@ t_qos1_random_dispatch_if_all_members_are_down(Config) when is_list(Config) ->
%% Expected behaviour: %% Expected behaviour:
%% the messages sent to client1's inflight and mq are re-dispatched after client1 is down %% the messages sent to client1's inflight and mq are re-dispatched after client1 is down
t_dispatch_qos2({init, Config}) when is_list(Config) -> t_dispatch_qos2({init, Config}) when is_list(Config) ->
ok = ensure_config(round_robin, _AckEnabled = false),
emqx_config:put_zone_conf(default, [mqtt, max_inflight], 1), emqx_config:put_zone_conf(default, [mqtt, max_inflight], 1),
Config; Config;
t_dispatch_qos2({'end', Config}) when is_list(Config) -> t_dispatch_qos2({'end', Config}) when is_list(Config) ->
emqx_config:put_zone_conf(default, [mqtt, max_inflight], 0); emqx_config:put_zone_conf(default, [mqtt, max_inflight], 0);
t_dispatch_qos2(Config) when is_list(Config) -> t_dispatch_qos2(Config) when is_list(Config) ->
ok = ensure_config(round_robin, _AckEnabled = false),
Topic = <<"foo/bar/1">>, Topic = <<"foo/bar/1">>,
ClientId1 = <<"ClientId1">>, ClientId1 = <<"ClientId1">>,
ClientId2 = <<"ClientId2">>, ClientId2 = <<"ClientId2">>,
@ -923,12 +923,12 @@ t_session_takeover(Config) when is_list(Config) ->
ok. ok.
t_session_kicked({init, Config}) when is_list(Config) -> t_session_kicked({init, Config}) when is_list(Config) ->
ok = ensure_config(round_robin, _AckEnabled = false),
emqx_config:put_zone_conf(default, [mqtt, max_inflight], 1), emqx_config:put_zone_conf(default, [mqtt, max_inflight], 1),
Config; Config;
t_session_kicked({'end', Config}) when is_list(Config) -> t_session_kicked({'end', Config}) when is_list(Config) ->
emqx_config:put_zone_conf(default, [mqtt, max_inflight], 0); emqx_config:put_zone_conf(default, [mqtt, max_inflight], 0);
t_session_kicked(Config) when is_list(Config) -> t_session_kicked(Config) when is_list(Config) ->
ok = ensure_config(round_robin, _AckEnabled = false),
Topic = <<"foo/bar/1">>, Topic = <<"foo/bar/1">>,
ClientId1 = <<"ClientId1">>, ClientId1 = <<"ClientId1">>,
ClientId2 = <<"ClientId2">>, ClientId2 = <<"ClientId2">>,
@ -1019,12 +1019,12 @@ ensure_config(Strategy) ->
ensure_config(Strategy, _AckEnabled = true). ensure_config(Strategy, _AckEnabled = true).
ensure_config(Strategy, AckEnabled) -> ensure_config(Strategy, AckEnabled) ->
emqx_config:put([broker, shared_subscription_strategy], Strategy), emqx_config:put([mqtt, shared_subscription_strategy], Strategy),
emqx_config:put([broker, shared_dispatch_ack_enabled], AckEnabled), emqx_config:put([broker, shared_dispatch_ack_enabled], AckEnabled),
ok. ok.
ensure_node_config(Node, Strategy) -> ensure_node_config(Node, Strategy) ->
rpc:call(Node, emqx_config, force_put, [[broker, shared_subscription_strategy], Strategy]). rpc:call(Node, emqx_config, force_put, [[mqtt, shared_subscription_strategy], Strategy]).
ensure_group_config(Group2Strategy) -> ensure_group_config(Group2Strategy) ->
lists:foreach( lists:foreach(

View File

@ -0,0 +1 @@
Hide the broker and move the `broker.shared_subscription_strategy` to `mqtt.shared_subscription_strategy` as it belongs to `mqtt`.