Merge pull request #8294 from lafirest/feat/exclusive_sub

feat: add exclusive subscription
This commit is contained in:
lafirest 2022-06-23 17:49:04 +08:00 committed by GitHub
commit 4909973e37
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 379 additions and 70 deletions

View File

@ -742,6 +742,11 @@ mqtt.wildcard_subscription = true
## Value: boolean
mqtt.shared_subscription = true
## Whether the Server supports MQTT Exclusive Subscriptions.
##
## Value: boolean
mqtt.exclusive_subscription = false
## Whether to ignore loop delivery of messages.(for mqtt v3.1.1)
##
## Value: true | false
@ -847,6 +852,11 @@ zone.external.force_gc_policy = 16000|16MB
## Value: boolean
## zone.external.shared_subscription = false
## Whether the Server supports MQTT Exclusive Subscriptions.
##
## Value: boolean
## zone.external.exclusive_subscription = false
## Server Keep Alive
##
## Value: Number
@ -1049,6 +1059,11 @@ zone.internal.acl_deny_action = ignore
## Value: boolean
## zone.internal.shared_subscription = true
## Whether the Server supports MQTT Exclusive Subscriptions.
##
## Value: boolean
## zone.internal.exclusive_subscription = false
## See zone.$name.max_subscriptions.
##
## Value: Integer

View File

