From b5411da770b1251bc05cfc61215777286748a285 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Wed, 7 Jun 2023 10:22:14 +0800 Subject: [PATCH] refactor: subscribe process to fix shared-sub --- README-CN.md | 2 +- apps/emqx/include/emqx.hrl | 3 - apps/emqx/include/emqx_mqtt.hrl | 18 +- apps/emqx/src/emqx_broker.erl | 88 +++-- apps/emqx/src/emqx_channel.erl | 305 +++++++++++------- apps/emqx/src/emqx_mountpoint.erl | 47 ++- apps/emqx/src/emqx_mqtt_caps.erl | 15 +- apps/emqx/src/emqx_reason_codes.erl | 1 + apps/emqx/src/emqx_session.erl | 14 +- apps/emqx/src/emqx_session_mem.erl | 2 +- apps/emqx/src/emqx_topic.erl | 89 +++-- apps/emqx/src/emqx_types.erl | 9 +- apps/emqx/test/emqx_broker_SUITE.erl | 28 +- apps/emqx/test/emqx_channel_SUITE.erl | 11 +- apps/emqx/test/emqx_mountpoint_SUITE.erl | 39 +++ apps/emqx/test/emqx_mqtt_caps_SUITE.erl | 4 +- apps/emqx/test/emqx_shared_sub_SUITE.erl | 189 +++++++---- apps/emqx/test/emqx_topic_SUITE.erl | 12 +- .../src/emqx_bridge_mqtt_connector.erl | 2 +- apps/emqx_exhook/src/emqx_exhook_handler.erl | 14 +- .../test/props/prop_exhook_hooks.erl | 1 + .../src/emqx_mgmt_api_clients.erl | 11 +- .../src/emqx_mgmt_api_subscriptions.erl | 41 +-- .../src/emqx_mgmt_api_topics.erl | 18 +- apps/emqx_modules/src/emqx_rewrite.erl | 3 +- apps/emqx_retainer/src/emqx_retainer.erl | 3 +- .../emqx_rule_engine/src/emqx_rule_events.erl | 8 +- changes/ce/fix-10976.en.md | 2 + 28 files changed, 636 insertions(+), 343 deletions(-) create mode 100644 changes/ce/fix-10976.en.md diff --git a/README-CN.md b/README-CN.md index 8c6f8d8c3..f989b9bed 100644 --- a/README-CN.md +++ b/README-CN.md @@ -77,7 +77,7 @@ EMQX Cloud 文档:[docs.emqx.com/zh/cloud/latest/](https://docs.emqx.com/zh/cl 优雅的跨平台 MQTT 5.0 客户端工具,提供了桌面端、命令行、Web 三种版本,帮助您更快的开发和调试 MQTT 服务和应用。 -- [车联网平台搭建从入门到精通 ](https://www.emqx.com/zh/blog/category/internet-of-vehicles) +- [车联网平台搭建从入门到精通](https://www.emqx.com/zh/blog/category/internet-of-vehicles) 结合 EMQ 在车联网领域的实践经验,从协议选择等理论知识,到平台架构设计等实战操作,分享如何搭建一个可靠、高效、符合行业场景需求的车联网平台。 diff --git a/apps/emqx/include/emqx.hrl b/apps/emqx/include/emqx.hrl index 664ec5803..3650488dd 100644 --- a/apps/emqx/include/emqx.hrl +++ b/apps/emqx/include/emqx.hrl @@ -39,9 +39,6 @@ %% System topic -define(SYSTOP, <<"$SYS/">>). -%% Queue topic --define(QUEUE, <<"$queue/">>). - %%-------------------------------------------------------------------- %% alarms %%-------------------------------------------------------------------- diff --git a/apps/emqx/include/emqx_mqtt.hrl b/apps/emqx/include/emqx_mqtt.hrl index 4d0188f71..93c70a6e1 100644 --- a/apps/emqx/include/emqx_mqtt.hrl +++ b/apps/emqx/include/emqx_mqtt.hrl @@ -55,6 +55,17 @@ %% MQTT-3.1.1 and MQTT-5.0 [MQTT-4.7.3-3] -define(MAX_TOPIC_LEN, 65535). +%%-------------------------------------------------------------------- +%% MQTT Share-Sub Internal +%%-------------------------------------------------------------------- + +-record(share, {group :: emqx_types:group(), topic :: emqx_types:topic()}). + +%% guards +-define(IS_TOPIC(T), + (is_binary(T) orelse is_record(T, share)) +). + %%-------------------------------------------------------------------- %% MQTT QoS Levels %%-------------------------------------------------------------------- @@ -661,13 +672,8 @@ end). -define(PACKET(Type), #mqtt_packet{header = #mqtt_packet_header{type = Type}}). -define(SHARE, "$share"). +-define(QUEUE, "$queue"). -define(SHARE(Group, Topic), emqx_topic:join([<>, Group, Topic])). --define(IS_SHARE(Topic), - case Topic of - <> -> true; - _ -> false - end -). -define(SHARE_EMPTY_FILTER, share_subscription_topic_cannot_be_empty). -define(SHARE_EMPTY_GROUP, share_subscription_group_name_cannot_be_empty). diff --git a/apps/emqx/src/emqx_broker.erl b/apps/emqx/src/emqx_broker.erl index 403e3757f..cc9cb98a6 100644 --- a/apps/emqx/src/emqx_broker.erl +++ b/apps/emqx/src/emqx_broker.erl @@ -118,18 +118,20 @@ create_tabs() -> %% Subscribe API %%------------------------------------------------------------------------------ --spec subscribe(emqx_types:topic()) -> ok. -subscribe(Topic) when is_binary(Topic) -> +-spec subscribe(emqx_types:topic() | emqx_types:share()) -> ok. +subscribe(Topic) when ?IS_TOPIC(Topic) -> subscribe(Topic, undefined). --spec subscribe(emqx_types:topic(), emqx_types:subid() | emqx_types:subopts()) -> ok. -subscribe(Topic, SubId) when is_binary(Topic), ?IS_SUBID(SubId) -> +-spec subscribe(emqx_types:topic() | emqx_types:share(), emqx_types:subid() | emqx_types:subopts()) -> + ok. +subscribe(Topic, SubId) when ?IS_TOPIC(Topic), ?IS_SUBID(SubId) -> subscribe(Topic, SubId, ?DEFAULT_SUBOPTS); -subscribe(Topic, SubOpts) when is_binary(Topic), is_map(SubOpts) -> +subscribe(Topic, SubOpts) when ?IS_TOPIC(Topic), is_map(SubOpts) -> subscribe(Topic, undefined, SubOpts). --spec subscribe(emqx_types:topic(), emqx_types:subid(), emqx_types:subopts()) -> ok. -subscribe(Topic, SubId, SubOpts0) when is_binary(Topic), ?IS_SUBID(SubId), is_map(SubOpts0) -> +-spec subscribe(emqx_types:topic() | emqx_types:share(), emqx_types:subid(), emqx_types:subopts()) -> + ok. +subscribe(Topic, SubId, SubOpts0) when ?IS_TOPIC(Topic), ?IS_SUBID(SubId), is_map(SubOpts0) -> SubOpts = maps:merge(?DEFAULT_SUBOPTS, SubOpts0), _ = emqx_trace:subscribe(Topic, SubId, SubOpts), SubPid = self(), @@ -151,13 +153,13 @@ with_subid(undefined, SubOpts) -> with_subid(SubId, SubOpts) -> maps:put(subid, SubId, SubOpts). -%% @private do_subscribe(Topic, SubPid, SubOpts) -> true = ets:insert(?SUBSCRIPTION, {SubPid, Topic}), - Group = maps:get(share, SubOpts, undefined), - do_subscribe(Group, Topic, SubPid, SubOpts). + do_subscribe2(Topic, SubPid, SubOpts). -do_subscribe(undefined, Topic, SubPid, SubOpts) -> +do_subscribe2(Topic, SubPid, SubOpts) when is_binary(Topic) -> + %% FIXME: subscribe shard bug + %% https://emqx.atlassian.net/browse/EMQX-10214 case emqx_broker_helper:get_sub_shard(SubPid, Topic) of 0 -> true = ets:insert(?SUBSCRIBER, {Topic, SubPid}), @@ -168,34 +170,40 @@ do_subscribe(undefined, Topic, SubPid, SubOpts) -> true = ets:insert(?SUBOPTION, {{Topic, SubPid}, maps:put(shard, I, SubOpts)}), call(pick({Topic, I}), {subscribe, Topic, I}) end; -%% Shared subscription -do_subscribe(Group, Topic, SubPid, SubOpts) -> +do_subscribe2(Topic = #share{group = Group, topic = RealTopic}, SubPid, SubOpts) when + is_binary(RealTopic) +-> true = ets:insert(?SUBOPTION, {{Topic, SubPid}, SubOpts}), - emqx_shared_sub:subscribe(Group, Topic, SubPid). + emqx_shared_sub:subscribe(Group, RealTopic, SubPid). %%-------------------------------------------------------------------- %% Unsubscribe API %%-------------------------------------------------------------------- --spec unsubscribe(emqx_types:topic()) -> ok. -unsubscribe(Topic) when is_binary(Topic) -> +-spec unsubscribe(emqx_types:topic() | emqx_types:share()) -> ok. +unsubscribe(Topic) when ?IS_TOPIC(Topic) -> SubPid = self(), case ets:lookup(?SUBOPTION, {Topic, SubPid}) of [{_, SubOpts}] -> - _ = emqx_broker_helper:reclaim_seq(Topic), _ = emqx_trace:unsubscribe(Topic, SubOpts), do_unsubscribe(Topic, SubPid, SubOpts); [] -> ok end. +-spec do_unsubscribe(emqx_types:topic() | emqx_types:share(), pid(), emqx_types:subopts()) -> + ok. do_unsubscribe(Topic, SubPid, SubOpts) -> true = ets:delete(?SUBOPTION, {Topic, SubPid}), true = ets:delete_object(?SUBSCRIPTION, {SubPid, Topic}), - Group = maps:get(share, SubOpts, undefined), - do_unsubscribe(Group, Topic, SubPid, SubOpts). + do_unsubscribe2(Topic, SubPid, SubOpts). -do_unsubscribe(undefined, Topic, SubPid, SubOpts) -> +-spec do_unsubscribe2(emqx_types:topic() | emqx_types:share(), pid(), emqx_types:subopts()) -> + ok. +do_unsubscribe2(Topic, SubPid, SubOpts) when + is_binary(Topic), is_pid(SubPid), is_map(SubOpts) +-> + _ = emqx_broker_helper:reclaim_seq(Topic), case maps:get(shard, SubOpts, 0) of 0 -> true = ets:delete_object(?SUBSCRIBER, {Topic, SubPid}), @@ -205,7 +213,9 @@ do_unsubscribe(undefined, Topic, SubPid, SubOpts) -> true = ets:delete_object(?SUBSCRIBER, {{shard, Topic, I}, SubPid}), cast(pick({Topic, I}), {unsubscribed, Topic, I}) end; -do_unsubscribe(Group, Topic, SubPid, _SubOpts) -> +do_unsubscribe2(#share{group = Group, topic = Topic}, SubPid, _SubOpts) when + is_binary(Group), is_binary(Topic), is_pid(SubPid) +-> emqx_shared_sub:unsubscribe(Group, Topic, SubPid). %%-------------------------------------------------------------------- @@ -306,7 +316,9 @@ aggre([], true, Acc) -> lists:usort(Acc). %% @doc Forward message to another node. --spec forward(node(), emqx_types:topic(), emqx_types:delivery(), RpcMode :: sync | async) -> +-spec forward( + node(), emqx_types:topic() | emqx_types:share(), emqx_types:delivery(), RpcMode :: sync | async +) -> emqx_types:deliver_result(). forward(Node, To, Delivery, async) -> true = emqx_broker_proto_v1:forward_async(Node, To, Delivery), @@ -329,7 +341,8 @@ forward(Node, To, Delivery, sync) -> Result end. --spec dispatch(emqx_types:topic(), emqx_types:delivery()) -> emqx_types:deliver_result(). +-spec dispatch(emqx_types:topic() | emqx_types:share(), emqx_types:delivery()) -> + emqx_types:deliver_result(). dispatch(Topic, Delivery = #delivery{}) when is_binary(Topic) -> case emqx:is_running() of true -> @@ -353,7 +366,11 @@ inc_dropped_cnt(Msg) -> end. -compile({inline, [subscribers/1]}). --spec subscribers(emqx_types:topic() | {shard, emqx_types:topic(), non_neg_integer()}) -> +-spec subscribers( + emqx_types:topic() + | emqx_types:share() + | {shard, emqx_types:topic() | emqx_types:share(), non_neg_integer()} +) -> [pid()]. subscribers(Topic) when is_binary(Topic) -> lookup_value(?SUBSCRIBER, Topic, []); @@ -372,7 +389,7 @@ subscriber_down(SubPid) -> SubOpts when is_map(SubOpts) -> _ = emqx_broker_helper:reclaim_seq(Topic), true = ets:delete(?SUBOPTION, {Topic, SubPid}), - do_unsubscribe(undefined, Topic, SubPid, SubOpts); + do_unsubscribe2(Topic, SubPid, SubOpts); undefined -> ok end @@ -386,7 +403,7 @@ subscriber_down(SubPid) -> %%-------------------------------------------------------------------- -spec subscriptions(pid() | emqx_types:subid()) -> - [{emqx_types:topic(), emqx_types:subopts()}]. + [{emqx_types:topic() | emqx_types:share(), emqx_types:subopts()}]. subscriptions(SubPid) when is_pid(SubPid) -> [ {Topic, lookup_value(?SUBOPTION, {Topic, SubPid}, #{})} @@ -400,20 +417,22 @@ subscriptions(SubId) -> [] end. --spec subscriptions_via_topic(emqx_types:topic()) -> [emqx_types:subopts()]. +-spec subscriptions_via_topic(emqx_types:topic() | emqx_types:share()) -> [emqx_types:subopts()]. subscriptions_via_topic(Topic) -> MatchSpec = [{{{Topic, '_'}, '_'}, [], ['$_']}], ets:select(?SUBOPTION, MatchSpec). --spec subscribed(pid() | emqx_types:subid(), emqx_types:topic()) -> boolean(). +-spec subscribed( + pid() | emqx_types:subid(), emqx_types:topic() | emqx_types:share() +) -> boolean(). subscribed(SubPid, Topic) when is_pid(SubPid) -> ets:member(?SUBOPTION, {Topic, SubPid}); subscribed(SubId, Topic) when ?IS_SUBID(SubId) -> SubPid = emqx_broker_helper:lookup_subpid(SubId), ets:member(?SUBOPTION, {Topic, SubPid}). --spec get_subopts(pid(), emqx_types:topic()) -> maybe(emqx_types:subopts()). -get_subopts(SubPid, Topic) when is_pid(SubPid), is_binary(Topic) -> +-spec get_subopts(pid(), emqx_types:topic() | emqx_types:share()) -> maybe(emqx_types:subopts()). +get_subopts(SubPid, Topic) when is_pid(SubPid), ?IS_TOPIC(Topic) -> lookup_value(?SUBOPTION, {Topic, SubPid}); get_subopts(SubId, Topic) when ?IS_SUBID(SubId) -> case emqx_broker_helper:lookup_subpid(SubId) of @@ -423,7 +442,7 @@ get_subopts(SubId, Topic) when ?IS_SUBID(SubId) -> undefined end. --spec set_subopts(emqx_types:topic(), emqx_types:subopts()) -> boolean(). +-spec set_subopts(emqx_types:topic() | emqx_types:share(), emqx_types:subopts()) -> boolean(). set_subopts(Topic, NewOpts) when is_binary(Topic), is_map(NewOpts) -> set_subopts(self(), Topic, NewOpts). @@ -437,7 +456,7 @@ set_subopts(SubPid, Topic, NewOpts) -> false end. --spec topics() -> [emqx_types:topic()]. +-spec topics() -> [emqx_types:topic() | emqx_types:share()]. topics() -> emqx_router:topics(). @@ -542,7 +561,8 @@ code_change(_OldVsn, State, _Extra) -> %% Internal functions %%-------------------------------------------------------------------- --spec do_dispatch(emqx_types:topic(), emqx_types:delivery()) -> emqx_types:deliver_result(). +-spec do_dispatch(emqx_types:topic() | emqx_types:share(), emqx_types:delivery()) -> + emqx_types:deliver_result(). do_dispatch(Topic, #delivery{message = Msg}) -> DispN = lists:foldl( fun(Sub, N) -> @@ -560,6 +580,8 @@ do_dispatch(Topic, #delivery{message = Msg}) -> {ok, DispN} end. +%% Donot dispatch to share subscriber here. +%% we do it in `emqx_shared_sub.erl` with configured strategy do_dispatch(SubPid, Topic, Msg) when is_pid(SubPid) -> case erlang:is_process_alive(SubPid) of true -> diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 4f6d5ac6f..61b31c6e1 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -476,60 +476,27 @@ handle_in( ok = emqx_metrics:inc('packets.pubcomp.missed'), {ok, Channel} end; -handle_in( - SubPkt = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters), - Channel = #channel{clientinfo = ClientInfo} -) -> - case emqx_packet:check(SubPkt) of - ok -> - TopicFilters0 = parse_topic_filters(TopicFilters), - TopicFilters1 = enrich_subopts_subid(Properties, TopicFilters0), - TupleTopicFilters0 = check_sub_authzs(TopicFilters1, Channel), - HasAuthzDeny = lists:any( - fun({_TopicFilter, ReasonCode}) -> - ReasonCode =:= ?RC_NOT_AUTHORIZED - end, - TupleTopicFilters0 - ), - DenyAction = emqx:get_config([authorization, deny_action], ignore), - case DenyAction =:= disconnect andalso HasAuthzDeny of - true -> - handle_out(disconnect, ?RC_NOT_AUTHORIZED, Channel); - false -> - TopicFilters2 = [ - TopicFilter - || {TopicFilter, ?RC_SUCCESS} <- TupleTopicFilters0 - ], - TopicFilters3 = run_hooks( - 'client.subscribe', - [ClientInfo, Properties], - TopicFilters2 - ), - {TupleTopicFilters1, NChannel} = process_subscribe( - TopicFilters3, - Properties, - Channel - ), - TupleTopicFilters2 = - lists:foldl( - fun - ({{Topic, Opts = #{deny_subscription := true}}, _QoS}, Acc) -> - Key = {Topic, maps:without([deny_subscription], Opts)}, - lists:keyreplace(Key, 1, Acc, {Key, ?RC_UNSPECIFIED_ERROR}); - (Tuple = {Key, _Value}, Acc) -> - lists:keyreplace(Key, 1, Acc, Tuple) - end, - TupleTopicFilters0, - TupleTopicFilters1 - ), - ReasonCodes2 = [ - ReasonCode - || {_TopicFilter, ReasonCode} <- TupleTopicFilters2 - ], - handle_out(suback, {PacketId, ReasonCodes2}, NChannel) - end; - {error, ReasonCode} -> - handle_out(disconnect, ReasonCode, Channel) +handle_in(SubPkt = ?SUBSCRIBE_PACKET(PacketId, _Properties, _TopicFilters0), Channel0) -> + Pipe = pipeline( + [ + fun check_subscribe/2, + fun enrich_subscribe/2, + %% TODO && FIXME (EMQX-10786): mount topic before authz check. + fun check_sub_authzs/2, + fun check_sub_caps/2 + ], + SubPkt, + Channel0 + ), + case Pipe of + {ok, NPkt = ?SUBSCRIBE_PACKET(_PacketId, TFChecked), Channel} -> + {TFSubedWithNRC, NChannel} = process_subscribe(run_sub_hooks(NPkt, Channel), Channel), + ReasonCodes = gen_reason_codes(TFChecked, TFSubedWithNRC), + handle_out(suback, {PacketId, ReasonCodes}, NChannel); + {error, {disconnect, RC}, Channel} -> + %% funcs in pipeline always cause action: `disconnect` + %% And Only one ReasonCode in DISCONNECT packet + handle_out(disconnect, RC, Channel) end; handle_in( Packet = ?UNSUBSCRIBE_PACKET(PacketId, Properties, TopicFilters), @@ -540,7 +507,7 @@ handle_in( TopicFilters1 = run_hooks( 'client.unsubscribe', [ClientInfo, Properties], - parse_topic_filters(TopicFilters) + parse_raw_topic_filters(TopicFilters) ), {ReasonCodes, NChannel} = process_unsubscribe(TopicFilters1, Properties, Channel), handle_out(unsuback, {PacketId, ReasonCodes}, NChannel); @@ -782,32 +749,14 @@ after_message_acked(ClientInfo, Msg, PubAckProps) -> %% Process Subscribe %%-------------------------------------------------------------------- --compile({inline, [process_subscribe/3]}). -process_subscribe(TopicFilters, SubProps, Channel) -> - process_subscribe(TopicFilters, SubProps, Channel, []). +process_subscribe(TopicFilters, Channel) -> + process_subscribe(TopicFilters, Channel, []). -process_subscribe([], _SubProps, Channel, Acc) -> +process_subscribe([], Channel, Acc) -> {lists:reverse(Acc), Channel}; -process_subscribe([Topic = {TopicFilter, SubOpts} | More], SubProps, Channel, Acc) -> - case check_sub_caps(TopicFilter, SubOpts, Channel) of - ok -> - {ReasonCode, NChannel} = do_subscribe( - TopicFilter, - SubOpts#{sub_props => SubProps}, - Channel - ), - process_subscribe(More, SubProps, NChannel, [{Topic, ReasonCode} | Acc]); - {error, ReasonCode} -> - ?SLOG( - warning, - #{ - msg => "cannot_subscribe_topic_filter", - reason => emqx_reason_codes:name(ReasonCode) - }, - #{topic => TopicFilter} - ), - process_subscribe(More, SubProps, Channel, [{Topic, ReasonCode} | Acc]) - end. +process_subscribe([Filter = {TopicFilter, SubOpts} | More], Channel, Acc) -> + {NReasonCode, NChannel} = do_subscribe(TopicFilter, SubOpts, Channel), + process_subscribe(More, NChannel, [{Filter, NReasonCode} | Acc]). do_subscribe( TopicFilter, @@ -818,11 +767,13 @@ do_subscribe( session = Session } ) -> + %% TODO && FIXME (EMQX-10786): mount topic before authz check. NTopicFilter = emqx_mountpoint:mount(MountPoint, TopicFilter), - NSubOpts = enrich_subopts(maps:merge(?DEFAULT_SUBOPTS, SubOpts), Channel), - case emqx_session:subscribe(ClientInfo, NTopicFilter, NSubOpts, Session) of + case emqx_session:subscribe(ClientInfo, NTopicFilter, SubOpts, Session) of {ok, NSession} -> - {QoS, Channel#channel{session = NSession}}; + %% TODO && FIXME (EMQX-11216): QoS as ReasonCode(max granted QoS) for now + RC = QoS, + {RC, Channel#channel{session = NSession}}; {error, RC} -> ?SLOG( warning, @@ -835,6 +786,30 @@ do_subscribe( {RC, Channel} end. +gen_reason_codes(TFChecked, TFSubedWitNhRC) -> + do_gen_reason_codes([], TFChecked, TFSubedWitNhRC). + +%% Initial RC is `RC_SUCCESS | RC_NOT_AUTHORIZED`, generated by check_sub_authzs/2 +%% And then TF with `RC_SUCCESS` will passing through `process_subscribe/2` and +%% NRC should override the initial RC. +do_gen_reason_codes(Acc, [], []) -> + lists:reverse(Acc); +do_gen_reason_codes( + Acc, + [{_TF, ?RC_SUCCESS} | RestCheckedTF], + [{_TF, NRC} | RestTFWithNRC] +) -> + %% will passing through `process_subscribe/2` + %% use NRC to override IintialRC + do_gen_reason_codes([NRC | Acc], RestCheckedTF, RestTFWithNRC); +do_gen_reason_codes( + Acc, + [{_TF, InitialRC} | RestChecked], + RestTFWithNRC +) -> + %% InitialRC is not `RC_SUCCESS`, use it. + do_gen_reason_codes([InitialRC | Acc], RestChecked, RestTFWithNRC). + %%-------------------------------------------------------------------- %% Process Unsubscribe %%-------------------------------------------------------------------- @@ -1213,13 +1188,8 @@ handle_call(Req, Channel) -> ok | {ok, channel()} | {shutdown, Reason :: term(), channel()}. handle_info({subscribe, TopicFilters}, Channel) -> - {_, NChannel} = lists:foldl( - fun({TopicFilter, SubOpts}, {_, ChannelAcc}) -> - do_subscribe(TopicFilter, SubOpts, ChannelAcc) - end, - {[], Channel}, - parse_topic_filters(TopicFilters) - ), + NTopicFilters = enrich_subscribe(TopicFilters, Channel), + {_TopicFiltersWithRC, NChannel} = process_subscribe(NTopicFilters, Channel), {ok, NChannel}; handle_info({unsubscribe, TopicFilters}, Channel) -> {_RC, NChannel} = process_unsubscribe(TopicFilters, #{}, Channel), @@ -1857,49 +1827,156 @@ check_pub_caps( ) -> emqx_mqtt_caps:check_pub(Zone, #{qos => QoS, retain => Retain, topic => Topic}). +%%-------------------------------------------------------------------- +%% Check Subscribe Packet + +check_subscribe(SubPkt, _Channel) -> + case emqx_packet:check(SubPkt) of + ok -> ok; + {error, RC} -> {error, {disconnect, RC}} + end. + %%-------------------------------------------------------------------- %% Check Sub Authorization -check_sub_authzs(TopicFilters, Channel) -> - check_sub_authzs(TopicFilters, Channel, []). - check_sub_authzs( - [TopicFilter = {Topic, _} | More], - Channel = #channel{clientinfo = ClientInfo}, - Acc + ?SUBSCRIBE_PACKET(PacketId, SubProps, TopicFilters0), + Channel = #channel{clientinfo = ClientInfo} ) -> + CheckResult = do_check_sub_authzs(TopicFilters0, ClientInfo), + HasAuthzDeny = lists:any( + fun({{_TopicFilter, _SubOpts}, ReasonCode}) -> + ReasonCode =:= ?RC_NOT_AUTHORIZED + end, + CheckResult + ), + DenyAction = emqx:get_config([authorization, deny_action], ignore), + case DenyAction =:= disconnect andalso HasAuthzDeny of + true -> + {error, {disconnect, ?RC_NOT_AUTHORIZED}, Channel}; + false -> + {ok, ?SUBSCRIBE_PACKET(PacketId, SubProps, CheckResult), Channel} + end. + +do_check_sub_authzs(TopicFilters, ClientInfo) -> + do_check_sub_authzs(ClientInfo, TopicFilters, []). + +do_check_sub_authzs(_ClientInfo, [], Acc) -> + lists:reverse(Acc); +do_check_sub_authzs(ClientInfo, [TopicFilter = {Topic, _SubOpts} | More], Acc) -> + %% subsclibe authz check only cares the real topic filter when shared-sub + %% e.g. only check <<"t/#">> for <<"$share/g/t/#">> Action = authz_action(TopicFilter), - case emqx_access_control:authorize(ClientInfo, Action, Topic) of + case + emqx_access_control:authorize( + ClientInfo, + Action, + emqx_topic:get_shared_real_topic(Topic) + ) + of + %% TODO: support maximum QoS granted + %% MQTT-3.1.1 [MQTT-3.8.4-6] and MQTT-5.0 [MQTT-3.8.4-7] + %% Not implemented yet: + %% {allow, RC} -> do_check_sub_authzs(ClientInfo, More, [{TopicFilter, RC} | Acc]); allow -> - check_sub_authzs(More, Channel, [{TopicFilter, ?RC_SUCCESS} | Acc]); + do_check_sub_authzs(ClientInfo, More, [{TopicFilter, ?RC_SUCCESS} | Acc]); deny -> - check_sub_authzs(More, Channel, [{TopicFilter, ?RC_NOT_AUTHORIZED} | Acc]) - end; -check_sub_authzs([], _Channel, Acc) -> - lists:reverse(Acc). + do_check_sub_authzs(ClientInfo, More, [{TopicFilter, ?RC_NOT_AUTHORIZED} | Acc]) + end. %%-------------------------------------------------------------------- %% Check Sub Caps -check_sub_caps(TopicFilter, SubOpts, #channel{clientinfo = ClientInfo}) -> - emqx_mqtt_caps:check_sub(ClientInfo, TopicFilter, SubOpts). +check_sub_caps( + ?SUBSCRIBE_PACKET(PacketId, SubProps, TopicFilters), + Channel = #channel{clientinfo = ClientInfo} +) -> + CheckResult = do_check_sub_caps(ClientInfo, TopicFilters), + {ok, ?SUBSCRIBE_PACKET(PacketId, SubProps, CheckResult), Channel}. + +do_check_sub_caps(ClientInfo, TopicFilters) -> + do_check_sub_caps(ClientInfo, TopicFilters, []). + +do_check_sub_caps(_ClientInfo, [], Acc) -> + lists:reverse(Acc); +do_check_sub_caps(ClientInfo, [TopicFilter = {{Topic, SubOpts}, ?RC_SUCCESS} | More], Acc) -> + case emqx_mqtt_caps:check_sub(ClientInfo, Topic, SubOpts) of + ok -> + do_check_sub_caps(ClientInfo, More, [TopicFilter | Acc]); + {error, NRC} -> + ?SLOG( + warning, + #{ + msg => "cannot_subscribe_topic_filter", + reason => emqx_reason_codes:name(NRC) + }, + #{topic => Topic} + ), + do_check_sub_caps(ClientInfo, More, [{{Topic, SubOpts}, NRC} | Acc]) + end; +do_check_sub_caps(ClientInfo, [TopicFilter = {{_Topic, _SubOpts}, _OtherRC} | More], Acc) -> + do_check_sub_caps(ClientInfo, More, [TopicFilter | Acc]). %%-------------------------------------------------------------------- -%% Enrich SubId +%% Run Subscribe Hooks -enrich_subopts_subid(#{'Subscription-Identifier' := SubId}, TopicFilters) -> - [{Topic, SubOpts#{subid => SubId}} || {Topic, SubOpts} <- TopicFilters]; -enrich_subopts_subid(_Properties, TopicFilters) -> - TopicFilters. +run_sub_hooks( + ?SUBSCRIBE_PACKET(_PacketId, Properties, TopicFilters0), + _Channel = #channel{clientinfo = ClientInfo} +) -> + TopicFilters = [ + TopicFilter + || {TopicFilter, ?RC_SUCCESS} <- TopicFilters0 + ], + _NTopicFilters = run_hooks('client.subscribe', [ClientInfo, Properties], TopicFilters). %%-------------------------------------------------------------------- %% Enrich SubOpts -enrich_subopts(SubOpts, _Channel = ?IS_MQTT_V5) -> - SubOpts; -enrich_subopts(SubOpts, #channel{clientinfo = #{zone := Zone, is_bridge := IsBridge}}) -> +%% for api subscribe without sub-authz check and sub-caps check. +enrich_subscribe(TopicFilters, Channel) when is_list(TopicFilters) -> + do_enrich_subscribe(#{}, TopicFilters, Channel); +%% for mqtt clients sent subscribe packet. +enrich_subscribe(?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters), Channel) -> + NTopicFilters = do_enrich_subscribe(Properties, TopicFilters, Channel), + {ok, ?SUBSCRIBE_PACKET(PacketId, Properties, NTopicFilters), Channel}. + +do_enrich_subscribe(Properties, TopicFilters, Channel) -> + _NTopicFilters = run_fold( + [ + %% TODO: do try catch with reason code here + fun(TFs, _) -> parse_raw_topic_filters(TFs) end, + fun enrich_subopts_subid/2, + fun enrich_subopts_porps/2, + fun enrich_subopts_flags/2 + ], + TopicFilters, + #{sub_props => Properties, channel => Channel} + ). + +enrich_subopts_subid(TopicFilters, #{sub_props := #{'Subscription-Identifier' := SubId}}) -> + [{Topic, SubOpts#{subid => SubId}} || {Topic, SubOpts} <- TopicFilters]; +enrich_subopts_subid(TopicFilters, _State) -> + TopicFilters. + +enrich_subopts_porps(TopicFilters, #{sub_props := SubProps}) -> + [{Topic, SubOpts#{sub_props => SubProps}} || {Topic, SubOpts} <- TopicFilters]. + +enrich_subopts_flags(TopicFilters, #{channel := Channel}) -> + do_enrich_subopts_flags(TopicFilters, Channel). + +do_enrich_subopts_flags(TopicFilters, ?IS_MQTT_V5) -> + [{Topic, merge_default_subopts(SubOpts)} || {Topic, SubOpts} <- TopicFilters]; +do_enrich_subopts_flags(TopicFilters, #channel{clientinfo = #{zone := Zone, is_bridge := IsBridge}}) -> + Rap = flag(IsBridge), NL = flag(get_mqtt_conf(Zone, ignore_loop_deliver)), - SubOpts#{rap => flag(IsBridge), nl => NL}. + [ + {Topic, (merge_default_subopts(SubOpts))#{rap => Rap, nl => NL}} + || {Topic, SubOpts} <- TopicFilters + ]. + +merge_default_subopts(SubOpts) -> + maps:merge(?DEFAULT_SUBOPTS, SubOpts). %%-------------------------------------------------------------------- %% Enrich ConnAck Caps @@ -2089,8 +2166,8 @@ maybe_shutdown(Reason, _Intent = shutdown, Channel) -> %%-------------------------------------------------------------------- %% Parse Topic Filters --compile({inline, [parse_topic_filters/1]}). -parse_topic_filters(TopicFilters) -> +%% [{<<"$share/group/topic">>, _SubOpts = #{}} | _] +parse_raw_topic_filters(TopicFilters) -> lists:map(fun emqx_topic:parse/1, TopicFilters). %%-------------------------------------------------------------------- diff --git a/apps/emqx/src/emqx_mountpoint.erl b/apps/emqx/src/emqx_mountpoint.erl index 5b5dac954..c19736690 100644 --- a/apps/emqx/src/emqx_mountpoint.erl +++ b/apps/emqx/src/emqx_mountpoint.erl @@ -17,6 +17,7 @@ -module(emqx_mountpoint). -include("emqx.hrl"). +-include("emqx_mqtt.hrl"). -include("emqx_placeholder.hrl"). -include("types.hrl"). @@ -34,38 +35,54 @@ -spec mount(maybe(mountpoint()), Any) -> Any when Any :: emqx_types:topic() + | emqx_types:share() | emqx_types:message() | emqx_types:topic_filters(). mount(undefined, Any) -> Any; -mount(MountPoint, Topic) when is_binary(Topic) -> - prefix(MountPoint, Topic); -mount(MountPoint, Msg = #message{topic = Topic}) -> - Msg#message{topic = prefix(MountPoint, Topic)}; +mount(MountPoint, Topic) when ?IS_TOPIC(Topic) -> + prefix_maybe_share(MountPoint, Topic); +mount(MountPoint, Msg = #message{topic = Topic}) when is_binary(Topic) -> + Msg#message{topic = prefix_maybe_share(MountPoint, Topic)}; mount(MountPoint, TopicFilters) when is_list(TopicFilters) -> - [{prefix(MountPoint, Topic), SubOpts} || {Topic, SubOpts} <- TopicFilters]. + [{prefix_maybe_share(MountPoint, Topic), SubOpts} || {Topic, SubOpts} <- TopicFilters]. -%% @private --compile({inline, [prefix/2]}). -prefix(MountPoint, Topic) -> - <>. +-spec prefix_maybe_share(maybe(mountpoint()), Any) -> Any when + Any :: + emqx_types:topic() + | emqx_types:share(). +prefix_maybe_share(MountPoint, Topic) when + is_binary(MountPoint) andalso is_binary(Topic) +-> + <>; +prefix_maybe_share(MountPoint, #share{group = Group, topic = Topic}) when + is_binary(MountPoint) andalso is_binary(Topic) +-> + #share{group = Group, topic = prefix_maybe_share(MountPoint, Topic)}. -spec unmount(maybe(mountpoint()), Any) -> Any when Any :: emqx_types:topic() + | emqx_types:share() | emqx_types:message(). unmount(undefined, Any) -> Any; -unmount(MountPoint, Topic) when is_binary(Topic) -> +unmount(MountPoint, Topic) when ?IS_TOPIC(Topic) -> + unmount_maybe_share(MountPoint, Topic); +unmount(MountPoint, Msg = #message{topic = Topic}) when is_binary(Topic) -> + Msg#message{topic = unmount_maybe_share(MountPoint, Topic)}. + +unmount_maybe_share(MountPoint, Topic) when + is_binary(MountPoint) andalso is_binary(Topic) +-> case string:prefix(Topic, MountPoint) of nomatch -> Topic; Topic1 -> Topic1 end; -unmount(MountPoint, Msg = #message{topic = Topic}) -> - case string:prefix(Topic, MountPoint) of - nomatch -> Msg; - Topic1 -> Msg#message{topic = Topic1} - end. +unmount_maybe_share(MountPoint, TopicFilter = #share{topic = Topic}) when + is_binary(MountPoint) andalso is_binary(Topic) +-> + TopicFilter#share{topic = unmount_maybe_share(MountPoint, Topic)}. -spec replvar(maybe(mountpoint()), map()) -> maybe(mountpoint()). replvar(undefined, _Vars) -> diff --git a/apps/emqx/src/emqx_mqtt_caps.erl b/apps/emqx/src/emqx_mqtt_caps.erl index 11f495dbd..5cf10691d 100644 --- a/apps/emqx/src/emqx_mqtt_caps.erl +++ b/apps/emqx/src/emqx_mqtt_caps.erl @@ -102,16 +102,19 @@ do_check_pub(_Flags, _Caps) -> -spec check_sub( emqx_types:clientinfo(), - emqx_types:topic(), + emqx_types:topic() | emqx_types:share(), emqx_types:subopts() ) -> ok_or_error(emqx_types:reason_code()). check_sub(ClientInfo = #{zone := Zone}, Topic, SubOpts) -> Caps = emqx_config:get_zone_conf(Zone, [mqtt]), Flags = #{ + %% TODO: qos check + %% (max_qos_allowed, Map) -> + %% max_qos_allowed => maps:get(max_qos_allowed, Caps, 2), topic_levels => emqx_topic:levels(Topic), is_wildcard => emqx_topic:wildcard(Topic), - is_shared => maps:is_key(share, SubOpts), + is_shared => erlang:is_record(Topic, share), is_exclusive => maps:get(is_exclusive, SubOpts, false) }, do_check_sub(Flags, Caps, ClientInfo, Topic). @@ -126,13 +129,19 @@ do_check_sub(#{is_shared := true}, #{shared_subscription := false}, _, _) -> {error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED}; do_check_sub(#{is_exclusive := true}, #{exclusive_subscription := false}, _, _) -> {error, ?RC_TOPIC_FILTER_INVALID}; -do_check_sub(#{is_exclusive := true}, #{exclusive_subscription := true}, ClientInfo, Topic) -> +do_check_sub(#{is_exclusive := true}, #{exclusive_subscription := true}, ClientInfo, Topic) when + is_binary(Topic) +-> case emqx_exclusive_subscription:check_subscribe(ClientInfo, Topic) of deny -> {error, ?RC_QUOTA_EXCEEDED}; _ -> ok end; +%% for max_qos_allowed +%% see: RC_GRANTED_QOS_0, RC_GRANTED_QOS_1, RC_GRANTED_QOS_2 +%% do_check_sub(_, _) -> +%% {ok, RC}; do_check_sub(_Flags, _Caps, _, _) -> ok. diff --git a/apps/emqx/src/emqx_reason_codes.erl b/apps/emqx/src/emqx_reason_codes.erl index 77a8c1be2..543a62216 100644 --- a/apps/emqx/src/emqx_reason_codes.erl +++ b/apps/emqx/src/emqx_reason_codes.erl @@ -177,6 +177,7 @@ compat(connack, 16#9D) -> ?CONNACK_SERVER; compat(connack, 16#9F) -> ?CONNACK_SERVER; compat(suback, Code) when Code =< ?QOS_2 -> Code; compat(suback, Code) when Code >= 16#80 -> 16#80; +%% TODO: 16#80(qos0) 16#81(qos1) 16#82(qos2) for mqtt-v3.1.1 compat(unsuback, _Code) -> undefined; compat(_Other, _Code) -> undefined. diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index 8bdd47392..e79c30f4a 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -259,7 +259,7 @@ destroy(Session) -> -spec subscribe( clientinfo(), - emqx_types:topic(), + emqx_types:topic() | emqx_types:share(), emqx_types:subopts(), t() ) -> @@ -279,7 +279,7 @@ subscribe(ClientInfo, TopicFilter, SubOpts, Session) -> -spec unsubscribe( clientinfo(), - emqx_types:topic(), + emqx_types:topic() | emqx_types:share(), emqx_types:subopts(), t() ) -> @@ -409,6 +409,16 @@ enrich_delivers(ClientInfo, [D | Rest], UpgradeQoS, Session) -> [Msg | enrich_delivers(ClientInfo, Rest, UpgradeQoS, Session)] end. +enrich_deliver( + ClientInfo, + {deliver, Topic, Msg = #message{headers = #{redispatch_to := {Group, Topic}}}}, + UpgradeQoS, + Session +) -> + %% Only QoS_1 and QoS_2 messages added `redispatch_to` header + %% For QoS 0 message, send it as regular dispatch + Deliver = {deliver, emqx_topic:make_shared_record(Group, Topic), Msg}, + enrich_deliver(ClientInfo, Deliver, UpgradeQoS, Session); enrich_deliver(ClientInfo, {deliver, Topic, Msg}, UpgradeQoS, Session) -> SubOpts = ?IMPL(Session):get_subscription(Topic, Session), enrich_message(ClientInfo, Msg, SubOpts, UpgradeQoS). diff --git a/apps/emqx/src/emqx_session_mem.erl b/apps/emqx/src/emqx_session_mem.erl index e72feffd5..8279953c1 100644 --- a/apps/emqx/src/emqx_session_mem.erl +++ b/apps/emqx/src/emqx_session_mem.erl @@ -314,7 +314,7 @@ unsubscribe( {error, ?RC_NO_SUBSCRIPTION_EXISTED} end. --spec get_subscription(emqx_types:topic(), session()) -> +-spec get_subscription(emqx_types:topic() | emqx_types:share(), session()) -> emqx_types:subopts() | undefined. get_subscription(Topic, #session{subscriptions = Subs}) -> maps:get(Topic, Subs, undefined). diff --git a/apps/emqx/src/emqx_topic.erl b/apps/emqx/src/emqx_topic.erl index 6d232c68d..20dfd4316 100644 --- a/apps/emqx/src/emqx_topic.erl +++ b/apps/emqx/src/emqx_topic.erl @@ -36,9 +36,16 @@ parse/2 ]). +-export([ + maybe_format_share/1, + get_shared_real_topic/1, + make_shared_record/2 +]). + -type topic() :: emqx_types:topic(). -type word() :: emqx_types:word(). -type words() :: emqx_types:words(). +-type share() :: emqx_types:share(). %% Guards -define(MULTI_LEVEL_WILDCARD_NOT_LAST(C, REST), @@ -50,7 +57,9 @@ %%-------------------------------------------------------------------- %% @doc Is wildcard topic? --spec wildcard(topic() | words()) -> true | false. +-spec wildcard(topic() | share() | words()) -> true | false. +wildcard(#share{topic = Topic}) when is_binary(Topic) -> + wildcard(Topic); wildcard(Topic) when is_binary(Topic) -> wildcard(words(Topic)); wildcard([]) -> @@ -64,7 +73,7 @@ wildcard([_H | T]) -> %% @doc Match Topic name with filter. -spec match(Name, Filter) -> boolean() when - Name :: topic() | words(), + Name :: topic() | share() | words(), Filter :: topic() | words(). match(<<$$, _/binary>>, <<$+, _/binary>>) -> false; @@ -72,6 +81,8 @@ match(<<$$, _/binary>>, <<$#, _/binary>>) -> false; match(Name, Filter) when is_binary(Name), is_binary(Filter) -> match(words(Name), words(Filter)); +match(#share{} = Name, Filter) -> + match_share(Name, Filter); match([], []) -> true; match([H | T1], [H | T2]) -> @@ -87,12 +98,26 @@ match([_H1 | _], []) -> match([], [_H | _T2]) -> false. +-spec match_share(Name, Filter) -> boolean() when + Name :: share(), + Filter :: topic() | share(). +match_share(#share{topic = Name}, Filter) when is_binary(Filter) -> + %% only match real topic filter for normal topic filter. + match(words(Name), words(Filter)); +match_share(#share{group = Group, topic = Name}, #share{group = Group, topic = Filter}) -> + %% Matching real topic filter When subed same share group. + match(words(Name), words(Filter)); +match_share(#share{}, _) -> + %% Otherwise, non-matched. + false. + -spec match_any(Name, [Filter]) -> boolean() when Name :: topic() | words(), Filter :: topic() | words(). match_any(Topic, Filters) -> lists:any(fun(Filter) -> match(Topic, Filter) end, Filters). +%% TODO: validate share topic #share{} for emqx_trace.erl %% @doc Validate topic name or filter -spec validate(topic() | {name | filter, topic()}) -> true. validate(Topic) when is_binary(Topic) -> @@ -107,7 +132,7 @@ validate(_, <<>>) -> validate(_, Topic) when is_binary(Topic) andalso (size(Topic) > ?MAX_TOPIC_LEN) -> %% MQTT-5.0 [MQTT-4.7.3-3] error(topic_too_long); -validate(filter, SharedFilter = <<"$share/", _Rest/binary>>) -> +validate(filter, SharedFilter = <>) -> validate_share(SharedFilter); validate(filter, Filter) when is_binary(Filter) -> validate2(words(Filter)); @@ -139,12 +164,12 @@ validate3(<>) when C == $#; C == $+; C == 0 -> validate3(<<_/utf8, Rest/binary>>) -> validate3(Rest). -validate_share(<<"$share/", Rest/binary>>) when +validate_share(<>) when Rest =:= <<>> orelse Rest =:= <<"/">> -> %% MQTT-5.0 [MQTT-4.8.2-1] error(?SHARE_EMPTY_FILTER); -validate_share(<<"$share/", Rest/binary>>) -> +validate_share(<>) -> case binary:split(Rest, <<"/">>) of %% MQTT-5.0 [MQTT-4.8.2-1] [<<>>, _] -> @@ -156,7 +181,7 @@ validate_share(<<"$share/", Rest/binary>>) -> validate_share(ShareName, Filter) end. -validate_share(_, <<"$share/", _Rest/binary>>) -> +validate_share(_, <>) -> error(?SHARE_RECURSIVELY); validate_share(ShareName, Filter) -> case binary:match(ShareName, [<<"+">>, <<"#">>]) of @@ -185,7 +210,9 @@ bin('#') -> <<"#">>; bin(B) when is_binary(B) -> B; bin(L) when is_list(L) -> list_to_binary(L). --spec levels(topic()) -> pos_integer(). +-spec levels(topic() | share()) -> pos_integer(). +levels(#share{topic = Topic}) when is_binary(Topic) -> + levels(Topic); levels(Topic) when is_binary(Topic) -> length(tokens(Topic)). @@ -197,6 +224,8 @@ tokens(Topic) -> %% @doc Split Topic Path to Words -spec words(topic()) -> words(). +words(#share{topic = Topic}) when is_binary(Topic) -> + words(Topic); words(Topic) when is_binary(Topic) -> [word(W) || W <- tokens(Topic)]. @@ -237,26 +266,29 @@ do_join(_TopicAcc, [C | Words]) when ?MULTI_LEVEL_WILDCARD_NOT_LAST(C, Words) -> do_join(TopicAcc, [Word | Words]) -> do_join(<>, Words). --spec parse(topic() | {topic(), map()}) -> {topic(), #{share => binary()}}. +-spec parse(topic() | {topic(), map()}) -> {topic() | share(), map()}. parse(TopicFilter) when is_binary(TopicFilter) -> parse(TopicFilter, #{}); parse({TopicFilter, Options}) when is_binary(TopicFilter) -> parse(TopicFilter, Options). --spec parse(topic(), map()) -> {topic(), map()}. -parse(TopicFilter = <<"$queue/", _/binary>>, #{share := _Group}) -> - error({invalid_topic_filter, TopicFilter}); -parse(TopicFilter = <<"$share/", _/binary>>, #{share := _Group}) -> - error({invalid_topic_filter, TopicFilter}); -parse(<<"$queue/", TopicFilter/binary>>, Options) -> - parse(TopicFilter, Options#{share => <<"$queue">>}); -parse(TopicFilter = <<"$share/", Rest/binary>>, Options) -> +-spec parse(topic() | share(), map()) -> {topic() | share(), map()}. +%% <<"$queue/[real_topic_filter]>">> equivalent to <<"$share/$queue/[real_topic_filter]">> +%% So the head of `real_topic_filter` MUST NOT be `<<$queue>>` or `<<$share>>` +parse(#share{topic = Topic = <>}, _Options) -> + error({invalid_topic_filter, Topic}); +parse(#share{topic = Topic = <>}, _Options) -> + error({invalid_topic_filter, Topic}); +parse(<>, Options) -> + parse(#share{group = <>, topic = Topic}, Options); +parse(TopicFilter = <>, Options) -> case binary:split(Rest, <<"/">>) of [_Any] -> error({invalid_topic_filter, TopicFilter}); - [ShareName, Filter] -> - case binary:match(ShareName, [<<"+">>, <<"#">>]) of - nomatch -> parse(Filter, Options#{share => ShareName}); + %% `Group` could be `$share` or `$queue` + [Group, Topic] -> + case binary:match(Group, [<<"+">>, <<"#">>]) of + nomatch -> parse(#share{group = Group, topic = Topic}, Options); _ -> error({invalid_topic_filter, TopicFilter}) end end; @@ -267,5 +299,22 @@ parse(TopicFilter = <<"$exclusive/", Topic/binary>>, Options) -> _ -> {Topic, Options#{is_exclusive => true}} end; -parse(TopicFilter, Options) -> +parse(TopicFilter, Options) when + ?IS_TOPIC(TopicFilter) +-> {TopicFilter, Options}. + +get_shared_real_topic(#share{topic = TopicFilter}) -> + TopicFilter; +get_shared_real_topic(TopicFilter) when is_binary(TopicFilter) -> + TopicFilter. + +make_shared_record(Group, Topic) -> + #share{group = Group, topic = Topic}. + +maybe_format_share(#share{group = <>, topic = Topic}) -> + join([<>, Topic]); +maybe_format_share(#share{group = Group, topic = Topic}) -> + join([<>, Group, Topic]); +maybe_format_share(Topic) -> + join([Topic]). diff --git a/apps/emqx/src/emqx_types.erl b/apps/emqx/src/emqx_types.erl index 504540cf6..dbd788c04 100644 --- a/apps/emqx/src/emqx_types.erl +++ b/apps/emqx/src/emqx_types.erl @@ -40,6 +40,10 @@ words/0 ]). +-export_type([ + share/0 +]). + -export_type([ socktype/0, sockstate/0, @@ -136,11 +140,14 @@ -type subid() :: binary() | atom(). --type group() :: binary() | undefined. +%% '_' for match spec +-type group() :: binary() | '_'. -type topic() :: binary(). -type word() :: '' | '+' | '#' | binary(). -type words() :: list(word()). +-type share() :: #share{}. + -type socktype() :: tcp | udp | ssl | proxy | atom(). -type sockstate() :: idle | running | blocked | closed. -type conninfo() :: #{ diff --git a/apps/emqx/test/emqx_broker_SUITE.erl b/apps/emqx/test/emqx_broker_SUITE.erl index a205f6fcd..da108ceef 100644 --- a/apps/emqx/test/emqx_broker_SUITE.erl +++ b/apps/emqx/test/emqx_broker_SUITE.erl @@ -299,7 +299,9 @@ t_nosub_pub(Config) when is_list(Config) -> ?assertEqual(1, emqx_metrics:val('messages.dropped')). t_shared_subscribe({init, Config}) -> - emqx_broker:subscribe(<<"topic">>, <<"clientid">>, #{share => <<"group">>}), + emqx_broker:subscribe( + emqx_topic:make_shared_record(<<"group">>, <<"topic">>), <<"clientid">>, #{} + ), ct:sleep(100), Config; t_shared_subscribe(Config) when is_list(Config) -> @@ -316,7 +318,7 @@ t_shared_subscribe(Config) when is_list(Config) -> end ); t_shared_subscribe({'end', _Config}) -> - emqx_broker:unsubscribe(<<"$share/group/topic">>). + emqx_broker:unsubscribe(emqx_topic:make_shared_record(<<"group">>, <<"topic">>)). t_shared_subscribe_2({init, Config}) -> Config; @@ -723,24 +725,6 @@ t_connack_auth_error(Config) when is_list(Config) -> ?assertEqual(2, emqx_metrics:val('packets.connack.auth_error')), ok. -t_handle_in_empty_client_subscribe_hook({init, Config}) -> - Hook = {?MODULE, client_subscribe_delete_all_hook, []}, - ok = emqx_hooks:put('client.subscribe', Hook, _Priority = 100), - Config; -t_handle_in_empty_client_subscribe_hook({'end', _Config}) -> - emqx_hooks:del('client.subscribe', {?MODULE, client_subscribe_delete_all_hook}), - ok; -t_handle_in_empty_client_subscribe_hook(Config) when is_list(Config) -> - {ok, C} = emqtt:start_link(), - {ok, _} = emqtt:connect(C), - try - {ok, _, RCs} = emqtt:subscribe(C, <<"t">>), - ?assertEqual([?RC_UNSPECIFIED_ERROR], RCs), - ok - after - emqtt:disconnect(C) - end. - authenticate_deny(_Credentials, _Default) -> {stop, {error, bad_username_or_password}}. @@ -800,7 +784,3 @@ recv_msgs(Count, Msgs) -> after 100 -> Msgs end. - -client_subscribe_delete_all_hook(_ClientInfo, _Username, TopicFilter) -> - EmptyFilters = [{T, Opts#{deny_subscription => true}} || {T, Opts} <- TopicFilter], - {stop, EmptyFilters}. diff --git a/apps/emqx/test/emqx_channel_SUITE.erl b/apps/emqx/test/emqx_channel_SUITE.erl index 8f6a2baaa..c6b4c0518 100644 --- a/apps/emqx/test/emqx_channel_SUITE.erl +++ b/apps/emqx/test/emqx_channel_SUITE.erl @@ -456,7 +456,7 @@ t_process_subscribe(_) -> ok = meck:expect(emqx_session, subscribe, fun(_, _, _, Session) -> {ok, Session} end), TopicFilters = [TopicFilter = {<<"+">>, ?DEFAULT_SUBOPTS}], {[{TopicFilter, ?RC_SUCCESS}], _Channel} = - emqx_channel:process_subscribe(TopicFilters, #{}, channel()). + emqx_channel:process_subscribe(TopicFilters, channel()). t_process_unsubscribe(_) -> ok = meck:expect(emqx_session, unsubscribe, fun(_, _, _, Session) -> {ok, Session} end), @@ -914,7 +914,13 @@ t_check_pub_alias(_) -> t_check_sub_authzs(_) -> emqx_config:put_zone_conf(default, [authorization, enable], true), TopicFilter = {<<"t">>, ?DEFAULT_SUBOPTS}, - [{TopicFilter, 0}] = emqx_channel:check_sub_authzs([TopicFilter], channel()). + SubPkt = ?SUBSCRIBE_PACKET(1, #{}, [TopicFilter]), + CheckedSubPkt = ?SUBSCRIBE_PACKET(1, #{}, [{TopicFilter, ?RC_SUCCESS}]), + Channel = channel(), + ?assertEqual( + {ok, CheckedSubPkt, Channel}, + emqx_channel:check_sub_authzs(SubPkt, Channel) + ). t_enrich_connack_caps(_) -> ok = meck:new(emqx_mqtt_caps, [passthrough, no_history]), @@ -1061,6 +1067,7 @@ clientinfo(InitProps) -> clientid => <<"clientid">>, username => <<"username">>, is_superuser => false, + is_bridge => false, mountpoint => undefined }, InitProps diff --git a/apps/emqx/test/emqx_mountpoint_SUITE.erl b/apps/emqx/test/emqx_mountpoint_SUITE.erl index 6d065d521..0bfde981c 100644 --- a/apps/emqx/test/emqx_mountpoint_SUITE.erl +++ b/apps/emqx/test/emqx_mountpoint_SUITE.erl @@ -29,6 +29,7 @@ ). -include_lib("emqx/include/emqx.hrl"). +-include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("eunit/include/eunit.hrl"). all() -> emqx_common_test_helpers:all(?MODULE). @@ -52,6 +53,27 @@ t_mount(_) -> mount(<<"device/1/">>, TopicFilters) ). +t_mount_share(_) -> + T = {TopicFilter, Opts} = emqx_topic:parse(<<"$share/group/topic">>), + TopicFilters = [T], + ?assertEqual(TopicFilter, #share{group = <<"group">>, topic = <<"topic">>}), + + %% should not mount share topic when make message. + Msg = emqx_message:make(<<"clientid">>, TopicFilter, <<"payload">>), + + ?assertEqual( + TopicFilter, + mount(undefined, TopicFilter) + ), + ?assertEqual( + #share{group = <<"group">>, topic = <<"device/1/topic">>}, + mount(<<"device/1/">>, TopicFilter) + ), + ?assertEqual( + [{#share{group = <<"group">>, topic = <<"device/1/topic">>}, Opts}], + mount(<<"device/1/">>, TopicFilters) + ). + t_unmount(_) -> Msg = emqx_message:make(<<"clientid">>, <<"device/1/topic">>, <<"payload">>), ?assertEqual(<<"topic">>, unmount(undefined, <<"topic">>)), @@ -61,6 +83,23 @@ t_unmount(_) -> ?assertEqual(<<"device/1/topic">>, unmount(<<"device/2/">>, <<"device/1/topic">>)), ?assertEqual(Msg#message{topic = <<"device/1/topic">>}, unmount(<<"device/2/">>, Msg)). +t_unmount_share(_) -> + {TopicFilter, _Opts} = emqx_topic:parse(<<"$share/group/topic">>), + MountedTopicFilter = #share{group = <<"group">>, topic = <<"device/1/topic">>}, + + ?assertEqual(TopicFilter, #share{group = <<"group">>, topic = <<"topic">>}), + + %% should not unmount share topic when make message. + Msg = emqx_message:make(<<"clientid">>, TopicFilter, <<"payload">>), + ?assertEqual( + TopicFilter, + unmount(undefined, TopicFilter) + ), + ?assertEqual( + #share{group = <<"group">>, topic = <<"topic">>}, + unmount(<<"device/1/">>, MountedTopicFilter) + ). + t_replvar(_) -> ?assertEqual(undefined, replvar(undefined, #{})), ?assertEqual( diff --git a/apps/emqx/test/emqx_mqtt_caps_SUITE.erl b/apps/emqx/test/emqx_mqtt_caps_SUITE.erl index 297ee7f7d..e97684b74 100644 --- a/apps/emqx/test/emqx_mqtt_caps_SUITE.erl +++ b/apps/emqx/test/emqx_mqtt_caps_SUITE.erl @@ -76,6 +76,8 @@ t_check_sub(_) -> ), ?assertEqual( {error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED}, - emqx_mqtt_caps:check_sub(ClientInfo, <<"topic">>, SubOpts#{share => true}) + emqx_mqtt_caps:check_sub( + ClientInfo, #share{group = <<"group">>, topic = <<"topic">>}, SubOpts + ) ), emqx_config:put([zones], OldConf). diff --git a/apps/emqx/test/emqx_shared_sub_SUITE.erl b/apps/emqx/test/emqx_shared_sub_SUITE.erl index 4b4535cea..86887eff0 100644 --- a/apps/emqx/test/emqx_shared_sub_SUITE.erl +++ b/apps/emqx/test/emqx_shared_sub_SUITE.erl @@ -137,7 +137,8 @@ t_random_basic(Config) when is_list(Config) -> ClientId = <<"ClientId">>, Topic = <<"foo">>, Payload = <<"hello">>, - emqx:subscribe(Topic, #{qos => 2, share => <<"group1">>}), + Group = <<"group1">>, + emqx_broker:subscribe(emqx_topic:make_shared_record(Group, Topic), #{qos => 2}), MsgQoS2 = emqx_message:make(ClientId, 2, Topic, Payload), %% wait for the subscription to show up ct:sleep(200), @@ -402,7 +403,7 @@ t_hash(Config) when is_list(Config) -> ok = ensure_config(hash_clientid, false), test_two_messages(hash_clientid). -t_hash_clinetid(Config) when is_list(Config) -> +t_hash_clientid(Config) when is_list(Config) -> ok = ensure_config(hash_clientid, false), test_two_messages(hash_clientid). @@ -528,14 +529,15 @@ last_message(ExpectedPayload, Pids, Timeout) -> t_dispatch(Config) when is_list(Config) -> ok = ensure_config(random), Topic = <<"foo">>, + Group = <<"group1">>, ?assertEqual( {error, no_subscribers}, - emqx_shared_sub:dispatch(<<"group1">>, Topic, #delivery{message = #message{}}) + emqx_shared_sub:dispatch(Group, Topic, #delivery{message = #message{}}) ), - emqx:subscribe(Topic, #{qos => 2, share => <<"group1">>}), + emqx_broker:subscribe(emqx_topic:make_shared_record(Group, Topic), #{qos => 2}), ?assertEqual( {ok, 1}, - emqx_shared_sub:dispatch(<<"group1">>, Topic, #delivery{message = #message{}}) + emqx_shared_sub:dispatch(Group, Topic, #delivery{message = #message{}}) ). t_uncovered_func(Config) when is_list(Config) -> @@ -991,37 +993,110 @@ t_session_kicked(Config) when is_list(Config) -> ?assertEqual([], collect_msgs(0)), ok. -%% FIXME: currently doesn't work -%% t_different_groups_same_topic({init, Config}) -> -%% TestName = atom_to_binary(?FUNCTION_NAME), -%% ClientId = <>, -%% {ok, C} = emqtt:start_link([{clientid, ClientId}, {proto_ver, v5}]), -%% {ok, _} = emqtt:connect(C), -%% [{client, C}, {clientid, ClientId} | Config]; -%% t_different_groups_same_topic({'end', Config}) -> -%% C = ?config(client, Config), -%% emqtt:stop(C), -%% ok; -%% t_different_groups_same_topic(Config) when is_list(Config) -> -%% C = ?config(client, Config), -%% ClientId = ?config(clientid, Config), -%% %% Subscribe and unsubscribe to both $queue and $shared topics -%% Topic = <<"t/1">>, -%% SharedTopic0 = <<"$share/aa/", Topic/binary>>, -%% SharedTopic1 = <<"$share/bb/", Topic/binary>>, -%% {ok, _, [2]} = emqtt:subscribe(C, {SharedTopic0, 2}), -%% {ok, _, [2]} = emqtt:subscribe(C, {SharedTopic1, 2}), +-define(UPDATE_SUB_QOS(ConnPid, Topic, QoS), + ?assertMatch({ok, _, [QoS]}, emqtt:subscribe(ConnPid, {Topic, QoS})) +). -%% Message0 = emqx_message:make(ClientId, _QoS = 2, Topic, <<"hi">>), -%% emqx:publish(Message0), -%% ?assertMatch([ {publish, #{payload := <<"hi">>}} -%% , {publish, #{payload := <<"hi">>}} -%% ], collect_msgs(5_000), #{routes => ets:tab2list(emqx_route)}), +t_different_groups_same_topic({init, Config}) -> + TestName = atom_to_binary(?FUNCTION_NAME), + ClientId = <>, + {ok, C} = emqtt:start_link([{clientid, ClientId}, {proto_ver, v5}]), + {ok, _} = emqtt:connect(C), + [{client, C}, {clientid, ClientId} | Config]; +t_different_groups_same_topic({'end', Config}) -> + C = ?config(client, Config), + emqtt:stop(C), + ok; +t_different_groups_same_topic(Config) when is_list(Config) -> + C = ?config(client, Config), + ClientId = ?config(clientid, Config), + %% Subscribe and unsubscribe to different group `aa` and `bb` with same topic + GroupA = <<"aa">>, + GroupB = <<"bb">>, + Topic = <<"t/1">>, -%% {ok, _, [0]} = emqtt:unsubscribe(C, SharedTopic0), -%% {ok, _, [0]} = emqtt:unsubscribe(C, SharedTopic1), + SharedTopicGroupA = ?SHARE(GroupA, Topic), + ?UPDATE_SUB_QOS(C, SharedTopicGroupA, ?QOS_2), + SharedTopicGroupB = ?SHARE(GroupB, Topic), + ?UPDATE_SUB_QOS(C, SharedTopicGroupB, ?QOS_2), -%% ok. + ?retry( + _Sleep0 = 100, + _Attempts0 = 50, + begin + ?assertEqual(2, length(emqx_router:match_routes(Topic))) + end + ), + + Message0 = emqx_message:make(ClientId, ?QOS_2, Topic, <<"hi">>), + emqx:publish(Message0), + ?assertMatch( + [ + {publish, #{payload := <<"hi">>}}, + {publish, #{payload := <<"hi">>}} + ], + collect_msgs(5_000), + #{routes => ets:tab2list(emqx_route)} + ), + + {ok, _, [?RC_SUCCESS]} = emqtt:unsubscribe(C, SharedTopicGroupA), + {ok, _, [?RC_SUCCESS]} = emqtt:unsubscribe(C, SharedTopicGroupB), + + ok. + +t_different_groups_update_subopts({init, Config}) -> + TestName = atom_to_binary(?FUNCTION_NAME), + ClientId = <>, + {ok, C} = emqtt:start_link([{clientid, ClientId}, {proto_ver, v5}]), + {ok, _} = emqtt:connect(C), + [{client, C}, {clientid, ClientId} | Config]; +t_different_groups_update_subopts({'end', Config}) -> + C = ?config(client, Config), + emqtt:stop(C), + ok; +t_different_groups_update_subopts(Config) when is_list(Config) -> + C = ?config(client, Config), + ClientId = ?config(clientid, Config), + %% Subscribe and unsubscribe to different group `aa` and `bb` with same topic + Topic = <<"t/1">>, + GroupA = <<"aa">>, + GroupB = <<"bb">>, + SharedTopicGroupA = ?SHARE(GroupA, Topic), + SharedTopicGroupB = ?SHARE(GroupB, Topic), + + Fun = fun(Group, QoS) -> + ?UPDATE_SUB_QOS(C, ?SHARE(Group, Topic), QoS), + ?assertMatch( + #{qos := QoS}, + emqx_broker:get_subopts(ClientId, emqx_topic:make_shared_record(Group, Topic)) + ) + end, + + [Fun(Group, QoS) || QoS <- [?QOS_0, ?QOS_1, ?QOS_2], Group <- [GroupA, GroupB]], + + ?retry( + _Sleep0 = 100, + _Attempts0 = 50, + begin + ?assertEqual(2, length(emqx_router:match_routes(Topic))) + end + ), + + Message0 = emqx_message:make(ClientId, _QoS = 2, Topic, <<"hi">>), + emqx:publish(Message0), + ?assertMatch( + [ + {publish, #{payload := <<"hi">>}}, + {publish, #{payload := <<"hi">>}} + ], + collect_msgs(5_000), + #{routes => ets:tab2list(emqx_route)} + ), + + {ok, _, [?RC_SUCCESS]} = emqtt:unsubscribe(C, SharedTopicGroupA), + {ok, _, [?RC_SUCCESS]} = emqtt:unsubscribe(C, SharedTopicGroupB), + + ok. t_queue_subscription({init, Config}) -> TestName = atom_to_binary(?FUNCTION_NAME), @@ -1038,23 +1113,19 @@ t_queue_subscription({'end', Config}) -> t_queue_subscription(Config) when is_list(Config) -> C = ?config(client, Config), ClientId = ?config(clientid, Config), - %% Subscribe and unsubscribe to both $queue and $shared topics + %% Subscribe and unsubscribe to both $queue share and $share/ with same topic Topic = <<"t/1">>, QueueTopic = <<"$queue/", Topic/binary>>, SharedTopic = <<"$share/aa/", Topic/binary>>, - {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(C, {QueueTopic, 2}), - {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(C, {SharedTopic, 2}), - %% FIXME: we should actually see 2 routes, one for each group - %% ($queue and aa), but currently the latest subscription - %% overwrites the existing one. + ?UPDATE_SUB_QOS(C, QueueTopic, ?QOS_2), + ?UPDATE_SUB_QOS(C, SharedTopic, ?QOS_2), + ?retry( _Sleep0 = 100, _Attempts0 = 50, begin - ct:pal("routes: ~p", [ets:tab2list(emqx_route)]), - %% FIXME: should ensure we have 2 subscriptions - [_] = emqx_router:lookup_routes(Topic) + ?assertEqual(2, length(emqx_router:match_routes(Topic))) end ), @@ -1063,37 +1134,29 @@ t_queue_subscription(Config) when is_list(Config) -> emqx:publish(Message0), ?assertMatch( [ + {publish, #{payload := <<"hi">>}}, {publish, #{payload := <<"hi">>}} - %% FIXME: should receive one message from each group - %% , {publish, #{payload := <<"hi">>}} ], - collect_msgs(5_000) + collect_msgs(5_000), + #{routes => ets:tab2list(emqx_route)} ), {ok, _, [?RC_SUCCESS]} = emqtt:unsubscribe(C, QueueTopic), - %% FIXME: return code should be success instead of 17 ("no_subscription_existed") - {ok, _, [?RC_NO_SUBSCRIPTION_EXISTED]} = emqtt:unsubscribe(C, SharedTopic), + {ok, _, [?RC_SUCCESS]} = emqtt:unsubscribe(C, SharedTopic), - %% FIXME: this should eventually be true, but currently we leak - %% the previous group subscription... - %% ?retry( - %% _Sleep0 = 100, - %% _Attempts0 = 50, - %% begin - %% ct:pal("routes: ~p", [ets:tab2list(emqx_route)]), - %% [] = emqx_router:lookup_routes(Topic) - %% end - %% ), + ?retry( + _Sleep0 = 100, + _Attempts0 = 50, + begin + ?assertEqual(0, length(emqx_router:match_routes(Topic))) + end + ), ct:sleep(500), Message1 = emqx_message:make(ClientId, _QoS = 2, Topic, <<"hello">>), emqx:publish(Message1), - %% FIXME: we should *not* receive any messages... - %% ?assertEqual([], collect_msgs(1_000), #{routes => ets:tab2list(emqx_route)}), - %% This is from the leaked group... - ?assertMatch([{publish, #{topic := Topic}}], collect_msgs(1_000), #{ - routes => ets:tab2list(emqx_route) - }), + %% we should *not* receive any messages. + ?assertEqual([], collect_msgs(1_000), #{routes => ets:tab2list(emqx_route)}), ok. diff --git a/apps/emqx/test/emqx_topic_SUITE.erl b/apps/emqx/test/emqx_topic_SUITE.erl index c49c93fb2..4761ea17d 100644 --- a/apps/emqx/test/emqx_topic_SUITE.erl +++ b/apps/emqx/test/emqx_topic_SUITE.erl @@ -238,11 +238,11 @@ long_topic() -> t_parse(_) -> ?assertError( {invalid_topic_filter, <<"$queue/t">>}, - parse(<<"$queue/t">>, #{share => <<"g">>}) + parse(#share{group = <<"$queue">>, topic = <<"$queue/t">>}, #{}) ), ?assertError( {invalid_topic_filter, <<"$share/g/t">>}, - parse(<<"$share/g/t">>, #{share => <<"g">>}) + parse(#share{group = <<"g">>, topic = <<"$share/g/t">>}, #{}) ), ?assertError( {invalid_topic_filter, <<"$share/t">>}, @@ -254,8 +254,12 @@ t_parse(_) -> ), ?assertEqual({<<"a/b/+/#">>, #{}}, parse(<<"a/b/+/#">>)), ?assertEqual({<<"a/b/+/#">>, #{qos => 1}}, parse({<<"a/b/+/#">>, #{qos => 1}})), - ?assertEqual({<<"topic">>, #{share => <<"$queue">>}}, parse(<<"$queue/topic">>)), - ?assertEqual({<<"topic">>, #{share => <<"group">>}}, parse(<<"$share/group/topic">>)), + ?assertEqual( + {#share{group = <<"$queue">>, topic = <<"topic">>}, #{}}, parse(<<"$queue/topic">>) + ), + ?assertEqual( + {#share{group = <<"group">>, topic = <<"topic">>}, #{}}, parse(<<"$share/group/topic">>) + ), %% The '$local' and '$fastlane' topics have been deprecated. ?assertEqual({<<"$local/topic">>, #{}}, parse(<<"$local/topic">>)), ?assertEqual({<<"$local/$queue/topic">>, #{}}, parse(<<"$local/$queue/topic">>)), diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl index eb81c4b6e..0f21a2593 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl @@ -96,7 +96,7 @@ choose_ingress_pool_size( #{remote := #{topic := RemoteTopic}, pool_size := PoolSize} ) -> case emqx_topic:parse(RemoteTopic) of - {_Filter, #{share := _Name}} -> + {#share{} = _Filter, _SubOpts} -> % NOTE: this is shared subscription, many workers may subscribe PoolSize; {_Filter, #{}} when PoolSize > 1 -> diff --git a/apps/emqx_exhook/src/emqx_exhook_handler.erl b/apps/emqx_exhook/src/emqx_exhook_handler.erl index b4358969d..64061de3d 100644 --- a/apps/emqx_exhook/src/emqx_exhook_handler.erl +++ b/apps/emqx_exhook/src/emqx_exhook_handler.erl @@ -143,7 +143,7 @@ on_client_authorize(ClientInfo, Action, Topic, Result) -> Req = #{ clientinfo => clientinfo(ClientInfo), type => Type, - topic => Topic, + topic => emqx_topic:maybe_format_share(Topic), result => Bool }, case @@ -191,7 +191,7 @@ on_session_created(ClientInfo, _SessInfo) -> on_session_subscribed(ClientInfo, Topic, SubOpts) -> Req = #{ clientinfo => clientinfo(ClientInfo), - topic => Topic, + topic => emqx_topic:maybe_format_share(Topic), subopts => maps:with([qos, share, rh, rap, nl], SubOpts) }, cast('session.subscribed', Req). @@ -199,7 +199,7 @@ on_session_subscribed(ClientInfo, Topic, SubOpts) -> on_session_unsubscribed(ClientInfo, Topic, _SubOpts) -> Req = #{ clientinfo => clientinfo(ClientInfo), - topic => Topic + topic => emqx_topic:maybe_format_share(Topic) }, cast('session.unsubscribed', Req). @@ -413,7 +413,13 @@ enrich_header(Headers, Message) -> end. topicfilters(Tfs) when is_list(Tfs) -> - [#{name => Topic, qos => Qos} || {Topic, #{qos := Qos}} <- Tfs]. + GetQos = fun(SubOpts) -> + maps:get(qos, SubOpts, 0) + end, + [ + #{name => emqx_topic:maybe_format_share(Topic), qos => GetQos(SubOpts)} + || {Topic, SubOpts} <- Tfs + ]. ntoa({0, 0, 0, 0, 0, 16#ffff, AB, CD}) -> list_to_binary(inet_parse:ntoa({AB bsr 8, AB rem 256, CD bsr 8, CD rem 256})); diff --git a/apps/emqx_exhook/test/props/prop_exhook_hooks.erl b/apps/emqx_exhook/test/props/prop_exhook_hooks.erl index 2c9b5bb06..10462d210 100644 --- a/apps/emqx_exhook/test/props/prop_exhook_hooks.erl +++ b/apps/emqx_exhook/test/props/prop_exhook_hooks.erl @@ -547,6 +547,7 @@ subopts(SubOpts) -> rh => maps:get(rh, SubOpts, 0), rap => maps:get(rap, SubOpts, 0), nl => maps:get(nl, SubOpts, 0), + %% TOOD: FIXME for share-sub refactored share => maps:get(share, SubOpts, <<>>) }. diff --git a/apps/emqx_management/src/emqx_mgmt_api_clients.erl b/apps/emqx_management/src/emqx_mgmt_api_clients.erl index 6b4793b1c..a0436298e 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_clients.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_clients.erl @@ -629,15 +629,8 @@ subscriptions(get, #{bindings := #{clientid := ClientID}}) -> {200, []}; {Node, Subs} -> Formatter = - fun({Topic, SubOpts}) -> - maps:merge( - #{ - node => Node, - clientid => ClientID, - topic => Topic - }, - maps:with([qos, nl, rap, rh], SubOpts) - ) + fun(_Sub = {Topic, SubOpts}) -> + emqx_mgmt_api_subscriptions:format(Node, {{Topic, ClientID}, SubOpts}) end, {200, lists:map(Formatter, Subs)} end. diff --git a/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl b/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl index e6e8bb475..d10c9d068 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl @@ -20,6 +20,7 @@ -include_lib("typerefl/include/types.hrl"). -include_lib("emqx/include/emqx.hrl"). +-include_lib("emqx/include/emqx_router.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("hocon/include/hoconsc.hrl"). @@ -38,8 +39,6 @@ format/2 ]). --define(SUBS_QTABLE, emqx_suboption). - -define(SUBS_QSCHEMA, [ {<<"clientid">>, binary}, {<<"topic">>, binary}, @@ -146,7 +145,7 @@ subscriptions(get, #{query_string := QString}) -> case maps:get(<<"node">>, QString, undefined) of undefined -> emqx_mgmt_api:cluster_query( - ?SUBS_QTABLE, + ?SUBOPTION, QString, ?SUBS_QSCHEMA, fun ?MODULE:qs2ms/2, @@ -157,7 +156,7 @@ subscriptions(get, #{query_string := QString}) -> {ok, Node1} -> emqx_mgmt_api:node_query( Node1, - ?SUBS_QTABLE, + ?SUBOPTION, QString, ?SUBS_QSCHEMA, fun ?MODULE:qs2ms/2, @@ -177,23 +176,16 @@ subscriptions(get, #{query_string := QString}) -> {200, Result} end. -format(WhichNode, {{Topic, _Subscriber}, Options}) -> +format(WhichNode, {{Topic, _Subscriber}, SubOpts}) -> maps:merge( #{ - topic => get_topic(Topic, Options), - clientid => maps:get(subid, Options, null), + topic => emqx_topic:maybe_format_share(Topic), + clientid => maps:get(subid, SubOpts, null), node => WhichNode }, - maps:with([qos, nl, rap, rh], Options) + maps:with([qos, nl, rap, rh], SubOpts) ). -get_topic(Topic, #{share := <<"$queue">> = Group}) -> - emqx_topic:join([Group, Topic]); -get_topic(Topic, #{share := Group}) -> - emqx_topic:join([<<"$share">>, Group, Topic]); -get_topic(Topic, _) -> - Topic. - %%-------------------------------------------------------------------- %% QueryString to MatchSpec %%-------------------------------------------------------------------- @@ -213,10 +205,18 @@ gen_match_spec([{Key, '=:=', Value} | More], MtchHead) -> update_ms(clientid, X, {{Topic, Pid}, Opts}) -> {{Topic, Pid}, Opts#{subid => X}}; -update_ms(topic, X, {{_Topic, Pid}, Opts}) -> +update_ms(topic, X, {{Topic, Pid}, Opts}) when + is_record(Topic, share) +-> + {{#share{group = '_', topic = X}, Pid}, Opts}; +update_ms(topic, X, {{Topic, Pid}, Opts}) when + is_binary(Topic) orelse Topic =:= '_' +-> {{X, Pid}, Opts}; -update_ms(share_group, X, {{Topic, Pid}, Opts}) -> - {{Topic, Pid}, Opts#{share => X}}; +update_ms(share_group, X, {{Topic, Pid}, Opts}) when + not is_record(Topic, share) +-> + {{#share{group = X, topic = Topic}, Pid}, Opts}; update_ms(qos, X, {{Topic, Pid}, Opts}) -> {{Topic, Pid}, Opts#{qos => X}}. @@ -227,5 +227,6 @@ fuzzy_filter_fun(Fuzzy) -> run_fuzzy_filter(_, []) -> true; -run_fuzzy_filter(E = {{Topic, _}, _}, [{topic, match, TopicFilter} | Fuzzy]) -> - emqx_topic:match(Topic, TopicFilter) andalso run_fuzzy_filter(E, Fuzzy). +run_fuzzy_filter(E = {{SubedTopic, _}, _}, [{topic, match, TopicFilter} | Fuzzy]) -> + {Filter, _SubOpts} = emqx_topic:parse(TopicFilter), + emqx_topic:match(SubedTopic, Filter) andalso run_fuzzy_filter(E, Fuzzy). diff --git a/apps/emqx_management/src/emqx_mgmt_api_topics.erl b/apps/emqx_management/src/emqx_mgmt_api_topics.erl index b6294ecbf..94bedd39f 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_topics.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_topics.erl @@ -17,6 +17,7 @@ -module(emqx_mgmt_api_topics). -include_lib("emqx/include/emqx.hrl"). +-include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("emqx/include/emqx_router.hrl"). -include_lib("typerefl/include/types.hrl"). -include_lib("hocon/include/hoconsc.hrl"). @@ -101,10 +102,10 @@ fields(topic) -> %%%============================================================================================== %% parameters trans topics(get, #{query_string := Qs}) -> - do_list(generate_topic(Qs)). + do_list(Qs). topic(get, #{bindings := Bindings}) -> - lookup(generate_topic(Bindings)). + lookup(Bindings). %%%============================================================================================== %% api apply @@ -139,13 +140,6 @@ lookup(#{topic := Topic}) -> %%%============================================================================================== %% internal -generate_topic(Params = #{<<"topic">> := Topic}) -> - Params#{<<"topic">> => Topic}; -generate_topic(Params = #{topic := Topic}) -> - Params#{topic => Topic}; -generate_topic(Params) -> - Params. - -spec qs2ms(atom(), {list(), list()}) -> emqx_mgmt_api:match_spec_and_filter(). qs2ms(_Tab, {Qs, _}) -> #{ @@ -160,9 +154,9 @@ gen_match_spec([{topic, '=:=', T} | Qs], [{{route, _, N}, [], ['$_']}]) -> gen_match_spec([{node, '=:=', N} | Qs], [{{route, T, _}, [], ['$_']}]) -> gen_match_spec(Qs, [{{route, T, N}, [], ['$_']}]). -format(#route{topic = Topic, dest = {_, Node}}) -> - #{topic => Topic, node => Node}; -format(#route{topic = Topic, dest = Node}) -> +format(#route{topic = Topic, dest = {Group, Node}}) -> + #{topic => ?SHARE(Group, Topic), node => Node}; +format(#route{topic = Topic, dest = Node}) when is_atom(Node) -> #{topic => Topic, node => Node}. topic_param(In) -> diff --git a/apps/emqx_modules/src/emqx_rewrite.erl b/apps/emqx_modules/src/emqx_rewrite.erl index d91371f7e..619b53be4 100644 --- a/apps/emqx_modules/src/emqx_rewrite.erl +++ b/apps/emqx_modules/src/emqx_rewrite.erl @@ -150,11 +150,12 @@ compile(Rules) -> Rules ). +%% FIXME: rewrite #share{} and return #share{}, not formated $share/group/topic match_and_rewrite(Topic, [], _) -> Topic; match_and_rewrite(Topic, [{Filter, MP, Dest} | Rules], Binds) -> case emqx_topic:match(Topic, Filter) of - true -> rewrite(Topic, MP, Dest, Binds); + true -> rewrite(emqx_topic:get_shared_real_topic(Topic), MP, Dest, Binds); false -> match_and_rewrite(Topic, Rules, Binds) end. diff --git a/apps/emqx_retainer/src/emqx_retainer.erl b/apps/emqx_retainer/src/emqx_retainer.erl index 4976e2400..49831a9e8 100644 --- a/apps/emqx_retainer/src/emqx_retainer.erl +++ b/apps/emqx_retainer/src/emqx_retainer.erl @@ -20,6 +20,7 @@ -include("emqx_retainer.hrl"). -include_lib("emqx/include/logger.hrl"). +-include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("emqx/include/emqx_hooks.hrl"). -export([start_link/0]). @@ -87,7 +88,7 @@ %% Hook API %%------------------------------------------------------------------------------ -spec on_session_subscribed(_, _, emqx_types:subopts(), _) -> any(). -on_session_subscribed(_, _, #{share := ShareName}, _) when ShareName =/= undefined -> +on_session_subscribed(_, #share{} = _Topic, _SubOpts, _) -> ok; on_session_subscribed(_, Topic, #{rh := Rh} = Opts, Context) -> IsNew = maps:get(is_new, Opts, true), diff --git a/apps/emqx_rule_engine/src/emqx_rule_events.erl b/apps/emqx_rule_engine/src/emqx_rule_events.erl index 3ff588f48..ac9e077eb 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_events.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_events.erl @@ -190,7 +190,9 @@ on_session_subscribed(ClientInfo, Topic, SubOpts, Conf) -> apply_event( 'session.subscribed', fun() -> - eventmsg_sub_or_unsub('session.subscribed', ClientInfo, Topic, SubOpts) + eventmsg_sub_or_unsub( + 'session.subscribed', ClientInfo, emqx_topic:maybe_format_share(Topic), SubOpts + ) end, Conf ). @@ -199,7 +201,9 @@ on_session_unsubscribed(ClientInfo, Topic, SubOpts, Conf) -> apply_event( 'session.unsubscribed', fun() -> - eventmsg_sub_or_unsub('session.unsubscribed', ClientInfo, Topic, SubOpts) + eventmsg_sub_or_unsub( + 'session.unsubscribed', ClientInfo, emqx_topic:maybe_format_share(Topic), SubOpts + ) end, Conf ). diff --git a/changes/ce/fix-10976.en.md b/changes/ce/fix-10976.en.md new file mode 100644 index 000000000..87a7b442a --- /dev/null +++ b/changes/ce/fix-10976.en.md @@ -0,0 +1,2 @@ +Fix topic-filter overlapping handling in shared subscription. +In the previous implementation, the storage method for subscription options did not provide adequate support for shared subscriptions. This resulted in message routing failures and leakage of routing tables between nodes during the "subscribe-unsubscribe" process with specific order and topics.