diff --git a/etc/emqx.conf b/etc/emqx.conf index a4892c610..94d5cbecb 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -742,6 +742,11 @@ mqtt.wildcard_subscription = true ## Value: boolean mqtt.shared_subscription = true +## Whether the Server supports MQTT Exclusive Subscriptions. +## +## Value: boolean +mqtt.exclusive_subscription = false + ## Whether to ignore loop delivery of messages.(for mqtt v3.1.1) ## ## Value: true | false @@ -847,6 +852,11 @@ zone.external.force_gc_policy = 16000|16MB ## Value: boolean ## zone.external.shared_subscription = false +## Whether the Server supports MQTT Exclusive Subscriptions. +## +## Value: boolean +## zone.external.exclusive_subscription = false + ## Server Keep Alive ## ## Value: Number @@ -1049,6 +1059,11 @@ zone.internal.acl_deny_action = ignore ## Value: boolean ## zone.internal.shared_subscription = true +## Whether the Server supports MQTT Exclusive Subscriptions. +## +## Value: boolean +## zone.internal.exclusive_subscription = false + ## See zone.$name.max_subscriptions. ## ## Value: Integer diff --git a/priv/emqx.schema b/priv/emqx.schema index 576177c5d..b3fd916a8 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -945,6 +945,12 @@ end}. {datatype, string} ]}. +%% @doc Whether the Server supports Exclusive Subscriptions. +{mapping, "mqtt.exclusive_subscription", "emqx.exclusive_subscription", [ + {default, false}, + {datatype, {enum, [true, false]}} +]}. + %%-------------------------------------------------------------------- %% Zones %%-------------------------------------------------------------------- @@ -1199,6 +1205,12 @@ end}. {datatype, {enum, [true, false]}} ]}. +%% @doc Whether the Server supports Exclusive Subscriptions. +{mapping, "zone.$name.exclusive_subscription", "emqx.zones", [ + {default, false}, + {datatype, {enum, [true, false]}} +]}. + {translation, "emqx.zones", fun(Conf) -> Ratelimit = fun(Val) -> [L, D] = string:tokens(Val, ", "), diff --git a/src/emqx.appup.src b/src/emqx.appup.src index 827a4663b..db68145cb 100644 --- a/src/emqx.appup.src +++ b/src/emqx.appup.src @@ -2,13 +2,32 @@ %% Unless you know what you are doing, DO NOT edit manually!! {VSN, [{"4.3.16", - [{load_module,emqx_channel,brutal_purge,soft_purge,[]}, + [{load_module,emqx_packet,brutal_purge,soft_purge,[]}, + {load_module,emqx_misc,brutal_purge,soft_purge,[]}, + {load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, + {load_module,emqx_access_control,brutal_purge,soft_purge,[]}, + {load_module,emqx_os_mon,brutal_purge,soft_purge,[]}, + {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]}, + {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, + {load_module,emqx_alarm,brutal_purge,soft_purge,[]}, + {load_module,emqx_topic,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]}, + {load_module,emqx_frame,brutal_purge,soft_purge,[]}, + {load_module,emqx_session,brutal_purge,soft_purge,[]}, + {add_module,emqx_exclusive_subscription}, + {add_module,emqx_calendar}, + {load_module,emqx_channel,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_access_rule,brutal_purge,soft_purge,[]}, {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_metrics,brutal_purge,soft_purge,[]}]}, {"4.3.15", - [{add_module,emqx_calendar}, + [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_topic,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]}, + {add_module,emqx_exclusive_subscription}, + {add_module,emqx_calendar}, {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, @@ -26,7 +45,11 @@ {update,emqx_os_mon,{advanced,[]}}, {load_module,emqx_app,brutal_purge,soft_purge,[]}]}, {"4.3.14", - [{add_module,emqx_calendar}, + [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_topic,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]}, + {add_module,emqx_exclusive_subscription}, + {add_module,emqx_calendar}, {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]}, @@ -47,7 +70,11 @@ {update,emqx_os_mon,{advanced,[]}}, {load_module,emqx_hooks,brutal_purge,soft_purge,[]}]}, {"4.3.13", - [{add_module,emqx_calendar}, + [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_topic,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]}, + {add_module,emqx_exclusive_subscription}, + {add_module,emqx_calendar}, {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, @@ -75,7 +102,11 @@ {update,emqx_os_mon,{advanced,[]}}, {load_module,emqx_connection,brutal_purge,soft_purge,[]}]}, {"4.3.12", - [{add_module,emqx_calendar}, + [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_topic,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]}, + {add_module,emqx_exclusive_subscription}, + {add_module,emqx_calendar}, {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, @@ -107,7 +138,11 @@ {load_module,emqx_message,brutal_purge,soft_purge,[]}, {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {"4.3.11", - [{add_module,emqx_calendar}, + [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_topic,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]}, + {add_module,emqx_exclusive_subscription}, + {add_module,emqx_calendar}, {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, @@ -141,7 +176,11 @@ {load_module,emqx_message,brutal_purge,soft_purge,[]}, {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {"4.3.10", - [{add_module,emqx_calendar}, + [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_topic,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]}, + {add_module,emqx_exclusive_subscription}, + {add_module,emqx_calendar}, {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, @@ -175,7 +214,11 @@ {load_module,emqx_message,brutal_purge,soft_purge,[]}, {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {"4.3.9", - [{add_module,emqx_calendar}, + [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_topic,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]}, + {add_module,emqx_exclusive_subscription}, + {add_module,emqx_calendar}, {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, @@ -213,7 +256,11 @@ {load_module,emqx_message,brutal_purge,soft_purge,[]}, {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {"4.3.8", - [{add_module,emqx_calendar}, + [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_topic,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]}, + {add_module,emqx_exclusive_subscription}, + {add_module,emqx_calendar}, {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, @@ -251,7 +298,11 @@ {load_module,emqx_message,brutal_purge,soft_purge,[]}, {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {"4.3.7", - [{add_module,emqx_calendar}, + [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_topic,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]}, + {add_module,emqx_exclusive_subscription}, + {add_module,emqx_calendar}, {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, @@ -289,7 +340,11 @@ {load_module,emqx_message,brutal_purge,soft_purge,[]}, {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {"4.3.6", - [{add_module,emqx_calendar}, + [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_topic,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]}, + {add_module,emqx_exclusive_subscription}, + {add_module,emqx_calendar}, {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, @@ -327,7 +382,11 @@ {load_module,emqx_message,brutal_purge,soft_purge,[]}, {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {"4.3.5", - [{add_module,emqx_calendar}, + [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_topic,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]}, + {add_module,emqx_exclusive_subscription}, + {add_module,emqx_calendar}, {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, @@ -365,7 +424,11 @@ {load_module,emqx_message,brutal_purge,soft_purge,[]}, {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {"4.3.4", - [{add_module,emqx_calendar}, + [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_topic,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]}, + {add_module,emqx_exclusive_subscription}, + {add_module,emqx_calendar}, {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, @@ -403,7 +466,11 @@ {load_module,emqx_message,brutal_purge,soft_purge,[]}, {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {"4.3.3", - [{add_module,emqx_calendar}, + [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_topic,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]}, + {add_module,emqx_exclusive_subscription}, + {add_module,emqx_calendar}, {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]}, {load_module,emqx_hooks,brutal_purge,soft_purge,[]}, @@ -441,7 +508,11 @@ {load_module,emqx_message,brutal_purge,soft_purge,[]}, {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {"4.3.2", - [{add_module,emqx_calendar}, + [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_topic,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]}, + {add_module,emqx_exclusive_subscription}, + {add_module,emqx_calendar}, {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]}, {load_module,emqx_hooks,brutal_purge,soft_purge,[]}, @@ -479,7 +550,11 @@ {load_module,emqx_message,brutal_purge,soft_purge,[]}, {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {"4.3.1", - [{add_module,emqx_calendar}, + [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_topic,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]}, + {add_module,emqx_exclusive_subscription}, + {add_module,emqx_calendar}, {load_module,emqx_hooks,brutal_purge,soft_purge,[]}, {load_module,emqx_pmon,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]}, @@ -519,7 +594,11 @@ {load_module,emqx_message,brutal_purge,soft_purge,[]}, {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {"4.3.0", - [{add_module,emqx_calendar}, + [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_topic,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]}, + {add_module,emqx_exclusive_subscription}, + {add_module,emqx_calendar}, {load_module,emqx_hooks,brutal_purge,soft_purge,[]}, {load_module,emqx_pmon,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]}, @@ -563,13 +642,31 @@ {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {<<".*">>,[]}], [{"4.3.16", - [{load_module,emqx_channel,brutal_purge,soft_purge,[]}, + [{load_module,emqx_packet,brutal_purge,soft_purge,[]}, + {load_module,emqx_misc,brutal_purge,soft_purge,[]}, + {load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, + {load_module,emqx_access_control,brutal_purge,soft_purge,[]}, + {load_module,emqx_os_mon,brutal_purge,soft_purge,[]}, + {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]}, + {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, + {load_module,emqx_alarm,brutal_purge,soft_purge,[]}, + {load_module,emqx_topic,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]}, + {load_module,emqx_frame,brutal_purge,soft_purge,[]}, + {load_module,emqx_session,brutal_purge,soft_purge,[]}, + {load_module,emqx_channel,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_access_rule,brutal_purge,soft_purge,[]}, {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, - {load_module,emqx_metrics,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_metrics,brutal_purge,soft_purge,[]}, + {delete_module,emqx_calendar}, + {delete_module,emqx_exclusive_subscription}]}, {"4.3.15", - [{delete_module,emqx_calendar}, + [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_topic,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]}, + {delete_module,emqx_calendar}, {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, @@ -584,9 +681,13 @@ {load_module,emqx_access_rule,brutal_purge,soft_purge,[]}, {load_module,emqx_access_control,brutal_purge,soft_purge,[]}, {load_module,emqx_os_mon,brutal_purge,soft_purge,[]}, - {load_module,emqx_app,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_app,brutal_purge,soft_purge,[]}, + {delete_module,emqx_exclusive_subscription}]}, {"4.3.14", - [{delete_module,emqx_calendar}, + [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_topic,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]}, + {delete_module,emqx_calendar}, {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, {load_module,emqx_alarm,brutal_purge,soft_purge,[]}, @@ -604,9 +705,13 @@ {load_module,emqx_frame,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_os_mon,brutal_purge,soft_purge,[]}, - {load_module,emqx_hooks,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_hooks,brutal_purge,soft_purge,[]}, + {delete_module,emqx_exclusive_subscription}]}, {"4.3.13", - [{delete_module,emqx_calendar}, + [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_topic,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]}, + {delete_module,emqx_calendar}, {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, @@ -631,9 +736,13 @@ {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_ctl,brutal_purge,soft_purge,[]}, {load_module,emqx_cm,brutal_purge,soft_purge,[]}, - {load_module,emqx_connection,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_connection,brutal_purge,soft_purge,[]}, + {delete_module,emqx_exclusive_subscription}]}, {"4.3.12", - [{delete_module,emqx_calendar}, + [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_topic,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]}, + {delete_module,emqx_calendar}, {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, @@ -661,9 +770,13 @@ {load_module,emqx,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_message,brutal_purge,soft_purge,[]}, - {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_limiter,brutal_purge,soft_purge,[]}, + {delete_module,emqx_exclusive_subscription}]}, {"4.3.11", - [{delete_module,emqx_calendar}, + [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_topic,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]}, + {delete_module,emqx_calendar}, {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, @@ -693,9 +806,13 @@ {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_http_lib,brutal_purge,soft_purge,[]}, {load_module,emqx_message,brutal_purge,soft_purge,[]}, - {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_limiter,brutal_purge,soft_purge,[]}, + {delete_module,emqx_exclusive_subscription}]}, {"4.3.10", - [{delete_module,emqx_calendar}, + [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_topic,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]}, + {delete_module,emqx_calendar}, {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, @@ -725,9 +842,13 @@ {load_module,emqx,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_message,brutal_purge,soft_purge,[]}, - {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_limiter,brutal_purge,soft_purge,[]}, + {delete_module,emqx_exclusive_subscription}]}, {"4.3.9", - [{delete_module,emqx_calendar}, + [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_topic,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]}, + {delete_module,emqx_calendar}, {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, @@ -761,9 +882,13 @@ {load_module,emqx,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_message,brutal_purge,soft_purge,[]}, - {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_limiter,brutal_purge,soft_purge,[]}, + {delete_module,emqx_exclusive_subscription}]}, {"4.3.8", - [{delete_module,emqx_calendar}, + [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_topic,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]}, + {delete_module,emqx_calendar}, {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, @@ -797,9 +922,13 @@ {load_module,emqx,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_message,brutal_purge,soft_purge,[]}, - {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_limiter,brutal_purge,soft_purge,[]}, + {delete_module,emqx_exclusive_subscription}]}, {"4.3.7", - [{delete_module,emqx_calendar}, + [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_topic,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]}, + {delete_module,emqx_calendar}, {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, @@ -833,9 +962,13 @@ {load_module,emqx,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_message,brutal_purge,soft_purge,[]}, - {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_limiter,brutal_purge,soft_purge,[]}, + {delete_module,emqx_exclusive_subscription}]}, {"4.3.6", - [{delete_module,emqx_calendar}, + [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_topic,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]}, + {delete_module,emqx_calendar}, {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, @@ -869,9 +1002,13 @@ {load_module,emqx,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_message,brutal_purge,soft_purge,[]}, - {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_limiter,brutal_purge,soft_purge,[]}, + {delete_module,emqx_exclusive_subscription}]}, {"4.3.5", - [{delete_module,emqx_calendar}, + [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_topic,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]}, + {delete_module,emqx_calendar}, {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, @@ -905,9 +1042,13 @@ {load_module,emqx,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_message,brutal_purge,soft_purge,[]}, - {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_limiter,brutal_purge,soft_purge,[]}, + {delete_module,emqx_exclusive_subscription}]}, {"4.3.4", - [{delete_module,emqx_calendar}, + [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_topic,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]}, + {delete_module,emqx_calendar}, {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, @@ -941,9 +1082,13 @@ {load_module,emqx,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_message,brutal_purge,soft_purge,[]}, - {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_limiter,brutal_purge,soft_purge,[]}, + {delete_module,emqx_exclusive_subscription}]}, {"4.3.3", - [{delete_module,emqx_calendar}, + [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_topic,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]}, + {delete_module,emqx_calendar}, {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]}, {load_module,emqx_hooks,brutal_purge,soft_purge,[]}, @@ -977,9 +1122,13 @@ {load_module,emqx,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_message,brutal_purge,soft_purge,[]}, - {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_limiter,brutal_purge,soft_purge,[]}, + {delete_module,emqx_exclusive_subscription}]}, {"4.3.2", - [{delete_module,emqx_calendar}, + [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_topic,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]}, + {delete_module,emqx_calendar}, {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]}, {load_module,emqx_hooks,brutal_purge,soft_purge,[]}, @@ -1013,9 +1162,13 @@ {load_module,emqx,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_message,brutal_purge,soft_purge,[]}, - {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_limiter,brutal_purge,soft_purge,[]}, + {delete_module,emqx_exclusive_subscription}]}, {"4.3.1", - [{delete_module,emqx_calendar}, + [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_topic,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]}, + {delete_module,emqx_calendar}, {load_module,emqx_hooks,brutal_purge,soft_purge,[]}, {load_module,emqx_pmon,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]}, @@ -1051,9 +1204,13 @@ {load_module,emqx,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_message,brutal_purge,soft_purge,[]}, - {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_limiter,brutal_purge,soft_purge,[]}, + {delete_module,emqx_exclusive_subscription}]}, {"4.3.0", - [{delete_module,emqx_calendar}, + [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_topic,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]}, + {delete_module,emqx_calendar}, {load_module,emqx_hooks,brutal_purge,soft_purge,[]}, {load_module,emqx_pmon,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]}, @@ -1091,5 +1248,6 @@ {load_module,emqx,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_message,brutal_purge,soft_purge,[]}, - {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_limiter,brutal_purge,soft_purge,[]}, + {delete_module,emqx_exclusive_subscription}]}, {<<".*">>,[]}]}. diff --git a/src/emqx_broker.erl b/src/emqx_broker.erl index b93227990..876ba07eb 100644 --- a/src/emqx_broker.erl +++ b/src/emqx_broker.erl @@ -180,7 +180,8 @@ do_unsubscribe(Topic, SubPid, SubOpts) -> true = ets:delete(?SUBOPTION, {SubPid, Topic}), true = ets:delete_object(?SUBSCRIPTION, {SubPid, Topic}), Group = maps:get(share, SubOpts, undefined), - do_unsubscribe(Group, Topic, SubPid, SubOpts). + do_unsubscribe(Group, Topic, SubPid, SubOpts), + emqx_exclusive_subscription:unsubscribe(Topic, SubOpts). do_unsubscribe(undefined, Topic, SubPid, SubOpts) -> case maps:get(shard, SubOpts, 0) of @@ -498,4 +499,3 @@ code_change(_OldVsn, State, _Extra) -> %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- - diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index c667be30e..8ac537b69 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -1483,8 +1483,8 @@ check_sub_acl(TopicFilter, #channel{clientinfo = ClientInfo}) -> %%-------------------------------------------------------------------- %% Check Sub Caps -check_sub_caps(TopicFilter, SubOpts, #channel{clientinfo = #{zone := Zone}}) -> - emqx_mqtt_caps:check_sub(Zone, TopicFilter, SubOpts). +check_sub_caps(TopicFilter, SubOpts, #channel{clientinfo = ClientInfo}) -> + emqx_mqtt_caps:check_sub(ClientInfo, TopicFilter, SubOpts). %%-------------------------------------------------------------------- %% Enrich SubId diff --git a/src/emqx_exclusive_subscription.erl b/src/emqx_exclusive_subscription.erl new file mode 100644 index 000000000..ab26a6bd6 --- /dev/null +++ b/src/emqx_exclusive_subscription.erl @@ -0,0 +1,103 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_exclusive_subscription). + +-include_lib("emqx/include/emqx.hrl"). +-include_lib("emqx/include/logger.hrl"). + +-logger_header("[exclusive]"). + +%% Mnesia bootstrap +-export([mnesia/1]). + +-boot_mnesia({mnesia, [boot]}). +-copy_mnesia({mnesia, [copy]}). + +-export([ + check_subscribe/2, + unsubscribe/2 +]). + +-record(exclusive_subscription, { + topic :: emqx_types:topic(), + clientid :: emqx_types:clientid() +}). + +-define(TAB, emqx_exclusive_subscription). + +%%-------------------------------------------------------------------- +%% Mnesia bootstrap +%%-------------------------------------------------------------------- + +mnesia(boot) -> + StoreProps = [ + {ets, [ + {read_concurrency, true}, + {write_concurrency, true} + ]} + ], + ok = ekka_mnesia:create_table(?TAB, [ + {type, set}, + {ram_copies, [node()]}, + {record_name, exclusive_subscription}, + {attributes, record_info(fields, exclusive_subscription)}, + {storage_properties, StoreProps} + ]); +mnesia(copy) -> + ok = ekka_mnesia:copy_table(?TAB, ram_copies). + +%%-------------------------------------------------------------------- +%% APIs +%%-------------------------------------------------------------------- +-spec check_subscribe(emqx_types:clientinfo(), emqx_types:topic()) -> + allow | deny. +check_subscribe(#{clientid := ClientId}, Topic) -> + Fun = fun() -> + try_subscribe(ClientId, Topic) + end, + case mnesia:transaction(Fun) of + {atomic, Res} -> + Res; + {aborted, Reason} -> + ?LOG(warning, "Cannot check subscribe ~p due to ~p.", [Topic, Reason]), + deny + end. + +unsubscribe(Topic, #{is_exclusive := true}) -> + _ = mnesia:transaction(fun() -> mnesia:delete({?TAB, Topic}) end), + ok; +unsubscribe(_Topic, _SubOpts) -> + ok. + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- +try_subscribe(ClientId, Topic) -> + case mnesia:wread({?TAB, Topic}) of + [] -> + mnesia:write( + ?TAB, + #exclusive_subscription{ + clientid = ClientId, + topic = Topic + }, + write + ), + allow; + [_] -> + deny + end. diff --git a/src/emqx_mqtt_caps.erl b/src/emqx_mqtt_caps.erl index 8e94d25a7..f218fa795 100644 --- a/src/emqx_mqtt_caps.erl +++ b/src/emqx_mqtt_caps.erl @@ -43,7 +43,8 @@ retain_available => boolean(), wildcard_subscription => boolean(), subscription_identifiers => boolean(), - shared_subscription => boolean() + shared_subscription => boolean(), + exclusive_subscription => boolean() }). -define(UNLIMITED, 0). @@ -56,7 +57,8 @@ -define(SUBCAP_KEYS, [max_topic_levels, max_qos_allowed, wildcard_subscription, - shared_subscription + shared_subscription, + exclusive_subscription ]). -define(DEFAULT_CAPS, #{max_packet_size => ?MAX_PACKET_SIZE, @@ -67,7 +69,8 @@ retain_available => true, wildcard_subscription => true, subscription_identifiers => true, - shared_subscription => true + shared_subscription => true, + exclusive_subscription => false }). -spec(check_pub(emqx_types:zone(), @@ -93,11 +96,11 @@ do_check_pub(#{retain := true}, #{retain_available := false}) -> {error, ?RC_RETAIN_NOT_SUPPORTED}; do_check_pub(_Flags, _Caps) -> ok. --spec(check_sub(emqx_types:zone(), +-spec(check_sub(emqx_types:clientinfo(), emqx_types:topic(), emqx_types:subopts()) -> ok_or_error(emqx_types:reason_code())). -check_sub(Zone, Topic, SubOpts) -> +check_sub(ClientInfo = #{zone := Zone}, Topic, SubOpts) -> Caps = get_caps(Zone, subscribe), Flags = lists:foldl( fun(max_topic_levels, Map) -> @@ -106,18 +109,29 @@ check_sub(Zone, Topic, SubOpts) -> Map#{is_wildcard => emqx_topic:wildcard(Topic)}; (shared_subscription, Map) -> Map#{is_shared => maps:is_key(share, SubOpts)}; + (exclusive_subscription, Map) -> + Map#{is_exclusive => maps:get(is_exclusive, SubOpts, false)}; (_Key, Map) -> Map %% Ignore end, #{}, maps:keys(Caps)), - do_check_sub(Flags, Caps). + do_check_sub(Flags, Caps, ClientInfo, Topic). -do_check_sub(#{topic_levels := Levels}, #{max_topic_levels := Limit}) +do_check_sub(#{topic_levels := Levels}, #{max_topic_levels := Limit}, _, _) when Limit > 0, Levels > Limit -> {error, ?RC_TOPIC_FILTER_INVALID}; -do_check_sub(#{is_wildcard := true}, #{wildcard_subscription := false}) -> +do_check_sub(#{is_wildcard := true}, #{wildcard_subscription := false}, _, _) -> {error, ?RC_WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED}; -do_check_sub(#{is_shared := true}, #{shared_subscription := false}) -> +do_check_sub(#{is_shared := true}, #{shared_subscription := false}, _, _) -> {error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED}; -do_check_sub(_Flags, _Caps) -> ok. +do_check_sub(#{is_exclusive := true}, #{exclusive_subscription := false}, _, _) -> + {error, ?RC_TOPIC_FILTER_INVALID}; +do_check_sub(#{is_exclusive := true}, #{exclusive_subscription := true}, ClientInfo, Topic) -> + case emqx_exclusive_subscription:check_subscribe(ClientInfo, Topic) of + deny -> + {error, ?RC_QUOTA_EXCEEDED}; + _ -> + ok + end; +do_check_sub(_Flags, _Caps, _, _) -> ok. default_caps() -> ?DEFAULT_CAPS. diff --git a/src/emqx_topic.erl b/src/emqx_topic.erl index 8c6ac7225..3d758d32e 100644 --- a/src/emqx_topic.erl +++ b/src/emqx_topic.erl @@ -216,6 +216,12 @@ parse(TopicFilter = <<"$share/", Rest/binary>>, Options) -> _ -> error({invalid_topic_filter, TopicFilter}) end end; +parse(TopicFilter = <<"$exclusive/", Topic/binary>>, Options) -> + case Topic of + <<>> -> + error({invalid_topic_filter, TopicFilter}); + _ -> + {Topic, Options#{is_exclusive => true}} + end; parse(TopicFilter, Options) -> {TopicFilter, Options}. - diff --git a/test/emqx_mqtt_caps_SUITE.erl b/test/emqx_mqtt_caps_SUITE.erl index 992e7743e..c2b1a477b 100644 --- a/test/emqx_mqtt_caps_SUITE.erl +++ b/test/emqx_mqtt_caps_SUITE.erl @@ -53,11 +53,12 @@ t_check_sub(_) -> }, emqx_zone:set_env(zone, '$mqtt_sub_caps', SubCaps), timer:sleep(50), - ok = emqx_mqtt_caps:check_sub(zone, <<"topic">>, SubOpts), + ClientInfo = #{zone => zone}, + ok = emqx_mqtt_caps:check_sub(ClientInfo, <<"topic">>, SubOpts), ?assertEqual({error, ?RC_TOPIC_FILTER_INVALID}, - emqx_mqtt_caps:check_sub(zone, <<"a/b/c/d">>, SubOpts)), + emqx_mqtt_caps:check_sub(ClientInfo, <<"a/b/c/d">>, SubOpts)), ?assertEqual({error, ?RC_WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED}, - emqx_mqtt_caps:check_sub(zone, <<"+/#">>, SubOpts)), + emqx_mqtt_caps:check_sub(ClientInfo, <<"+/#">>, SubOpts)), ?assertEqual({error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED}, - emqx_mqtt_caps:check_sub(zone, <<"topic">>, SubOpts#{share => true})), + emqx_mqtt_caps:check_sub(ClientInfo, <<"topic">>, SubOpts#{share => true})), emqx_zone:unset_env(zone, '$mqtt_pub_caps').