diff --git a/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl index d6a868da8..d7717fdc4 100644 --- a/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl +++ b/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl @@ -845,19 +845,31 @@ check_subscribe_authz({_TopicId, TopicName, _QoS}, do_subscribe({TopicId, TopicName, QoS}, Channel = #channel{ + ctx = Ctx, session = Session, clientinfo = ClientInfo = #{mountpoint := Mountpoint}}) -> - NTopicName = emqx_mountpoint:mount(Mountpoint, TopicName), - SubOpts = maps:merge(?DEFAULT_SUBOPTS, #{qos => QoS}), - case emqx_session:subscribe(ClientInfo, NTopicName, SubOpts, Session) of - {ok, NSession} -> - {ok, {TopicId, QoS}, - Channel#channel{session = NSession}}; - {error, ?RC_QUOTA_EXCEEDED} -> - ?LOG(warning, "Cannot subscribe ~s due to ~s.", - [TopicName, emqx_reason_codes:text(?RC_QUOTA_EXCEEDED)]), - {error, ?SN_EXCEED_LIMITATION} + + {TopicName1, SubOpts0} = emqx_topic:parse(TopicName), + TopicFilters = [{TopicName1, SubOpts0#{qos => QoS}}], + case run_hooks(Ctx, 'client.subscribe', + [ClientInfo, #{}], TopicFilters) of + [] -> + ?LOG(warning, "Skip to subscribe ~s, " + "due to 'client.subscribe' denied!", [TopicName]), + {ok, Channel}; + [{NTopicName, NSubOpts}|_] -> + NTopicName1 = emqx_mountpoint:mount(Mountpoint, NTopicName), + NSubOpts1 = maps:merge(?DEFAULT_SUBOPTS, NSubOpts), + case emqx_session:subscribe(ClientInfo, NTopicName1, NSubOpts1, Session) of + {ok, NSession} -> + {ok, {TopicId, QoS}, + Channel#channel{session = NSession}}; + {error, ?RC_QUOTA_EXCEEDED} -> + ?LOG(warning, "Cannot subscribe ~s due to ~s.", + [TopicName, emqx_reason_codes:text(?RC_QUOTA_EXCEEDED)]), + {error, ?SN_EXCEED_LIMITATION} + end end. %%-------------------------------------------------------------------- @@ -891,16 +903,29 @@ preproc_unsub_type(?SN_UNSUBSCRIBE_MSG_TYPE(?SN_SHORT_TOPIC, do_unsubscribe(TopicName, Channel = #channel{ + ctx = Ctx, session = Session, clientinfo = ClientInfo = #{mountpoint := Mountpoint}}) -> - NTopicName = emqx_mountpoint:mount(Mountpoint, TopicName), - case emqx_session:unsubscribe(ClientInfo, NTopicName, - ?DEFAULT_SUBOPTS, Session) of - {ok, NSession} -> - {ok, Channel#channel{session = NSession}}; - {error, ?RC_NO_SUBSCRIPTION_EXISTED} -> - {ok, Channel} + TopicFilters = [emqx_topic:parse(TopicName)], + case run_hooks(Ctx, 'client.unsubscribe', + [ClientInfo, #{}], TopicFilters) of + [] -> + %% Skip to unsubscribe + {ok, Channel}; + [{NTopicName, NSubOpts}|_] -> + NTopicName1 = emqx_mountpoint:mount(Mountpoint, NTopicName), + NSubOpts1 = maps:merge( + emqx_gateway_utils:default_subopts(), + NSubOpts + ), + case emqx_session:unsubscribe(ClientInfo, NTopicName1, + NSubOpts1, Session) of + {ok, NSession} -> + {ok, Channel#channel{session = NSession}}; + {error, ?RC_NO_SUBSCRIPTION_EXISTED} -> + {ok, Channel} + end end. %%-------------------------------------------------------------------- diff --git a/apps/emqx_gateway/src/mqttsn/emqx_sn_registry.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_registry.erl index 0fac56ae0..1249831cc 100644 --- a/apps/emqx_gateway/src/mqttsn/emqx_sn_registry.erl +++ b/apps/emqx_gateway/src/mqttsn/emqx_sn_registry.erl @@ -155,7 +155,8 @@ init([InstaId, PredefTopics]) -> % FIXME: %ok = ekka_rlog:wait_for_shards([?CM_SHARD], infinity), MaxPredefId = lists:foldl( - fun(#{id := TopicId, topic := TopicName}, AccId) -> + fun(#{id := TopicId, topic := TopicName0}, AccId) -> + TopicName = iolist_to_binary(TopicName0), ekka_mnesia:dirty_write(Tab, #emqx_sn_registry{ key = {predef, TopicId}, value = TopicName}