diff --git a/CHANGES-4.3.md b/CHANGES-4.3.md index a7e161915..25760be08 100644 --- a/CHANGES-4.3.md +++ b/CHANGES-4.3.md @@ -50,6 +50,7 @@ File format: * Fix user or appid created, name only allow `^[A-Za-z]+[A-Za-z0-9-_]*$` * Fix subscribe http api crash by bad_qos `/mqtt/subscribe`,`/mqtt/subscribe_batch`. * Send DISCONNECT packet with reason code 0x98 if connection has been kicked [#7309] +* Auto subscribe to an empty topic will be simply ignored now ## v4.3.12 ### Important changes diff --git a/lib-ce/emqx_modules/src/emqx_mod_subscription.erl b/lib-ce/emqx_modules/src/emqx_mod_subscription.erl index 06178aee7..1b6a2c1c7 100644 --- a/lib-ce/emqx_modules/src/emqx_mod_subscription.erl +++ b/lib-ce/emqx_modules/src/emqx_mod_subscription.erl @@ -20,6 +20,7 @@ -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). +-include_lib("emqx/include/logger.hrl"). %% emqx_gen_mod callbacks -export([ load/1 @@ -38,14 +39,33 @@ load(Topics) -> emqx_hooks:add('client.connected', {?MODULE, on_client_connected, [Topics]}). on_client_connected(#{clientid := ClientId, username := Username}, _ConnInfo = #{proto_ver := ProtoVer}, Topics) -> - Replace = fun(Topic) -> - rep(<<"%u">>, Username, rep(<<"%c">>, ClientId, Topic)) + + OptFun = case ProtoVer of + ?MQTT_PROTO_V5 -> fun(X) -> X end; + _ -> fun(#{qos := Qos}) -> #{qos => Qos} end end, - TopicFilters = case ProtoVer of - ?MQTT_PROTO_V5 -> [{Replace(Topic), SubOpts} || {Topic, SubOpts} <- Topics]; - _ -> [{Replace(Topic), #{qos => Qos}} || {Topic, #{qos := Qos}} <- Topics] - end, - self() ! {subscribe, TopicFilters}. + + Fold = fun({Topic, SubOpts}, Acc) -> + case rep(Topic, ClientId, Username) of + {error, Reason} -> + ?LOG(warning, "auto subscribe ignored, topic filter:~ts reason:~p~n", + [Topic, Reason]), + Acc; + <<>> -> + ?LOG(warning, "auto subscribe ignored, topic filter:~ts" + " reason: topic can't be empty~n", + [Topic]), + Acc; + NewTopic -> + [{NewTopic, OptFun(SubOpts)} | Acc] + end + end, + + case lists:foldl(Fold, [], Topics) of + [] -> ok; + TopicFilters -> + self() ! {subscribe, TopicFilters} + end. unload(_) -> emqx_hooks:del('client.connected', {?MODULE, on_client_connected}). @@ -56,10 +76,24 @@ description() -> %% Internal functions %%-------------------------------------------------------------------- -rep(<<"%c">>, ClientId, Topic) -> - emqx_topic:feed_var(<<"%c">>, ClientId, Topic); -rep(<<"%u">>, undefined, Topic) -> - Topic; -rep(<<"%u">>, Username, Topic) -> - emqx_topic:feed_var(<<"%u">>, Username, Topic). +rep(Topic, ClientId, Username) -> + Words = emqx_topic:words(Topic), + rep(Words, ClientId, Username, []). +rep([<<"%c">> | T], ClientId, Username, Acc) -> + rep(T, + ClientId, + Username, + [ClientId | Acc]); +rep([<<"%u">> | _], _, undefined, _) -> + {error, username_undefined}; +rep([<<"%u">> | T], ClientId, Username, Acc) -> + rep(T, + ClientId, + Username, + [Username | Acc]); +rep([H | T], ClientId, UserName, Acc) -> + rep(T, ClientId, UserName, [H | Acc]); + +rep([], _, _, Acc) -> + emqx_topic:join(lists:reverse(Acc)). diff --git a/lib-ce/emqx_modules/src/emqx_modules.appup.src b/lib-ce/emqx_modules/src/emqx_modules.appup.src index a82421aec..01b9c6651 100644 --- a/lib-ce/emqx_modules/src/emqx_modules.appup.src +++ b/lib-ce/emqx_modules/src/emqx_modules.appup.src @@ -1,29 +1,39 @@ %% -*- mode: erlang -*- %% Unless you know what you are doing, DO NOT edit manually!! {VSN, - [{"4.3.4",[{load_module,emqx_mod_delayed,brutal_purge,soft_purge,[]}]}, + [{"4.3.4", + [{load_module,emqx_mod_subscription,brutal_purge,soft_purge,[]}, + {load_module,emqx_mod_delayed,brutal_purge,soft_purge,[]}]}, {<<"4\\.3\\.[2-3]">>, - [{load_module,emqx_mod_delayed,brutal_purge,soft_purge,[]}, + [{load_module,emqx_mod_subscription,brutal_purge,soft_purge,[]}, + {load_module,emqx_mod_delayed,brutal_purge,soft_purge,[]}, {load_module,emqx_mod_presence,brutal_purge,soft_purge,[]}]}, {"4.3.1", - [{load_module,emqx_mod_delayed,brutal_purge,soft_purge,[]}, + [{load_module,emqx_mod_subscription,brutal_purge,soft_purge,[]}, + {load_module,emqx_mod_delayed,brutal_purge,soft_purge,[]}, {load_module,emqx_mod_presence,brutal_purge,soft_purge,[]}, {load_module,emqx_mod_api_topic_metrics,brutal_purge,soft_purge,[]}]}, {"4.3.0", - [{update,emqx_mod_delayed,{advanced,[]}}, + [{load_module,emqx_mod_subscription,brutal_purge,soft_purge,[]}, + {update,emqx_mod_delayed,{advanced,[]}}, {load_module,emqx_mod_presence,brutal_purge,soft_purge,[]}, {load_module,emqx_mod_api_topic_metrics,brutal_purge,soft_purge,[]}]}, {<<".*">>,[]}], - [{"4.3.4",[{load_module,emqx_mod_delayed,brutal_purge,soft_purge,[]}]}, + [{"4.3.4", + [{load_module,emqx_mod_subscription,brutal_purge,soft_purge,[]}, + {load_module,emqx_mod_delayed,brutal_purge,soft_purge,[]}]}, {<<"4\\.3\\.[2-3]">>, - [{load_module,emqx_mod_delayed,brutal_purge,soft_purge,[]}, + [{load_module,emqx_mod_subscription,brutal_purge,soft_purge,[]}, + {load_module,emqx_mod_delayed,brutal_purge,soft_purge,[]}, {load_module,emqx_mod_presence,brutal_purge,soft_purge,[]}]}, {"4.3.1", - [{load_module,emqx_mod_delayed,brutal_purge,soft_purge,[]}, + [{load_module,emqx_mod_subscription,brutal_purge,soft_purge,[]}, + {load_module,emqx_mod_delayed,brutal_purge,soft_purge,[]}, {load_module,emqx_mod_presence,brutal_purge,soft_purge,[]}, {load_module,emqx_mod_api_topic_metrics,brutal_purge,soft_purge,[]}]}, {"4.3.0", - [{update,emqx_mod_delayed,{advanced,[]}}, + [{load_module,emqx_mod_subscription,brutal_purge,soft_purge,[]}, + {update,emqx_mod_delayed,{advanced,[]}}, {load_module,emqx_mod_presence,brutal_purge,soft_purge,[]}, {load_module,emqx_mod_api_topic_metrics,brutal_purge,soft_purge,[]}]}, {<<".*">>,[]}]}.