From b5411da770b1251bc05cfc61215777286748a285 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Wed, 7 Jun 2023 10:22:14 +0800 Subject: [PATCH 1/9] refactor: subscribe process to fix shared-sub --- README-CN.md | 2 +- apps/emqx/include/emqx.hrl | 3 - apps/emqx/include/emqx_mqtt.hrl | 18 +- apps/emqx/src/emqx_broker.erl | 88 +++-- apps/emqx/src/emqx_channel.erl | 305 +++++++++++------- apps/emqx/src/emqx_mountpoint.erl | 47 ++- apps/emqx/src/emqx_mqtt_caps.erl | 15 +- apps/emqx/src/emqx_reason_codes.erl | 1 + apps/emqx/src/emqx_session.erl | 14 +- apps/emqx/src/emqx_session_mem.erl | 2 +- apps/emqx/src/emqx_topic.erl | 89 +++-- apps/emqx/src/emqx_types.erl | 9 +- apps/emqx/test/emqx_broker_SUITE.erl | 28 +- apps/emqx/test/emqx_channel_SUITE.erl | 11 +- apps/emqx/test/emqx_mountpoint_SUITE.erl | 39 +++ apps/emqx/test/emqx_mqtt_caps_SUITE.erl | 4 +- apps/emqx/test/emqx_shared_sub_SUITE.erl | 189 +++++++---- apps/emqx/test/emqx_topic_SUITE.erl | 12 +- .../src/emqx_bridge_mqtt_connector.erl | 2 +- apps/emqx_exhook/src/emqx_exhook_handler.erl | 14 +- .../test/props/prop_exhook_hooks.erl | 1 + .../src/emqx_mgmt_api_clients.erl | 11 +- .../src/emqx_mgmt_api_subscriptions.erl | 41 +-- .../src/emqx_mgmt_api_topics.erl | 18 +- apps/emqx_modules/src/emqx_rewrite.erl | 3 +- apps/emqx_retainer/src/emqx_retainer.erl | 3 +- .../emqx_rule_engine/src/emqx_rule_events.erl | 8 +- changes/ce/fix-10976.en.md | 2 + 28 files changed, 636 insertions(+), 343 deletions(-) create mode 100644 changes/ce/fix-10976.en.md diff --git a/README-CN.md b/README-CN.md index 8c6f8d8c3..f989b9bed 100644 --- a/README-CN.md +++ b/README-CN.md @@ -77,7 +77,7 @@ EMQX Cloud 文档:[docs.emqx.com/zh/cloud/latest/](https://docs.emqx.com/zh/cl 优雅的跨平台 MQTT 5.0 客户端工具,提供了桌面端、命令行、Web 三种版本,帮助您更快的开发和调试 MQTT 服务和应用。 -- [车联网平台搭建从入门到精通 ](https://www.emqx.com/zh/blog/category/internet-of-vehicles) +- [车联网平台搭建从入门到精通](https://www.emqx.com/zh/blog/category/internet-of-vehicles) 结合 EMQ 在车联网领域的实践经验,从协议选择等理论知识,到平台架构设计等实战操作,分享如何搭建一个可靠、高效、符合行业场景需求的车联网平台。 diff --git a/apps/emqx/include/emqx.hrl b/apps/emqx/include/emqx.hrl index 664ec5803..3650488dd 100644 --- a/apps/emqx/include/emqx.hrl +++ b/apps/emqx/include/emqx.hrl @@ -39,9 +39,6 @@ %% System topic -define(SYSTOP, <<"$SYS/">>). -%% Queue topic --define(QUEUE, <<"$queue/">>). - %%-------------------------------------------------------------------- %% alarms %%-------------------------------------------------------------------- diff --git a/apps/emqx/include/emqx_mqtt.hrl b/apps/emqx/include/emqx_mqtt.hrl index 4d0188f71..93c70a6e1 100644 --- a/apps/emqx/include/emqx_mqtt.hrl +++ b/apps/emqx/include/emqx_mqtt.hrl @@ -55,6 +55,17 @@ %% MQTT-3.1.1 and MQTT-5.0 [MQTT-4.7.3-3] -define(MAX_TOPIC_LEN, 65535). +%%-------------------------------------------------------------------- +%% MQTT Share-Sub Internal +%%-------------------------------------------------------------------- + +-record(share, {group :: emqx_types:group(), topic :: emqx_types:topic()}). + +%% guards +-define(IS_TOPIC(T), + (is_binary(T) orelse is_record(T, share)) +). + %%-------------------------------------------------------------------- %% MQTT QoS Levels %%-------------------------------------------------------------------- @@ -661,13 +672,8 @@ end). -define(PACKET(Type), #mqtt_packet{header = #mqtt_packet_header{type = Type}}). -define(SHARE, "$share"). +-define(QUEUE, "$queue"). -define(SHARE(Group, Topic), emqx_topic:join([<>, Group, Topic])). --define(IS_SHARE(Topic), - case Topic of - <> -> true; - _ -> false - end -). -define(SHARE_EMPTY_FILTER, share_subscription_topic_cannot_be_empty). -define(SHARE_EMPTY_GROUP, share_subscription_group_name_cannot_be_empty). diff --git a/apps/emqx/src/emqx_broker.erl b/apps/emqx/src/emqx_broker.erl index 403e3757f..cc9cb98a6 100644 --- a/apps/emqx/src/emqx_broker.erl +++ b/apps/emqx/src/emqx_broker.erl @@ -118,18 +118,20 @@ create_tabs() -> %% Subscribe API %%------------------------------------------------------------------------------ --spec subscribe(emqx_types:topic()) -> ok. -subscribe(Topic) when is_binary(Topic) -> +-spec subscribe(emqx_types:topic() | emqx_types:share()) -> ok. +subscribe(Topic) when ?IS_TOPIC(Topic) -> subscribe(Topic, undefined). --spec subscribe(emqx_types:topic(), emqx_types:subid() | emqx_types:subopts()) -> ok. -subscribe(Topic, SubId) when is_binary(Topic), ?IS_SUBID(SubId) -> +-spec subscribe(emqx_types:topic() | emqx_types:share(), emqx_types:subid() | emqx_types:subopts()) -> + ok. +subscribe(Topic, SubId) when ?IS_TOPIC(Topic), ?IS_SUBID(SubId) -> subscribe(Topic, SubId, ?DEFAULT_SUBOPTS); -subscribe(Topic, SubOpts) when is_binary(Topic), is_map(SubOpts) -> +subscribe(Topic, SubOpts) when ?IS_TOPIC(Topic), is_map(SubOpts) -> subscribe(Topic, undefined, SubOpts). --spec subscribe(emqx_types:topic(), emqx_types:subid(), emqx_types:subopts()) -> ok. -subscribe(Topic, SubId, SubOpts0) when is_binary(Topic), ?IS_SUBID(SubId), is_map(SubOpts0) -> +-spec subscribe(emqx_types:topic() | emqx_types:share(), emqx_types:subid(), emqx_types:subopts()) -> + ok. +subscribe(Topic, SubId, SubOpts0) when ?IS_TOPIC(Topic), ?IS_SUBID(SubId), is_map(SubOpts0) -> SubOpts = maps:merge(?DEFAULT_SUBOPTS, SubOpts0), _ = emqx_trace:subscribe(Topic, SubId, SubOpts), SubPid = self(), @@ -151,13 +153,13 @@ with_subid(undefined, SubOpts) -> with_subid(SubId, SubOpts) -> maps:put(subid, SubId, SubOpts). -%% @private do_subscribe(Topic, SubPid, SubOpts) -> true = ets:insert(?SUBSCRIPTION, {SubPid, Topic}), - Group = maps:get(share, SubOpts, undefined), - do_subscribe(Group, Topic, SubPid, SubOpts). + do_subscribe2(Topic, SubPid, SubOpts). -do_subscribe(undefined, Topic, SubPid, SubOpts) -> +do_subscribe2(Topic, SubPid, SubOpts) when is_binary(Topic) -> + %% FIXME: subscribe shard bug + %% https://emqx.atlassian.net/browse/EMQX-10214 case emqx_broker_helper:get_sub_shard(SubPid, Topic) of 0 -> true = ets:insert(?SUBSCRIBER, {Topic, SubPid}), @@ -168,34 +170,40 @@ do_subscribe(undefined, Topic, SubPid, SubOpts) -> true = ets:insert(?SUBOPTION, {{Topic, SubPid}, maps:put(shard, I, SubOpts)}), call(pick({Topic, I}), {subscribe, Topic, I}) end; -%% Shared subscription -do_subscribe(Group, Topic, SubPid, SubOpts) -> +do_subscribe2(Topic = #share{group = Group, topic = RealTopic}, SubPid, SubOpts) when + is_binary(RealTopic) +-> true = ets:insert(?SUBOPTION, {{Topic, SubPid}, SubOpts}), - emqx_shared_sub:subscribe(Group, Topic, SubPid). + emqx_shared_sub:subscribe(Group, RealTopic, SubPid). %%-------------------------------------------------------------------- %% Unsubscribe API %%-------------------------------------------------------------------- --spec unsubscribe(emqx_types:topic()) -> ok. -unsubscribe(Topic) when is_binary(Topic) -> +-spec unsubscribe(emqx_types:topic() | emqx_types:share()) -> ok. +unsubscribe(Topic) when ?IS_TOPIC(Topic) -> SubPid = self(), case ets:lookup(?SUBOPTION, {Topic, SubPid}) of [{_, SubOpts}] -> - _ = emqx_broker_helper:reclaim_seq(Topic), _ = emqx_trace:unsubscribe(Topic, SubOpts), do_unsubscribe(Topic, SubPid, SubOpts); [] -> ok end. +-spec do_unsubscribe(emqx_types:topic() | emqx_types:share(), pid(), emqx_types:subopts()) -> + ok. do_unsubscribe(Topic, SubPid, SubOpts) -> true = ets:delete(?SUBOPTION, {Topic, SubPid}), true = ets:delete_object(?SUBSCRIPTION, {SubPid, Topic}), - Group = maps:get(share, SubOpts, undefined), - do_unsubscribe(Group, Topic, SubPid, SubOpts). + do_unsubscribe2(Topic, SubPid, SubOpts). -do_unsubscribe(undefined, Topic, SubPid, SubOpts) -> +-spec do_unsubscribe2(emqx_types:topic() | emqx_types:share(), pid(), emqx_types:subopts()) -> + ok. +do_unsubscribe2(Topic, SubPid, SubOpts) when + is_binary(Topic), is_pid(SubPid), is_map(SubOpts) +-> + _ = emqx_broker_helper:reclaim_seq(Topic), case maps:get(shard, SubOpts, 0) of 0 -> true = ets:delete_object(?SUBSCRIBER, {Topic, SubPid}), @@ -205,7 +213,9 @@ do_unsubscribe(undefined, Topic, SubPid, SubOpts) -> true = ets:delete_object(?SUBSCRIBER, {{shard, Topic, I}, SubPid}), cast(pick({Topic, I}), {unsubscribed, Topic, I}) end; -do_unsubscribe(Group, Topic, SubPid, _SubOpts) -> +do_unsubscribe2(#share{group = Group, topic = Topic}, SubPid, _SubOpts) when + is_binary(Group), is_binary(Topic), is_pid(SubPid) +-> emqx_shared_sub:unsubscribe(Group, Topic, SubPid). %%-------------------------------------------------------------------- @@ -306,7 +316,9 @@ aggre([], true, Acc) -> lists:usort(Acc). %% @doc Forward message to another node. --spec forward(node(), emqx_types:topic(), emqx_types:delivery(), RpcMode :: sync | async) -> +-spec forward( + node(), emqx_types:topic() | emqx_types:share(), emqx_types:delivery(), RpcMode :: sync | async +) -> emqx_types:deliver_result(). forward(Node, To, Delivery, async) -> true = emqx_broker_proto_v1:forward_async(Node, To, Delivery), @@ -329,7 +341,8 @@ forward(Node, To, Delivery, sync) -> Result end. --spec dispatch(emqx_types:topic(), emqx_types:delivery()) -> emqx_types:deliver_result(). +-spec dispatch(emqx_types:topic() | emqx_types:share(), emqx_types:delivery()) -> + emqx_types:deliver_result(). dispatch(Topic, Delivery = #delivery{}) when is_binary(Topic) -> case emqx:is_running() of true -> @@ -353,7 +366,11 @@ inc_dropped_cnt(Msg) -> end. -compile({inline, [subscribers/1]}). --spec subscribers(emqx_types:topic() | {shard, emqx_types:topic(), non_neg_integer()}) -> +-spec subscribers( + emqx_types:topic() + | emqx_types:share() + | {shard, emqx_types:topic() | emqx_types:share(), non_neg_integer()} +) -> [pid()]. subscribers(Topic) when is_binary(Topic) -> lookup_value(?SUBSCRIBER, Topic, []); @@ -372,7 +389,7 @@ subscriber_down(SubPid) -> SubOpts when is_map(SubOpts) -> _ = emqx_broker_helper:reclaim_seq(Topic), true = ets:delete(?SUBOPTION, {Topic, SubPid}), - do_unsubscribe(undefined, Topic, SubPid, SubOpts); + do_unsubscribe2(Topic, SubPid, SubOpts); undefined -> ok end @@ -386,7 +403,7 @@ subscriber_down(SubPid) -> %%-------------------------------------------------------------------- -spec subscriptions(pid() | emqx_types:subid()) -> - [{emqx_types:topic(), emqx_types:subopts()}]. + [{emqx_types:topic() | emqx_types:share(), emqx_types:subopts()}]. subscriptions(SubPid) when is_pid(SubPid) -> [ {Topic, lookup_value(?SUBOPTION, {Topic, SubPid}, #{})} @@ -400,20 +417,22 @@ subscriptions(SubId) -> [] end. --spec subscriptions_via_topic(emqx_types:topic()) -> [emqx_types:subopts()]. +-spec subscriptions_via_topic(emqx_types:topic() | emqx_types:share()) -> [emqx_types:subopts()]. subscriptions_via_topic(Topic) -> MatchSpec = [{{{Topic, '_'}, '_'}, [], ['$_']}], ets:select(?SUBOPTION, MatchSpec). --spec subscribed(pid() | emqx_types:subid(), emqx_types:topic()) -> boolean(). +-spec subscribed( + pid() | emqx_types:subid(), emqx_types:topic() | emqx_types:share() +) -> boolean(). subscribed(SubPid, Topic) when is_pid(SubPid) -> ets:member(?SUBOPTION, {Topic, SubPid}); subscribed(SubId, Topic) when ?IS_SUBID(SubId) -> SubPid = emqx_broker_helper:lookup_subpid(SubId), ets:member(?SUBOPTION, {Topic, SubPid}). --spec get_subopts(pid(), emqx_types:topic()) -> maybe(emqx_types:subopts()). -get_subopts(SubPid, Topic) when is_pid(SubPid), is_binary(Topic) -> +-spec get_subopts(pid(), emqx_types:topic() | emqx_types:share()) -> maybe(emqx_types:subopts()). +get_subopts(SubPid, Topic) when is_pid(SubPid), ?IS_TOPIC(Topic) -> lookup_value(?SUBOPTION, {Topic, SubPid}); get_subopts(SubId, Topic) when ?IS_SUBID(SubId) -> case emqx_broker_helper:lookup_subpid(SubId) of @@ -423,7 +442,7 @@ get_subopts(SubId, Topic) when ?IS_SUBID(SubId) -> undefined end. --spec set_subopts(emqx_types:topic(), emqx_types:subopts()) -> boolean(). +-spec set_subopts(emqx_types:topic() | emqx_types:share(), emqx_types:subopts()) -> boolean(). set_subopts(Topic, NewOpts) when is_binary(Topic), is_map(NewOpts) -> set_subopts(self(), Topic, NewOpts). @@ -437,7 +456,7 @@ set_subopts(SubPid, Topic, NewOpts) -> false end. --spec topics() -> [emqx_types:topic()]. +-spec topics() -> [emqx_types:topic() | emqx_types:share()]. topics() -> emqx_router:topics(). @@ -542,7 +561,8 @@ code_change(_OldVsn, State, _Extra) -> %% Internal functions %%-------------------------------------------------------------------- --spec do_dispatch(emqx_types:topic(), emqx_types:delivery()) -> emqx_types:deliver_result(). +-spec do_dispatch(emqx_types:topic() | emqx_types:share(), emqx_types:delivery()) -> + emqx_types:deliver_result(). do_dispatch(Topic, #delivery{message = Msg}) -> DispN = lists:foldl( fun(Sub, N) -> @@ -560,6 +580,8 @@ do_dispatch(Topic, #delivery{message = Msg}) -> {ok, DispN} end. +%% Donot dispatch to share subscriber here. +%% we do it in `emqx_shared_sub.erl` with configured strategy do_dispatch(SubPid, Topic, Msg) when is_pid(SubPid) -> case erlang:is_process_alive(SubPid) of true -> diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 4f6d5ac6f..61b31c6e1 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -476,60 +476,27 @@ handle_in( ok = emqx_metrics:inc('packets.pubcomp.missed'), {ok, Channel} end; -handle_in( - SubPkt = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters), - Channel = #channel{clientinfo = ClientInfo} -) -> - case emqx_packet:check(SubPkt) of - ok -> - TopicFilters0 = parse_topic_filters(TopicFilters), - TopicFilters1 = enrich_subopts_subid(Properties, TopicFilters0), - TupleTopicFilters0 = check_sub_authzs(TopicFilters1, Channel), - HasAuthzDeny = lists:any( - fun({_TopicFilter, ReasonCode}) -> - ReasonCode =:= ?RC_NOT_AUTHORIZED - end, - TupleTopicFilters0 - ), - DenyAction = emqx:get_config([authorization, deny_action], ignore), - case DenyAction =:= disconnect andalso HasAuthzDeny of - true -> - handle_out(disconnect, ?RC_NOT_AUTHORIZED, Channel); - false -> - TopicFilters2 = [ - TopicFilter - || {TopicFilter, ?RC_SUCCESS} <- TupleTopicFilters0 - ], - TopicFilters3 = run_hooks( - 'client.subscribe', - [ClientInfo, Properties], - TopicFilters2 - ), - {TupleTopicFilters1, NChannel} = process_subscribe( - TopicFilters3, - Properties, - Channel - ), - TupleTopicFilters2 = - lists:foldl( - fun - ({{Topic, Opts = #{deny_subscription := true}}, _QoS}, Acc) -> - Key = {Topic, maps:without([deny_subscription], Opts)}, - lists:keyreplace(Key, 1, Acc, {Key, ?RC_UNSPECIFIED_ERROR}); - (Tuple = {Key, _Value}, Acc) -> - lists:keyreplace(Key, 1, Acc, Tuple) - end, - TupleTopicFilters0, - TupleTopicFilters1 - ), - ReasonCodes2 = [ - ReasonCode - || {_TopicFilter, ReasonCode} <- TupleTopicFilters2 - ], - handle_out(suback, {PacketId, ReasonCodes2}, NChannel) - end; - {error, ReasonCode} -> - handle_out(disconnect, ReasonCode, Channel) +handle_in(SubPkt = ?SUBSCRIBE_PACKET(PacketId, _Properties, _TopicFilters0), Channel0) -> + Pipe = pipeline( + [ + fun check_subscribe/2, + fun enrich_subscribe/2, + %% TODO && FIXME (EMQX-10786): mount topic before authz check. + fun check_sub_authzs/2, + fun check_sub_caps/2 + ], + SubPkt, + Channel0 + ), + case Pipe of + {ok, NPkt = ?SUBSCRIBE_PACKET(_PacketId, TFChecked), Channel} -> + {TFSubedWithNRC, NChannel} = process_subscribe(run_sub_hooks(NPkt, Channel), Channel), + ReasonCodes = gen_reason_codes(TFChecked, TFSubedWithNRC), + handle_out(suback, {PacketId, ReasonCodes}, NChannel); + {error, {disconnect, RC}, Channel} -> + %% funcs in pipeline always cause action: `disconnect` + %% And Only one ReasonCode in DISCONNECT packet + handle_out(disconnect, RC, Channel) end; handle_in( Packet = ?UNSUBSCRIBE_PACKET(PacketId, Properties, TopicFilters), @@ -540,7 +507,7 @@ handle_in( TopicFilters1 = run_hooks( 'client.unsubscribe', [ClientInfo, Properties], - parse_topic_filters(TopicFilters) + parse_raw_topic_filters(TopicFilters) ), {ReasonCodes, NChannel} = process_unsubscribe(TopicFilters1, Properties, Channel), handle_out(unsuback, {PacketId, ReasonCodes}, NChannel); @@ -782,32 +749,14 @@ after_message_acked(ClientInfo, Msg, PubAckProps) -> %% Process Subscribe %%-------------------------------------------------------------------- --compile({inline, [process_subscribe/3]}). -process_subscribe(TopicFilters, SubProps, Channel) -> - process_subscribe(TopicFilters, SubProps, Channel, []). +process_subscribe(TopicFilters, Channel) -> + process_subscribe(TopicFilters, Channel, []). -process_subscribe([], _SubProps, Channel, Acc) -> +process_subscribe([], Channel, Acc) -> {lists:reverse(Acc), Channel}; -process_subscribe([Topic = {TopicFilter, SubOpts} | More], SubProps, Channel, Acc) -> - case check_sub_caps(TopicFilter, SubOpts, Channel) of - ok -> - {ReasonCode, NChannel} = do_subscribe( - TopicFilter, - SubOpts#{sub_props => SubProps}, - Channel - ), - process_subscribe(More, SubProps, NChannel, [{Topic, ReasonCode} | Acc]); - {error, ReasonCode} -> - ?SLOG( - warning, - #{ - msg => "cannot_subscribe_topic_filter", - reason => emqx_reason_codes:name(ReasonCode) - }, - #{topic => TopicFilter} - ), - process_subscribe(More, SubProps, Channel, [{Topic, ReasonCode} | Acc]) - end. +process_subscribe([Filter = {TopicFilter, SubOpts} | More], Channel, Acc) -> + {NReasonCode, NChannel} = do_subscribe(TopicFilter, SubOpts, Channel), + process_subscribe(More, NChannel, [{Filter, NReasonCode} | Acc]). do_subscribe( TopicFilter, @@ -818,11 +767,13 @@ do_subscribe( session = Session } ) -> + %% TODO && FIXME (EMQX-10786): mount topic before authz check. NTopicFilter = emqx_mountpoint:mount(MountPoint, TopicFilter), - NSubOpts = enrich_subopts(maps:merge(?DEFAULT_SUBOPTS, SubOpts), Channel), - case emqx_session:subscribe(ClientInfo, NTopicFilter, NSubOpts, Session) of + case emqx_session:subscribe(ClientInfo, NTopicFilter, SubOpts, Session) of {ok, NSession} -> - {QoS, Channel#channel{session = NSession}}; + %% TODO && FIXME (EMQX-11216): QoS as ReasonCode(max granted QoS) for now + RC = QoS, + {RC, Channel#channel{session = NSession}}; {error, RC} -> ?SLOG( warning, @@ -835,6 +786,30 @@ do_subscribe( {RC, Channel} end. +gen_reason_codes(TFChecked, TFSubedWitNhRC) -> + do_gen_reason_codes([], TFChecked, TFSubedWitNhRC). + +%% Initial RC is `RC_SUCCESS | RC_NOT_AUTHORIZED`, generated by check_sub_authzs/2 +%% And then TF with `RC_SUCCESS` will passing through `process_subscribe/2` and +%% NRC should override the initial RC. +do_gen_reason_codes(Acc, [], []) -> + lists:reverse(Acc); +do_gen_reason_codes( + Acc, + [{_TF, ?RC_SUCCESS} | RestCheckedTF], + [{_TF, NRC} | RestTFWithNRC] +) -> + %% will passing through `process_subscribe/2` + %% use NRC to override IintialRC + do_gen_reason_codes([NRC | Acc], RestCheckedTF, RestTFWithNRC); +do_gen_reason_codes( + Acc, + [{_TF, InitialRC} | RestChecked], + RestTFWithNRC +) -> + %% InitialRC is not `RC_SUCCESS`, use it. + do_gen_reason_codes([InitialRC | Acc], RestChecked, RestTFWithNRC). + %%-------------------------------------------------------------------- %% Process Unsubscribe %%-------------------------------------------------------------------- @@ -1213,13 +1188,8 @@ handle_call(Req, Channel) -> ok | {ok, channel()} | {shutdown, Reason :: term(), channel()}. handle_info({subscribe, TopicFilters}, Channel) -> - {_, NChannel} = lists:foldl( - fun({TopicFilter, SubOpts}, {_, ChannelAcc}) -> - do_subscribe(TopicFilter, SubOpts, ChannelAcc) - end, - {[], Channel}, - parse_topic_filters(TopicFilters) - ), + NTopicFilters = enrich_subscribe(TopicFilters, Channel), + {_TopicFiltersWithRC, NChannel} = process_subscribe(NTopicFilters, Channel), {ok, NChannel}; handle_info({unsubscribe, TopicFilters}, Channel) -> {_RC, NChannel} = process_unsubscribe(TopicFilters, #{}, Channel), @@ -1857,49 +1827,156 @@ check_pub_caps( ) -> emqx_mqtt_caps:check_pub(Zone, #{qos => QoS, retain => Retain, topic => Topic}). +%%-------------------------------------------------------------------- +%% Check Subscribe Packet + +check_subscribe(SubPkt, _Channel) -> + case emqx_packet:check(SubPkt) of + ok -> ok; + {error, RC} -> {error, {disconnect, RC}} + end. + %%-------------------------------------------------------------------- %% Check Sub Authorization -check_sub_authzs(TopicFilters, Channel) -> - check_sub_authzs(TopicFilters, Channel, []). - check_sub_authzs( - [TopicFilter = {Topic, _} | More], - Channel = #channel{clientinfo = ClientInfo}, - Acc + ?SUBSCRIBE_PACKET(PacketId, SubProps, TopicFilters0), + Channel = #channel{clientinfo = ClientInfo} ) -> + CheckResult = do_check_sub_authzs(TopicFilters0, ClientInfo), + HasAuthzDeny = lists:any( + fun({{_TopicFilter, _SubOpts}, ReasonCode}) -> + ReasonCode =:= ?RC_NOT_AUTHORIZED + end, + CheckResult + ), + DenyAction = emqx:get_config([authorization, deny_action], ignore), + case DenyAction =:= disconnect andalso HasAuthzDeny of + true -> + {error, {disconnect, ?RC_NOT_AUTHORIZED}, Channel}; + false -> + {ok, ?SUBSCRIBE_PACKET(PacketId, SubProps, CheckResult), Channel} + end. + +do_check_sub_authzs(TopicFilters, ClientInfo) -> + do_check_sub_authzs(ClientInfo, TopicFilters, []). + +do_check_sub_authzs(_ClientInfo, [], Acc) -> + lists:reverse(Acc); +do_check_sub_authzs(ClientInfo, [TopicFilter = {Topic, _SubOpts} | More], Acc) -> + %% subsclibe authz check only cares the real topic filter when shared-sub + %% e.g. only check <<"t/#">> for <<"$share/g/t/#">> Action = authz_action(TopicFilter), - case emqx_access_control:authorize(ClientInfo, Action, Topic) of + case + emqx_access_control:authorize( + ClientInfo, + Action, + emqx_topic:get_shared_real_topic(Topic) + ) + of + %% TODO: support maximum QoS granted + %% MQTT-3.1.1 [MQTT-3.8.4-6] and MQTT-5.0 [MQTT-3.8.4-7] + %% Not implemented yet: + %% {allow, RC} -> do_check_sub_authzs(ClientInfo, More, [{TopicFilter, RC} | Acc]); allow -> - check_sub_authzs(More, Channel, [{TopicFilter, ?RC_SUCCESS} | Acc]); + do_check_sub_authzs(ClientInfo, More, [{TopicFilter, ?RC_SUCCESS} | Acc]); deny -> - check_sub_authzs(More, Channel, [{TopicFilter, ?RC_NOT_AUTHORIZED} | Acc]) - end; -check_sub_authzs([], _Channel, Acc) -> - lists:reverse(Acc). + do_check_sub_authzs(ClientInfo, More, [{TopicFilter, ?RC_NOT_AUTHORIZED} | Acc]) + end. %%-------------------------------------------------------------------- %% Check Sub Caps -check_sub_caps(TopicFilter, SubOpts, #channel{clientinfo = ClientInfo}) -> - emqx_mqtt_caps:check_sub(ClientInfo, TopicFilter, SubOpts). +check_sub_caps( + ?SUBSCRIBE_PACKET(PacketId, SubProps, TopicFilters), + Channel = #channel{clientinfo = ClientInfo} +) -> + CheckResult = do_check_sub_caps(ClientInfo, TopicFilters), + {ok, ?SUBSCRIBE_PACKET(PacketId, SubProps, CheckResult), Channel}. + +do_check_sub_caps(ClientInfo, TopicFilters) -> + do_check_sub_caps(ClientInfo, TopicFilters, []). + +do_check_sub_caps(_ClientInfo, [], Acc) -> + lists:reverse(Acc); +do_check_sub_caps(ClientInfo, [TopicFilter = {{Topic, SubOpts}, ?RC_SUCCESS} | More], Acc) -> + case emqx_mqtt_caps:check_sub(ClientInfo, Topic, SubOpts) of + ok -> + do_check_sub_caps(ClientInfo, More, [TopicFilter | Acc]); + {error, NRC} -> + ?SLOG( + warning, + #{ + msg => "cannot_subscribe_topic_filter", + reason => emqx_reason_codes:name(NRC) + }, + #{topic => Topic} + ), + do_check_sub_caps(ClientInfo, More, [{{Topic, SubOpts}, NRC} | Acc]) + end; +do_check_sub_caps(ClientInfo, [TopicFilter = {{_Topic, _SubOpts}, _OtherRC} | More], Acc) -> + do_check_sub_caps(ClientInfo, More, [TopicFilter | Acc]). %%-------------------------------------------------------------------- -%% Enrich SubId +%% Run Subscribe Hooks -enrich_subopts_subid(#{'Subscription-Identifier' := SubId}, TopicFilters) -> - [{Topic, SubOpts#{subid => SubId}} || {Topic, SubOpts} <- TopicFilters]; -enrich_subopts_subid(_Properties, TopicFilters) -> - TopicFilters. +run_sub_hooks( + ?SUBSCRIBE_PACKET(_PacketId, Properties, TopicFilters0), + _Channel = #channel{clientinfo = ClientInfo} +) -> + TopicFilters = [ + TopicFilter + || {TopicFilter, ?RC_SUCCESS} <- TopicFilters0 + ], + _NTopicFilters = run_hooks('client.subscribe', [ClientInfo, Properties], TopicFilters). %%-------------------------------------------------------------------- %% Enrich SubOpts -enrich_subopts(SubOpts, _Channel = ?IS_MQTT_V5) -> - SubOpts; -enrich_subopts(SubOpts, #channel{clientinfo = #{zone := Zone, is_bridge := IsBridge}}) -> +%% for api subscribe without sub-authz check and sub-caps check. +enrich_subscribe(TopicFilters, Channel) when is_list(TopicFilters) -> + do_enrich_subscribe(#{}, TopicFilters, Channel); +%% for mqtt clients sent subscribe packet. +enrich_subscribe(?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters), Channel) -> + NTopicFilters = do_enrich_subscribe(Properties, TopicFilters, Channel), + {ok, ?SUBSCRIBE_PACKET(PacketId, Properties, NTopicFilters), Channel}. + +do_enrich_subscribe(Properties, TopicFilters, Channel) -> + _NTopicFilters = run_fold( + [ + %% TODO: do try catch with reason code here + fun(TFs, _) -> parse_raw_topic_filters(TFs) end, + fun enrich_subopts_subid/2, + fun enrich_subopts_porps/2, + fun enrich_subopts_flags/2 + ], + TopicFilters, + #{sub_props => Properties, channel => Channel} + ). + +enrich_subopts_subid(TopicFilters, #{sub_props := #{'Subscription-Identifier' := SubId}}) -> + [{Topic, SubOpts#{subid => SubId}} || {Topic, SubOpts} <- TopicFilters]; +enrich_subopts_subid(TopicFilters, _State) -> + TopicFilters. + +enrich_subopts_porps(TopicFilters, #{sub_props := SubProps}) -> + [{Topic, SubOpts#{sub_props => SubProps}} || {Topic, SubOpts} <- TopicFilters]. + +enrich_subopts_flags(TopicFilters, #{channel := Channel}) -> + do_enrich_subopts_flags(TopicFilters, Channel). + +do_enrich_subopts_flags(TopicFilters, ?IS_MQTT_V5) -> + [{Topic, merge_default_subopts(SubOpts)} || {Topic, SubOpts} <- TopicFilters]; +do_enrich_subopts_flags(TopicFilters, #channel{clientinfo = #{zone := Zone, is_bridge := IsBridge}}) -> + Rap = flag(IsBridge), NL = flag(get_mqtt_conf(Zone, ignore_loop_deliver)), - SubOpts#{rap => flag(IsBridge), nl => NL}. + [ + {Topic, (merge_default_subopts(SubOpts))#{rap => Rap, nl => NL}} + || {Topic, SubOpts} <- TopicFilters + ]. + +merge_default_subopts(SubOpts) -> + maps:merge(?DEFAULT_SUBOPTS, SubOpts). %%-------------------------------------------------------------------- %% Enrich ConnAck Caps @@ -2089,8 +2166,8 @@ maybe_shutdown(Reason, _Intent = shutdown, Channel) -> %%-------------------------------------------------------------------- %% Parse Topic Filters --compile({inline, [parse_topic_filters/1]}). -parse_topic_filters(TopicFilters) -> +%% [{<<"$share/group/topic">>, _SubOpts = #{}} | _] +parse_raw_topic_filters(TopicFilters) -> lists:map(fun emqx_topic:parse/1, TopicFilters). %%-------------------------------------------------------------------- diff --git a/apps/emqx/src/emqx_mountpoint.erl b/apps/emqx/src/emqx_mountpoint.erl index 5b5dac954..c19736690 100644 --- a/apps/emqx/src/emqx_mountpoint.erl +++ b/apps/emqx/src/emqx_mountpoint.erl @@ -17,6 +17,7 @@ -module(emqx_mountpoint). -include("emqx.hrl"). +-include("emqx_mqtt.hrl"). -include("emqx_placeholder.hrl"). -include("types.hrl"). @@ -34,38 +35,54 @@ -spec mount(maybe(mountpoint()), Any) -> Any when Any :: emqx_types:topic() + | emqx_types:share() | emqx_types:message() | emqx_types:topic_filters(). mount(undefined, Any) -> Any; -mount(MountPoint, Topic) when is_binary(Topic) -> - prefix(MountPoint, Topic); -mount(MountPoint, Msg = #message{topic = Topic}) -> - Msg#message{topic = prefix(MountPoint, Topic)}; +mount(MountPoint, Topic) when ?IS_TOPIC(Topic) -> + prefix_maybe_share(MountPoint, Topic); +mount(MountPoint, Msg = #message{topic = Topic}) when is_binary(Topic) -> + Msg#message{topic = prefix_maybe_share(MountPoint, Topic)}; mount(MountPoint, TopicFilters) when is_list(TopicFilters) -> - [{prefix(MountPoint, Topic), SubOpts} || {Topic, SubOpts} <- TopicFilters]. + [{prefix_maybe_share(MountPoint, Topic), SubOpts} || {Topic, SubOpts} <- TopicFilters]. -%% @private --compile({inline, [prefix/2]}). -prefix(MountPoint, Topic) -> - <>. +-spec prefix_maybe_share(maybe(mountpoint()), Any) -> Any when + Any :: + emqx_types:topic() + | emqx_types:share(). +prefix_maybe_share(MountPoint, Topic) when + is_binary(MountPoint) andalso is_binary(Topic) +-> + <>; +prefix_maybe_share(MountPoint, #share{group = Group, topic = Topic}) when + is_binary(MountPoint) andalso is_binary(Topic) +-> + #share{group = Group, topic = prefix_maybe_share(MountPoint, Topic)}. -spec unmount(maybe(mountpoint()), Any) -> Any when Any :: emqx_types:topic() + | emqx_types:share() | emqx_types:message(). unmount(undefined, Any) -> Any; -unmount(MountPoint, Topic) when is_binary(Topic) -> +unmount(MountPoint, Topic) when ?IS_TOPIC(Topic) -> + unmount_maybe_share(MountPoint, Topic); +unmount(MountPoint, Msg = #message{topic = Topic}) when is_binary(Topic) -> + Msg#message{topic = unmount_maybe_share(MountPoint, Topic)}. + +unmount_maybe_share(MountPoint, Topic) when + is_binary(MountPoint) andalso is_binary(Topic) +-> case string:prefix(Topic, MountPoint) of nomatch -> Topic; Topic1 -> Topic1 end; -unmount(MountPoint, Msg = #message{topic = Topic}) -> - case string:prefix(Topic, MountPoint) of - nomatch -> Msg; - Topic1 -> Msg#message{topic = Topic1} - end. +unmount_maybe_share(MountPoint, TopicFilter = #share{topic = Topic}) when + is_binary(MountPoint) andalso is_binary(Topic) +-> + TopicFilter#share{topic = unmount_maybe_share(MountPoint, Topic)}. -spec replvar(maybe(mountpoint()), map()) -> maybe(mountpoint()). replvar(undefined, _Vars) -> diff --git a/apps/emqx/src/emqx_mqtt_caps.erl b/apps/emqx/src/emqx_mqtt_caps.erl index 11f495dbd..5cf10691d 100644 --- a/apps/emqx/src/emqx_mqtt_caps.erl +++ b/apps/emqx/src/emqx_mqtt_caps.erl @@ -102,16 +102,19 @@ do_check_pub(_Flags, _Caps) -> -spec check_sub( emqx_types:clientinfo(), - emqx_types:topic(), + emqx_types:topic() | emqx_types:share(), emqx_types:subopts() ) -> ok_or_error(emqx_types:reason_code()). check_sub(ClientInfo = #{zone := Zone}, Topic, SubOpts) -> Caps = emqx_config:get_zone_conf(Zone, [mqtt]), Flags = #{ + %% TODO: qos check + %% (max_qos_allowed, Map) -> + %% max_qos_allowed => maps:get(max_qos_allowed, Caps, 2), topic_levels => emqx_topic:levels(Topic), is_wildcard => emqx_topic:wildcard(Topic), - is_shared => maps:is_key(share, SubOpts), + is_shared => erlang:is_record(Topic, share), is_exclusive => maps:get(is_exclusive, SubOpts, false) }, do_check_sub(Flags, Caps, ClientInfo, Topic). @@ -126,13 +129,19 @@ do_check_sub(#{is_shared := true}, #{shared_subscription := false}, _, _) -> {error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED}; do_check_sub(#{is_exclusive := true}, #{exclusive_subscription := false}, _, _) -> {error, ?RC_TOPIC_FILTER_INVALID}; -do_check_sub(#{is_exclusive := true}, #{exclusive_subscription := true}, ClientInfo, Topic) -> +do_check_sub(#{is_exclusive := true}, #{exclusive_subscription := true}, ClientInfo, Topic) when + is_binary(Topic) +-> case emqx_exclusive_subscription:check_subscribe(ClientInfo, Topic) of deny -> {error, ?RC_QUOTA_EXCEEDED}; _ -> ok end; +%% for max_qos_allowed +%% see: RC_GRANTED_QOS_0, RC_GRANTED_QOS_1, RC_GRANTED_QOS_2 +%% do_check_sub(_, _) -> +%% {ok, RC}; do_check_sub(_Flags, _Caps, _, _) -> ok. diff --git a/apps/emqx/src/emqx_reason_codes.erl b/apps/emqx/src/emqx_reason_codes.erl index 77a8c1be2..543a62216 100644 --- a/apps/emqx/src/emqx_reason_codes.erl +++ b/apps/emqx/src/emqx_reason_codes.erl @@ -177,6 +177,7 @@ compat(connack, 16#9D) -> ?CONNACK_SERVER; compat(connack, 16#9F) -> ?CONNACK_SERVER; compat(suback, Code) when Code =< ?QOS_2 -> Code; compat(suback, Code) when Code >= 16#80 -> 16#80; +%% TODO: 16#80(qos0) 16#81(qos1) 16#82(qos2) for mqtt-v3.1.1 compat(unsuback, _Code) -> undefined; compat(_Other, _Code) -> undefined. diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index 8bdd47392..e79c30f4a 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -259,7 +259,7 @@ destroy(Session) -> -spec subscribe( clientinfo(), - emqx_types:topic(), + emqx_types:topic() | emqx_types:share(), emqx_types:subopts(), t() ) -> @@ -279,7 +279,7 @@ subscribe(ClientInfo, TopicFilter, SubOpts, Session) -> -spec unsubscribe( clientinfo(), - emqx_types:topic(), + emqx_types:topic() | emqx_types:share(), emqx_types:subopts(), t() ) -> @@ -409,6 +409,16 @@ enrich_delivers(ClientInfo, [D | Rest], UpgradeQoS, Session) -> [Msg | enrich_delivers(ClientInfo, Rest, UpgradeQoS, Session)] end. +enrich_deliver( + ClientInfo, + {deliver, Topic, Msg = #message{headers = #{redispatch_to := {Group, Topic}}}}, + UpgradeQoS, + Session +) -> + %% Only QoS_1 and QoS_2 messages added `redispatch_to` header + %% For QoS 0 message, send it as regular dispatch + Deliver = {deliver, emqx_topic:make_shared_record(Group, Topic), Msg}, + enrich_deliver(ClientInfo, Deliver, UpgradeQoS, Session); enrich_deliver(ClientInfo, {deliver, Topic, Msg}, UpgradeQoS, Session) -> SubOpts = ?IMPL(Session):get_subscription(Topic, Session), enrich_message(ClientInfo, Msg, SubOpts, UpgradeQoS). diff --git a/apps/emqx/src/emqx_session_mem.erl b/apps/emqx/src/emqx_session_mem.erl index e72feffd5..8279953c1 100644 --- a/apps/emqx/src/emqx_session_mem.erl +++ b/apps/emqx/src/emqx_session_mem.erl @@ -314,7 +314,7 @@ unsubscribe( {error, ?RC_NO_SUBSCRIPTION_EXISTED} end. --spec get_subscription(emqx_types:topic(), session()) -> +-spec get_subscription(emqx_types:topic() | emqx_types:share(), session()) -> emqx_types:subopts() | undefined. get_subscription(Topic, #session{subscriptions = Subs}) -> maps:get(Topic, Subs, undefined). diff --git a/apps/emqx/src/emqx_topic.erl b/apps/emqx/src/emqx_topic.erl index 6d232c68d..20dfd4316 100644 --- a/apps/emqx/src/emqx_topic.erl +++ b/apps/emqx/src/emqx_topic.erl @@ -36,9 +36,16 @@ parse/2 ]). +-export([ + maybe_format_share/1, + get_shared_real_topic/1, + make_shared_record/2 +]). + -type topic() :: emqx_types:topic(). -type word() :: emqx_types:word(). -type words() :: emqx_types:words(). +-type share() :: emqx_types:share(). %% Guards -define(MULTI_LEVEL_WILDCARD_NOT_LAST(C, REST), @@ -50,7 +57,9 @@ %%-------------------------------------------------------------------- %% @doc Is wildcard topic? --spec wildcard(topic() | words()) -> true | false. +-spec wildcard(topic() | share() | words()) -> true | false. +wildcard(#share{topic = Topic}) when is_binary(Topic) -> + wildcard(Topic); wildcard(Topic) when is_binary(Topic) -> wildcard(words(Topic)); wildcard([]) -> @@ -64,7 +73,7 @@ wildcard([_H | T]) -> %% @doc Match Topic name with filter. -spec match(Name, Filter) -> boolean() when - Name :: topic() | words(), + Name :: topic() | share() | words(), Filter :: topic() | words(). match(<<$$, _/binary>>, <<$+, _/binary>>) -> false; @@ -72,6 +81,8 @@ match(<<$$, _/binary>>, <<$#, _/binary>>) -> false; match(Name, Filter) when is_binary(Name), is_binary(Filter) -> match(words(Name), words(Filter)); +match(#share{} = Name, Filter) -> + match_share(Name, Filter); match([], []) -> true; match([H | T1], [H | T2]) -> @@ -87,12 +98,26 @@ match([_H1 | _], []) -> match([], [_H | _T2]) -> false. +-spec match_share(Name, Filter) -> boolean() when + Name :: share(), + Filter :: topic() | share(). +match_share(#share{topic = Name}, Filter) when is_binary(Filter) -> + %% only match real topic filter for normal topic filter. + match(words(Name), words(Filter)); +match_share(#share{group = Group, topic = Name}, #share{group = Group, topic = Filter}) -> + %% Matching real topic filter When subed same share group. + match(words(Name), words(Filter)); +match_share(#share{}, _) -> + %% Otherwise, non-matched. + false. + -spec match_any(Name, [Filter]) -> boolean() when Name :: topic() | words(), Filter :: topic() | words(). match_any(Topic, Filters) -> lists:any(fun(Filter) -> match(Topic, Filter) end, Filters). +%% TODO: validate share topic #share{} for emqx_trace.erl %% @doc Validate topic name or filter -spec validate(topic() | {name | filter, topic()}) -> true. validate(Topic) when is_binary(Topic) -> @@ -107,7 +132,7 @@ validate(_, <<>>) -> validate(_, Topic) when is_binary(Topic) andalso (size(Topic) > ?MAX_TOPIC_LEN) -> %% MQTT-5.0 [MQTT-4.7.3-3] error(topic_too_long); -validate(filter, SharedFilter = <<"$share/", _Rest/binary>>) -> +validate(filter, SharedFilter = <>) -> validate_share(SharedFilter); validate(filter, Filter) when is_binary(Filter) -> validate2(words(Filter)); @@ -139,12 +164,12 @@ validate3(<>) when C == $#; C == $+; C == 0 -> validate3(<<_/utf8, Rest/binary>>) -> validate3(Rest). -validate_share(<<"$share/", Rest/binary>>) when +validate_share(<>) when Rest =:= <<>> orelse Rest =:= <<"/">> -> %% MQTT-5.0 [MQTT-4.8.2-1] error(?SHARE_EMPTY_FILTER); -validate_share(<<"$share/", Rest/binary>>) -> +validate_share(<>) -> case binary:split(Rest, <<"/">>) of %% MQTT-5.0 [MQTT-4.8.2-1] [<<>>, _] -> @@ -156,7 +181,7 @@ validate_share(<<"$share/", Rest/binary>>) -> validate_share(ShareName, Filter) end. -validate_share(_, <<"$share/", _Rest/binary>>) -> +validate_share(_, <>) -> error(?SHARE_RECURSIVELY); validate_share(ShareName, Filter) -> case binary:match(ShareName, [<<"+">>, <<"#">>]) of @@ -185,7 +210,9 @@ bin('#') -> <<"#">>; bin(B) when is_binary(B) -> B; bin(L) when is_list(L) -> list_to_binary(L). --spec levels(topic()) -> pos_integer(). +-spec levels(topic() | share()) -> pos_integer(). +levels(#share{topic = Topic}) when is_binary(Topic) -> + levels(Topic); levels(Topic) when is_binary(Topic) -> length(tokens(Topic)). @@ -197,6 +224,8 @@ tokens(Topic) -> %% @doc Split Topic Path to Words -spec words(topic()) -> words(). +words(#share{topic = Topic}) when is_binary(Topic) -> + words(Topic); words(Topic) when is_binary(Topic) -> [word(W) || W <- tokens(Topic)]. @@ -237,26 +266,29 @@ do_join(_TopicAcc, [C | Words]) when ?MULTI_LEVEL_WILDCARD_NOT_LAST(C, Words) -> do_join(TopicAcc, [Word | Words]) -> do_join(<>, Words). --spec parse(topic() | {topic(), map()}) -> {topic(), #{share => binary()}}. +-spec parse(topic() | {topic(), map()}) -> {topic() | share(), map()}. parse(TopicFilter) when is_binary(TopicFilter) -> parse(TopicFilter, #{}); parse({TopicFilter, Options}) when is_binary(TopicFilter) -> parse(TopicFilter, Options). --spec parse(topic(), map()) -> {topic(), map()}. -parse(TopicFilter = <<"$queue/", _/binary>>, #{share := _Group}) -> - error({invalid_topic_filter, TopicFilter}); -parse(TopicFilter = <<"$share/", _/binary>>, #{share := _Group}) -> - error({invalid_topic_filter, TopicFilter}); -parse(<<"$queue/", TopicFilter/binary>>, Options) -> - parse(TopicFilter, Options#{share => <<"$queue">>}); -parse(TopicFilter = <<"$share/", Rest/binary>>, Options) -> +-spec parse(topic() | share(), map()) -> {topic() | share(), map()}. +%% <<"$queue/[real_topic_filter]>">> equivalent to <<"$share/$queue/[real_topic_filter]">> +%% So the head of `real_topic_filter` MUST NOT be `<<$queue>>` or `<<$share>>` +parse(#share{topic = Topic = <>}, _Options) -> + error({invalid_topic_filter, Topic}); +parse(#share{topic = Topic = <>}, _Options) -> + error({invalid_topic_filter, Topic}); +parse(<>, Options) -> + parse(#share{group = <>, topic = Topic}, Options); +parse(TopicFilter = <>, Options) -> case binary:split(Rest, <<"/">>) of [_Any] -> error({invalid_topic_filter, TopicFilter}); - [ShareName, Filter] -> - case binary:match(ShareName, [<<"+">>, <<"#">>]) of - nomatch -> parse(Filter, Options#{share => ShareName}); + %% `Group` could be `$share` or `$queue` + [Group, Topic] -> + case binary:match(Group, [<<"+">>, <<"#">>]) of + nomatch -> parse(#share{group = Group, topic = Topic}, Options); _ -> error({invalid_topic_filter, TopicFilter}) end end; @@ -267,5 +299,22 @@ parse(TopicFilter = <<"$exclusive/", Topic/binary>>, Options) -> _ -> {Topic, Options#{is_exclusive => true}} end; -parse(TopicFilter, Options) -> +parse(TopicFilter, Options) when + ?IS_TOPIC(TopicFilter) +-> {TopicFilter, Options}. + +get_shared_real_topic(#share{topic = TopicFilter}) -> + TopicFilter; +get_shared_real_topic(TopicFilter) when is_binary(TopicFilter) -> + TopicFilter. + +make_shared_record(Group, Topic) -> + #share{group = Group, topic = Topic}. + +maybe_format_share(#share{group = <>, topic = Topic}) -> + join([<>, Topic]); +maybe_format_share(#share{group = Group, topic = Topic}) -> + join([<>, Group, Topic]); +maybe_format_share(Topic) -> + join([Topic]). diff --git a/apps/emqx/src/emqx_types.erl b/apps/emqx/src/emqx_types.erl index 504540cf6..dbd788c04 100644 --- a/apps/emqx/src/emqx_types.erl +++ b/apps/emqx/src/emqx_types.erl @@ -40,6 +40,10 @@ words/0 ]). +-export_type([ + share/0 +]). + -export_type([ socktype/0, sockstate/0, @@ -136,11 +140,14 @@ -type subid() :: binary() | atom(). --type group() :: binary() | undefined. +%% '_' for match spec +-type group() :: binary() | '_'. -type topic() :: binary(). -type word() :: '' | '+' | '#' | binary(). -type words() :: list(word()). +-type share() :: #share{}. + -type socktype() :: tcp | udp | ssl | proxy | atom(). -type sockstate() :: idle | running | blocked | closed. -type conninfo() :: #{ diff --git a/apps/emqx/test/emqx_broker_SUITE.erl b/apps/emqx/test/emqx_broker_SUITE.erl index a205f6fcd..da108ceef 100644 --- a/apps/emqx/test/emqx_broker_SUITE.erl +++ b/apps/emqx/test/emqx_broker_SUITE.erl @@ -299,7 +299,9 @@ t_nosub_pub(Config) when is_list(Config) -> ?assertEqual(1, emqx_metrics:val('messages.dropped')). t_shared_subscribe({init, Config}) -> - emqx_broker:subscribe(<<"topic">>, <<"clientid">>, #{share => <<"group">>}), + emqx_broker:subscribe( + emqx_topic:make_shared_record(<<"group">>, <<"topic">>), <<"clientid">>, #{} + ), ct:sleep(100), Config; t_shared_subscribe(Config) when is_list(Config) -> @@ -316,7 +318,7 @@ t_shared_subscribe(Config) when is_list(Config) -> end ); t_shared_subscribe({'end', _Config}) -> - emqx_broker:unsubscribe(<<"$share/group/topic">>). + emqx_broker:unsubscribe(emqx_topic:make_shared_record(<<"group">>, <<"topic">>)). t_shared_subscribe_2({init, Config}) -> Config; @@ -723,24 +725,6 @@ t_connack_auth_error(Config) when is_list(Config) -> ?assertEqual(2, emqx_metrics:val('packets.connack.auth_error')), ok. -t_handle_in_empty_client_subscribe_hook({init, Config}) -> - Hook = {?MODULE, client_subscribe_delete_all_hook, []}, - ok = emqx_hooks:put('client.subscribe', Hook, _Priority = 100), - Config; -t_handle_in_empty_client_subscribe_hook({'end', _Config}) -> - emqx_hooks:del('client.subscribe', {?MODULE, client_subscribe_delete_all_hook}), - ok; -t_handle_in_empty_client_subscribe_hook(Config) when is_list(Config) -> - {ok, C} = emqtt:start_link(), - {ok, _} = emqtt:connect(C), - try - {ok, _, RCs} = emqtt:subscribe(C, <<"t">>), - ?assertEqual([?RC_UNSPECIFIED_ERROR], RCs), - ok - after - emqtt:disconnect(C) - end. - authenticate_deny(_Credentials, _Default) -> {stop, {error, bad_username_or_password}}. @@ -800,7 +784,3 @@ recv_msgs(Count, Msgs) -> after 100 -> Msgs end. - -client_subscribe_delete_all_hook(_ClientInfo, _Username, TopicFilter) -> - EmptyFilters = [{T, Opts#{deny_subscription => true}} || {T, Opts} <- TopicFilter], - {stop, EmptyFilters}. diff --git a/apps/emqx/test/emqx_channel_SUITE.erl b/apps/emqx/test/emqx_channel_SUITE.erl index 8f6a2baaa..c6b4c0518 100644 --- a/apps/emqx/test/emqx_channel_SUITE.erl +++ b/apps/emqx/test/emqx_channel_SUITE.erl @@ -456,7 +456,7 @@ t_process_subscribe(_) -> ok = meck:expect(emqx_session, subscribe, fun(_, _, _, Session) -> {ok, Session} end), TopicFilters = [TopicFilter = {<<"+">>, ?DEFAULT_SUBOPTS}], {[{TopicFilter, ?RC_SUCCESS}], _Channel} = - emqx_channel:process_subscribe(TopicFilters, #{}, channel()). + emqx_channel:process_subscribe(TopicFilters, channel()). t_process_unsubscribe(_) -> ok = meck:expect(emqx_session, unsubscribe, fun(_, _, _, Session) -> {ok, Session} end), @@ -914,7 +914,13 @@ t_check_pub_alias(_) -> t_check_sub_authzs(_) -> emqx_config:put_zone_conf(default, [authorization, enable], true), TopicFilter = {<<"t">>, ?DEFAULT_SUBOPTS}, - [{TopicFilter, 0}] = emqx_channel:check_sub_authzs([TopicFilter], channel()). + SubPkt = ?SUBSCRIBE_PACKET(1, #{}, [TopicFilter]), + CheckedSubPkt = ?SUBSCRIBE_PACKET(1, #{}, [{TopicFilter, ?RC_SUCCESS}]), + Channel = channel(), + ?assertEqual( + {ok, CheckedSubPkt, Channel}, + emqx_channel:check_sub_authzs(SubPkt, Channel) + ). t_enrich_connack_caps(_) -> ok = meck:new(emqx_mqtt_caps, [passthrough, no_history]), @@ -1061,6 +1067,7 @@ clientinfo(InitProps) -> clientid => <<"clientid">>, username => <<"username">>, is_superuser => false, + is_bridge => false, mountpoint => undefined }, InitProps diff --git a/apps/emqx/test/emqx_mountpoint_SUITE.erl b/apps/emqx/test/emqx_mountpoint_SUITE.erl index 6d065d521..0bfde981c 100644 --- a/apps/emqx/test/emqx_mountpoint_SUITE.erl +++ b/apps/emqx/test/emqx_mountpoint_SUITE.erl @@ -29,6 +29,7 @@ ). -include_lib("emqx/include/emqx.hrl"). +-include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("eunit/include/eunit.hrl"). all() -> emqx_common_test_helpers:all(?MODULE). @@ -52,6 +53,27 @@ t_mount(_) -> mount(<<"device/1/">>, TopicFilters) ). +t_mount_share(_) -> + T = {TopicFilter, Opts} = emqx_topic:parse(<<"$share/group/topic">>), + TopicFilters = [T], + ?assertEqual(TopicFilter, #share{group = <<"group">>, topic = <<"topic">>}), + + %% should not mount share topic when make message. + Msg = emqx_message:make(<<"clientid">>, TopicFilter, <<"payload">>), + + ?assertEqual( + TopicFilter, + mount(undefined, TopicFilter) + ), + ?assertEqual( + #share{group = <<"group">>, topic = <<"device/1/topic">>}, + mount(<<"device/1/">>, TopicFilter) + ), + ?assertEqual( + [{#share{group = <<"group">>, topic = <<"device/1/topic">>}, Opts}], + mount(<<"device/1/">>, TopicFilters) + ). + t_unmount(_) -> Msg = emqx_message:make(<<"clientid">>, <<"device/1/topic">>, <<"payload">>), ?assertEqual(<<"topic">>, unmount(undefined, <<"topic">>)), @@ -61,6 +83,23 @@ t_unmount(_) -> ?assertEqual(<<"device/1/topic">>, unmount(<<"device/2/">>, <<"device/1/topic">>)), ?assertEqual(Msg#message{topic = <<"device/1/topic">>}, unmount(<<"device/2/">>, Msg)). +t_unmount_share(_) -> + {TopicFilter, _Opts} = emqx_topic:parse(<<"$share/group/topic">>), + MountedTopicFilter = #share{group = <<"group">>, topic = <<"device/1/topic">>}, + + ?assertEqual(TopicFilter, #share{group = <<"group">>, topic = <<"topic">>}), + + %% should not unmount share topic when make message. + Msg = emqx_message:make(<<"clientid">>, TopicFilter, <<"payload">>), + ?assertEqual( + TopicFilter, + unmount(undefined, TopicFilter) + ), + ?assertEqual( + #share{group = <<"group">>, topic = <<"topic">>}, + unmount(<<"device/1/">>, MountedTopicFilter) + ). + t_replvar(_) -> ?assertEqual(undefined, replvar(undefined, #{})), ?assertEqual( diff --git a/apps/emqx/test/emqx_mqtt_caps_SUITE.erl b/apps/emqx/test/emqx_mqtt_caps_SUITE.erl index 297ee7f7d..e97684b74 100644 --- a/apps/emqx/test/emqx_mqtt_caps_SUITE.erl +++ b/apps/emqx/test/emqx_mqtt_caps_SUITE.erl @@ -76,6 +76,8 @@ t_check_sub(_) -> ), ?assertEqual( {error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED}, - emqx_mqtt_caps:check_sub(ClientInfo, <<"topic">>, SubOpts#{share => true}) + emqx_mqtt_caps:check_sub( + ClientInfo, #share{group = <<"group">>, topic = <<"topic">>}, SubOpts + ) ), emqx_config:put([zones], OldConf). diff --git a/apps/emqx/test/emqx_shared_sub_SUITE.erl b/apps/emqx/test/emqx_shared_sub_SUITE.erl index 4b4535cea..86887eff0 100644 --- a/apps/emqx/test/emqx_shared_sub_SUITE.erl +++ b/apps/emqx/test/emqx_shared_sub_SUITE.erl @@ -137,7 +137,8 @@ t_random_basic(Config) when is_list(Config) -> ClientId = <<"ClientId">>, Topic = <<"foo">>, Payload = <<"hello">>, - emqx:subscribe(Topic, #{qos => 2, share => <<"group1">>}), + Group = <<"group1">>, + emqx_broker:subscribe(emqx_topic:make_shared_record(Group, Topic), #{qos => 2}), MsgQoS2 = emqx_message:make(ClientId, 2, Topic, Payload), %% wait for the subscription to show up ct:sleep(200), @@ -402,7 +403,7 @@ t_hash(Config) when is_list(Config) -> ok = ensure_config(hash_clientid, false), test_two_messages(hash_clientid). -t_hash_clinetid(Config) when is_list(Config) -> +t_hash_clientid(Config) when is_list(Config) -> ok = ensure_config(hash_clientid, false), test_two_messages(hash_clientid). @@ -528,14 +529,15 @@ last_message(ExpectedPayload, Pids, Timeout) -> t_dispatch(Config) when is_list(Config) -> ok = ensure_config(random), Topic = <<"foo">>, + Group = <<"group1">>, ?assertEqual( {error, no_subscribers}, - emqx_shared_sub:dispatch(<<"group1">>, Topic, #delivery{message = #message{}}) + emqx_shared_sub:dispatch(Group, Topic, #delivery{message = #message{}}) ), - emqx:subscribe(Topic, #{qos => 2, share => <<"group1">>}), + emqx_broker:subscribe(emqx_topic:make_shared_record(Group, Topic), #{qos => 2}), ?assertEqual( {ok, 1}, - emqx_shared_sub:dispatch(<<"group1">>, Topic, #delivery{message = #message{}}) + emqx_shared_sub:dispatch(Group, Topic, #delivery{message = #message{}}) ). t_uncovered_func(Config) when is_list(Config) -> @@ -991,37 +993,110 @@ t_session_kicked(Config) when is_list(Config) -> ?assertEqual([], collect_msgs(0)), ok. -%% FIXME: currently doesn't work -%% t_different_groups_same_topic({init, Config}) -> -%% TestName = atom_to_binary(?FUNCTION_NAME), -%% ClientId = <>, -%% {ok, C} = emqtt:start_link([{clientid, ClientId}, {proto_ver, v5}]), -%% {ok, _} = emqtt:connect(C), -%% [{client, C}, {clientid, ClientId} | Config]; -%% t_different_groups_same_topic({'end', Config}) -> -%% C = ?config(client, Config), -%% emqtt:stop(C), -%% ok; -%% t_different_groups_same_topic(Config) when is_list(Config) -> -%% C = ?config(client, Config), -%% ClientId = ?config(clientid, Config), -%% %% Subscribe and unsubscribe to both $queue and $shared topics -%% Topic = <<"t/1">>, -%% SharedTopic0 = <<"$share/aa/", Topic/binary>>, -%% SharedTopic1 = <<"$share/bb/", Topic/binary>>, -%% {ok, _, [2]} = emqtt:subscribe(C, {SharedTopic0, 2}), -%% {ok, _, [2]} = emqtt:subscribe(C, {SharedTopic1, 2}), +-define(UPDATE_SUB_QOS(ConnPid, Topic, QoS), + ?assertMatch({ok, _, [QoS]}, emqtt:subscribe(ConnPid, {Topic, QoS})) +). -%% Message0 = emqx_message:make(ClientId, _QoS = 2, Topic, <<"hi">>), -%% emqx:publish(Message0), -%% ?assertMatch([ {publish, #{payload := <<"hi">>}} -%% , {publish, #{payload := <<"hi">>}} -%% ], collect_msgs(5_000), #{routes => ets:tab2list(emqx_route)}), +t_different_groups_same_topic({init, Config}) -> + TestName = atom_to_binary(?FUNCTION_NAME), + ClientId = <>, + {ok, C} = emqtt:start_link([{clientid, ClientId}, {proto_ver, v5}]), + {ok, _} = emqtt:connect(C), + [{client, C}, {clientid, ClientId} | Config]; +t_different_groups_same_topic({'end', Config}) -> + C = ?config(client, Config), + emqtt:stop(C), + ok; +t_different_groups_same_topic(Config) when is_list(Config) -> + C = ?config(client, Config), + ClientId = ?config(clientid, Config), + %% Subscribe and unsubscribe to different group `aa` and `bb` with same topic + GroupA = <<"aa">>, + GroupB = <<"bb">>, + Topic = <<"t/1">>, -%% {ok, _, [0]} = emqtt:unsubscribe(C, SharedTopic0), -%% {ok, _, [0]} = emqtt:unsubscribe(C, SharedTopic1), + SharedTopicGroupA = ?SHARE(GroupA, Topic), + ?UPDATE_SUB_QOS(C, SharedTopicGroupA, ?QOS_2), + SharedTopicGroupB = ?SHARE(GroupB, Topic), + ?UPDATE_SUB_QOS(C, SharedTopicGroupB, ?QOS_2), -%% ok. + ?retry( + _Sleep0 = 100, + _Attempts0 = 50, + begin + ?assertEqual(2, length(emqx_router:match_routes(Topic))) + end + ), + + Message0 = emqx_message:make(ClientId, ?QOS_2, Topic, <<"hi">>), + emqx:publish(Message0), + ?assertMatch( + [ + {publish, #{payload := <<"hi">>}}, + {publish, #{payload := <<"hi">>}} + ], + collect_msgs(5_000), + #{routes => ets:tab2list(emqx_route)} + ), + + {ok, _, [?RC_SUCCESS]} = emqtt:unsubscribe(C, SharedTopicGroupA), + {ok, _, [?RC_SUCCESS]} = emqtt:unsubscribe(C, SharedTopicGroupB), + + ok. + +t_different_groups_update_subopts({init, Config}) -> + TestName = atom_to_binary(?FUNCTION_NAME), + ClientId = <>, + {ok, C} = emqtt:start_link([{clientid, ClientId}, {proto_ver, v5}]), + {ok, _} = emqtt:connect(C), + [{client, C}, {clientid, ClientId} | Config]; +t_different_groups_update_subopts({'end', Config}) -> + C = ?config(client, Config), + emqtt:stop(C), + ok; +t_different_groups_update_subopts(Config) when is_list(Config) -> + C = ?config(client, Config), + ClientId = ?config(clientid, Config), + %% Subscribe and unsubscribe to different group `aa` and `bb` with same topic + Topic = <<"t/1">>, + GroupA = <<"aa">>, + GroupB = <<"bb">>, + SharedTopicGroupA = ?SHARE(GroupA, Topic), + SharedTopicGroupB = ?SHARE(GroupB, Topic), + + Fun = fun(Group, QoS) -> + ?UPDATE_SUB_QOS(C, ?SHARE(Group, Topic), QoS), + ?assertMatch( + #{qos := QoS}, + emqx_broker:get_subopts(ClientId, emqx_topic:make_shared_record(Group, Topic)) + ) + end, + + [Fun(Group, QoS) || QoS <- [?QOS_0, ?QOS_1, ?QOS_2], Group <- [GroupA, GroupB]], + + ?retry( + _Sleep0 = 100, + _Attempts0 = 50, + begin + ?assertEqual(2, length(emqx_router:match_routes(Topic))) + end + ), + + Message0 = emqx_message:make(ClientId, _QoS = 2, Topic, <<"hi">>), + emqx:publish(Message0), + ?assertMatch( + [ + {publish, #{payload := <<"hi">>}}, + {publish, #{payload := <<"hi">>}} + ], + collect_msgs(5_000), + #{routes => ets:tab2list(emqx_route)} + ), + + {ok, _, [?RC_SUCCESS]} = emqtt:unsubscribe(C, SharedTopicGroupA), + {ok, _, [?RC_SUCCESS]} = emqtt:unsubscribe(C, SharedTopicGroupB), + + ok. t_queue_subscription({init, Config}) -> TestName = atom_to_binary(?FUNCTION_NAME), @@ -1038,23 +1113,19 @@ t_queue_subscription({'end', Config}) -> t_queue_subscription(Config) when is_list(Config) -> C = ?config(client, Config), ClientId = ?config(clientid, Config), - %% Subscribe and unsubscribe to both $queue and $shared topics + %% Subscribe and unsubscribe to both $queue share and $share/ with same topic Topic = <<"t/1">>, QueueTopic = <<"$queue/", Topic/binary>>, SharedTopic = <<"$share/aa/", Topic/binary>>, - {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(C, {QueueTopic, 2}), - {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(C, {SharedTopic, 2}), - %% FIXME: we should actually see 2 routes, one for each group - %% ($queue and aa), but currently the latest subscription - %% overwrites the existing one. + ?UPDATE_SUB_QOS(C, QueueTopic, ?QOS_2), + ?UPDATE_SUB_QOS(C, SharedTopic, ?QOS_2), + ?retry( _Sleep0 = 100, _Attempts0 = 50, begin - ct:pal("routes: ~p", [ets:tab2list(emqx_route)]), - %% FIXME: should ensure we have 2 subscriptions - [_] = emqx_router:lookup_routes(Topic) + ?assertEqual(2, length(emqx_router:match_routes(Topic))) end ), @@ -1063,37 +1134,29 @@ t_queue_subscription(Config) when is_list(Config) -> emqx:publish(Message0), ?assertMatch( [ + {publish, #{payload := <<"hi">>}}, {publish, #{payload := <<"hi">>}} - %% FIXME: should receive one message from each group - %% , {publish, #{payload := <<"hi">>}} ], - collect_msgs(5_000) + collect_msgs(5_000), + #{routes => ets:tab2list(emqx_route)} ), {ok, _, [?RC_SUCCESS]} = emqtt:unsubscribe(C, QueueTopic), - %% FIXME: return code should be success instead of 17 ("no_subscription_existed") - {ok, _, [?RC_NO_SUBSCRIPTION_EXISTED]} = emqtt:unsubscribe(C, SharedTopic), + {ok, _, [?RC_SUCCESS]} = emqtt:unsubscribe(C, SharedTopic), - %% FIXME: this should eventually be true, but currently we leak - %% the previous group subscription... - %% ?retry( - %% _Sleep0 = 100, - %% _Attempts0 = 50, - %% begin - %% ct:pal("routes: ~p", [ets:tab2list(emqx_route)]), - %% [] = emqx_router:lookup_routes(Topic) - %% end - %% ), + ?retry( + _Sleep0 = 100, + _Attempts0 = 50, + begin + ?assertEqual(0, length(emqx_router:match_routes(Topic))) + end + ), ct:sleep(500), Message1 = emqx_message:make(ClientId, _QoS = 2, Topic, <<"hello">>), emqx:publish(Message1), - %% FIXME: we should *not* receive any messages... - %% ?assertEqual([], collect_msgs(1_000), #{routes => ets:tab2list(emqx_route)}), - %% This is from the leaked group... - ?assertMatch([{publish, #{topic := Topic}}], collect_msgs(1_000), #{ - routes => ets:tab2list(emqx_route) - }), + %% we should *not* receive any messages. + ?assertEqual([], collect_msgs(1_000), #{routes => ets:tab2list(emqx_route)}), ok. diff --git a/apps/emqx/test/emqx_topic_SUITE.erl b/apps/emqx/test/emqx_topic_SUITE.erl index c49c93fb2..4761ea17d 100644 --- a/apps/emqx/test/emqx_topic_SUITE.erl +++ b/apps/emqx/test/emqx_topic_SUITE.erl @@ -238,11 +238,11 @@ long_topic() -> t_parse(_) -> ?assertError( {invalid_topic_filter, <<"$queue/t">>}, - parse(<<"$queue/t">>, #{share => <<"g">>}) + parse(#share{group = <<"$queue">>, topic = <<"$queue/t">>}, #{}) ), ?assertError( {invalid_topic_filter, <<"$share/g/t">>}, - parse(<<"$share/g/t">>, #{share => <<"g">>}) + parse(#share{group = <<"g">>, topic = <<"$share/g/t">>}, #{}) ), ?assertError( {invalid_topic_filter, <<"$share/t">>}, @@ -254,8 +254,12 @@ t_parse(_) -> ), ?assertEqual({<<"a/b/+/#">>, #{}}, parse(<<"a/b/+/#">>)), ?assertEqual({<<"a/b/+/#">>, #{qos => 1}}, parse({<<"a/b/+/#">>, #{qos => 1}})), - ?assertEqual({<<"topic">>, #{share => <<"$queue">>}}, parse(<<"$queue/topic">>)), - ?assertEqual({<<"topic">>, #{share => <<"group">>}}, parse(<<"$share/group/topic">>)), + ?assertEqual( + {#share{group = <<"$queue">>, topic = <<"topic">>}, #{}}, parse(<<"$queue/topic">>) + ), + ?assertEqual( + {#share{group = <<"group">>, topic = <<"topic">>}, #{}}, parse(<<"$share/group/topic">>) + ), %% The '$local' and '$fastlane' topics have been deprecated. ?assertEqual({<<"$local/topic">>, #{}}, parse(<<"$local/topic">>)), ?assertEqual({<<"$local/$queue/topic">>, #{}}, parse(<<"$local/$queue/topic">>)), diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl index eb81c4b6e..0f21a2593 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl @@ -96,7 +96,7 @@ choose_ingress_pool_size( #{remote := #{topic := RemoteTopic}, pool_size := PoolSize} ) -> case emqx_topic:parse(RemoteTopic) of - {_Filter, #{share := _Name}} -> + {#share{} = _Filter, _SubOpts} -> % NOTE: this is shared subscription, many workers may subscribe PoolSize; {_Filter, #{}} when PoolSize > 1 -> diff --git a/apps/emqx_exhook/src/emqx_exhook_handler.erl b/apps/emqx_exhook/src/emqx_exhook_handler.erl index b4358969d..64061de3d 100644 --- a/apps/emqx_exhook/src/emqx_exhook_handler.erl +++ b/apps/emqx_exhook/src/emqx_exhook_handler.erl @@ -143,7 +143,7 @@ on_client_authorize(ClientInfo, Action, Topic, Result) -> Req = #{ clientinfo => clientinfo(ClientInfo), type => Type, - topic => Topic, + topic => emqx_topic:maybe_format_share(Topic), result => Bool }, case @@ -191,7 +191,7 @@ on_session_created(ClientInfo, _SessInfo) -> on_session_subscribed(ClientInfo, Topic, SubOpts) -> Req = #{ clientinfo => clientinfo(ClientInfo), - topic => Topic, + topic => emqx_topic:maybe_format_share(Topic), subopts => maps:with([qos, share, rh, rap, nl], SubOpts) }, cast('session.subscribed', Req). @@ -199,7 +199,7 @@ on_session_subscribed(ClientInfo, Topic, SubOpts) -> on_session_unsubscribed(ClientInfo, Topic, _SubOpts) -> Req = #{ clientinfo => clientinfo(ClientInfo), - topic => Topic + topic => emqx_topic:maybe_format_share(Topic) }, cast('session.unsubscribed', Req). @@ -413,7 +413,13 @@ enrich_header(Headers, Message) -> end. topicfilters(Tfs) when is_list(Tfs) -> - [#{name => Topic, qos => Qos} || {Topic, #{qos := Qos}} <- Tfs]. + GetQos = fun(SubOpts) -> + maps:get(qos, SubOpts, 0) + end, + [ + #{name => emqx_topic:maybe_format_share(Topic), qos => GetQos(SubOpts)} + || {Topic, SubOpts} <- Tfs + ]. ntoa({0, 0, 0, 0, 0, 16#ffff, AB, CD}) -> list_to_binary(inet_parse:ntoa({AB bsr 8, AB rem 256, CD bsr 8, CD rem 256})); diff --git a/apps/emqx_exhook/test/props/prop_exhook_hooks.erl b/apps/emqx_exhook/test/props/prop_exhook_hooks.erl index 2c9b5bb06..10462d210 100644 --- a/apps/emqx_exhook/test/props/prop_exhook_hooks.erl +++ b/apps/emqx_exhook/test/props/prop_exhook_hooks.erl @@ -547,6 +547,7 @@ subopts(SubOpts) -> rh => maps:get(rh, SubOpts, 0), rap => maps:get(rap, SubOpts, 0), nl => maps:get(nl, SubOpts, 0), + %% TOOD: FIXME for share-sub refactored share => maps:get(share, SubOpts, <<>>) }. diff --git a/apps/emqx_management/src/emqx_mgmt_api_clients.erl b/apps/emqx_management/src/emqx_mgmt_api_clients.erl index 6b4793b1c..a0436298e 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_clients.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_clients.erl @@ -629,15 +629,8 @@ subscriptions(get, #{bindings := #{clientid := ClientID}}) -> {200, []}; {Node, Subs} -> Formatter = - fun({Topic, SubOpts}) -> - maps:merge( - #{ - node => Node, - clientid => ClientID, - topic => Topic - }, - maps:with([qos, nl, rap, rh], SubOpts) - ) + fun(_Sub = {Topic, SubOpts}) -> + emqx_mgmt_api_subscriptions:format(Node, {{Topic, ClientID}, SubOpts}) end, {200, lists:map(Formatter, Subs)} end. diff --git a/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl b/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl index e6e8bb475..d10c9d068 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl @@ -20,6 +20,7 @@ -include_lib("typerefl/include/types.hrl"). -include_lib("emqx/include/emqx.hrl"). +-include_lib("emqx/include/emqx_router.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("hocon/include/hoconsc.hrl"). @@ -38,8 +39,6 @@ format/2 ]). --define(SUBS_QTABLE, emqx_suboption). - -define(SUBS_QSCHEMA, [ {<<"clientid">>, binary}, {<<"topic">>, binary}, @@ -146,7 +145,7 @@ subscriptions(get, #{query_string := QString}) -> case maps:get(<<"node">>, QString, undefined) of undefined -> emqx_mgmt_api:cluster_query( - ?SUBS_QTABLE, + ?SUBOPTION, QString, ?SUBS_QSCHEMA, fun ?MODULE:qs2ms/2, @@ -157,7 +156,7 @@ subscriptions(get, #{query_string := QString}) -> {ok, Node1} -> emqx_mgmt_api:node_query( Node1, - ?SUBS_QTABLE, + ?SUBOPTION, QString, ?SUBS_QSCHEMA, fun ?MODULE:qs2ms/2, @@ -177,23 +176,16 @@ subscriptions(get, #{query_string := QString}) -> {200, Result} end. -format(WhichNode, {{Topic, _Subscriber}, Options}) -> +format(WhichNode, {{Topic, _Subscriber}, SubOpts}) -> maps:merge( #{ - topic => get_topic(Topic, Options), - clientid => maps:get(subid, Options, null), + topic => emqx_topic:maybe_format_share(Topic), + clientid => maps:get(subid, SubOpts, null), node => WhichNode }, - maps:with([qos, nl, rap, rh], Options) + maps:with([qos, nl, rap, rh], SubOpts) ). -get_topic(Topic, #{share := <<"$queue">> = Group}) -> - emqx_topic:join([Group, Topic]); -get_topic(Topic, #{share := Group}) -> - emqx_topic:join([<<"$share">>, Group, Topic]); -get_topic(Topic, _) -> - Topic. - %%-------------------------------------------------------------------- %% QueryString to MatchSpec %%-------------------------------------------------------------------- @@ -213,10 +205,18 @@ gen_match_spec([{Key, '=:=', Value} | More], MtchHead) -> update_ms(clientid, X, {{Topic, Pid}, Opts}) -> {{Topic, Pid}, Opts#{subid => X}}; -update_ms(topic, X, {{_Topic, Pid}, Opts}) -> +update_ms(topic, X, {{Topic, Pid}, Opts}) when + is_record(Topic, share) +-> + {{#share{group = '_', topic = X}, Pid}, Opts}; +update_ms(topic, X, {{Topic, Pid}, Opts}) when + is_binary(Topic) orelse Topic =:= '_' +-> {{X, Pid}, Opts}; -update_ms(share_group, X, {{Topic, Pid}, Opts}) -> - {{Topic, Pid}, Opts#{share => X}}; +update_ms(share_group, X, {{Topic, Pid}, Opts}) when + not is_record(Topic, share) +-> + {{#share{group = X, topic = Topic}, Pid}, Opts}; update_ms(qos, X, {{Topic, Pid}, Opts}) -> {{Topic, Pid}, Opts#{qos => X}}. @@ -227,5 +227,6 @@ fuzzy_filter_fun(Fuzzy) -> run_fuzzy_filter(_, []) -> true; -run_fuzzy_filter(E = {{Topic, _}, _}, [{topic, match, TopicFilter} | Fuzzy]) -> - emqx_topic:match(Topic, TopicFilter) andalso run_fuzzy_filter(E, Fuzzy). +run_fuzzy_filter(E = {{SubedTopic, _}, _}, [{topic, match, TopicFilter} | Fuzzy]) -> + {Filter, _SubOpts} = emqx_topic:parse(TopicFilter), + emqx_topic:match(SubedTopic, Filter) andalso run_fuzzy_filter(E, Fuzzy). diff --git a/apps/emqx_management/src/emqx_mgmt_api_topics.erl b/apps/emqx_management/src/emqx_mgmt_api_topics.erl index b6294ecbf..94bedd39f 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_topics.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_topics.erl @@ -17,6 +17,7 @@ -module(emqx_mgmt_api_topics). -include_lib("emqx/include/emqx.hrl"). +-include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("emqx/include/emqx_router.hrl"). -include_lib("typerefl/include/types.hrl"). -include_lib("hocon/include/hoconsc.hrl"). @@ -101,10 +102,10 @@ fields(topic) -> %%%============================================================================================== %% parameters trans topics(get, #{query_string := Qs}) -> - do_list(generate_topic(Qs)). + do_list(Qs). topic(get, #{bindings := Bindings}) -> - lookup(generate_topic(Bindings)). + lookup(Bindings). %%%============================================================================================== %% api apply @@ -139,13 +140,6 @@ lookup(#{topic := Topic}) -> %%%============================================================================================== %% internal -generate_topic(Params = #{<<"topic">> := Topic}) -> - Params#{<<"topic">> => Topic}; -generate_topic(Params = #{topic := Topic}) -> - Params#{topic => Topic}; -generate_topic(Params) -> - Params. - -spec qs2ms(atom(), {list(), list()}) -> emqx_mgmt_api:match_spec_and_filter(). qs2ms(_Tab, {Qs, _}) -> #{ @@ -160,9 +154,9 @@ gen_match_spec([{topic, '=:=', T} | Qs], [{{route, _, N}, [], ['$_']}]) -> gen_match_spec([{node, '=:=', N} | Qs], [{{route, T, _}, [], ['$_']}]) -> gen_match_spec(Qs, [{{route, T, N}, [], ['$_']}]). -format(#route{topic = Topic, dest = {_, Node}}) -> - #{topic => Topic, node => Node}; -format(#route{topic = Topic, dest = Node}) -> +format(#route{topic = Topic, dest = {Group, Node}}) -> + #{topic => ?SHARE(Group, Topic), node => Node}; +format(#route{topic = Topic, dest = Node}) when is_atom(Node) -> #{topic => Topic, node => Node}. topic_param(In) -> diff --git a/apps/emqx_modules/src/emqx_rewrite.erl b/apps/emqx_modules/src/emqx_rewrite.erl index d91371f7e..619b53be4 100644 --- a/apps/emqx_modules/src/emqx_rewrite.erl +++ b/apps/emqx_modules/src/emqx_rewrite.erl @@ -150,11 +150,12 @@ compile(Rules) -> Rules ). +%% FIXME: rewrite #share{} and return #share{}, not formated $share/group/topic match_and_rewrite(Topic, [], _) -> Topic; match_and_rewrite(Topic, [{Filter, MP, Dest} | Rules], Binds) -> case emqx_topic:match(Topic, Filter) of - true -> rewrite(Topic, MP, Dest, Binds); + true -> rewrite(emqx_topic:get_shared_real_topic(Topic), MP, Dest, Binds); false -> match_and_rewrite(Topic, Rules, Binds) end. diff --git a/apps/emqx_retainer/src/emqx_retainer.erl b/apps/emqx_retainer/src/emqx_retainer.erl index 4976e2400..49831a9e8 100644 --- a/apps/emqx_retainer/src/emqx_retainer.erl +++ b/apps/emqx_retainer/src/emqx_retainer.erl @@ -20,6 +20,7 @@ -include("emqx_retainer.hrl"). -include_lib("emqx/include/logger.hrl"). +-include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("emqx/include/emqx_hooks.hrl"). -export([start_link/0]). @@ -87,7 +88,7 @@ %% Hook API %%------------------------------------------------------------------------------ -spec on_session_subscribed(_, _, emqx_types:subopts(), _) -> any(). -on_session_subscribed(_, _, #{share := ShareName}, _) when ShareName =/= undefined -> +on_session_subscribed(_, #share{} = _Topic, _SubOpts, _) -> ok; on_session_subscribed(_, Topic, #{rh := Rh} = Opts, Context) -> IsNew = maps:get(is_new, Opts, true), diff --git a/apps/emqx_rule_engine/src/emqx_rule_events.erl b/apps/emqx_rule_engine/src/emqx_rule_events.erl index 3ff588f48..ac9e077eb 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_events.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_events.erl @@ -190,7 +190,9 @@ on_session_subscribed(ClientInfo, Topic, SubOpts, Conf) -> apply_event( 'session.subscribed', fun() -> - eventmsg_sub_or_unsub('session.subscribed', ClientInfo, Topic, SubOpts) + eventmsg_sub_or_unsub( + 'session.subscribed', ClientInfo, emqx_topic:maybe_format_share(Topic), SubOpts + ) end, Conf ). @@ -199,7 +201,9 @@ on_session_unsubscribed(ClientInfo, Topic, SubOpts, Conf) -> apply_event( 'session.unsubscribed', fun() -> - eventmsg_sub_or_unsub('session.unsubscribed', ClientInfo, Topic, SubOpts) + eventmsg_sub_or_unsub( + 'session.unsubscribed', ClientInfo, emqx_topic:maybe_format_share(Topic), SubOpts + ) end, Conf ). diff --git a/changes/ce/fix-10976.en.md b/changes/ce/fix-10976.en.md new file mode 100644 index 000000000..87a7b442a --- /dev/null +++ b/changes/ce/fix-10976.en.md @@ -0,0 +1,2 @@ +Fix topic-filter overlapping handling in shared subscription. +In the previous implementation, the storage method for subscription options did not provide adequate support for shared subscriptions. This resulted in message routing failures and leakage of routing tables between nodes during the "subscribe-unsubscribe" process with specific order and topics. From 0ca725ff25841938afc3726c5b2cf7b4592ef9b1 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Tue, 15 Aug 2023 11:22:55 +0800 Subject: [PATCH 2/9] fix: deprecated `share` in `subopts` --- apps/emqx/src/emqx_types.erl | 1 - apps/emqx_exhook/priv/protos/exhook.proto | 5 ++++- apps/emqx_exhook/src/emqx_exhook_handler.erl | 2 +- apps/emqx_exhook/test/props/prop_exhook_hooks.erl | 4 +--- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/apps/emqx/src/emqx_types.erl b/apps/emqx/src/emqx_types.erl index dbd788c04..1a4825736 100644 --- a/apps/emqx/src/emqx_types.erl +++ b/apps/emqx/src/emqx_types.erl @@ -214,7 +214,6 @@ rap := 0 | 1, nl := 0 | 1, qos := qos(), - share => binary(), atom() => term() }. -type reason_code() :: 0..16#FF. diff --git a/apps/emqx_exhook/priv/protos/exhook.proto b/apps/emqx_exhook/priv/protos/exhook.proto index 928e9b20b..e5d7b3606 100644 --- a/apps/emqx_exhook/priv/protos/exhook.proto +++ b/apps/emqx_exhook/priv/protos/exhook.proto @@ -460,8 +460,11 @@ message SubOpts { // The QoS level uint32 qos = 1; + // deprecated + reserved 2; + reserved "share"; // The group name for shared subscription - string share = 2; + // string share = 2; // The Retain Handling option (MQTT v5.0) // diff --git a/apps/emqx_exhook/src/emqx_exhook_handler.erl b/apps/emqx_exhook/src/emqx_exhook_handler.erl index 64061de3d..2bcb91b12 100644 --- a/apps/emqx_exhook/src/emqx_exhook_handler.erl +++ b/apps/emqx_exhook/src/emqx_exhook_handler.erl @@ -192,7 +192,7 @@ on_session_subscribed(ClientInfo, Topic, SubOpts) -> Req = #{ clientinfo => clientinfo(ClientInfo), topic => emqx_topic:maybe_format_share(Topic), - subopts => maps:with([qos, share, rh, rap, nl], SubOpts) + subopts => maps:with([qos, rh, rap, nl], SubOpts) }, cast('session.subscribed', Req). diff --git a/apps/emqx_exhook/test/props/prop_exhook_hooks.erl b/apps/emqx_exhook/test/props/prop_exhook_hooks.erl index 10462d210..cf48fff80 100644 --- a/apps/emqx_exhook/test/props/prop_exhook_hooks.erl +++ b/apps/emqx_exhook/test/props/prop_exhook_hooks.erl @@ -546,9 +546,7 @@ subopts(SubOpts) -> qos => maps:get(qos, SubOpts, 0), rh => maps:get(rh, SubOpts, 0), rap => maps:get(rap, SubOpts, 0), - nl => maps:get(nl, SubOpts, 0), - %% TOOD: FIXME for share-sub refactored - share => maps:get(share, SubOpts, <<>>) + nl => maps:get(nl, SubOpts, 0) }. authresult_to_bool(AuthResult) -> From eaa5459509468a881a676334e199b86d0274eda2 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Tue, 8 Aug 2023 14:20:22 +0800 Subject: [PATCH 3/9] chore: bump apps vsn --- apps/emqx_exhook/src/emqx_exhook.app.src | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx_exhook/src/emqx_exhook.app.src b/apps/emqx_exhook/src/emqx_exhook.app.src index 8a57249e9..79c34e36b 100644 --- a/apps/emqx_exhook/src/emqx_exhook.app.src +++ b/apps/emqx_exhook/src/emqx_exhook.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_exhook, [ {description, "EMQX Extension for Hook"}, - {vsn, "5.0.14"}, + {vsn, "5.0.15"}, {modules, []}, {registered, []}, {mod, {emqx_exhook_app, []}}, From 9732e31395a48bba236f13805468d12497ef2115 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Wed, 11 Oct 2023 11:23:41 +0800 Subject: [PATCH 4/9] chore: highlight breaking changes --- changes/ce/fix-10976.en.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/changes/ce/fix-10976.en.md b/changes/ce/fix-10976.en.md index 87a7b442a..f2f15d4c8 100644 --- a/changes/ce/fix-10976.en.md +++ b/changes/ce/fix-10976.en.md @@ -1,2 +1,9 @@ Fix topic-filter overlapping handling in shared subscription. In the previous implementation, the storage method for subscription options did not provide adequate support for shared subscriptions. This resulted in message routing failures and leakage of routing tables between nodes during the "subscribe-unsubscribe" process with specific order and topics. + +## Breaking changes +* Hook callback `session.subscribed` and `client.subscribe` will now receive shared subscription in its full representation, e.g. `$share/group1/topic1/#`, and the `share` property is deleted from `subopts`. +* Hook callback `session.unsubscribed` and `client.unsubscribe` will now receive shared subscription in its full representation, e.g. `$share/group1/topic1/#` instead of just `topic1/#`. +* ExHook Proto changed. The `share` field in message `SubOpts` was deprecated. + ExHook Server will now receive shared subscription in its full representation, e.g. `$share/group1/topic1/#`, and the `share` property is deleted from message `SubOpts`. +* `session.subscribed` and `session.unsubscribed` rule-engine events will have shared subscriptions in their full representation for `topic`, e.g. `$share/group1/topic1/#` instead of just `topic1/#`. From 802a36c67020369e5d3a1020433bfa6303a14a5b Mon Sep 17 00:00:00 2001 From: JimMoen Date: Wed, 25 Oct 2023 16:13:45 +0800 Subject: [PATCH 5/9] fix: find SubOpts by shared_record, not deliver topic --- apps/emqx/src/emqx_session.erl | 18 +++++++----------- apps/emqx/src/emqx_shared_sub.erl | 12 ++++++++---- apps/emqx/test/emqx_broker_SUITE.erl | 5 ++++- 3 files changed, 19 insertions(+), 16 deletions(-) diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index e79c30f4a..c31efb0a6 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -409,18 +409,14 @@ enrich_delivers(ClientInfo, [D | Rest], UpgradeQoS, Session) -> [Msg | enrich_delivers(ClientInfo, Rest, UpgradeQoS, Session)] end. -enrich_deliver( - ClientInfo, - {deliver, Topic, Msg = #message{headers = #{redispatch_to := {Group, Topic}}}}, - UpgradeQoS, - Session -) -> - %% Only QoS_1 and QoS_2 messages added `redispatch_to` header - %% For QoS 0 message, send it as regular dispatch - Deliver = {deliver, emqx_topic:make_shared_record(Group, Topic), Msg}, - enrich_deliver(ClientInfo, Deliver, UpgradeQoS, Session); enrich_deliver(ClientInfo, {deliver, Topic, Msg}, UpgradeQoS, Session) -> - SubOpts = ?IMPL(Session):get_subscription(Topic, Session), + SubOpts = + case Msg of + #message{headers = #{shared_record := SharedRecord}} -> + ?IMPL(Session):get_subscription(SharedRecord, Session); + _ -> + ?IMPL(Session):get_subscription(Topic, Session) + end, enrich_message(ClientInfo, Msg, SubOpts, UpgradeQoS). enrich_message( diff --git a/apps/emqx/src/emqx_shared_sub.erl b/apps/emqx/src/emqx_shared_sub.erl index 84921be6b..c5ee9e7ab 100644 --- a/apps/emqx/src/emqx_shared_sub.erl +++ b/apps/emqx/src/emqx_shared_sub.erl @@ -141,14 +141,15 @@ record(Group, Topic, SubPid) -> dispatch(Group, Topic, Delivery) -> dispatch(Group, Topic, Delivery, _FailedSubs = #{}). -dispatch(Group, Topic, Delivery = #delivery{message = Msg}, FailedSubs) -> - #message{from = ClientId, topic = SourceTopic} = Msg, +dispatch(Group, Topic, Delivery = #delivery{message = Msg0}, FailedSubs) -> + #message{from = ClientId, topic = SourceTopic} = Msg0, + Msg1 = with_shared_record(Msg0, Group, Topic), case pick(strategy(Group), ClientId, SourceTopic, Group, Topic, FailedSubs) of false -> {error, no_subscribers}; {Type, SubPid} -> - Msg1 = with_redispatch_to(Msg, Group, Topic), - case do_dispatch(SubPid, Group, Topic, Msg1, Type) of + Msg2 = with_redispatch_to(Msg1, Group, Topic), + case do_dispatch(SubPid, Group, Topic, Msg2, Type) of ok -> {ok, 1}; {error, Reason} -> @@ -239,6 +240,9 @@ with_redispatch_to(#message{qos = ?QOS_0} = Msg, _Group, _Topic) -> with_redispatch_to(Msg, Group, Topic) -> emqx_message:set_headers(#{redispatch_to => ?REDISPATCH_TO(Group, Topic)}, Msg). +with_shared_record(Msg, Group, Topic) -> + emqx_message:set_headers(#{shared_record => emqx_topic:make_shared_record(Group, Topic)}, Msg). + %% @hidden Redispatch is needed only for the messages with redispatch_to header added. is_redispatch_needed(#message{} = Msg) -> case get_redispatch_to(Msg) of diff --git a/apps/emqx/test/emqx_broker_SUITE.erl b/apps/emqx/test/emqx_broker_SUITE.erl index da108ceef..2bc6bd8ea 100644 --- a/apps/emqx/test/emqx_broker_SUITE.erl +++ b/apps/emqx/test/emqx_broker_SUITE.erl @@ -308,7 +308,10 @@ t_shared_subscribe(Config) when is_list(Config) -> emqx_broker:safe_publish(emqx_message:make(ct, <<"topic">>, <<"hello">>)), ?assert( receive - {deliver, <<"topic">>, #message{payload = <<"hello">>}} -> + {deliver, <<"topic">>, #message{ + headers = #{shared_record := #share{group = <<"group">>, topic = <<"topic">>}}, + payload = <<"hello">> + }} -> true; Msg -> ct:pal("Msg: ~p", [Msg]), From afec6fa2f6c547c52479332df1d92703925a0e8d Mon Sep 17 00:00:00 2001 From: JimMoen Date: Wed, 25 Oct 2023 17:12:28 +0800 Subject: [PATCH 6/9] fix: TopicFilter may modified by `client.subscribe` hook --- apps/emqx/src/emqx_channel.erl | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 61b31c6e1..81e01e1bd 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -796,19 +796,19 @@ do_gen_reason_codes(Acc, [], []) -> lists:reverse(Acc); do_gen_reason_codes( Acc, - [{_TF, ?RC_SUCCESS} | RestCheckedTF], - [{_TF, NRC} | RestTFWithNRC] + [{_, ?RC_SUCCESS} | RestTF], + [{_, NRC} | RestWithNRC] ) -> %% will passing through `process_subscribe/2` %% use NRC to override IintialRC - do_gen_reason_codes([NRC | Acc], RestCheckedTF, RestTFWithNRC); + do_gen_reason_codes([NRC | Acc], RestTF, RestWithNRC); do_gen_reason_codes( Acc, - [{_TF, InitialRC} | RestChecked], - RestTFWithNRC + [{_, InitialRC} | Rest], + RestWithNRC ) -> %% InitialRC is not `RC_SUCCESS`, use it. - do_gen_reason_codes([InitialRC | Acc], RestChecked, RestTFWithNRC). + do_gen_reason_codes([InitialRC | Acc], Rest, RestWithNRC). %%-------------------------------------------------------------------- %% Process Unsubscribe From 53383991d9a065b20c0331f1e7168c89ae7a0326 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Wed, 25 Oct 2023 17:25:23 +0800 Subject: [PATCH 7/9] fix: rewrite #share{} and return #share{}, not formated $share/group/topic --- apps/emqx_modules/src/emqx_rewrite.erl | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/apps/emqx_modules/src/emqx_rewrite.erl b/apps/emqx_modules/src/emqx_rewrite.erl index 619b53be4..485c41f29 100644 --- a/apps/emqx_modules/src/emqx_rewrite.erl +++ b/apps/emqx_modules/src/emqx_rewrite.erl @@ -150,15 +150,16 @@ compile(Rules) -> Rules ). -%% FIXME: rewrite #share{} and return #share{}, not formated $share/group/topic match_and_rewrite(Topic, [], _) -> Topic; match_and_rewrite(Topic, [{Filter, MP, Dest} | Rules], Binds) -> case emqx_topic:match(Topic, Filter) of - true -> rewrite(emqx_topic:get_shared_real_topic(Topic), MP, Dest, Binds); + true -> rewrite(Topic, MP, Dest, Binds); false -> match_and_rewrite(Topic, Rules, Binds) end. +rewrite(SharedRecord = #share{topic = Topic}, MP, Dest, Binds) -> + SharedRecord#share{topic = rewrite(Topic, MP, Dest, Binds)}; rewrite(Topic, MP, Dest, Binds) -> case re:run(Topic, MP, [{capture, all_but_first, list}]) of {match, Captured} -> From 3b5cc912e7c5ed4e11d2e02d684ab5dcf4f8a726 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Fri, 27 Oct 2023 16:15:23 +0800 Subject: [PATCH 8/9] fix: add `redispatch_to` header to all msgs when deliver shared-sub - to find correct SubOpts for shared-sub dispatch - use previous key `redispatch_to` to ensure rolling upgrade compatibility --- apps/emqx/src/emqx_session.erl | 4 ++-- apps/emqx/src/emqx_shared_sub.erl | 29 +++++++++++----------------- apps/emqx/test/emqx_broker_SUITE.erl | 2 +- 3 files changed, 14 insertions(+), 21 deletions(-) diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index c31efb0a6..f5157aaf3 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -412,8 +412,8 @@ enrich_delivers(ClientInfo, [D | Rest], UpgradeQoS, Session) -> enrich_deliver(ClientInfo, {deliver, Topic, Msg}, UpgradeQoS, Session) -> SubOpts = case Msg of - #message{headers = #{shared_record := SharedRecord}} -> - ?IMPL(Session):get_subscription(SharedRecord, Session); + #message{headers = #{redispatch_to := {Group, T}}} -> + ?IMPL(Session):get_subscription(emqx_topic:make_shared_record(Group, T), Session); _ -> ?IMPL(Session):get_subscription(Topic, Session) end, diff --git a/apps/emqx/src/emqx_shared_sub.erl b/apps/emqx/src/emqx_shared_sub.erl index c5ee9e7ab..691ab7497 100644 --- a/apps/emqx/src/emqx_shared_sub.erl +++ b/apps/emqx/src/emqx_shared_sub.erl @@ -141,15 +141,14 @@ record(Group, Topic, SubPid) -> dispatch(Group, Topic, Delivery) -> dispatch(Group, Topic, Delivery, _FailedSubs = #{}). -dispatch(Group, Topic, Delivery = #delivery{message = Msg0}, FailedSubs) -> - #message{from = ClientId, topic = SourceTopic} = Msg0, - Msg1 = with_shared_record(Msg0, Group, Topic), +dispatch(Group, Topic, Delivery = #delivery{message = Msg}, FailedSubs) -> + #message{from = ClientId, topic = SourceTopic} = Msg, case pick(strategy(Group), ClientId, SourceTopic, Group, Topic, FailedSubs) of false -> {error, no_subscribers}; {Type, SubPid} -> - Msg2 = with_redispatch_to(Msg1, Group, Topic), - case do_dispatch(SubPid, Group, Topic, Msg2, Type) of + Msg1 = with_redispatch_to(Msg, Group, Topic), + case do_dispatch(SubPid, Group, Topic, Msg1, Type) of ok -> {ok, 1}; {error, Reason} -> @@ -235,22 +234,16 @@ without_group_ack(Msg) -> get_group_ack(Msg) -> emqx_message:get_header(shared_dispatch_ack, Msg, ?NO_ACK). -with_redispatch_to(#message{qos = ?QOS_0} = Msg, _Group, _Topic) -> - Msg; +%% always add `redispatch_to` header to the message +%% for QOS_0 msgs, redispatch_to is not needed and filtered out in is_redispatch_needed/1 with_redispatch_to(Msg, Group, Topic) -> emqx_message:set_headers(#{redispatch_to => ?REDISPATCH_TO(Group, Topic)}, Msg). -with_shared_record(Msg, Group, Topic) -> - emqx_message:set_headers(#{shared_record => emqx_topic:make_shared_record(Group, Topic)}, Msg). - -%% @hidden Redispatch is needed only for the messages with redispatch_to header added. -is_redispatch_needed(#message{} = Msg) -> - case get_redispatch_to(Msg) of - ?REDISPATCH_TO(_, _) -> - true; - _ -> - false - end. +%% @hidden Redispatch is needed only for the messages which not QOS_0 +is_redispatch_needed(#message{qos = ?QOS_0}) -> + false; +is_redispatch_needed(#message{headers = #{redispatch_to := ?REDISPATCH_TO(_, _)}}) -> + true. %% @doc Redispatch shared deliveries to other members in the group. redispatch(Messages0) -> diff --git a/apps/emqx/test/emqx_broker_SUITE.erl b/apps/emqx/test/emqx_broker_SUITE.erl index 2bc6bd8ea..18d9f9651 100644 --- a/apps/emqx/test/emqx_broker_SUITE.erl +++ b/apps/emqx/test/emqx_broker_SUITE.erl @@ -309,7 +309,7 @@ t_shared_subscribe(Config) when is_list(Config) -> ?assert( receive {deliver, <<"topic">>, #message{ - headers = #{shared_record := #share{group = <<"group">>, topic = <<"topic">>}}, + headers = #{redispatch_to := {<<"group">>, <<"topic">>}}, payload = <<"hello">> }} -> true; From d563121284d63fd0ff79873223ec6308a09ee607 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Fri, 27 Oct 2023 17:54:14 +0800 Subject: [PATCH 9/9] refactor: move ?REDISPATCH_TO macro to emqx_mqtt.hrl --- apps/emqx/include/emqx_mqtt.hrl | 2 ++ apps/emqx/src/emqx_session.erl | 2 +- apps/emqx/src/emqx_shared_sub.erl | 1 - apps/emqx/test/emqx_broker_SUITE.erl | 2 +- apps/emqx_rule_engine/test/emqx_rule_events_SUITE.erl | 3 ++- 5 files changed, 6 insertions(+), 4 deletions(-) diff --git a/apps/emqx/include/emqx_mqtt.hrl b/apps/emqx/include/emqx_mqtt.hrl index 93c70a6e1..53fed0f9d 100644 --- a/apps/emqx/include/emqx_mqtt.hrl +++ b/apps/emqx/include/emqx_mqtt.hrl @@ -675,6 +675,8 @@ end). -define(QUEUE, "$queue"). -define(SHARE(Group, Topic), emqx_topic:join([<>, Group, Topic])). +-define(REDISPATCH_TO(GROUP, TOPIC), {GROUP, TOPIC}). + -define(SHARE_EMPTY_FILTER, share_subscription_topic_cannot_be_empty). -define(SHARE_EMPTY_GROUP, share_subscription_group_name_cannot_be_empty). -define(SHARE_RECURSIVELY, '$share_cannot_be_used_as_real_topic_filter'). diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index f5157aaf3..147b0b35c 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -412,7 +412,7 @@ enrich_delivers(ClientInfo, [D | Rest], UpgradeQoS, Session) -> enrich_deliver(ClientInfo, {deliver, Topic, Msg}, UpgradeQoS, Session) -> SubOpts = case Msg of - #message{headers = #{redispatch_to := {Group, T}}} -> + #message{headers = #{redispatch_to := ?REDISPATCH_TO(Group, T)}} -> ?IMPL(Session):get_subscription(emqx_topic:make_shared_record(Group, T), Session); _ -> ?IMPL(Session):get_subscription(Topic, Session) diff --git a/apps/emqx/src/emqx_shared_sub.erl b/apps/emqx/src/emqx_shared_sub.erl index 691ab7497..89a785590 100644 --- a/apps/emqx/src/emqx_shared_sub.erl +++ b/apps/emqx/src/emqx_shared_sub.erl @@ -95,7 +95,6 @@ -define(ACK, shared_sub_ack). -define(NACK(Reason), {shared_sub_nack, Reason}). -define(NO_ACK, no_ack). --define(REDISPATCH_TO(GROUP, TOPIC), {GROUP, TOPIC}). -define(SUBSCRIBER_DOWN, noproc). -type redispatch_to() :: ?REDISPATCH_TO(emqx_types:group(), emqx_types:topic()). diff --git a/apps/emqx/test/emqx_broker_SUITE.erl b/apps/emqx/test/emqx_broker_SUITE.erl index 18d9f9651..b416f1730 100644 --- a/apps/emqx/test/emqx_broker_SUITE.erl +++ b/apps/emqx/test/emqx_broker_SUITE.erl @@ -309,7 +309,7 @@ t_shared_subscribe(Config) when is_list(Config) -> ?assert( receive {deliver, <<"topic">>, #message{ - headers = #{redispatch_to := {<<"group">>, <<"topic">>}}, + headers = #{redispatch_to := ?REDISPATCH_TO(<<"group">>, <<"topic">>)}, payload = <<"hello">> }} -> true; diff --git a/apps/emqx_rule_engine/test/emqx_rule_events_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_events_SUITE.erl index f8fe49ca8..fee112d9a 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_events_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_events_SUITE.erl @@ -20,6 +20,7 @@ -compile(nowarn_export_all). -include_lib("eunit/include/eunit.hrl"). +-include_lib("emqx/include/emqx_mqtt.hrl"). all() -> emqx_common_test_helpers:all(?MODULE). @@ -42,7 +43,7 @@ t_printable_maps(_) -> peerhost => {127, 0, 0, 1}, peername => {{127, 0, 0, 1}, 9980}, sockname => {{127, 0, 0, 1}, 1883}, - redispatch_to => {<<"group">>, <<"sub/topic/+">>}, + redispatch_to => ?REDISPATCH_TO(<<"group">>, <<"sub/topic/+">>), shared_dispatch_ack => {self(), ref} }, Converted = emqx_rule_events:printable_maps(Headers),