From f87bef9ffb33fd32a1e4cbb534f46f2fce8b78b1 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Thu, 29 Jul 2021 09:13:55 +0800 Subject: [PATCH] feat(gw-stomp): run subscribe hooks --- apps/emqx_gateway/src/emqx_gateway_utils.erl | 8 ++ .../src/stomp/emqx_stomp_channel.erl | 98 ++++++++++++++----- 2 files changed, 82 insertions(+), 24 deletions(-) diff --git a/apps/emqx_gateway/src/emqx_gateway_utils.erl b/apps/emqx_gateway/src/emqx_gateway_utils.erl index 8adf06249..74bc3a8ce 100644 --- a/apps/emqx_gateway/src/emqx_gateway_utils.erl +++ b/apps/emqx_gateway/src/emqx_gateway_utils.erl @@ -43,6 +43,7 @@ -export([ default_tcp_options/0 , default_udp_options/0 + , default_subopts/0 ]). -define(ACTIVE_N, 100). @@ -186,3 +187,10 @@ default_tcp_options() -> default_udp_options() -> [binary]. + +default_subopts() -> + #{rh => 0, %% Retain Handling + rap => 0, %% Retain as Publish + nl => 0, %% No Local + qos => 0 %% QoS + }. diff --git a/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl b/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl index e0fb4b82e..6c1d678e1 100644 --- a/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl +++ b/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl @@ -373,34 +373,36 @@ handle_in(?PACKET(?CMD_SUBSCRIBE, Headers), Channel = #channel{ ctx = Ctx, subscriptions = Subs, - clientinfo = ClientInfo = #{mountpoint := Mountpoint} + clientinfo = ClientInfo }) -> + SubId = header(<<"id">>, Headers), Topic = header(<<"destination">>, Headers), Ack = header(<<"ack">>, Headers, <<"auto">>), - - MountedTopic = emqx_mountpoint:mount(Mountpoint, Topic), - - case lists:keyfind(SubId, 1, Subs) of - {SubId, MountedTopic, Ack} -> - maybe_outgoing_receipt(receipt_id(Headers), Channel); - {SubId, _OtherTopic, _OtherAck} -> - %% FIXME: - ?LOG(error, "Conflicts with subscribed topics ~s, id: ~s", - [_OtherTopic, SubId]), - ErrMsg = "Conflict subscribe id ", - handle_out(error, {receipt_id(Headers), ErrMsg}, Channel); - false -> - case emqx_gateway_ctx:authorize(Ctx, ClientInfo, subscribe, Topic) of - deny -> - handle_out(error, {receipt_id(Headers), "Authorization Deny"}, Channel); - allow -> - _ = emqx_broker:subscribe(MountedTopic), - maybe_outgoing_receipt( - receipt_id(Headers), - Channel#channel{subscriptions = [{SubId, MountedTopic, Ack} | Subs]} - ) - end + case emqx_misc:pipeline( + [ fun parse_topic_filter/2 + , fun check_subscribed_status/2 + , fun check_sub_acl/2 + ], {SubId, Topic}, Channel) of + {ok, {_, TopicFilter}, NChannel} -> + TopicFilters = [TopicFilter], + NTopicFilters = run_hooks(Ctx, 'client.subscribe', + [ClientInfo, #{}], TopicFilters), + case do_subscribe(NTopicFilters, NChannel) of + [] -> + ErrMsg = "Permission denied", + handle_out(error, {receipt_id(Headers), ErrMsg}, Channel); + [MountedTopic|_] -> + NChannel1 = NChannel#channel{ + subscriptions = [{SubId, MountedTopic, Ack} + | Subs] + }, + handle_out(receipt, receipt_id(Headers), NChannel1) + end; + {error, ErrMsg, NChannel} -> + ?LOG(error, "Failed to subscribe topic ~s, reason: ~s", + [Topic, ErrMsg]), + handle_out(error, {receipt_id(Headers), ErrMsg}, NChannel) end; handle_in(?PACKET(?CMD_UNSUBSCRIBE, Headers), @@ -521,6 +523,54 @@ trans_pipeline([{Func, Args}|More], Outgoings, Channel) -> {error, Reason, Channel} end. +%%-------------------------------------------------------------------- +%% Subs + +parse_topic_filter({SubId, Topic}, Channel) -> + TopicFilter = emqx_topic:parse(Topic), + {ok, {SubId, TopicFilter}, Channel}. + +check_subscribed_status({SubId, TopicFilter}, + #channel{ + subscriptions = Subs, + clientinfo = #{mountpoint := Mountpoint} + }) -> + MountedTopic = emqx_mountpoint:mount(Mountpoint, TopicFilter), + case lists:keyfind(SubId, 1, Subs) of + {SubId, MountedTopic, _Ack} -> + ok; + {SubId, _OtherTopic, _Ack} -> + {error, "Conflict subscribe id"}; + false -> + ok + end. + +check_sub_acl({_SubId, TopicFilter}, + #channel{ + ctx = Ctx, + clientinfo = ClientInfo}) -> + case emqx_gateway_ctx:authorize(Ctx, ClientInfo, subscribe, TopicFilter) of + deny -> {error, "ACL Deny"}; + allow -> ok + end. + +do_subscribe(TopicFilters, Channel) -> + do_subscribe(TopicFilters, Channel, []). + +do_subscribe([], _Channel, Acc) -> + lists:reverse(Acc); +do_subscribe([{TopicFilter, Option}|More], + Channel = #channel{ + ctx = Ctx, + clientinfo = ClientInfo + = #{clientid := ClientId, + mountpoint := Mountpoint}}, Acc) -> + SubOpts = maps:merge(emqx_gateway_utils:default_subopts(), Option), + MountedTopic = emqx_mountpoint:mount(Mountpoint, TopicFilter), + _ = emqx_broker:subscribe(MountedTopic, ClientId, SubOpts), + run_hooks(Ctx, 'session.subscribed', [ClientInfo, MountedTopic, SubOpts]), + do_subscribe(More, Channel, [MountedTopic|Acc]). + %%-------------------------------------------------------------------- %% Handle outgoing packet %%--------------------------------------------------------------------