@ -945,6 +945,12 @@ end}.
{datatype, string}
]}.
%% @doc Whether the Server supports Exclusive Subscriptions.
{mapping, "mqtt.exclusive_subscription", "emqx.exclusive_subscription", [
{default, false},
{datatype, {enum, [true, false]}}
]}.
%%--------------------------------------------------------------------
%% Zones
%%--------------------------------------------------------------------
@ -1199,6 +1205,12 @@ end}.
{datatype, {enum, [true, false]}}
]}.
%% @doc Whether the Server supports Exclusive Subscriptions.
{mapping, "zone.$name.exclusive_subscription", "emqx.zones", [
{default, false},
{datatype, {enum, [true, false]}}
]}.
{translation, "emqx.zones", fun(Conf) ->
Ratelimit = fun(Val) ->
[L, D] = string:tokens(Val, ", "),

View File

@ -2,13 +2,32 @@
%% Unless you know what you are doing, DO NOT edit manually!!
{VSN,
[{"4.3.16",
[{load_module,emqx_channel,brutal_purge,soft_purge,[]},
[{load_module,emqx_packet,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module,emqx_broker,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
{load_module,emqx_access_control,brutal_purge,soft_purge,[]},
{load_module,emqx_os_mon,brutal_purge,soft_purge,[]},
{load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]},
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm,brutal_purge,soft_purge,[]},
{load_module,emqx_topic,brutal_purge,soft_purge,[]},
{load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]},
{load_module,emqx_frame,brutal_purge,soft_purge,[]},
{load_module,emqx_session,brutal_purge,soft_purge,[]},
{add_module,emqx_exclusive_subscription},
{add_module,emqx_calendar},
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_access_rule,brutal_purge,soft_purge,[]},
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
{load_module,emqx_metrics,brutal_purge,soft_purge,[]}]},
{"4.3.15",
[{add_module,emqx_calendar},
[{load_module,emqx_broker,brutal_purge,soft_purge,[]},
{load_module,emqx_topic,brutal_purge,soft_purge,[]},
{load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]},
{add_module,emqx_exclusive_subscription},
{add_module,emqx_calendar},
{load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]},
{load_module,emqx_packet,brutal_purge,soft_purge,[]},
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
@ -26,7 +45,11 @@
{update,emqx_os_mon,{advanced,[]}},
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
{"4.3.14",
[{add_module,emqx_calendar},
[{load_module,emqx_broker,brutal_purge,soft_purge,[]},
{load_module,emqx_topic,brutal_purge,soft_purge,[]},
{load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]},
{add_module,emqx_exclusive_subscription},
{add_module,emqx_calendar},
{load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]},
{load_module,emqx_packet,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
@ -47,7 +70,11 @@
{update,emqx_os_mon,{advanced,[]}},
{load_module,emqx_hooks,brutal_purge,soft_purge,[]}]},
{"4.3.13",
[{add_module,emqx_calendar},
[{load_module,emqx_broker,brutal_purge,soft_purge,[]},
{load_module,emqx_topic,brutal_purge,soft_purge,[]},
{load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]},
{add_module,emqx_exclusive_subscription},
{add_module,emqx_calendar},
{load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]},
{load_module,emqx_listeners,brutal_purge,soft_purge,[]},
{load_module,emqx_packet,brutal_purge,soft_purge,[]},
@ -75,7 +102,11 @@
{update,emqx_os_mon,{advanced,[]}},
{load_module,emqx_connection,brutal_purge,soft_purge,[]}]},
{"4.3.12",
[{add_module,emqx_calendar},
[{load_module,emqx_broker,brutal_purge,soft_purge,[]},
{load_module,emqx_topic,brutal_purge,soft_purge,[]},
{load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]},
{add_module,emqx_exclusive_subscription},
{add_module,emqx_calendar},
{load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]},
{load_module,emqx_listeners,brutal_purge,soft_purge,[]},
{load_module,emqx_packet,brutal_purge,soft_purge,[]},
@ -107,7 +138,11 @@
{load_module,emqx_message,brutal_purge,soft_purge,[]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]},
{"4.3.11",
[{add_module,emqx_calendar},
[{load_module,emqx_broker,brutal_purge,soft_purge,[]},
{load_module,emqx_topic,brutal_purge,soft_purge,[]},
{load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]},
{add_module,emqx_exclusive_subscription},
{add_module,emqx_calendar},
{load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]},
{load_module,emqx_listeners,brutal_purge,soft_purge,[]},
{load_module,emqx_packet,brutal_purge,soft_purge,[]},
@ -141,7 +176,11 @@
{load_module,emqx_message,brutal_purge,soft_purge,[]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]},
{"4.3.10",
[{add_module,emqx_calendar},
[{load_module,emqx_broker,brutal_purge,soft_purge,[]},
{load_module,emqx_topic,brutal_purge,soft_purge,[]},
{load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]},
{add_module,emqx_exclusive_subscription},
{add_module,emqx_calendar},
{load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]},
{load_module,emqx_listeners,brutal_purge,soft_purge,[]},
{load_module,emqx_packet,brutal_purge,soft_purge,[]},
@ -175,7 +214,11 @@
{load_module,emqx_message,brutal_purge,soft_purge,[]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]},
{"4.3.9",
[{add_module,emqx_calendar},
[{load_module,emqx_broker,brutal_purge,soft_purge,[]},
{load_module,emqx_topic,brutal_purge,soft_purge,[]},
{load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]},
{add_module,emqx_exclusive_subscription},
{add_module,emqx_calendar},
{load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]},
{load_module,emqx_listeners,brutal_purge,soft_purge,[]},
{load_module,emqx_packet,brutal_purge,soft_purge,[]},
@ -213,7 +256,11 @@
{load_module,emqx_message,brutal_purge,soft_purge,[]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]},
{"4.3.8",
[{add_module,emqx_calendar},
[{load_module,emqx_broker,brutal_purge,soft_purge,[]},
{load_module,emqx_topic,brutal_purge,soft_purge,[]},
{load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]},
{add_module,emqx_exclusive_subscription},
{add_module,emqx_calendar},
{load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]},
{load_module,emqx_listeners,brutal_purge,soft_purge,[]},
{load_module,emqx_packet,brutal_purge,soft_purge,[]},
@ -251,7 +298,11 @@
{load_module,emqx_message,brutal_purge,soft_purge,[]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]},
{"4.3.7",
[{add_module,emqx_calendar},
[{load_module,emqx_broker,brutal_purge,soft_purge,[]},
{load_module,emqx_topic,brutal_purge,soft_purge,[]},
{load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]},
{add_module,emqx_exclusive_subscription},
{add_module,emqx_calendar},
{load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]},
{load_module,emqx_listeners,brutal_purge,soft_purge,[]},
{load_module,emqx_packet,brutal_purge,soft_purge,[]},
@ -289,7 +340,11 @@
{load_module,emqx_message,brutal_purge,soft_purge,[]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]},
{"4.3.6",
[{add_module,emqx_calendar},
[{load_module,emqx_broker,brutal_purge,soft_purge,[]},
{load_module,emqx_topic,brutal_purge,soft_purge,[]},
{load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]},
{add_module,emqx_exclusive_subscription},
{add_module,emqx_calendar},
{load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]},
{load_module,emqx_listeners,brutal_purge,soft_purge,[]},
{load_module,emqx_packet,brutal_purge,soft_purge,[]},
@ -327,7 +382,11 @@
{load_module,emqx_message,brutal_purge,soft_purge,[]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]},
{"4.3.5",
[{add_module,emqx_calendar},
[{load_module,emqx_broker,brutal_purge,soft_purge,[]},
{load_module,emqx_topic,brutal_purge,soft_purge,[]},
{load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]},
{add_module,emqx_exclusive_subscription},
{add_module,emqx_calendar},
{load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]},
{load_module,emqx_listeners,brutal_purge,soft_purge,[]},
{load_module,emqx_packet,brutal_purge,soft_purge,[]},
@ -365,7 +424,11 @@
{load_module,emqx_message,brutal_purge,soft_purge,[]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]},
{"4.3.4",
[{add_module,emqx_calendar},
[{load_module,emqx_broker,brutal_purge,soft_purge,[]},
{load_module,emqx_topic,brutal_purge,soft_purge,[]},
{load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]},
{add_module,emqx_exclusive_subscription},
{add_module,emqx_calendar},
{load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]},
{load_module,emqx_listeners,brutal_purge,soft_purge,[]},
{load_module,emqx_packet,brutal_purge,soft_purge,[]},
@ -403,7 +466,11 @@
{load_module,emqx_message,brutal_purge,soft_purge,[]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]},
{"4.3.3",
[{add_module,emqx_calendar},
[{load_module,emqx_broker,brutal_purge,soft_purge,[]},
{load_module,emqx_topic,brutal_purge,soft_purge,[]},
{load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]},
{add_module,emqx_exclusive_subscription},
{add_module,emqx_calendar},
{load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]},
{load_module,emqx_listeners,brutal_purge,soft_purge,[]},
{load_module,emqx_hooks,brutal_purge,soft_purge,[]},
@ -441,7 +508,11 @@
{load_module,emqx_message,brutal_purge,soft_purge,[]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]},
{"4.3.2",
[{add_module,emqx_calendar},
[{load_module,emqx_broker,brutal_purge,soft_purge,[]},
{load_module,emqx_topic,brutal_purge,soft_purge,[]},
{load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]},
{add_module,emqx_exclusive_subscription},
{add_module,emqx_calendar},
{load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]},
{load_module,emqx_listeners,brutal_purge,soft_purge,[]},
{load_module,emqx_hooks,brutal_purge,soft_purge,[]},
@ -479,7 +550,11 @@
{load_module,emqx_message,brutal_purge,soft_purge,[]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]},
{"4.3.1",
[{add_module,emqx_calendar},
[{load_module,emqx_broker,brutal_purge,soft_purge,[]},
{load_module,emqx_topic,brutal_purge,soft_purge,[]},
{load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]},
{add_module,emqx_exclusive_subscription},
{add_module,emqx_calendar},
{load_module,emqx_hooks,brutal_purge,soft_purge,[]},
{load_module,emqx_pmon,brutal_purge,soft_purge,[]},
{load_module,emqx_banned,brutal_purge,soft_purge,[]},
@ -519,7 +594,11 @@
{load_module,emqx_message,brutal_purge,soft_purge,[]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]},
{"4.3.0",
[{add_module,emqx_calendar},
[{load_module,emqx_broker,brutal_purge,soft_purge,[]},
{load_module,emqx_topic,brutal_purge,soft_purge,[]},
{load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]},
{add_module,emqx_exclusive_subscription},
{add_module,emqx_calendar},
{load_module,emqx_hooks,brutal_purge,soft_purge,[]},
{load_module,emqx_pmon,brutal_purge,soft_purge,[]},
{load_module,emqx_banned,brutal_purge,soft_purge,[]},
@ -563,13 +642,31 @@
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]},
{<<".*">>,[]}],
[{"4.3.16",
[{load_module,emqx_channel,brutal_purge,soft_purge,[]},
[{load_module,emqx_packet,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module,emqx_broker,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
{load_module,emqx_access_control,brutal_purge,soft_purge,[]},
{load_module,emqx_os_mon,brutal_purge,soft_purge,[]},
{load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]},
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm,brutal_purge,soft_purge,[]},
{load_module,emqx_topic,brutal_purge,soft_purge,[]},
{load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]},
{load_module,emqx_frame,brutal_purge,soft_purge,[]},
{load_module,emqx_session,brutal_purge,soft_purge,[]},
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_access_rule,brutal_purge,soft_purge,[]},
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
{load_module,emqx_metrics,brutal_purge,soft_purge,[]}]},
{load_module,emqx_metrics,brutal_purge,soft_purge,[]},
{delete_module,emqx_calendar},
{delete_module,emqx_exclusive_subscription}]},
{"4.3.15",
[{delete_module,emqx_calendar},
[{load_module,emqx_broker,brutal_purge,soft_purge,[]},
{load_module,emqx_topic,brutal_purge,soft_purge,[]},
{load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]},
{delete_module,emqx_calendar},
{load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]},
{load_module,emqx_packet,brutal_purge,soft_purge,[]},
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
@ -584,9 +681,13 @@
{load_module,emqx_access_rule,brutal_purge,soft_purge,[]},
{load_module,emqx_access_control,brutal_purge,soft_purge,[]},
{load_module,emqx_os_mon,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
{load_module,emqx_app,brutal_purge,soft_purge,[]},
{delete_module,emqx_exclusive_subscription}]},
{"4.3.14",
[{delete_module,emqx_calendar},
[{load_module,emqx_broker,brutal_purge,soft_purge,[]},
{load_module,emqx_topic,brutal_purge,soft_purge,[]},
{load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]},
{delete_module,emqx_calendar},
{load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]},
{load_module,emqx_packet,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm,brutal_purge,soft_purge,[]},
@ -604,9 +705,13 @@
{load_module,emqx_frame,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_os_mon,brutal_purge,soft_purge,[]},
{load_module,emqx_hooks,brutal_purge,soft_purge,[]}]},
{load_module,emqx_hooks,brutal_purge,soft_purge,[]},
{delete_module,emqx_exclusive_subscription}]},
{"4.3.13",
[{delete_module,emqx_calendar},
[{load_module,emqx_broker,brutal_purge,soft_purge,[]},
{load_module,emqx_topic,brutal_purge,soft_purge,[]},
{load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]},
{delete_module,emqx_calendar},
{load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]},
{load_module,emqx_listeners,brutal_purge,soft_purge,[]},
{load_module,emqx_packet,brutal_purge,soft_purge,[]},
@ -631,9 +736,13 @@
{load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_ctl,brutal_purge,soft_purge,[]},
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
{load_module,emqx_connection,brutal_purge,soft_purge,[]}]},
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
{delete_module,emqx_exclusive_subscription}]},
{"4.3.12",
[{delete_module,emqx_calendar},
[{load_module,emqx_broker,brutal_purge,soft_purge,[]},
{load_module,emqx_topic,brutal_purge,soft_purge,[]},
{load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]},
{delete_module,emqx_calendar},
{load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]},
{load_module,emqx_listeners,brutal_purge,soft_purge,[]},
{load_module,emqx_packet,brutal_purge,soft_purge,[]},
@ -661,9 +770,13 @@
{load_module,emqx,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_message,brutal_purge,soft_purge,[]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]},
{delete_module,emqx_exclusive_subscription}]},
{"4.3.11",
[{delete_module,emqx_calendar},
[{load_module,emqx_broker,brutal_purge,soft_purge,[]},
{load_module,emqx_topic,brutal_purge,soft_purge,[]},
{load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]},
{delete_module,emqx_calendar},
{load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]},
{load_module,emqx_listeners,brutal_purge,soft_purge,[]},
{load_module,emqx_packet,brutal_purge,soft_purge,[]},
@ -693,9 +806,13 @@
{load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
{load_module,emqx_message,brutal_purge,soft_purge,[]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]},
{delete_module,emqx_exclusive_subscription}]},
{"4.3.10",
[{delete_module,emqx_calendar},
[{load_module,emqx_broker,brutal_purge,soft_purge,[]},
{load_module,emqx_topic,brutal_purge,soft_purge,[]},
{load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]},
{delete_module,emqx_calendar},
{load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]},
{load_module,emqx_listeners,brutal_purge,soft_purge,[]},
{load_module,emqx_packet,brutal_purge,soft_purge,[]},
@ -725,9 +842,13 @@
{load_module,emqx,brutal_purge,soft_purge,[]},
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_message,brutal_purge,soft_purge,[]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]},
{delete_module,emqx_exclusive_subscription}]},
{"4.3.9",
[{delete_module,emqx_calendar},
[{load_module,emqx_broker,brutal_purge,soft_purge,[]},
{load_module,emqx_topic,brutal_purge,soft_purge,[]},
{load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]},
{delete_module,emqx_calendar},
{load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]},
{load_module,emqx_listeners,brutal_purge,soft_purge,[]},
{load_module,emqx_packet,brutal_purge,soft_purge,[]},
@ -761,9 +882,13 @@
{load_module,emqx,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_message,brutal_purge,soft_purge,[]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]},
{delete_module,emqx_exclusive_subscription}]},
{"4.3.8",
[{delete_module,emqx_calendar},
[{load_module,emqx_broker,brutal_purge,soft_purge,[]},
{load_module,emqx_topic,brutal_purge,soft_purge,[]},
{load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]},
{delete_module,emqx_calendar},
{load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]},
{load_module,emqx_listeners,brutal_purge,soft_purge,[]},
{load_module,emqx_packet,brutal_purge,soft_purge,[]},
@ -797,9 +922,13 @@
{load_module,emqx,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_message,brutal_purge,soft_purge,[]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]},
{delete_module,emqx_exclusive_subscription}]},
{"4.3.7",
[{delete_module,emqx_calendar},
[{load_module,emqx_broker,brutal_purge,soft_purge,[]},
{load_module,emqx_topic,brutal_purge,soft_purge,[]},
{load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]},
{delete_module,emqx_calendar},
{load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]},
{load_module,emqx_listeners,brutal_purge,soft_purge,[]},
{load_module,emqx_packet,brutal_purge,soft_purge,[]},
@ -833,9 +962,13 @@
{load_module,emqx,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_message,brutal_purge,soft_purge,[]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]},
{delete_module,emqx_exclusive_subscription}]},
{"4.3.6",
[{delete_module,emqx_calendar},
[{load_module,emqx_broker,brutal_purge,soft_purge,[]},
{load_module,emqx_topic,brutal_purge,soft_purge,[]},
{load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]},
{delete_module,emqx_calendar},
{load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]},
{load_module,emqx_listeners,brutal_purge,soft_purge,[]},
{load_module,emqx_packet,brutal_purge,soft_purge,[]},
@ -869,9 +1002,13 @@
{load_module,emqx,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_message,brutal_purge,soft_purge,[]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]},
{delete_module,emqx_exclusive_subscription}]},
{"4.3.5",
[{delete_module,emqx_calendar},
[{load_module,emqx_broker,brutal_purge,soft_purge,[]},
{load_module,emqx_topic,brutal_purge,soft_purge,[]},
{load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]},
{delete_module,emqx_calendar},
{load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]},
{load_module,emqx_listeners,brutal_purge,soft_purge,[]},
{load_module,emqx_packet,brutal_purge,soft_purge,[]},
@ -905,9 +1042,13 @@
{load_module,emqx,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_message,brutal_purge,soft_purge,[]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]},
{delete_module,emqx_exclusive_subscription}]},
{"4.3.4",
[{delete_module,emqx_calendar},
[{load_module,emqx_broker,brutal_purge,soft_purge,[]},
{load_module,emqx_topic,brutal_purge,soft_purge,[]},
{load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]},
{delete_module,emqx_calendar},
{load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]},
{load_module,emqx_listeners,brutal_purge,soft_purge,[]},
{load_module,emqx_packet,brutal_purge,soft_purge,[]},
@ -941,9 +1082,13 @@
{load_module,emqx,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_message,brutal_purge,soft_purge,[]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]},
{delete_module,emqx_exclusive_subscription}]},
{"4.3.3",
[{delete_module,emqx_calendar},
[{load_module,emqx_broker,brutal_purge,soft_purge,[]},
{load_module,emqx_topic,brutal_purge,soft_purge,[]},
{load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]},
{delete_module,emqx_calendar},
{load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]},
{load_module,emqx_listeners,brutal_purge,soft_purge,[]},
{load_module,emqx_hooks,brutal_purge,soft_purge,[]},
@ -977,9 +1122,13 @@
{load_module,emqx,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_message,brutal_purge,soft_purge,[]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]},
{delete_module,emqx_exclusive_subscription}]},
{"4.3.2",
[{delete_module,emqx_calendar},
[{load_module,emqx_broker,brutal_purge,soft_purge,[]},
{load_module,emqx_topic,brutal_purge,soft_purge,[]},
{load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]},
{delete_module,emqx_calendar},
{load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]},
{load_module,emqx_listeners,brutal_purge,soft_purge,[]},
{load_module,emqx_hooks,brutal_purge,soft_purge,[]},
@ -1013,9 +1162,13 @@
{load_module,emqx,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_message,brutal_purge,soft_purge,[]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]},
{delete_module,emqx_exclusive_subscription}]},
{"4.3.1",
[{delete_module,emqx_calendar},
[{load_module,emqx_broker,brutal_purge,soft_purge,[]},
{load_module,emqx_topic,brutal_purge,soft_purge,[]},
{load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]},
{delete_module,emqx_calendar},
{load_module,emqx_hooks,brutal_purge,soft_purge,[]},
{load_module,emqx_pmon,brutal_purge,soft_purge,[]},
{load_module,emqx_banned,brutal_purge,soft_purge,[]},
@ -1051,9 +1204,13 @@
{load_module,emqx,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_message,brutal_purge,soft_purge,[]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]},
{delete_module,emqx_exclusive_subscription}]},
{"4.3.0",
[{delete_module,emqx_calendar},
[{load_module,emqx_broker,brutal_purge,soft_purge,[]},
{load_module,emqx_topic,brutal_purge,soft_purge,[]},
{load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]},
{delete_module,emqx_calendar},
{load_module,emqx_hooks,brutal_purge,soft_purge,[]},
{load_module,emqx_pmon,brutal_purge,soft_purge,[]},
{load_module,emqx_banned,brutal_purge,soft_purge,[]},
@ -1091,5 +1248,6 @@
{load_module,emqx,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_message,brutal_purge,soft_purge,[]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]},
{delete_module,emqx_exclusive_subscription}]},
{<<".*">>,[]}]}.

