diff --git a/apps/emqx/include/emqx_schema.hrl b/apps/emqx/include/emqx_schema.hrl index 307bb20c5..0dae6bb27 100644 --- a/apps/emqx/include/emqx_schema.hrl +++ b/apps/emqx/include/emqx_schema.hrl @@ -19,5 +19,6 @@ -define(TOMBSTONE_TYPE, marked_for_deletion). -define(TOMBSTONE_VALUE, <<"marked_for_deletion">>). -define(TOMBSTONE_CONFIG_CHANGE_REQ, mark_it_for_deletion). +-define(CONFIG_NOT_FOUND_MAGIC, '$0tFound'). -endif. diff --git a/apps/emqx/src/emqx_config.erl b/apps/emqx/src/emqx_config.erl index f2f6eb264..58f53b134 100644 --- a/apps/emqx/src/emqx_config.erl +++ b/apps/emqx/src/emqx_config.erl @@ -19,6 +19,7 @@ -elvis([{elvis_style, god_modules, disable}]). -include("logger.hrl"). -include("emqx.hrl"). +-include("emqx_schema.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -export([ @@ -108,7 +109,6 @@ -define(ZONE_CONF_PATH(ZONE, PATH), [zones, ZONE | PATH]). -define(LISTENER_CONF_PATH(TYPE, LISTENER, PATH), [listeners, TYPE, LISTENER | PATH]). --define(CONFIG_NOT_FOUND_MAGIC, '$0tFound'). -define(MAX_KEEP_BACKUP_CONFIGS, 10). -export_type([ diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index e9fb47eec..387f2d64e 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -226,7 +226,10 @@ roots(medium) -> {"broker", sc( ref("broker"), - #{desc => ?DESC(broker)} + #{ + desc => ?DESC(broker), + importance => ?IMPORTANCE_HIDDEN + } )}, {"sys_topics", sc( @@ -439,251 +442,7 @@ fields("authz_cache") -> )} ]; fields("mqtt") -> - [ - {"idle_timeout", - sc( - hoconsc:union([infinity, duration()]), - #{ - default => <<"15s">>, - desc => ?DESC(mqtt_idle_timeout) - } - )}, - {"max_packet_size", - sc( - bytesize(), - #{ - default => <<"1MB">>, - desc => ?DESC(mqtt_max_packet_size) - } - )}, - {"max_clientid_len", - sc( - range(23, 65535), - #{ - default => 65535, - desc => ?DESC(mqtt_max_clientid_len) - } - )}, - {"max_topic_levels", - sc( - range(1, 65535), - #{ - default => 128, - desc => ?DESC(mqtt_max_topic_levels) - } - )}, - {"max_qos_allowed", - sc( - qos(), - #{ - default => 2, - desc => ?DESC(mqtt_max_qos_allowed) - } - )}, - {"max_topic_alias", - sc( - range(0, 65535), - #{ - default => 65535, - desc => ?DESC(mqtt_max_topic_alias) - } - )}, - {"retain_available", - sc( - boolean(), - #{ - default => true, - desc => ?DESC(mqtt_retain_available) - } - )}, - {"wildcard_subscription", - sc( - boolean(), - #{ - default => true, - desc => ?DESC(mqtt_wildcard_subscription) - } - )}, - {"shared_subscription", - sc( - boolean(), - #{ - default => true, - desc => ?DESC(mqtt_shared_subscription) - } - )}, - {"exclusive_subscription", - sc( - boolean(), - #{ - default => false, - desc => ?DESC(mqtt_exclusive_subscription) - } - )}, - {"ignore_loop_deliver", - sc( - boolean(), - #{ - default => false, - desc => ?DESC(mqtt_ignore_loop_deliver) - } - )}, - {"strict_mode", - sc( - boolean(), - #{ - default => false, - desc => ?DESC(mqtt_strict_mode) - } - )}, - {"response_information", - sc( - string(), - #{ - default => <<"">>, - desc => ?DESC(mqtt_response_information) - } - )}, - {"server_keepalive", - sc( - hoconsc:union([integer(), disabled]), - #{ - default => disabled, - desc => ?DESC(mqtt_server_keepalive) - } - )}, - {"keepalive_backoff", - sc( - number(), - #{ - default => ?DEFAULT_BACKOFF, - %% Must add required => false, zone schema has no default. - required => false, - importance => ?IMPORTANCE_HIDDEN - } - )}, - {"keepalive_multiplier", - sc( - number(), - #{ - default => ?DEFAULT_MULTIPLIER, - validator => fun ?MODULE:validate_keepalive_multiplier/1, - desc => ?DESC(mqtt_keepalive_multiplier) - } - )}, - {"max_subscriptions", - sc( - hoconsc:union([range(1, inf), infinity]), - #{ - default => infinity, - desc => ?DESC(mqtt_max_subscriptions) - } - )}, - {"upgrade_qos", - sc( - boolean(), - #{ - default => false, - desc => ?DESC(mqtt_upgrade_qos) - } - )}, - {"max_inflight", - sc( - range(1, 65535), - #{ - default => 32, - desc => ?DESC(mqtt_max_inflight) - } - )}, - {"retry_interval", - sc( - duration(), - #{ - default => <<"30s">>, - desc => ?DESC(mqtt_retry_interval) - } - )}, - {"max_awaiting_rel", - sc( - hoconsc:union([integer(), infinity]), - #{ - default => 100, - desc => ?DESC(mqtt_max_awaiting_rel) - } - )}, - {"await_rel_timeout", - sc( - duration(), - #{ - default => <<"300s">>, - desc => ?DESC(mqtt_await_rel_timeout) - } - )}, - {"session_expiry_interval", - sc( - duration(), - #{ - default => <<"2h">>, - desc => ?DESC(mqtt_session_expiry_interval) - } - )}, - {"max_mqueue_len", - sc( - hoconsc:union([non_neg_integer(), infinity]), - #{ - default => 1000, - desc => ?DESC(mqtt_max_mqueue_len) - } - )}, - {"mqueue_priorities", - sc( - hoconsc:union([disabled, map()]), - #{ - default => disabled, - desc => ?DESC(mqtt_mqueue_priorities) - } - )}, - {"mqueue_default_priority", - sc( - hoconsc:enum([highest, lowest]), - #{ - default => lowest, - desc => ?DESC(mqtt_mqueue_default_priority) - } - )}, - {"mqueue_store_qos0", - sc( - boolean(), - #{ - default => true, - desc => ?DESC(mqtt_mqueue_store_qos0) - } - )}, - {"use_username_as_clientid", - sc( - boolean(), - #{ - default => false, - desc => ?DESC(mqtt_use_username_as_clientid) - } - )}, - {"peer_cert_as_username", - sc( - hoconsc:enum([disabled, cn, dn, crt, pem, md5]), - #{ - default => disabled, - desc => ?DESC(mqtt_peer_cert_as_username) - } - )}, - {"peer_cert_as_clientid", - sc( - hoconsc:enum([disabled, cn, dn, crt, pem, md5]), - #{ - default => disabled, - desc => ?DESC(mqtt_peer_cert_as_clientid) - } - )} - ]; + mqtt_general() ++ mqtt_session(); fields("zone") -> emqx_zone_schema:zones_without_default(); fields("flapping_detect") -> @@ -1563,22 +1322,7 @@ fields("broker") -> desc => ?DESC(broker_session_locking_strategy) } )}, - {"shared_subscription_strategy", - sc( - hoconsc:enum([ - random, - round_robin, - round_robin_per_group, - sticky, - local, - hash_topic, - hash_clientid - ]), - #{ - default => round_robin, - desc => ?DESC(broker_shared_subscription_strategy) - } - )}, + shared_subscription_strategy(), {"shared_dispatch_ack_enabled", sc( boolean(), @@ -3564,3 +3308,283 @@ flapping_detect_converter(Conf = #{<<"window_time">> := <<"disable">>}, _Opts) - Conf#{<<"window_time">> => ?DEFAULT_WINDOW_TIME, <<"enable">> => false}; flapping_detect_converter(Conf, _Opts) -> Conf. +mqtt_general() -> + [ + {"idle_timeout", + sc( + hoconsc:union([infinity, duration()]), + #{ + default => <<"15s">>, + desc => ?DESC(mqtt_idle_timeout) + } + )}, + {"max_packet_size", + sc( + bytesize(), + #{ + default => <<"1MB">>, + desc => ?DESC(mqtt_max_packet_size) + } + )}, + {"max_clientid_len", + sc( + range(23, 65535), + #{ + default => 65535, + desc => ?DESC(mqtt_max_clientid_len) + } + )}, + {"max_topic_levels", + sc( + range(1, 65535), + #{ + default => 128, + desc => ?DESC(mqtt_max_topic_levels) + } + )}, + {"max_topic_alias", + sc( + range(0, 65535), + #{ + default => 65535, + desc => ?DESC(mqtt_max_topic_alias) + } + )}, + {"retain_available", + sc( + boolean(), + #{ + default => true, + desc => ?DESC(mqtt_retain_available) + } + )}, + {"wildcard_subscription", + sc( + boolean(), + #{ + default => true, + desc => ?DESC(mqtt_wildcard_subscription) + } + )}, + {"shared_subscription", + sc( + boolean(), + #{ + default => true, + desc => ?DESC(mqtt_shared_subscription) + } + )}, + shared_subscription_strategy(), + {"exclusive_subscription", + sc( + boolean(), + #{ + default => false, + desc => ?DESC(mqtt_exclusive_subscription) + } + )}, + {"ignore_loop_deliver", + sc( + boolean(), + #{ + default => false, + desc => ?DESC(mqtt_ignore_loop_deliver) + } + )}, + {"strict_mode", + sc( + boolean(), + #{ + default => false, + desc => ?DESC(mqtt_strict_mode) + } + )}, + {"response_information", + sc( + string(), + #{ + default => <<"">>, + desc => ?DESC(mqtt_response_information) + } + )}, + {"server_keepalive", + sc( + hoconsc:union([integer(), disabled]), + #{ + default => disabled, + desc => ?DESC(mqtt_server_keepalive) + } + )}, + {"keepalive_backoff", + sc( + number(), + #{ + default => ?DEFAULT_BACKOFF, + %% Must add required => false, zone schema has no default. + required => false, + importance => ?IMPORTANCE_HIDDEN + } + )}, + {"keepalive_multiplier", + sc( + number(), + #{ + default => ?DEFAULT_MULTIPLIER, + validator => fun ?MODULE:validate_keepalive_multiplier/1, + desc => ?DESC(mqtt_keepalive_multiplier) + } + )}, + {"retry_interval", + sc( + duration(), + #{ + default => <<"30s">>, + desc => ?DESC(mqtt_retry_interval) + } + )}, + {"use_username_as_clientid", + sc( + boolean(), + #{ + default => false, + desc => ?DESC(mqtt_use_username_as_clientid) + } + )}, + {"peer_cert_as_username", + sc( + hoconsc:enum([disabled, cn, dn, crt, pem, md5]), + #{ + default => disabled, + desc => ?DESC(mqtt_peer_cert_as_username) + } + )}, + {"peer_cert_as_clientid", + sc( + hoconsc:enum([disabled, cn, dn, crt, pem, md5]), + #{ + default => disabled, + desc => ?DESC(mqtt_peer_cert_as_clientid) + } + )} + ]. +%% All session's importance should be lower than general part to organize document. +mqtt_session() -> + [ + {"session_expiry_interval", + sc( + duration(), + #{ + default => <<"2h">>, + desc => ?DESC(mqtt_session_expiry_interval), + importance => ?IMPORTANCE_LOW + } + )}, + {"max_awaiting_rel", + sc( + hoconsc:union([integer(), infinity]), + #{ + default => 100, + desc => ?DESC(mqtt_max_awaiting_rel), + importance => ?IMPORTANCE_LOW + } + )}, + {"max_qos_allowed", + sc( + qos(), + #{ + default => 2, + desc => ?DESC(mqtt_max_qos_allowed), + importance => ?IMPORTANCE_LOW + } + )}, + {"mqueue_priorities", + sc( + hoconsc:union([disabled, map()]), + #{ + default => disabled, + desc => ?DESC(mqtt_mqueue_priorities), + importance => ?IMPORTANCE_LOW + } + )}, + {"mqueue_default_priority", + sc( + hoconsc:enum([highest, lowest]), + #{ + default => lowest, + desc => ?DESC(mqtt_mqueue_default_priority), + importance => ?IMPORTANCE_LOW + } + )}, + {"mqueue_store_qos0", + sc( + boolean(), + #{ + default => true, + desc => ?DESC(mqtt_mqueue_store_qos0), + importance => ?IMPORTANCE_LOW + } + )}, + {"max_mqueue_len", + sc( + hoconsc:union([non_neg_integer(), infinity]), + #{ + default => 1000, + desc => ?DESC(mqtt_max_mqueue_len), + importance => ?IMPORTANCE_LOW + } + )}, + {"max_inflight", + sc( + range(1, 65535), + #{ + default => 32, + desc => ?DESC(mqtt_max_inflight), + importance => ?IMPORTANCE_LOW + } + )}, + {"max_subscriptions", + sc( + hoconsc:union([range(1, inf), infinity]), + #{ + default => infinity, + desc => ?DESC(mqtt_max_subscriptions), + importance => ?IMPORTANCE_LOW + } + )}, + {"upgrade_qos", + sc( + boolean(), + #{ + default => false, + desc => ?DESC(mqtt_upgrade_qos), + importance => ?IMPORTANCE_LOW + } + )}, + {"await_rel_timeout", + sc( + duration(), + #{ + default => <<"300s">>, + desc => ?DESC(mqtt_await_rel_timeout), + importance => ?IMPORTANCE_LOW + } + )} + ]. + +shared_subscription_strategy() -> + {"shared_subscription_strategy", + sc( + hoconsc:enum([ + random, + round_robin, + round_robin_per_group, + sticky, + local, + hash_topic, + hash_clientid + ]), + #{ + default => round_robin, + desc => ?DESC(broker_shared_subscription_strategy) + } + )}. diff --git a/apps/emqx/src/emqx_shared_sub.erl b/apps/emqx/src/emqx_shared_sub.erl index 3a370ddba..87d0554c0 100644 --- a/apps/emqx/src/emqx_shared_sub.erl +++ b/apps/emqx/src/emqx_shared_sub.erl @@ -18,6 +18,7 @@ -behaviour(gen_server). +-include("emqx_schema.hrl"). -include("emqx.hrl"). -include("emqx_mqtt.hrl"). -include("logger.hrl"). @@ -158,24 +159,18 @@ dispatch(Group, Topic, Delivery = #delivery{message = Msg}, FailedSubs) -> -spec strategy(emqx_types:group()) -> strategy(). strategy(Group) -> - try - emqx:get_config([ - broker, - shared_subscription_group, - binary_to_existing_atom(Group), - strategy - ]) + try binary_to_existing_atom(Group) of + GroupAtom -> + Key = [broker, shared_subscription_group, GroupAtom, strategy], + case emqx:get_config(Key, ?CONFIG_NOT_FOUND_MAGIC) of + ?CONFIG_NOT_FOUND_MAGIC -> get_default_shared_subscription_strategy(); + Strategy -> Strategy + end catch - error:{config_not_found, _} -> - get_default_shared_subscription_strategy(); error:badarg -> get_default_shared_subscription_strategy() end. --spec ack_enabled() -> boolean(). -ack_enabled() -> - emqx:get_config([broker, shared_dispatch_ack_enabled], false). - do_dispatch(SubPid, _Group, Topic, Msg, _Type) when SubPid =:= self() -> %% Deadlock otherwise SubPid ! {deliver, Topic, Msg}, @@ -187,14 +182,8 @@ do_dispatch(SubPid, _Group, Topic, #message{qos = ?QOS_0} = Msg, _Type) -> do_dispatch(SubPid, _Group, Topic, Msg, retry) -> %% Retry implies all subscribers nack:ed, send again without ack send(SubPid, Topic, {deliver, Topic, Msg}); -do_dispatch(SubPid, Group, Topic, Msg, fresh) -> - case ack_enabled() of - true -> - %% TODO: delete this clase after 5.1.0 - do_dispatch_with_ack(SubPid, Group, Topic, Msg); - false -> - send(SubPid, Topic, {deliver, Topic, Msg}) - end. +do_dispatch(SubPid, _Group, Topic, Msg, fresh) -> + send(SubPid, Topic, {deliver, Topic, Msg}). -spec do_dispatch_with_ack(pid(), emqx_types:group(), emqx_types:topic(), emqx_types:message()) -> ok | {error, _}. @@ -240,7 +229,7 @@ with_redispatch_to(#message{qos = ?QOS_0} = Msg, _Group, _Topic) -> with_redispatch_to(Msg, Group, Topic) -> emqx_message:set_headers(#{redispatch_to => ?REDISPATCH_TO(Group, Topic)}, Msg). -%% @hidden Redispatch is neede only for the messages with redispatch_to header added. +%% @hidden Redispatch is needed only for the messages with redispatch_to header added. is_redispatch_needed(#message{} = Msg) -> case get_redispatch_to(Msg) of ?REDISPATCH_TO(_, _) -> @@ -555,4 +544,4 @@ delete_route_if_needed({Group, Topic} = GroupTopic) -> end). get_default_shared_subscription_strategy() -> - emqx:get_config([broker, shared_subscription_strategy]). + emqx:get_config([mqtt, shared_subscription_strategy]). diff --git a/changes/ce/feat-11034.en.md b/changes/ce/feat-11034.en.md new file mode 100644 index 000000000..905fd173f --- /dev/null +++ b/changes/ce/feat-11034.en.md @@ -0,0 +1 @@ +Hide the broker and move the `broker.shared_subscription_strategy` to `mqtt.shared_subscription_strategy` as it belongs to `mqtt`.