refactor: subscribe process to fix shared-sub

This commit is contained in:
JimMoen 2023-06-07 10:22:14 +08:00
parent efd0cfbda9
commit b5411da770
No known key found for this signature in database
GPG Key ID: 87A520B4F76BA86D
28 changed files with 636 additions and 343 deletions

View File

@ -77,7 +77,7 @@ EMQX Cloud 文档:[docs.emqx.com/zh/cloud/latest/](https://docs.emqx.com/zh/cl
优雅的跨平台 MQTT 5.0 客户端工具提供了桌面端、命令行、Web 三种版本,帮助您更快的开发和调试 MQTT 服务和应用。
- [车联网平台搭建从入门到精通 ](https://www.emqx.com/zh/blog/category/internet-of-vehicles)
- [车联网平台搭建从入门到精通](https://www.emqx.com/zh/blog/category/internet-of-vehicles)
结合 EMQ 在车联网领域的实践经验,从协议选择等理论知识,到平台架构设计等实战操作,分享如何搭建一个可靠、高效、符合行业场景需求的车联网平台。

View File

@ -39,9 +39,6 @@
%% System topic
-define(SYSTOP, <<"$SYS/">>).
%% Queue topic
-define(QUEUE, <<"$queue/">>).
%%--------------------------------------------------------------------
%% alarms
%%--------------------------------------------------------------------

View File

@ -55,6 +55,17 @@
%% MQTT-3.1.1 and MQTT-5.0 [MQTT-4.7.3-3]
-define(MAX_TOPIC_LEN, 65535).
%%--------------------------------------------------------------------
%% MQTT Share-Sub Internal
%%--------------------------------------------------------------------
-record(share, {group :: emqx_types:group(), topic :: emqx_types:topic()}).
%% guards
-define(IS_TOPIC(T),
(is_binary(T) orelse is_record(T, share))
).
%%--------------------------------------------------------------------
%% MQTT QoS Levels
%%--------------------------------------------------------------------
@ -661,13 +672,8 @@ end).
-define(PACKET(Type), #mqtt_packet{header = #mqtt_packet_header{type = Type}}).
-define(SHARE, "$share").
-define(QUEUE, "$queue").
-define(SHARE(Group, Topic), emqx_topic:join([<<?SHARE>>, Group, Topic])).
-define(IS_SHARE(Topic),
case Topic of
<<?SHARE, _/binary>> -> true;
_ -> false
end
).
-define(SHARE_EMPTY_FILTER, share_subscription_topic_cannot_be_empty).
-define(SHARE_EMPTY_GROUP, share_subscription_group_name_cannot_be_empty).

View File

@ -118,18 +118,20 @@ create_tabs() ->
%% Subscribe API
%%------------------------------------------------------------------------------
-spec subscribe(emqx_types:topic()) -> ok.
subscribe(Topic) when is_binary(Topic) ->
-spec subscribe(emqx_types:topic() | emqx_types:share()) -> ok.
subscribe(Topic) when ?IS_TOPIC(Topic) ->
subscribe(Topic, undefined).
-spec subscribe(emqx_types:topic(), emqx_types:subid() | emqx_types:subopts()) -> ok.
subscribe(Topic, SubId) when is_binary(Topic), ?IS_SUBID(SubId) ->
-spec subscribe(emqx_types:topic() | emqx_types:share(), emqx_types:subid() | emqx_types:subopts()) ->
ok.
subscribe(Topic, SubId) when ?IS_TOPIC(Topic), ?IS_SUBID(SubId) ->
subscribe(Topic, SubId, ?DEFAULT_SUBOPTS);
subscribe(Topic, SubOpts) when is_binary(Topic), is_map(SubOpts) ->
subscribe(Topic, SubOpts) when ?IS_TOPIC(Topic), is_map(SubOpts) ->
subscribe(Topic, undefined, SubOpts).
-spec subscribe(emqx_types:topic(), emqx_types:subid(), emqx_types:subopts()) -> ok.
subscribe(Topic, SubId, SubOpts0) when is_binary(Topic), ?IS_SUBID(SubId), is_map(SubOpts0) ->
-spec subscribe(emqx_types:topic() | emqx_types:share(), emqx_types:subid(), emqx_types:subopts()) ->
ok.
subscribe(Topic, SubId, SubOpts0) when ?IS_TOPIC(Topic), ?IS_SUBID(SubId), is_map(SubOpts0) ->
SubOpts = maps:merge(?DEFAULT_SUBOPTS, SubOpts0),
_ = emqx_trace:subscribe(Topic, SubId, SubOpts),
SubPid = self(),
@ -151,13 +153,13 @@ with_subid(undefined, SubOpts) ->
with_subid(SubId, SubOpts) ->
maps:put(subid, SubId, SubOpts).
%% @private
do_subscribe(Topic, SubPid, SubOpts) ->
true = ets:insert(?SUBSCRIPTION, {SubPid, Topic}),
Group = maps:get(share, SubOpts, undefined),
do_subscribe(Group, Topic, SubPid, SubOpts).
do_subscribe2(Topic, SubPid, SubOpts).
do_subscribe(undefined, Topic, SubPid, SubOpts) ->
do_subscribe2(Topic, SubPid, SubOpts) when is_binary(Topic) ->
%% FIXME: subscribe shard bug
%% https://emqx.atlassian.net/browse/EMQX-10214
case emqx_broker_helper:get_sub_shard(SubPid, Topic) of
0 ->
true = ets:insert(?SUBSCRIBER, {Topic, SubPid}),
@ -168,34 +170,40 @@ do_subscribe(undefined, Topic, SubPid, SubOpts) ->
true = ets:insert(?SUBOPTION, {{Topic, SubPid}, maps:put(shard, I, SubOpts)}),
call(pick({Topic, I}), {subscribe, Topic, I})
end;
%% Shared subscription
do_subscribe(Group, Topic, SubPid, SubOpts) ->
do_subscribe2(Topic = #share{group = Group, topic = RealTopic}, SubPid, SubOpts) when
is_binary(RealTopic)
->
true = ets:insert(?SUBOPTION, {{Topic, SubPid}, SubOpts}),
emqx_shared_sub:subscribe(Group, Topic, SubPid).
emqx_shared_sub:subscribe(Group, RealTopic, SubPid).
%%--------------------------------------------------------------------
%% Unsubscribe API
%%--------------------------------------------------------------------
-spec unsubscribe(emqx_types:topic()) -> ok.
unsubscribe(Topic) when is_binary(Topic) ->
-spec unsubscribe(emqx_types:topic() | emqx_types:share()) -> ok.
unsubscribe(Topic) when ?IS_TOPIC(Topic) ->
SubPid = self(),
case ets:lookup(?SUBOPTION, {Topic, SubPid}) of
[{_, SubOpts}] ->
_ = emqx_broker_helper:reclaim_seq(Topic),
_ = emqx_trace:unsubscribe(Topic, SubOpts),
do_unsubscribe(Topic, SubPid, SubOpts);
[] ->
ok
end.
-spec do_unsubscribe(emqx_types:topic() | emqx_types:share(), pid(), emqx_types:subopts()) ->
ok.
do_unsubscribe(Topic, SubPid, SubOpts) ->
true = ets:delete(?SUBOPTION, {Topic, SubPid}),
true = ets:delete_object(?SUBSCRIPTION, {SubPid, Topic}),
Group = maps:get(share, SubOpts, undefined),
do_unsubscribe(Group, Topic, SubPid, SubOpts).
do_unsubscribe2(Topic, SubPid, SubOpts).
do_unsubscribe(undefined, Topic, SubPid, SubOpts) ->
-spec do_unsubscribe2(emqx_types:topic() | emqx_types:share(), pid(), emqx_types:subopts()) ->
ok.
do_unsubscribe2(Topic, SubPid, SubOpts) when
is_binary(Topic), is_pid(SubPid), is_map(SubOpts)
->
_ = emqx_broker_helper:reclaim_seq(Topic),
case maps:get(shard, SubOpts, 0) of
0 ->
true = ets:delete_object(?SUBSCRIBER, {Topic, SubPid}),
@ -205,7 +213,9 @@ do_unsubscribe(undefined, Topic, SubPid, SubOpts) ->
true = ets:delete_object(?SUBSCRIBER, {{shard, Topic, I}, SubPid}),
cast(pick({Topic, I}), {unsubscribed, Topic, I})
end;
do_unsubscribe(Group, Topic, SubPid, _SubOpts) ->
do_unsubscribe2(#share{group = Group, topic = Topic}, SubPid, _SubOpts) when
is_binary(Group), is_binary(Topic), is_pid(SubPid)
->
emqx_shared_sub:unsubscribe(Group, Topic, SubPid).
%%--------------------------------------------------------------------
@ -306,7 +316,9 @@ aggre([], true, Acc) ->
lists:usort(Acc).
%% @doc Forward message to another node.
-spec forward(node(), emqx_types:topic(), emqx_types:delivery(), RpcMode :: sync | async) ->
-spec forward(
node(), emqx_types:topic() | emqx_types:share(), emqx_types:delivery(), RpcMode :: sync | async
) ->
emqx_types:deliver_result().
forward(Node, To, Delivery, async) ->
true = emqx_broker_proto_v1:forward_async(Node, To, Delivery),
@ -329,7 +341,8 @@ forward(Node, To, Delivery, sync) ->
Result
end.
-spec dispatch(emqx_types:topic(), emqx_types:delivery()) -> emqx_types:deliver_result().
-spec dispatch(emqx_types:topic() | emqx_types:share(), emqx_types:delivery()) ->
emqx_types:deliver_result().
dispatch(Topic, Delivery = #delivery{}) when is_binary(Topic) ->
case emqx:is_running() of
true ->
@ -353,7 +366,11 @@ inc_dropped_cnt(Msg) ->
end.
-compile({inline, [subscribers/1]}).
-spec subscribers(emqx_types:topic() | {shard, emqx_types:topic(), non_neg_integer()}) ->
-spec subscribers(
emqx_types:topic()
| emqx_types:share()
| {shard, emqx_types:topic() | emqx_types:share(), non_neg_integer()}
) ->
[pid()].
subscribers(Topic) when is_binary(Topic) ->
lookup_value(?SUBSCRIBER, Topic, []);
@ -372,7 +389,7 @@ subscriber_down(SubPid) ->
SubOpts when is_map(SubOpts) ->
_ = emqx_broker_helper:reclaim_seq(Topic),
true = ets:delete(?SUBOPTION, {Topic, SubPid}),
do_unsubscribe(undefined, Topic, SubPid, SubOpts);
do_unsubscribe2(Topic, SubPid, SubOpts);
undefined ->
ok
end
@ -386,7 +403,7 @@ subscriber_down(SubPid) ->
%%--------------------------------------------------------------------
-spec subscriptions(pid() | emqx_types:subid()) ->
[{emqx_types:topic(), emqx_types:subopts()}].
[{emqx_types:topic() | emqx_types:share(), emqx_types:subopts()}].
subscriptions(SubPid) when is_pid(SubPid) ->
[
{Topic, lookup_value(?SUBOPTION, {Topic, SubPid}, #{})}
@ -400,20 +417,22 @@ subscriptions(SubId) ->
[]
end.
-spec subscriptions_via_topic(emqx_types:topic()) -> [emqx_types:subopts()].
-spec subscriptions_via_topic(emqx_types:topic() | emqx_types:share()) -> [emqx_types:subopts()].
subscriptions_via_topic(Topic) ->
MatchSpec = [{{{Topic, '_'}, '_'}, [], ['$_']}],
ets:select(?SUBOPTION, MatchSpec).
-spec subscribed(pid() | emqx_types:subid(), emqx_types:topic()) -> boolean().
-spec subscribed(
pid() | emqx_types:subid(), emqx_types:topic() | emqx_types:share()
) -> boolean().
subscribed(SubPid, Topic) when is_pid(SubPid) ->
ets:member(?SUBOPTION, {Topic, SubPid});
subscribed(SubId, Topic) when ?IS_SUBID(SubId) ->
SubPid = emqx_broker_helper:lookup_subpid(SubId),
ets:member(?SUBOPTION, {Topic, SubPid}).
-spec get_subopts(pid(), emqx_types:topic()) -> maybe(emqx_types:subopts()).
get_subopts(SubPid, Topic) when is_pid(SubPid), is_binary(Topic) ->
-spec get_subopts(pid(), emqx_types:topic() | emqx_types:share()) -> maybe(emqx_types:subopts()).
get_subopts(SubPid, Topic) when is_pid(SubPid), ?IS_TOPIC(Topic) ->
lookup_value(?SUBOPTION, {Topic, SubPid});
get_subopts(SubId, Topic) when ?IS_SUBID(SubId) ->
case emqx_broker_helper:lookup_subpid(SubId) of
@ -423,7 +442,7 @@ get_subopts(SubId, Topic) when ?IS_SUBID(SubId) ->
undefined
end.
-spec set_subopts(emqx_types:topic(), emqx_types:subopts()) -> boolean().
-spec set_subopts(emqx_types:topic() | emqx_types:share(), emqx_types:subopts()) -> boolean().
set_subopts(Topic, NewOpts) when is_binary(Topic), is_map(NewOpts) ->
set_subopts(self(), Topic, NewOpts).
@ -437,7 +456,7 @@ set_subopts(SubPid, Topic, NewOpts) ->
false
end.
-spec topics() -> [emqx_types:topic()].
-spec topics() -> [emqx_types:topic() | emqx_types:share()].
topics() ->
emqx_router:topics().
@ -542,7 +561,8 @@ code_change(_OldVsn, State, _Extra) ->
%% Internal functions
%%--------------------------------------------------------------------
-spec do_dispatch(emqx_types:topic(), emqx_types:delivery()) -> emqx_types:deliver_result().
-spec do_dispatch(emqx_types:topic() | emqx_types:share(), emqx_types:delivery()) ->
emqx_types:deliver_result().
do_dispatch(Topic, #delivery{message = Msg}) ->
DispN = lists:foldl(
fun(Sub, N) ->
@ -560,6 +580,8 @@ do_dispatch(Topic, #delivery{message = Msg}) ->
{ok, DispN}
end.
%% Donot dispatch to share subscriber here.
%% we do it in `emqx_shared_sub.erl` with configured strategy
do_dispatch(SubPid, Topic, Msg) when is_pid(SubPid) ->
case erlang:is_process_alive(SubPid) of
true ->

View File

@ -476,60 +476,27 @@ handle_in(
ok = emqx_metrics:inc('packets.pubcomp.missed'),
{ok, Channel}
end;
handle_in(
SubPkt = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters),
Channel = #channel{clientinfo = ClientInfo}
) ->
case emqx_packet:check(SubPkt) of
ok ->
TopicFilters0 = parse_topic_filters(TopicFilters),
TopicFilters1 = enrich_subopts_subid(Properties, TopicFilters0),
TupleTopicFilters0 = check_sub_authzs(TopicFilters1, Channel),
HasAuthzDeny = lists:any(
fun({_TopicFilter, ReasonCode}) ->
ReasonCode =:= ?RC_NOT_AUTHORIZED
end,
TupleTopicFilters0
),
DenyAction = emqx:get_config([authorization, deny_action], ignore),
case DenyAction =:= disconnect andalso HasAuthzDeny of
true ->
handle_out(disconnect, ?RC_NOT_AUTHORIZED, Channel);
false ->
TopicFilters2 = [
TopicFilter
|| {TopicFilter, ?RC_SUCCESS} <- TupleTopicFilters0
],
TopicFilters3 = run_hooks(
'client.subscribe',
[ClientInfo, Properties],
TopicFilters2
),
{TupleTopicFilters1, NChannel} = process_subscribe(
TopicFilters3,
Properties,
Channel
),
TupleTopicFilters2 =
lists:foldl(
fun
({{Topic, Opts = #{deny_subscription := true}}, _QoS}, Acc) ->
Key = {Topic, maps:without([deny_subscription], Opts)},
lists:keyreplace(Key, 1, Acc, {Key, ?RC_UNSPECIFIED_ERROR});
(Tuple = {Key, _Value}, Acc) ->
lists:keyreplace(Key, 1, Acc, Tuple)
end,
TupleTopicFilters0,
TupleTopicFilters1
),
ReasonCodes2 = [
ReasonCode
|| {_TopicFilter, ReasonCode} <- TupleTopicFilters2
],
handle_out(suback, {PacketId, ReasonCodes2}, NChannel)
end;
{error, ReasonCode} ->
handle_out(disconnect, ReasonCode, Channel)
handle_in(SubPkt = ?SUBSCRIBE_PACKET(PacketId, _Properties, _TopicFilters0), Channel0) ->
Pipe = pipeline(
[
fun check_subscribe/2,
fun enrich_subscribe/2,
%% TODO && FIXME (EMQX-10786): mount topic before authz check.
fun check_sub_authzs/2,
fun check_sub_caps/2
],
SubPkt,
Channel0
),
case Pipe of
{ok, NPkt = ?SUBSCRIBE_PACKET(_PacketId, TFChecked), Channel} ->
{TFSubedWithNRC, NChannel} = process_subscribe(run_sub_hooks(NPkt, Channel), Channel),
ReasonCodes = gen_reason_codes(TFChecked, TFSubedWithNRC),
handle_out(suback, {PacketId, ReasonCodes}, NChannel);
{error, {disconnect, RC}, Channel} ->
%% funcs in pipeline always cause action: `disconnect`
%% And Only one ReasonCode in DISCONNECT packet
handle_out(disconnect, RC, Channel)
end;
handle_in(
Packet = ?UNSUBSCRIBE_PACKET(PacketId, Properties, TopicFilters),
@ -540,7 +507,7 @@ handle_in(
TopicFilters1 = run_hooks(
'client.unsubscribe',
[ClientInfo, Properties],
parse_topic_filters(TopicFilters)
parse_raw_topic_filters(TopicFilters)
),
{ReasonCodes, NChannel} = process_unsubscribe(TopicFilters1, Properties, Channel),
handle_out(unsuback, {PacketId, ReasonCodes}, NChannel);
@ -782,32 +749,14 @@ after_message_acked(ClientInfo, Msg, PubAckProps) ->
%% Process Subscribe
%%--------------------------------------------------------------------
-compile({inline, [process_subscribe/3]}).
process_subscribe(TopicFilters, SubProps, Channel) ->
process_subscribe(TopicFilters, SubProps, Channel, []).
process_subscribe(TopicFilters, Channel) ->
process_subscribe(TopicFilters, Channel, []).
process_subscribe([], _SubProps, Channel, Acc) ->
process_subscribe([], Channel, Acc) ->
{lists:reverse(Acc), Channel};
process_subscribe([Topic = {TopicFilter, SubOpts} | More], SubProps, Channel, Acc) ->
case check_sub_caps(TopicFilter, SubOpts, Channel) of
ok ->
{ReasonCode, NChannel} = do_subscribe(
TopicFilter,
SubOpts#{sub_props => SubProps},
Channel
),
process_subscribe(More, SubProps, NChannel, [{Topic, ReasonCode} | Acc]);
{error, ReasonCode} ->
?SLOG(
warning,
#{
msg => "cannot_subscribe_topic_filter",
reason => emqx_reason_codes:name(ReasonCode)
},
#{topic => TopicFilter}
),
process_subscribe(More, SubProps, Channel, [{Topic, ReasonCode} | Acc])
end.
process_subscribe([Filter = {TopicFilter, SubOpts} | More], Channel, Acc) ->
{NReasonCode, NChannel} = do_subscribe(TopicFilter, SubOpts, Channel),
process_subscribe(More, NChannel, [{Filter, NReasonCode} | Acc]).
do_subscribe(
TopicFilter,
@ -818,11 +767,13 @@ do_subscribe(
session = Session
}
) ->
%% TODO && FIXME (EMQX-10786): mount topic before authz check.
NTopicFilter = emqx_mountpoint:mount(MountPoint, TopicFilter),
NSubOpts = enrich_subopts(maps:merge(?DEFAULT_SUBOPTS, SubOpts), Channel),
case emqx_session:subscribe(ClientInfo, NTopicFilter, NSubOpts, Session) of
case emqx_session:subscribe(ClientInfo, NTopicFilter, SubOpts, Session) of
{ok, NSession} ->
{QoS, Channel#channel{session = NSession}};
%% TODO && FIXME (EMQX-11216): QoS as ReasonCode(max granted QoS) for now
RC = QoS,
{RC, Channel#channel{session = NSession}};
{error, RC} ->
?SLOG(
warning,
@ -835,6 +786,30 @@ do_subscribe(
{RC, Channel}
end.
gen_reason_codes(TFChecked, TFSubedWitNhRC) ->
do_gen_reason_codes([], TFChecked, TFSubedWitNhRC).
%% Initial RC is `RC_SUCCESS | RC_NOT_AUTHORIZED`, generated by check_sub_authzs/2
%% And then TF with `RC_SUCCESS` will passing through `process_subscribe/2` and
%% NRC should override the initial RC.
do_gen_reason_codes(Acc, [], []) ->
lists:reverse(Acc);
do_gen_reason_codes(
Acc,
[{_TF, ?RC_SUCCESS} | RestCheckedTF],
[{_TF, NRC} | RestTFWithNRC]
) ->
%% will passing through `process_subscribe/2`
%% use NRC to override IintialRC
do_gen_reason_codes([NRC | Acc], RestCheckedTF, RestTFWithNRC);
do_gen_reason_codes(
Acc,
[{_TF, InitialRC} | RestChecked],
RestTFWithNRC
) ->
%% InitialRC is not `RC_SUCCESS`, use it.
do_gen_reason_codes([InitialRC | Acc], RestChecked, RestTFWithNRC).
%%--------------------------------------------------------------------
%% Process Unsubscribe
%%--------------------------------------------------------------------
@ -1213,13 +1188,8 @@ handle_call(Req, Channel) ->
ok | {ok, channel()} | {shutdown, Reason :: term(), channel()}.
handle_info({subscribe, TopicFilters}, Channel) ->
{_, NChannel} = lists:foldl(
fun({TopicFilter, SubOpts}, {_, ChannelAcc}) ->
do_subscribe(TopicFilter, SubOpts, ChannelAcc)
end,
{[], Channel},
parse_topic_filters(TopicFilters)
),
NTopicFilters = enrich_subscribe(TopicFilters, Channel),
{_TopicFiltersWithRC, NChannel} = process_subscribe(NTopicFilters, Channel),
{ok, NChannel};
handle_info({unsubscribe, TopicFilters}, Channel) ->
{_RC, NChannel} = process_unsubscribe(TopicFilters, #{}, Channel),
@ -1857,49 +1827,156 @@ check_pub_caps(
) ->
emqx_mqtt_caps:check_pub(Zone, #{qos => QoS, retain => Retain, topic => Topic}).
%%--------------------------------------------------------------------
%% Check Subscribe Packet
check_subscribe(SubPkt, _Channel) ->
case emqx_packet:check(SubPkt) of
ok -> ok;
{error, RC} -> {error, {disconnect, RC}}
end.
%%--------------------------------------------------------------------
%% Check Sub Authorization
check_sub_authzs(TopicFilters, Channel) ->
check_sub_authzs(TopicFilters, Channel, []).
check_sub_authzs(
[TopicFilter = {Topic, _} | More],
Channel = #channel{clientinfo = ClientInfo},
Acc
?SUBSCRIBE_PACKET(PacketId, SubProps, TopicFilters0),
Channel = #channel{clientinfo = ClientInfo}
) ->
CheckResult = do_check_sub_authzs(TopicFilters0, ClientInfo),
HasAuthzDeny = lists:any(
fun({{_TopicFilter, _SubOpts}, ReasonCode}) ->
ReasonCode =:= ?RC_NOT_AUTHORIZED
end,
CheckResult
),
DenyAction = emqx:get_config([authorization, deny_action], ignore),
case DenyAction =:= disconnect andalso HasAuthzDeny of
true ->
{error, {disconnect, ?RC_NOT_AUTHORIZED}, Channel};
false ->
{ok, ?SUBSCRIBE_PACKET(PacketId, SubProps, CheckResult), Channel}
end.
do_check_sub_authzs(TopicFilters, ClientInfo) ->
do_check_sub_authzs(ClientInfo, TopicFilters, []).
do_check_sub_authzs(_ClientInfo, [], Acc) ->
lists:reverse(Acc);
do_check_sub_authzs(ClientInfo, [TopicFilter = {Topic, _SubOpts} | More], Acc) ->
%% subsclibe authz check only cares the real topic filter when shared-sub
%% e.g. only check <<"t/#">> for <<"$share/g/t/#">>
Action = authz_action(TopicFilter),
case emqx_access_control:authorize(ClientInfo, Action, Topic) of
case
emqx_access_control:authorize(
ClientInfo,
Action,
emqx_topic:get_shared_real_topic(Topic)
)
of
%% TODO: support maximum QoS granted
%% MQTT-3.1.1 [MQTT-3.8.4-6] and MQTT-5.0 [MQTT-3.8.4-7]
%% Not implemented yet:
%% {allow, RC} -> do_check_sub_authzs(ClientInfo, More, [{TopicFilter, RC} | Acc]);
allow ->
check_sub_authzs(More, Channel, [{TopicFilter, ?RC_SUCCESS} | Acc]);
do_check_sub_authzs(ClientInfo, More, [{TopicFilter, ?RC_SUCCESS} | Acc]);
deny ->
check_sub_authzs(More, Channel, [{TopicFilter, ?RC_NOT_AUTHORIZED} | Acc])
end;
check_sub_authzs([], _Channel, Acc) ->
lists:reverse(Acc).
do_check_sub_authzs(ClientInfo, More, [{TopicFilter, ?RC_NOT_AUTHORIZED} | Acc])
end.
%%--------------------------------------------------------------------
%% Check Sub Caps
check_sub_caps(TopicFilter, SubOpts, #channel{clientinfo = ClientInfo}) ->
emqx_mqtt_caps:check_sub(ClientInfo, TopicFilter, SubOpts).
check_sub_caps(
?SUBSCRIBE_PACKET(PacketId, SubProps, TopicFilters),
Channel = #channel{clientinfo = ClientInfo}
) ->
CheckResult = do_check_sub_caps(ClientInfo, TopicFilters),
{ok, ?SUBSCRIBE_PACKET(PacketId, SubProps, CheckResult), Channel}.
do_check_sub_caps(ClientInfo, TopicFilters) ->
do_check_sub_caps(ClientInfo, TopicFilters, []).
do_check_sub_caps(_ClientInfo, [], Acc) ->
lists:reverse(Acc);
do_check_sub_caps(ClientInfo, [TopicFilter = {{Topic, SubOpts}, ?RC_SUCCESS} | More], Acc) ->
case emqx_mqtt_caps:check_sub(ClientInfo, Topic, SubOpts) of
ok ->
do_check_sub_caps(ClientInfo, More, [TopicFilter | Acc]);
{error, NRC} ->
?SLOG(
warning,
#{
msg => "cannot_subscribe_topic_filter",
reason => emqx_reason_codes:name(NRC)
},
#{topic => Topic}
),
do_check_sub_caps(ClientInfo, More, [{{Topic, SubOpts}, NRC} | Acc])
end;
do_check_sub_caps(ClientInfo, [TopicFilter = {{_Topic, _SubOpts}, _OtherRC} | More], Acc) ->
do_check_sub_caps(ClientInfo, More, [TopicFilter | Acc]).
%%--------------------------------------------------------------------
%% Enrich SubId
%% Run Subscribe Hooks
enrich_subopts_subid(#{'Subscription-Identifier' := SubId}, TopicFilters) ->
[{Topic, SubOpts#{subid => SubId}} || {Topic, SubOpts} <- TopicFilters];
enrich_subopts_subid(_Properties, TopicFilters) ->
TopicFilters.
run_sub_hooks(
?SUBSCRIBE_PACKET(_PacketId, Properties, TopicFilters0),
_Channel = #channel{clientinfo = ClientInfo}
) ->
TopicFilters = [
TopicFilter
|| {TopicFilter, ?RC_SUCCESS} <- TopicFilters0
],
_NTopicFilters = run_hooks('client.subscribe', [ClientInfo, Properties], TopicFilters).
%%--------------------------------------------------------------------
%% Enrich SubOpts
enrich_subopts(SubOpts, _Channel = ?IS_MQTT_V5) ->
SubOpts;
enrich_subopts(SubOpts, #channel{clientinfo = #{zone := Zone, is_bridge := IsBridge}}) ->
%% for api subscribe without sub-authz check and sub-caps check.
enrich_subscribe(TopicFilters, Channel) when is_list(TopicFilters) ->
do_enrich_subscribe(#{}, TopicFilters, Channel);
%% for mqtt clients sent subscribe packet.
enrich_subscribe(?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters), Channel) ->
NTopicFilters = do_enrich_subscribe(Properties, TopicFilters, Channel),
{ok, ?SUBSCRIBE_PACKET(PacketId, Properties, NTopicFilters), Channel}.
do_enrich_subscribe(Properties, TopicFilters, Channel) ->
_NTopicFilters = run_fold(
[
%% TODO: do try catch with reason code here
fun(TFs, _) -> parse_raw_topic_filters(TFs) end,
fun enrich_subopts_subid/2,
fun enrich_subopts_porps/2,
fun enrich_subopts_flags/2
],
TopicFilters,
#{sub_props => Properties, channel => Channel}
).
enrich_subopts_subid(TopicFilters, #{sub_props := #{'Subscription-Identifier' := SubId}}) ->
[{Topic, SubOpts#{subid => SubId}} || {Topic, SubOpts} <- TopicFilters];
enrich_subopts_subid(TopicFilters, _State) ->
TopicFilters.
enrich_subopts_porps(TopicFilters, #{sub_props := SubProps}) ->
[{Topic, SubOpts#{sub_props => SubProps}} || {Topic, SubOpts} <- TopicFilters].
enrich_subopts_flags(TopicFilters, #{channel := Channel}) ->
do_enrich_subopts_flags(TopicFilters, Channel).
do_enrich_subopts_flags(TopicFilters, ?IS_MQTT_V5) ->
[{Topic, merge_default_subopts(SubOpts)} || {Topic, SubOpts} <- TopicFilters];
do_enrich_subopts_flags(TopicFilters, #channel{clientinfo = #{zone := Zone, is_bridge := IsBridge}}) ->
Rap = flag(IsBridge),
NL = flag(get_mqtt_conf(Zone, ignore_loop_deliver)),
SubOpts#{rap => flag(IsBridge), nl => NL}.
[
{Topic, (merge_default_subopts(SubOpts))#{rap => Rap, nl => NL}}
|| {Topic, SubOpts} <- TopicFilters
].
merge_default_subopts(SubOpts) ->
maps:merge(?DEFAULT_SUBOPTS, SubOpts).
%%--------------------------------------------------------------------
%% Enrich ConnAck Caps
@ -2089,8 +2166,8 @@ maybe_shutdown(Reason, _Intent = shutdown, Channel) ->
%%--------------------------------------------------------------------
%% Parse Topic Filters
-compile({inline, [parse_topic_filters/1]}).
parse_topic_filters(TopicFilters) ->
%% [{<<"$share/group/topic">>, _SubOpts = #{}} | _]
parse_raw_topic_filters(TopicFilters) ->
lists:map(fun emqx_topic:parse/1, TopicFilters).
%%--------------------------------------------------------------------

View File

@ -17,6 +17,7 @@
-module(emqx_mountpoint).
-include("emqx.hrl").
-include("emqx_mqtt.hrl").
-include("emqx_placeholder.hrl").
-include("types.hrl").
@ -34,38 +35,54 @@
-spec mount(maybe(mountpoint()), Any) -> Any when
Any ::
emqx_types:topic()
| emqx_types:share()
| emqx_types:message()
| emqx_types:topic_filters().
mount(undefined, Any) ->
Any;
mount(MountPoint, Topic) when is_binary(Topic) ->
prefix(MountPoint, Topic);
mount(MountPoint, Msg = #message{topic = Topic}) ->
Msg#message{topic = prefix(MountPoint, Topic)};
mount(MountPoint, Topic) when ?IS_TOPIC(Topic) ->
prefix_maybe_share(MountPoint, Topic);
mount(MountPoint, Msg = #message{topic = Topic}) when is_binary(Topic) ->
Msg#message{topic = prefix_maybe_share(MountPoint, Topic)};
mount(MountPoint, TopicFilters) when is_list(TopicFilters) ->
[{prefix(MountPoint, Topic), SubOpts} || {Topic, SubOpts} <- TopicFilters].
[{prefix_maybe_share(MountPoint, Topic), SubOpts} || {Topic, SubOpts} <- TopicFilters].
%% @private
-compile({inline, [prefix/2]}).
prefix(MountPoint, Topic) ->
<<MountPoint/binary, Topic/binary>>.
-spec prefix_maybe_share(maybe(mountpoint()), Any) -> Any when
Any ::
emqx_types:topic()
| emqx_types:share().
prefix_maybe_share(MountPoint, Topic) when
is_binary(MountPoint) andalso is_binary(Topic)
->
<<MountPoint/binary, Topic/binary>>;
prefix_maybe_share(MountPoint, #share{group = Group, topic = Topic}) when
is_binary(MountPoint) andalso is_binary(Topic)
->
#share{group = Group, topic = prefix_maybe_share(MountPoint, Topic)}.
-spec unmount(maybe(mountpoint()), Any) -> Any when
Any ::
emqx_types:topic()
| emqx_types:share()
| emqx_types:message().
unmount(undefined, Any) ->
Any;
unmount(MountPoint, Topic) when is_binary(Topic) ->
unmount(MountPoint, Topic) when ?IS_TOPIC(Topic) ->
unmount_maybe_share(MountPoint, Topic);
unmount(MountPoint, Msg = #message{topic = Topic}) when is_binary(Topic) ->
Msg#message{topic = unmount_maybe_share(MountPoint, Topic)}.
unmount_maybe_share(MountPoint, Topic) when
is_binary(MountPoint) andalso is_binary(Topic)
->
case string:prefix(Topic, MountPoint) of
nomatch -> Topic;
Topic1 -> Topic1
end;
unmount(MountPoint, Msg = #message{topic = Topic}) ->
case string:prefix(Topic, MountPoint) of
nomatch -> Msg;
Topic1 -> Msg#message{topic = Topic1}
end.
unmount_maybe_share(MountPoint, TopicFilter = #share{topic = Topic}) when
is_binary(MountPoint) andalso is_binary(Topic)
->
TopicFilter#share{topic = unmount_maybe_share(MountPoint, Topic)}.
-spec replvar(maybe(mountpoint()), map()) -> maybe(mountpoint()).
replvar(undefined, _Vars) ->

View File

@ -102,16 +102,19 @@ do_check_pub(_Flags, _Caps) ->
-spec check_sub(
emqx_types:clientinfo(),
emqx_types:topic(),
emqx_types:topic() | emqx_types:share(),
emqx_types:subopts()
) ->
ok_or_error(emqx_types:reason_code()).
check_sub(ClientInfo = #{zone := Zone}, Topic, SubOpts) ->
Caps = emqx_config:get_zone_conf(Zone, [mqtt]),
Flags = #{
%% TODO: qos check
%% (max_qos_allowed, Map) ->
%% max_qos_allowed => maps:get(max_qos_allowed, Caps, 2),
topic_levels => emqx_topic:levels(Topic),
is_wildcard => emqx_topic:wildcard(Topic),
is_shared => maps:is_key(share, SubOpts),
is_shared => erlang:is_record(Topic, share),
is_exclusive => maps:get(is_exclusive, SubOpts, false)
},
do_check_sub(Flags, Caps, ClientInfo, Topic).
@ -126,13 +129,19 @@ do_check_sub(#{is_shared := true}, #{shared_subscription := false}, _, _) ->
{error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED};
do_check_sub(#{is_exclusive := true}, #{exclusive_subscription := false}, _, _) ->
{error, ?RC_TOPIC_FILTER_INVALID};
do_check_sub(#{is_exclusive := true}, #{exclusive_subscription := true}, ClientInfo, Topic) ->
do_check_sub(#{is_exclusive := true}, #{exclusive_subscription := true}, ClientInfo, Topic) when
is_binary(Topic)
->
case emqx_exclusive_subscription:check_subscribe(ClientInfo, Topic) of
deny ->
{error, ?RC_QUOTA_EXCEEDED};
_ ->
ok
end;
%% for max_qos_allowed
%% see: RC_GRANTED_QOS_0, RC_GRANTED_QOS_1, RC_GRANTED_QOS_2
%% do_check_sub(_, _) ->
%% {ok, RC};
do_check_sub(_Flags, _Caps, _, _) ->
ok.

View File

@ -177,6 +177,7 @@ compat(connack, 16#9D) -> ?CONNACK_SERVER;
compat(connack, 16#9F) -> ?CONNACK_SERVER;
compat(suback, Code) when Code =< ?QOS_2 -> Code;
compat(suback, Code) when Code >= 16#80 -> 16#80;
%% TODO: 16#80(qos0) 16#81(qos1) 16#82(qos2) for mqtt-v3.1.1
compat(unsuback, _Code) -> undefined;
compat(_Other, _Code) -> undefined.

View File

@ -259,7 +259,7 @@ destroy(Session) ->
-spec subscribe(
clientinfo(),
emqx_types:topic(),
emqx_types:topic() | emqx_types:share(),
emqx_types:subopts(),
t()
) ->
@ -279,7 +279,7 @@ subscribe(ClientInfo, TopicFilter, SubOpts, Session) ->
-spec unsubscribe(
clientinfo(),
emqx_types:topic(),
emqx_types:topic() | emqx_types:share(),
emqx_types:subopts(),
t()
) ->
@ -409,6 +409,16 @@ enrich_delivers(ClientInfo, [D | Rest], UpgradeQoS, Session) ->
[Msg | enrich_delivers(ClientInfo, Rest, UpgradeQoS, Session)]
end.
enrich_deliver(
ClientInfo,
{deliver, Topic, Msg = #message{headers = #{redispatch_to := {Group, Topic}}}},
UpgradeQoS,
Session
) ->
%% Only QoS_1 and QoS_2 messages added `redispatch_to` header
%% For QoS 0 message, send it as regular dispatch
Deliver = {deliver, emqx_topic:make_shared_record(Group, Topic), Msg},
enrich_deliver(ClientInfo, Deliver, UpgradeQoS, Session);
enrich_deliver(ClientInfo, {deliver, Topic, Msg}, UpgradeQoS, Session) ->
SubOpts = ?IMPL(Session):get_subscription(Topic, Session),
enrich_message(ClientInfo, Msg, SubOpts, UpgradeQoS).

View File

@ -314,7 +314,7 @@ unsubscribe(
{error, ?RC_NO_SUBSCRIPTION_EXISTED}
end.
-spec get_subscription(emqx_types:topic(), session()) ->
-spec get_subscription(emqx_types:topic() | emqx_types:share(), session()) ->
emqx_types:subopts() | undefined.
get_subscription(Topic, #session{subscriptions = Subs}) ->
maps:get(Topic, Subs, undefined).

View File

@ -36,9 +36,16 @@
parse/2
]).
-export([
maybe_format_share/1,
get_shared_real_topic/1,
make_shared_record/2
]).
-type topic() :: emqx_types:topic().
-type word() :: emqx_types:word().
-type words() :: emqx_types:words().
-type share() :: emqx_types:share().
%% Guards
-define(MULTI_LEVEL_WILDCARD_NOT_LAST(C, REST),
@ -50,7 +57,9 @@
%%--------------------------------------------------------------------
%% @doc Is wildcard topic?
-spec wildcard(topic() | words()) -> true | false.
-spec wildcard(topic() | share() | words()) -> true | false.
wildcard(#share{topic = Topic}) when is_binary(Topic) ->
wildcard(Topic);
wildcard(Topic) when is_binary(Topic) ->
wildcard(words(Topic));
wildcard([]) ->
@ -64,7 +73,7 @@ wildcard([_H | T]) ->
%% @doc Match Topic name with filter.
-spec match(Name, Filter) -> boolean() when
Name :: topic() | words(),
Name :: topic() | share() | words(),
Filter :: topic() | words().
match(<<$$, _/binary>>, <<$+, _/binary>>) ->
false;
@ -72,6 +81,8 @@ match(<<$$, _/binary>>, <<$#, _/binary>>) ->
false;
match(Name, Filter) when is_binary(Name), is_binary(Filter) ->
match(words(Name), words(Filter));
match(#share{} = Name, Filter) ->
match_share(Name, Filter);
match([], []) ->
true;
match([H | T1], [H | T2]) ->
@ -87,12 +98,26 @@ match([_H1 | _], []) ->
match([], [_H | _T2]) ->
false.
-spec match_share(Name, Filter) -> boolean() when
Name :: share(),
Filter :: topic() | share().
match_share(#share{topic = Name}, Filter) when is_binary(Filter) ->
%% only match real topic filter for normal topic filter.
match(words(Name), words(Filter));
match_share(#share{group = Group, topic = Name}, #share{group = Group, topic = Filter}) ->
%% Matching real topic filter When subed same share group.
match(words(Name), words(Filter));
match_share(#share{}, _) ->
%% Otherwise, non-matched.
false.
-spec match_any(Name, [Filter]) -> boolean() when
Name :: topic() | words(),
Filter :: topic() | words().
match_any(Topic, Filters) ->
lists:any(fun(Filter) -> match(Topic, Filter) end, Filters).
%% TODO: validate share topic #share{} for emqx_trace.erl
%% @doc Validate topic name or filter
-spec validate(topic() | {name | filter, topic()}) -> true.
validate(Topic) when is_binary(Topic) ->
@ -107,7 +132,7 @@ validate(_, <<>>) ->
validate(_, Topic) when is_binary(Topic) andalso (size(Topic) > ?MAX_TOPIC_LEN) ->
%% MQTT-5.0 [MQTT-4.7.3-3]
error(topic_too_long);
validate(filter, SharedFilter = <<"$share/", _Rest/binary>>) ->
validate(filter, SharedFilter = <<?SHARE, "/", _Rest/binary>>) ->
validate_share(SharedFilter);
validate(filter, Filter) when is_binary(Filter) ->
validate2(words(Filter));
@ -139,12 +164,12 @@ validate3(<<C/utf8, _Rest/binary>>) when C == $#; C == $+; C == 0 ->
validate3(<<_/utf8, Rest/binary>>) ->
validate3(Rest).
validate_share(<<"$share/", Rest/binary>>) when
validate_share(<<?SHARE, "/", Rest/binary>>) when
Rest =:= <<>> orelse Rest =:= <<"/">>
->
%% MQTT-5.0 [MQTT-4.8.2-1]
error(?SHARE_EMPTY_FILTER);
validate_share(<<"$share/", Rest/binary>>) ->
validate_share(<<?SHARE, "/", Rest/binary>>) ->
case binary:split(Rest, <<"/">>) of
%% MQTT-5.0 [MQTT-4.8.2-1]
[<<>>, _] ->
@ -156,7 +181,7 @@ validate_share(<<"$share/", Rest/binary>>) ->
validate_share(ShareName, Filter)
end.
validate_share(_, <<"$share/", _Rest/binary>>) ->
validate_share(_, <<?SHARE, "/", _Rest/binary>>) ->
error(?SHARE_RECURSIVELY);
validate_share(ShareName, Filter) ->
case binary:match(ShareName, [<<"+">>, <<"#">>]) of
@ -185,7 +210,9 @@ bin('#') -> <<"#">>;
bin(B) when is_binary(B) -> B;
bin(L) when is_list(L) -> list_to_binary(L).
-spec levels(topic()) -> pos_integer().
-spec levels(topic() | share()) -> pos_integer().
levels(#share{topic = Topic}) when is_binary(Topic) ->
levels(Topic);
levels(Topic) when is_binary(Topic) ->
length(tokens(Topic)).
@ -197,6 +224,8 @@ tokens(Topic) ->
%% @doc Split Topic Path to Words
-spec words(topic()) -> words().
words(#share{topic = Topic}) when is_binary(Topic) ->
words(Topic);
words(Topic) when is_binary(Topic) ->
[word(W) || W <- tokens(Topic)].
@ -237,26 +266,29 @@ do_join(_TopicAcc, [C | Words]) when ?MULTI_LEVEL_WILDCARD_NOT_LAST(C, Words) ->
do_join(TopicAcc, [Word | Words]) ->
do_join(<<TopicAcc/binary, "/", (bin(Word))/binary>>, Words).
-spec parse(topic() | {topic(), map()}) -> {topic(), #{share => binary()}}.
-spec parse(topic() | {topic(), map()}) -> {topic() | share(), map()}.
parse(TopicFilter) when is_binary(TopicFilter) ->
parse(TopicFilter, #{});
parse({TopicFilter, Options}) when is_binary(TopicFilter) ->
parse(TopicFilter, Options).
-spec parse(topic(), map()) -> {topic(), map()}.
parse(TopicFilter = <<"$queue/", _/binary>>, #{share := _Group}) ->
error({invalid_topic_filter, TopicFilter});
parse(TopicFilter = <<"$share/", _/binary>>, #{share := _Group}) ->
error({invalid_topic_filter, TopicFilter});
parse(<<"$queue/", TopicFilter/binary>>, Options) ->
parse(TopicFilter, Options#{share => <<"$queue">>});
parse(TopicFilter = <<"$share/", Rest/binary>>, Options) ->
-spec parse(topic() | share(), map()) -> {topic() | share(), map()}.
%% <<"$queue/[real_topic_filter]>">> equivalent to <<"$share/$queue/[real_topic_filter]">>
%% So the head of `real_topic_filter` MUST NOT be `<<$queue>>` or `<<$share>>`
parse(#share{topic = Topic = <<?QUEUE, "/", _/binary>>}, _Options) ->
error({invalid_topic_filter, Topic});
parse(#share{topic = Topic = <<?SHARE, "/", _/binary>>}, _Options) ->
error({invalid_topic_filter, Topic});
parse(<<?QUEUE, "/", Topic/binary>>, Options) ->
parse(#share{group = <<?QUEUE>>, topic = Topic}, Options);
parse(TopicFilter = <<?SHARE, "/", Rest/binary>>, Options) ->
case binary:split(Rest, <<"/">>) of
[_Any] ->
error({invalid_topic_filter, TopicFilter});
[ShareName, Filter] ->
case binary:match(ShareName, [<<"+">>, <<"#">>]) of
nomatch -> parse(Filter, Options#{share => ShareName});
%% `Group` could be `$share` or `$queue`
[Group, Topic] ->
case binary:match(Group, [<<"+">>, <<"#">>]) of
nomatch -> parse(#share{group = Group, topic = Topic}, Options);
_ -> error({invalid_topic_filter, TopicFilter})
end
end;
@ -267,5 +299,22 @@ parse(TopicFilter = <<"$exclusive/", Topic/binary>>, Options) ->
_ ->
{Topic, Options#{is_exclusive => true}}
end;
parse(TopicFilter, Options) ->
parse(TopicFilter, Options) when
?IS_TOPIC(TopicFilter)
->
{TopicFilter, Options}.
get_shared_real_topic(#share{topic = TopicFilter}) ->
TopicFilter;
get_shared_real_topic(TopicFilter) when is_binary(TopicFilter) ->
TopicFilter.
make_shared_record(Group, Topic) ->
#share{group = Group, topic = Topic}.
maybe_format_share(#share{group = <<?QUEUE>>, topic = Topic}) ->
join([<<?QUEUE>>, Topic]);
maybe_format_share(#share{group = Group, topic = Topic}) ->
join([<<?SHARE>>, Group, Topic]);
maybe_format_share(Topic) ->
join([Topic]).

View File

@ -40,6 +40,10 @@
words/0
]).
-export_type([
share/0
]).
-export_type([
socktype/0,
sockstate/0,
@ -136,11 +140,14 @@
-type subid() :: binary() | atom().
-type group() :: binary() | undefined.
%% '_' for match spec
-type group() :: binary() | '_'.
-type topic() :: binary().
-type word() :: '' | '+' | '#' | binary().
-type words() :: list(word()).
-type share() :: #share{}.
-type socktype() :: tcp | udp | ssl | proxy | atom().
-type sockstate() :: idle | running | blocked | closed.
-type conninfo() :: #{

View File

@ -299,7 +299,9 @@ t_nosub_pub(Config) when is_list(Config) ->
?assertEqual(1, emqx_metrics:val('messages.dropped')).
t_shared_subscribe({init, Config}) ->
emqx_broker:subscribe(<<"topic">>, <<"clientid">>, #{share => <<"group">>}),
emqx_broker:subscribe(
emqx_topic:make_shared_record(<<"group">>, <<"topic">>), <<"clientid">>, #{}
),
ct:sleep(100),
Config;
t_shared_subscribe(Config) when is_list(Config) ->
@ -316,7 +318,7 @@ t_shared_subscribe(Config) when is_list(Config) ->
end
);
t_shared_subscribe({'end', _Config}) ->
emqx_broker:unsubscribe(<<"$share/group/topic">>).
emqx_broker:unsubscribe(emqx_topic:make_shared_record(<<"group">>, <<"topic">>)).
t_shared_subscribe_2({init, Config}) ->
Config;
@ -723,24 +725,6 @@ t_connack_auth_error(Config) when is_list(Config) ->
?assertEqual(2, emqx_metrics:val('packets.connack.auth_error')),
ok.
t_handle_in_empty_client_subscribe_hook({init, Config}) ->
Hook = {?MODULE, client_subscribe_delete_all_hook, []},
ok = emqx_hooks:put('client.subscribe', Hook, _Priority = 100),
Config;
t_handle_in_empty_client_subscribe_hook({'end', _Config}) ->
emqx_hooks:del('client.subscribe', {?MODULE, client_subscribe_delete_all_hook}),
ok;
t_handle_in_empty_client_subscribe_hook(Config) when is_list(Config) ->
{ok, C} = emqtt:start_link(),
{ok, _} = emqtt:connect(C),
try
{ok, _, RCs} = emqtt:subscribe(C, <<"t">>),
?assertEqual([?RC_UNSPECIFIED_ERROR], RCs),
ok
after
emqtt:disconnect(C)
end.
authenticate_deny(_Credentials, _Default) ->
{stop, {error, bad_username_or_password}}.
@ -800,7 +784,3 @@ recv_msgs(Count, Msgs) ->
after 100 ->
Msgs
end.
client_subscribe_delete_all_hook(_ClientInfo, _Username, TopicFilter) ->
EmptyFilters = [{T, Opts#{deny_subscription => true}} || {T, Opts} <- TopicFilter],
{stop, EmptyFilters}.

View File

@ -456,7 +456,7 @@ t_process_subscribe(_) ->
ok = meck:expect(emqx_session, subscribe, fun(_, _, _, Session) -> {ok, Session} end),
TopicFilters = [TopicFilter = {<<"+">>, ?DEFAULT_SUBOPTS}],
{[{TopicFilter, ?RC_SUCCESS}], _Channel} =
emqx_channel:process_subscribe(TopicFilters, #{}, channel()).
emqx_channel:process_subscribe(TopicFilters, channel()).
t_process_unsubscribe(_) ->
ok = meck:expect(emqx_session, unsubscribe, fun(_, _, _, Session) -> {ok, Session} end),
@ -914,7 +914,13 @@ t_check_pub_alias(_) ->
t_check_sub_authzs(_) ->
emqx_config:put_zone_conf(default, [authorization, enable], true),
TopicFilter = {<<"t">>, ?DEFAULT_SUBOPTS},
[{TopicFilter, 0}] = emqx_channel:check_sub_authzs([TopicFilter], channel()).
SubPkt = ?SUBSCRIBE_PACKET(1, #{}, [TopicFilter]),
CheckedSubPkt = ?SUBSCRIBE_PACKET(1, #{}, [{TopicFilter, ?RC_SUCCESS}]),
Channel = channel(),
?assertEqual(
{ok, CheckedSubPkt, Channel},
emqx_channel:check_sub_authzs(SubPkt, Channel)
).
t_enrich_connack_caps(_) ->
ok = meck:new(emqx_mqtt_caps, [passthrough, no_history]),
@ -1061,6 +1067,7 @@ clientinfo(InitProps) ->
clientid => <<"clientid">>,
username => <<"username">>,
is_superuser => false,
is_bridge => false,
mountpoint => undefined
},
InitProps

View File

@ -29,6 +29,7 @@
).
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("eunit/include/eunit.hrl").
all() -> emqx_common_test_helpers:all(?MODULE).
@ -52,6 +53,27 @@ t_mount(_) ->
mount(<<"device/1/">>, TopicFilters)
).
t_mount_share(_) ->
T = {TopicFilter, Opts} = emqx_topic:parse(<<"$share/group/topic">>),
TopicFilters = [T],
?assertEqual(TopicFilter, #share{group = <<"group">>, topic = <<"topic">>}),
%% should not mount share topic when make message.
Msg = emqx_message:make(<<"clientid">>, TopicFilter, <<"payload">>),
?assertEqual(
TopicFilter,
mount(undefined, TopicFilter)
),
?assertEqual(
#share{group = <<"group">>, topic = <<"device/1/topic">>},
mount(<<"device/1/">>, TopicFilter)
),
?assertEqual(
[{#share{group = <<"group">>, topic = <<"device/1/topic">>}, Opts}],
mount(<<"device/1/">>, TopicFilters)
).
t_unmount(_) ->
Msg = emqx_message:make(<<"clientid">>, <<"device/1/topic">>, <<"payload">>),
?assertEqual(<<"topic">>, unmount(undefined, <<"topic">>)),
@ -61,6 +83,23 @@ t_unmount(_) ->
?assertEqual(<<"device/1/topic">>, unmount(<<"device/2/">>, <<"device/1/topic">>)),
?assertEqual(Msg#message{topic = <<"device/1/topic">>}, unmount(<<"device/2/">>, Msg)).
t_unmount_share(_) ->
{TopicFilter, _Opts} = emqx_topic:parse(<<"$share/group/topic">>),
MountedTopicFilter = #share{group = <<"group">>, topic = <<"device/1/topic">>},
?assertEqual(TopicFilter, #share{group = <<"group">>, topic = <<"topic">>}),
%% should not unmount share topic when make message.
Msg = emqx_message:make(<<"clientid">>, TopicFilter, <<"payload">>),
?assertEqual(
TopicFilter,
unmount(undefined, TopicFilter)
),
?assertEqual(
#share{group = <<"group">>, topic = <<"topic">>},
unmount(<<"device/1/">>, MountedTopicFilter)
).
t_replvar(_) ->
?assertEqual(undefined, replvar(undefined, #{})),
?assertEqual(

View File

@ -76,6 +76,8 @@ t_check_sub(_) ->
),
?assertEqual(
{error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED},
emqx_mqtt_caps:check_sub(ClientInfo, <<"topic">>, SubOpts#{share => true})
emqx_mqtt_caps:check_sub(
ClientInfo, #share{group = <<"group">>, topic = <<"topic">>}, SubOpts
)
),
emqx_config:put([zones], OldConf).

View File

@ -137,7 +137,8 @@ t_random_basic(Config) when is_list(Config) ->
ClientId = <<"ClientId">>,
Topic = <<"foo">>,
Payload = <<"hello">>,
emqx:subscribe(Topic, #{qos => 2, share => <<"group1">>}),
Group = <<"group1">>,
emqx_broker:subscribe(emqx_topic:make_shared_record(Group, Topic), #{qos => 2}),
MsgQoS2 = emqx_message:make(ClientId, 2, Topic, Payload),
%% wait for the subscription to show up
ct:sleep(200),
@ -402,7 +403,7 @@ t_hash(Config) when is_list(Config) ->
ok = ensure_config(hash_clientid, false),
test_two_messages(hash_clientid).
t_hash_clinetid(Config) when is_list(Config) ->
t_hash_clientid(Config) when is_list(Config) ->
ok = ensure_config(hash_clientid, false),
test_two_messages(hash_clientid).
@ -528,14 +529,15 @@ last_message(ExpectedPayload, Pids, Timeout) ->
t_dispatch(Config) when is_list(Config) ->
ok = ensure_config(random),
Topic = <<"foo">>,
Group = <<"group1">>,
?assertEqual(
{error, no_subscribers},
emqx_shared_sub:dispatch(<<"group1">>, Topic, #delivery{message = #message{}})
emqx_shared_sub:dispatch(Group, Topic, #delivery{message = #message{}})
),
emqx:subscribe(Topic, #{qos => 2, share => <<"group1">>}),
emqx_broker:subscribe(emqx_topic:make_shared_record(Group, Topic), #{qos => 2}),
?assertEqual(
{ok, 1},
emqx_shared_sub:dispatch(<<"group1">>, Topic, #delivery{message = #message{}})
emqx_shared_sub:dispatch(Group, Topic, #delivery{message = #message{}})
).
t_uncovered_func(Config) when is_list(Config) ->
@ -991,37 +993,110 @@ t_session_kicked(Config) when is_list(Config) ->
?assertEqual([], collect_msgs(0)),
ok.
%% FIXME: currently doesn't work
%% t_different_groups_same_topic({init, Config}) ->
%% TestName = atom_to_binary(?FUNCTION_NAME),
%% ClientId = <<TestName/binary, (integer_to_binary(erlang:unique_integer()))/binary>>,
%% {ok, C} = emqtt:start_link([{clientid, ClientId}, {proto_ver, v5}]),
%% {ok, _} = emqtt:connect(C),
%% [{client, C}, {clientid, ClientId} | Config];
%% t_different_groups_same_topic({'end', Config}) ->
%% C = ?config(client, Config),
%% emqtt:stop(C),
%% ok;
%% t_different_groups_same_topic(Config) when is_list(Config) ->
%% C = ?config(client, Config),
%% ClientId = ?config(clientid, Config),
%% %% Subscribe and unsubscribe to both $queue and $shared topics
%% Topic = <<"t/1">>,
%% SharedTopic0 = <<"$share/aa/", Topic/binary>>,
%% SharedTopic1 = <<"$share/bb/", Topic/binary>>,
%% {ok, _, [2]} = emqtt:subscribe(C, {SharedTopic0, 2}),
%% {ok, _, [2]} = emqtt:subscribe(C, {SharedTopic1, 2}),
-define(UPDATE_SUB_QOS(ConnPid, Topic, QoS),
?assertMatch({ok, _, [QoS]}, emqtt:subscribe(ConnPid, {Topic, QoS}))
).
%% Message0 = emqx_message:make(ClientId, _QoS = 2, Topic, <<"hi">>),
%% emqx:publish(Message0),
%% ?assertMatch([ {publish, #{payload := <<"hi">>}}
%% , {publish, #{payload := <<"hi">>}}
%% ], collect_msgs(5_000), #{routes => ets:tab2list(emqx_route)}),
t_different_groups_same_topic({init, Config}) ->
TestName = atom_to_binary(?FUNCTION_NAME),
ClientId = <<TestName/binary, (integer_to_binary(erlang:unique_integer()))/binary>>,
{ok, C} = emqtt:start_link([{clientid, ClientId}, {proto_ver, v5}]),
{ok, _} = emqtt:connect(C),
[{client, C}, {clientid, ClientId} | Config];
t_different_groups_same_topic({'end', Config}) ->
C = ?config(client, Config),
emqtt:stop(C),
ok;
t_different_groups_same_topic(Config) when is_list(Config) ->
C = ?config(client, Config),
ClientId = ?config(clientid, Config),
%% Subscribe and unsubscribe to different group `aa` and `bb` with same topic
GroupA = <<"aa">>,
GroupB = <<"bb">>,
Topic = <<"t/1">>,
%% {ok, _, [0]} = emqtt:unsubscribe(C, SharedTopic0),
%% {ok, _, [0]} = emqtt:unsubscribe(C, SharedTopic1),
SharedTopicGroupA = ?SHARE(GroupA, Topic),
?UPDATE_SUB_QOS(C, SharedTopicGroupA, ?QOS_2),
SharedTopicGroupB = ?SHARE(GroupB, Topic),
?UPDATE_SUB_QOS(C, SharedTopicGroupB, ?QOS_2),
%% ok.
?retry(
_Sleep0 = 100,
_Attempts0 = 50,
begin
?assertEqual(2, length(emqx_router:match_routes(Topic)))
end
),
Message0 = emqx_message:make(ClientId, ?QOS_2, Topic, <<"hi">>),
emqx:publish(Message0),
?assertMatch(
[
{publish, #{payload := <<"hi">>}},
{publish, #{payload := <<"hi">>}}
],
collect_msgs(5_000),
#{routes => ets:tab2list(emqx_route)}
),
{ok, _, [?RC_SUCCESS]} = emqtt:unsubscribe(C, SharedTopicGroupA),
{ok, _, [?RC_SUCCESS]} = emqtt:unsubscribe(C, SharedTopicGroupB),
ok.
t_different_groups_update_subopts({init, Config}) ->
TestName = atom_to_binary(?FUNCTION_NAME),
ClientId = <<TestName/binary, (integer_to_binary(erlang:unique_integer()))/binary>>,
{ok, C} = emqtt:start_link([{clientid, ClientId}, {proto_ver, v5}]),
{ok, _} = emqtt:connect(C),
[{client, C}, {clientid, ClientId} | Config];
t_different_groups_update_subopts({'end', Config}) ->
C = ?config(client, Config),
emqtt:stop(C),
ok;
t_different_groups_update_subopts(Config) when is_list(Config) ->
C = ?config(client, Config),
ClientId = ?config(clientid, Config),
%% Subscribe and unsubscribe to different group `aa` and `bb` with same topic
Topic = <<"t/1">>,
GroupA = <<"aa">>,
GroupB = <<"bb">>,
SharedTopicGroupA = ?SHARE(GroupA, Topic),
SharedTopicGroupB = ?SHARE(GroupB, Topic),
Fun = fun(Group, QoS) ->
?UPDATE_SUB_QOS(C, ?SHARE(Group, Topic), QoS),
?assertMatch(
#{qos := QoS},
emqx_broker:get_subopts(ClientId, emqx_topic:make_shared_record(Group, Topic))
)
end,
[Fun(Group, QoS) || QoS <- [?QOS_0, ?QOS_1, ?QOS_2], Group <- [GroupA, GroupB]],
?retry(
_Sleep0 = 100,
_Attempts0 = 50,
begin
?assertEqual(2, length(emqx_router:match_routes(Topic)))
end
),
Message0 = emqx_message:make(ClientId, _QoS = 2, Topic, <<"hi">>),
emqx:publish(Message0),
?assertMatch(
[
{publish, #{payload := <<"hi">>}},
{publish, #{payload := <<"hi">>}}
],
collect_msgs(5_000),
#{routes => ets:tab2list(emqx_route)}
),
{ok, _, [?RC_SUCCESS]} = emqtt:unsubscribe(C, SharedTopicGroupA),
{ok, _, [?RC_SUCCESS]} = emqtt:unsubscribe(C, SharedTopicGroupB),
ok.
t_queue_subscription({init, Config}) ->
TestName = atom_to_binary(?FUNCTION_NAME),
@ -1038,23 +1113,19 @@ t_queue_subscription({'end', Config}) ->
t_queue_subscription(Config) when is_list(Config) ->
C = ?config(client, Config),
ClientId = ?config(clientid, Config),
%% Subscribe and unsubscribe to both $queue and $shared topics
%% Subscribe and unsubscribe to both $queue share and $share/<group> with same topic
Topic = <<"t/1">>,
QueueTopic = <<"$queue/", Topic/binary>>,
SharedTopic = <<"$share/aa/", Topic/binary>>,
{ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(C, {QueueTopic, 2}),
{ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(C, {SharedTopic, 2}),
%% FIXME: we should actually see 2 routes, one for each group
%% ($queue and aa), but currently the latest subscription
%% overwrites the existing one.
?UPDATE_SUB_QOS(C, QueueTopic, ?QOS_2),
?UPDATE_SUB_QOS(C, SharedTopic, ?QOS_2),
?retry(
_Sleep0 = 100,
_Attempts0 = 50,
begin
ct:pal("routes: ~p", [ets:tab2list(emqx_route)]),
%% FIXME: should ensure we have 2 subscriptions
[_] = emqx_router:lookup_routes(Topic)
?assertEqual(2, length(emqx_router:match_routes(Topic)))
end
),
@ -1063,37 +1134,29 @@ t_queue_subscription(Config) when is_list(Config) ->
emqx:publish(Message0),
?assertMatch(
[
{publish, #{payload := <<"hi">>}},
{publish, #{payload := <<"hi">>}}
%% FIXME: should receive one message from each group
%% , {publish, #{payload := <<"hi">>}}
],
collect_msgs(5_000)
collect_msgs(5_000),
#{routes => ets:tab2list(emqx_route)}
),
{ok, _, [?RC_SUCCESS]} = emqtt:unsubscribe(C, QueueTopic),
%% FIXME: return code should be success instead of 17 ("no_subscription_existed")
{ok, _, [?RC_NO_SUBSCRIPTION_EXISTED]} = emqtt:unsubscribe(C, SharedTopic),
{ok, _, [?RC_SUCCESS]} = emqtt:unsubscribe(C, SharedTopic),
%% FIXME: this should eventually be true, but currently we leak
%% the previous group subscription...
%% ?retry(
%% _Sleep0 = 100,
%% _Attempts0 = 50,
%% begin
%% ct:pal("routes: ~p", [ets:tab2list(emqx_route)]),
%% [] = emqx_router:lookup_routes(Topic)
%% end
%% ),
?retry(
_Sleep0 = 100,
_Attempts0 = 50,
begin
?assertEqual(0, length(emqx_router:match_routes(Topic)))
end
),
ct:sleep(500),
Message1 = emqx_message:make(ClientId, _QoS = 2, Topic, <<"hello">>),
emqx:publish(Message1),
%% FIXME: we should *not* receive any messages...
%% ?assertEqual([], collect_msgs(1_000), #{routes => ets:tab2list(emqx_route)}),
%% This is from the leaked group...
?assertMatch([{publish, #{topic := Topic}}], collect_msgs(1_000), #{
routes => ets:tab2list(emqx_route)
}),
%% we should *not* receive any messages.
?assertEqual([], collect_msgs(1_000), #{routes => ets:tab2list(emqx_route)}),
ok.

View File

@ -238,11 +238,11 @@ long_topic() ->
t_parse(_) ->
?assertError(
{invalid_topic_filter, <<"$queue/t">>},
parse(<<"$queue/t">>, #{share => <<"g">>})
parse(#share{group = <<"$queue">>, topic = <<"$queue/t">>}, #{})
),
?assertError(
{invalid_topic_filter, <<"$share/g/t">>},
parse(<<"$share/g/t">>, #{share => <<"g">>})
parse(#share{group = <<"g">>, topic = <<"$share/g/t">>}, #{})
),
?assertError(
{invalid_topic_filter, <<"$share/t">>},
@ -254,8 +254,12 @@ t_parse(_) ->
),
?assertEqual({<<"a/b/+/#">>, #{}}, parse(<<"a/b/+/#">>)),
?assertEqual({<<"a/b/+/#">>, #{qos => 1}}, parse({<<"a/b/+/#">>, #{qos => 1}})),
?assertEqual({<<"topic">>, #{share => <<"$queue">>}}, parse(<<"$queue/topic">>)),
?assertEqual({<<"topic">>, #{share => <<"group">>}}, parse(<<"$share/group/topic">>)),
?assertEqual(
{#share{group = <<"$queue">>, topic = <<"topic">>}, #{}}, parse(<<"$queue/topic">>)
),
?assertEqual(
{#share{group = <<"group">>, topic = <<"topic">>}, #{}}, parse(<<"$share/group/topic">>)
),
%% The '$local' and '$fastlane' topics have been deprecated.
?assertEqual({<<"$local/topic">>, #{}}, parse(<<"$local/topic">>)),
?assertEqual({<<"$local/$queue/topic">>, #{}}, parse(<<"$local/$queue/topic">>)),

View File

@ -96,7 +96,7 @@ choose_ingress_pool_size(
#{remote := #{topic := RemoteTopic}, pool_size := PoolSize}
) ->
case emqx_topic:parse(RemoteTopic) of
{_Filter, #{share := _Name}} ->
{#share{} = _Filter, _SubOpts} ->
% NOTE: this is shared subscription, many workers may subscribe
PoolSize;
{_Filter, #{}} when PoolSize > 1 ->

View File

@ -143,7 +143,7 @@ on_client_authorize(ClientInfo, Action, Topic, Result) ->
Req = #{
clientinfo => clientinfo(ClientInfo),
type => Type,
topic => Topic,
topic => emqx_topic:maybe_format_share(Topic),
result => Bool
},
case
@ -191,7 +191,7 @@ on_session_created(ClientInfo, _SessInfo) ->
on_session_subscribed(ClientInfo, Topic, SubOpts) ->
Req = #{
clientinfo => clientinfo(ClientInfo),
topic => Topic,
topic => emqx_topic:maybe_format_share(Topic),
subopts => maps:with([qos, share, rh, rap, nl], SubOpts)
},
cast('session.subscribed', Req).
@ -199,7 +199,7 @@ on_session_subscribed(ClientInfo, Topic, SubOpts) ->
on_session_unsubscribed(ClientInfo, Topic, _SubOpts) ->
Req = #{
clientinfo => clientinfo(ClientInfo),
topic => Topic
topic => emqx_topic:maybe_format_share(Topic)
},
cast('session.unsubscribed', Req).
@ -413,7 +413,13 @@ enrich_header(Headers, Message) ->
end.
topicfilters(Tfs) when is_list(Tfs) ->
[#{name => Topic, qos => Qos} || {Topic, #{qos := Qos}} <- Tfs].
GetQos = fun(SubOpts) ->
maps:get(qos, SubOpts, 0)
end,
[
#{name => emqx_topic:maybe_format_share(Topic), qos => GetQos(SubOpts)}
|| {Topic, SubOpts} <- Tfs
].
ntoa({0, 0, 0, 0, 0, 16#ffff, AB, CD}) ->
list_to_binary(inet_parse:ntoa({AB bsr 8, AB rem 256, CD bsr 8, CD rem 256}));

View File

@ -547,6 +547,7 @@ subopts(SubOpts) ->
rh => maps:get(rh, SubOpts, 0),
rap => maps:get(rap, SubOpts, 0),
nl => maps:get(nl, SubOpts, 0),
%% TOOD: FIXME for share-sub refactored
share => maps:get(share, SubOpts, <<>>)
}.

View File

@ -629,15 +629,8 @@ subscriptions(get, #{bindings := #{clientid := ClientID}}) ->
{200, []};
{Node, Subs} ->
Formatter =
fun({Topic, SubOpts}) ->
maps:merge(
#{
node => Node,
clientid => ClientID,
topic => Topic
},
maps:with([qos, nl, rap, rh], SubOpts)
)
fun(_Sub = {Topic, SubOpts}) ->
emqx_mgmt_api_subscriptions:format(Node, {{Topic, ClientID}, SubOpts})
end,
{200, lists:map(Formatter, Subs)}
end.

View File

@ -20,6 +20,7 @@
-include_lib("typerefl/include/types.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_router.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("hocon/include/hoconsc.hrl").
@ -38,8 +39,6 @@
format/2
]).
-define(SUBS_QTABLE, emqx_suboption).
-define(SUBS_QSCHEMA, [
{<<"clientid">>, binary},
{<<"topic">>, binary},
@ -146,7 +145,7 @@ subscriptions(get, #{query_string := QString}) ->
case maps:get(<<"node">>, QString, undefined) of
undefined ->
emqx_mgmt_api:cluster_query(
?SUBS_QTABLE,
?SUBOPTION,
QString,
?SUBS_QSCHEMA,
fun ?MODULE:qs2ms/2,
@ -157,7 +156,7 @@ subscriptions(get, #{query_string := QString}) ->
{ok, Node1} ->
emqx_mgmt_api:node_query(
Node1,
?SUBS_QTABLE,
?SUBOPTION,
QString,
?SUBS_QSCHEMA,
fun ?MODULE:qs2ms/2,
@ -177,23 +176,16 @@ subscriptions(get, #{query_string := QString}) ->
{200, Result}
end.
format(WhichNode, {{Topic, _Subscriber}, Options}) ->
format(WhichNode, {{Topic, _Subscriber}, SubOpts}) ->
maps:merge(
#{
topic => get_topic(Topic, Options),
clientid => maps:get(subid, Options, null),
topic => emqx_topic:maybe_format_share(Topic),
clientid => maps:get(subid, SubOpts, null),
node => WhichNode
},
maps:with([qos, nl, rap, rh], Options)
maps:with([qos, nl, rap, rh], SubOpts)
).
get_topic(Topic, #{share := <<"$queue">> = Group}) ->
emqx_topic:join([Group, Topic]);
get_topic(Topic, #{share := Group}) ->
emqx_topic:join([<<"$share">>, Group, Topic]);
get_topic(Topic, _) ->
Topic.
%%--------------------------------------------------------------------
%% QueryString to MatchSpec
%%--------------------------------------------------------------------
@ -213,10 +205,18 @@ gen_match_spec([{Key, '=:=', Value} | More], MtchHead) ->
update_ms(clientid, X, {{Topic, Pid}, Opts}) ->
{{Topic, Pid}, Opts#{subid => X}};
update_ms(topic, X, {{_Topic, Pid}, Opts}) ->
update_ms(topic, X, {{Topic, Pid}, Opts}) when
is_record(Topic, share)
->
{{#share{group = '_', topic = X}, Pid}, Opts};
update_ms(topic, X, {{Topic, Pid}, Opts}) when
is_binary(Topic) orelse Topic =:= '_'
->
{{X, Pid}, Opts};
update_ms(share_group, X, {{Topic, Pid}, Opts}) ->
{{Topic, Pid}, Opts#{share => X}};
update_ms(share_group, X, {{Topic, Pid}, Opts}) when
not is_record(Topic, share)
->
{{#share{group = X, topic = Topic}, Pid}, Opts};
update_ms(qos, X, {{Topic, Pid}, Opts}) ->
{{Topic, Pid}, Opts#{qos => X}}.
@ -227,5 +227,6 @@ fuzzy_filter_fun(Fuzzy) ->
run_fuzzy_filter(_, []) ->
true;
run_fuzzy_filter(E = {{Topic, _}, _}, [{topic, match, TopicFilter} | Fuzzy]) ->
emqx_topic:match(Topic, TopicFilter) andalso run_fuzzy_filter(E, Fuzzy).
run_fuzzy_filter(E = {{SubedTopic, _}, _}, [{topic, match, TopicFilter} | Fuzzy]) ->
{Filter, _SubOpts} = emqx_topic:parse(TopicFilter),
emqx_topic:match(SubedTopic, Filter) andalso run_fuzzy_filter(E, Fuzzy).

View File

@ -17,6 +17,7 @@
-module(emqx_mgmt_api_topics).
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("emqx/include/emqx_router.hrl").
-include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl").
@ -101,10 +102,10 @@ fields(topic) ->
%%%==============================================================================================
%% parameters trans
topics(get, #{query_string := Qs}) ->
do_list(generate_topic(Qs)).
do_list(Qs).
topic(get, #{bindings := Bindings}) ->
lookup(generate_topic(Bindings)).
lookup(Bindings).
%%%==============================================================================================
%% api apply
@ -139,13 +140,6 @@ lookup(#{topic := Topic}) ->
%%%==============================================================================================
%% internal
generate_topic(Params = #{<<"topic">> := Topic}) ->
Params#{<<"topic">> => Topic};
generate_topic(Params = #{topic := Topic}) ->
Params#{topic => Topic};
generate_topic(Params) ->
Params.
-spec qs2ms(atom(), {list(), list()}) -> emqx_mgmt_api:match_spec_and_filter().
qs2ms(_Tab, {Qs, _}) ->
#{
@ -160,9 +154,9 @@ gen_match_spec([{topic, '=:=', T} | Qs], [{{route, _, N}, [], ['$_']}]) ->
gen_match_spec([{node, '=:=', N} | Qs], [{{route, T, _}, [], ['$_']}]) ->
gen_match_spec(Qs, [{{route, T, N}, [], ['$_']}]).
format(#route{topic = Topic, dest = {_, Node}}) ->
#{topic => Topic, node => Node};
format(#route{topic = Topic, dest = Node}) ->
format(#route{topic = Topic, dest = {Group, Node}}) ->
#{topic => ?SHARE(Group, Topic), node => Node};
format(#route{topic = Topic, dest = Node}) when is_atom(Node) ->
#{topic => Topic, node => Node}.
topic_param(In) ->

View File

@ -150,11 +150,12 @@ compile(Rules) ->
Rules
).
%% FIXME: rewrite #share{} and return #share{}, not formated $share/group/topic
match_and_rewrite(Topic, [], _) ->
Topic;
match_and_rewrite(Topic, [{Filter, MP, Dest} | Rules], Binds) ->
case emqx_topic:match(Topic, Filter) of
true -> rewrite(Topic, MP, Dest, Binds);
true -> rewrite(emqx_topic:get_shared_real_topic(Topic), MP, Dest, Binds);
false -> match_and_rewrite(Topic, Rules, Binds)
end.

View File

@ -20,6 +20,7 @@
-include("emqx_retainer.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("emqx/include/emqx_hooks.hrl").
-export([start_link/0]).
@ -87,7 +88,7 @@
%% Hook API
%%------------------------------------------------------------------------------
-spec on_session_subscribed(_, _, emqx_types:subopts(), _) -> any().
on_session_subscribed(_, _, #{share := ShareName}, _) when ShareName =/= undefined ->
on_session_subscribed(_, #share{} = _Topic, _SubOpts, _) ->
ok;
on_session_subscribed(_, Topic, #{rh := Rh} = Opts, Context) ->
IsNew = maps:get(is_new, Opts, true),

View File

@ -190,7 +190,9 @@ on_session_subscribed(ClientInfo, Topic, SubOpts, Conf) ->
apply_event(
'session.subscribed',
fun() ->
eventmsg_sub_or_unsub('session.subscribed', ClientInfo, Topic, SubOpts)
eventmsg_sub_or_unsub(
'session.subscribed', ClientInfo, emqx_topic:maybe_format_share(Topic), SubOpts
)
end,
Conf
).
@ -199,7 +201,9 @@ on_session_unsubscribed(ClientInfo, Topic, SubOpts, Conf) ->
apply_event(
'session.unsubscribed',
fun() ->
eventmsg_sub_or_unsub('session.unsubscribed', ClientInfo, Topic, SubOpts)
eventmsg_sub_or_unsub(
'session.unsubscribed', ClientInfo, emqx_topic:maybe_format_share(Topic), SubOpts
)
end,
Conf
).

View File

@ -0,0 +1,2 @@
Fix topic-filter overlapping handling in shared subscription.
In the previous implementation, the storage method for subscription options did not provide adequate support for shared subscriptions. This resulted in message routing failures and leakage of routing tables between nodes during the "subscribe-unsubscribe" process with specific order and topics.