View File

@ -180,7 +180,8 @@ do_unsubscribe(Topic, SubPid, SubOpts) ->
true = ets:delete(?SUBOPTION, {SubPid, Topic}),
true = ets:delete_object(?SUBSCRIPTION, {SubPid, Topic}),
Group = maps:get(share, SubOpts, undefined),
do_unsubscribe(Group, Topic, SubPid, SubOpts).
do_unsubscribe(Group, Topic, SubPid, SubOpts),
emqx_exclusive_subscription:unsubscribe(Topic, SubOpts).
do_unsubscribe(undefined, Topic, SubPid, SubOpts) ->
case maps:get(shard, SubOpts, 0) of
@ -498,4 +499,3 @@ code_change(_OldVsn, State, _Extra) ->
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------

View File

@ -1483,8 +1483,8 @@ check_sub_acl(TopicFilter, #channel{clientinfo = ClientInfo}) ->
%%--------------------------------------------------------------------
%% Check Sub Caps
check_sub_caps(TopicFilter, SubOpts, #channel{clientinfo = #{zone := Zone}}) ->
emqx_mqtt_caps:check_sub(Zone, TopicFilter, SubOpts).
check_sub_caps(TopicFilter, SubOpts, #channel{clientinfo = ClientInfo}) ->
emqx_mqtt_caps:check_sub(ClientInfo, TopicFilter, SubOpts).
%%--------------------------------------------------------------------
%% Enrich SubId

View File

@ -0,0 +1,103 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_exclusive_subscription).
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/logger.hrl").
-logger_header("[exclusive]").
%% Mnesia bootstrap
-export([mnesia/1]).
-boot_mnesia({mnesia, [boot]}).
-copy_mnesia({mnesia, [copy]}).
-export([
check_subscribe/2,
unsubscribe/2
]).
-record(exclusive_subscription, {
topic :: emqx_types:topic(),
clientid :: emqx_types:clientid()
}).
-define(TAB, emqx_exclusive_subscription).
%%--------------------------------------------------------------------
%% Mnesia bootstrap
%%--------------------------------------------------------------------
mnesia(boot) ->
StoreProps = [
{ets, [
{read_concurrency, true},
{write_concurrency, true}
]}
],
ok = ekka_mnesia:create_table(?TAB, [
{type, set},
{ram_copies, [node()]},
{record_name, exclusive_subscription},
{attributes, record_info(fields, exclusive_subscription)},
{storage_properties, StoreProps}
]);
mnesia(copy) ->
ok = ekka_mnesia:copy_table(?TAB, ram_copies).
%%--------------------------------------------------------------------
%% APIs
%%--------------------------------------------------------------------
-spec check_subscribe(emqx_types:clientinfo(), emqx_types:topic()) ->
allow | deny.
check_subscribe(#{clientid := ClientId}, Topic) ->
Fun = fun() ->
try_subscribe(ClientId, Topic)
end,
case mnesia:transaction(Fun) of
{atomic, Res} ->
Res;
{aborted, Reason} ->
?LOG(warning, "Cannot check subscribe ~p due to ~p.", [Topic, Reason]),
deny
end.
unsubscribe(Topic, #{is_exclusive := true}) ->
_ = mnesia:transaction(fun() -> mnesia:delete({?TAB, Topic}) end),
ok;
unsubscribe(_Topic, _SubOpts) ->
ok.
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
try_subscribe(ClientId, Topic) ->
case mnesia:wread({?TAB, Topic}) of
[] ->
mnesia:write(
?TAB,
#exclusive_subscription{
clientid = ClientId,
topic = Topic
},
write
),
allow;
[_] ->
deny
end.

View File

@ -43,7 +43,8 @@
retain_available => boolean(),
wildcard_subscription => boolean(),
subscription_identifiers => boolean(),
shared_subscription => boolean()
shared_subscription => boolean(),
exclusive_subscription => boolean()
}).
-define(UNLIMITED, 0).
@ -56,7 +57,8 @@
-define(SUBCAP_KEYS, [max_topic_levels,
max_qos_allowed,
wildcard_subscription,
shared_subscription
shared_subscription,
exclusive_subscription
]).
-define(DEFAULT_CAPS, #{max_packet_size => ?MAX_PACKET_SIZE,
@ -67,7 +69,8 @@
retain_available => true,
wildcard_subscription => true,
subscription_identifiers => true,
shared_subscription => true
shared_subscription => true,
exclusive_subscription => false
}).
-spec(check_pub(emqx_types:zone(),
@ -93,11 +96,11 @@ do_check_pub(#{retain := true}, #{retain_available := false}) ->
{error, ?RC_RETAIN_NOT_SUPPORTED};
do_check_pub(_Flags, _Caps) -> ok.
-spec(check_sub(emqx_types:zone(),
-spec(check_sub(emqx_types:clientinfo(),
emqx_types:topic(),
emqx_types:subopts())
-> ok_or_error(emqx_types:reason_code())).
check_sub(Zone, Topic, SubOpts) ->
check_sub(ClientInfo = #{zone := Zone}, Topic, SubOpts) ->
Caps = get_caps(Zone, subscribe),
Flags = lists:foldl(
fun(max_topic_levels, Map) ->
@ -106,18 +109,29 @@ check_sub(Zone, Topic, SubOpts) ->
Map#{is_wildcard => emqx_topic:wildcard(Topic)};
(shared_subscription, Map) ->
Map#{is_shared => maps:is_key(share, SubOpts)};
(exclusive_subscription, Map) ->
Map#{is_exclusive => maps:get(is_exclusive, SubOpts, false)};
(_Key, Map) -> Map %% Ignore
end, #{}, maps:keys(Caps)),
do_check_sub(Flags, Caps).
do_check_sub(Flags, Caps, ClientInfo, Topic).
do_check_sub(#{topic_levels := Levels}, #{max_topic_levels := Limit})
do_check_sub(#{topic_levels := Levels}, #{max_topic_levels := Limit}, _, _)
when Limit > 0, Levels > Limit ->
{error, ?RC_TOPIC_FILTER_INVALID};
do_check_sub(#{is_wildcard := true}, #{wildcard_subscription := false}) ->
do_check_sub(#{is_wildcard := true}, #{wildcard_subscription := false}, _, _) ->
{error, ?RC_WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED};
do_check_sub(#{is_shared := true}, #{shared_subscription := false}) ->
do_check_sub(#{is_shared := true}, #{shared_subscription := false}, _, _) ->
{error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED};
do_check_sub(_Flags, _Caps) -> ok.
do_check_sub(#{is_exclusive := true}, #{exclusive_subscription := false}, _, _) ->
{error, ?RC_TOPIC_FILTER_INVALID};
do_check_sub(#{is_exclusive := true}, #{exclusive_subscription := true}, ClientInfo, Topic) ->
case emqx_exclusive_subscription:check_subscribe(ClientInfo, Topic) of
deny ->
{error, ?RC_QUOTA_EXCEEDED};
_ ->
ok
end;
do_check_sub(_Flags, _Caps, _, _) -> ok.
default_caps() ->
?DEFAULT_CAPS.

View File

@ -216,6 +216,12 @@ parse(TopicFilter = <<"$share/", Rest/binary>>, Options) ->
_ -> error({invalid_topic_filter, TopicFilter})
end
end;
parse(TopicFilter = <<"$exclusive/", Topic/binary>>, Options) ->
case Topic of
<<>> ->
error({invalid_topic_filter, TopicFilter});
_ ->
{Topic, Options#{is_exclusive => true}}
end;
parse(TopicFilter, Options) ->
{TopicFilter, Options}.

View File

@ -53,11 +53,12 @@ t_check_sub(_) ->
},
emqx_zone:set_env(zone, '$mqtt_sub_caps', SubCaps),
timer:sleep(50),
ok = emqx_mqtt_caps:check_sub(zone, <<"topic">>, SubOpts),
ClientInfo = #{zone => zone},
ok = emqx_mqtt_caps:check_sub(ClientInfo, <<"topic">>, SubOpts),
?assertEqual({error, ?RC_TOPIC_FILTER_INVALID},
emqx_mqtt_caps:check_sub(zone, <<"a/b/c/d">>, SubOpts)),
emqx_mqtt_caps:check_sub(ClientInfo, <<"a/b/c/d">>, SubOpts)),
?assertEqual({error, ?RC_WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED},
emqx_mqtt_caps:check_sub(zone, <<"+/#">>, SubOpts)),
emqx_mqtt_caps:check_sub(ClientInfo, <<"+/#">>, SubOpts)),
?assertEqual({error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED},
emqx_mqtt_caps:check_sub(zone, <<"topic">>, SubOpts#{share => true})),
emqx_mqtt_caps:check_sub(ClientInfo, <<"topic">>, SubOpts#{share => true})),
emqx_zone:unset_env(zone, '$mqtt_pub_caps').