feat(gw-sn): call subscribe/unsubscribe hook

This commit is contained in:
JianBo He 2021-07-30 14:49:54 +08:00
parent c986f89319
commit 602f0ebb60
2 changed files with 44 additions and 18 deletions

View File

@ -845,19 +845,31 @@ check_subscribe_authz({_TopicId, TopicName, _QoS},
do_subscribe({TopicId, TopicName, QoS},
Channel = #channel{
ctx = Ctx,
session = Session,
clientinfo = ClientInfo
= #{mountpoint := Mountpoint}}) ->
NTopicName = emqx_mountpoint:mount(Mountpoint, TopicName),
SubOpts = maps:merge(?DEFAULT_SUBOPTS, #{qos => QoS}),
case emqx_session:subscribe(ClientInfo, NTopicName, SubOpts, Session) of
{ok, NSession} ->
{ok, {TopicId, QoS},
Channel#channel{session = NSession}};
{error, ?RC_QUOTA_EXCEEDED} ->
?LOG(warning, "Cannot subscribe ~s due to ~s.",
[TopicName, emqx_reason_codes:text(?RC_QUOTA_EXCEEDED)]),
{error, ?SN_EXCEED_LIMITATION}
{TopicName1, SubOpts0} = emqx_topic:parse(TopicName),
TopicFilters = [{TopicName1, SubOpts0#{qos => QoS}}],
case run_hooks(Ctx, 'client.subscribe',
[ClientInfo, #{}], TopicFilters) of
[] ->
?LOG(warning, "Skip to subscribe ~s, "
"due to 'client.subscribe' denied!", [TopicName]),
{ok, Channel};
[{NTopicName, NSubOpts}|_] ->
NTopicName1 = emqx_mountpoint:mount(Mountpoint, NTopicName),
NSubOpts1 = maps:merge(?DEFAULT_SUBOPTS, NSubOpts),
case emqx_session:subscribe(ClientInfo, NTopicName1, NSubOpts1, Session) of
{ok, NSession} ->
{ok, {TopicId, QoS},
Channel#channel{session = NSession}};
{error, ?RC_QUOTA_EXCEEDED} ->
?LOG(warning, "Cannot subscribe ~s due to ~s.",
[TopicName, emqx_reason_codes:text(?RC_QUOTA_EXCEEDED)]),
{error, ?SN_EXCEED_LIMITATION}
end
end.
%%--------------------------------------------------------------------
@ -891,16 +903,29 @@ preproc_unsub_type(?SN_UNSUBSCRIBE_MSG_TYPE(?SN_SHORT_TOPIC,
do_unsubscribe(TopicName,
Channel = #channel{
ctx = Ctx,
session = Session,
clientinfo = ClientInfo
= #{mountpoint := Mountpoint}}) ->
NTopicName = emqx_mountpoint:mount(Mountpoint, TopicName),
case emqx_session:unsubscribe(ClientInfo, NTopicName,
?DEFAULT_SUBOPTS, Session) of
{ok, NSession} ->
{ok, Channel#channel{session = NSession}};
{error, ?RC_NO_SUBSCRIPTION_EXISTED} ->
{ok, Channel}
TopicFilters = [emqx_topic:parse(TopicName)],
case run_hooks(Ctx, 'client.unsubscribe',
[ClientInfo, #{}], TopicFilters) of
[] ->
%% Skip to unsubscribe
{ok, Channel};
[{NTopicName, NSubOpts}|_] ->
NTopicName1 = emqx_mountpoint:mount(Mountpoint, NTopicName),
NSubOpts1 = maps:merge(
emqx_gateway_utils:default_subopts(),
NSubOpts
),
case emqx_session:unsubscribe(ClientInfo, NTopicName1,
NSubOpts1, Session) of
{ok, NSession} ->
{ok, Channel#channel{session = NSession}};
{error, ?RC_NO_SUBSCRIPTION_EXISTED} ->
{ok, Channel}
end
end.
%%--------------------------------------------------------------------

View File

@ -155,7 +155,8 @@ init([InstaId, PredefTopics]) ->
% FIXME:
%ok = ekka_rlog:wait_for_shards([?CM_SHARD], infinity),
MaxPredefId = lists:foldl(
fun(#{id := TopicId, topic := TopicName}, AccId) ->
fun(#{id := TopicId, topic := TopicName0}, AccId) ->
TopicName = iolist_to_binary(TopicName0),
ekka_mnesia:dirty_write(Tab, #emqx_sn_registry{
key = {predef, TopicId},
value = TopicName}