feat(gw-stomp): run subscribe hooks

This commit is contained in:
JianBo He 2021-07-29 09:13:55 +08:00
parent a116c0afd1
commit f87bef9ffb
2 changed files with 82 additions and 24 deletions

View File

@ -43,6 +43,7 @@
-export([ default_tcp_options/0
, default_udp_options/0
, default_subopts/0
]).
-define(ACTIVE_N, 100).
@ -186,3 +187,10 @@ default_tcp_options() ->
default_udp_options() ->
[binary].
default_subopts() ->
#{rh => 0, %% Retain Handling
rap => 0, %% Retain as Publish
nl => 0, %% No Local
qos => 0 %% QoS
}.

View File

@ -373,34 +373,36 @@ handle_in(?PACKET(?CMD_SUBSCRIBE, Headers),
Channel = #channel{
ctx = Ctx,
subscriptions = Subs,
clientinfo = ClientInfo = #{mountpoint := Mountpoint}
clientinfo = ClientInfo
}) ->
SubId = header(<<"id">>, Headers),
Topic = header(<<"destination">>, Headers),
Ack = header(<<"ack">>, Headers, <<"auto">>),
MountedTopic = emqx_mountpoint:mount(Mountpoint, Topic),
case lists:keyfind(SubId, 1, Subs) of
{SubId, MountedTopic, Ack} ->
maybe_outgoing_receipt(receipt_id(Headers), Channel);
{SubId, _OtherTopic, _OtherAck} ->
%% FIXME:
?LOG(error, "Conflicts with subscribed topics ~s, id: ~s",
[_OtherTopic, SubId]),
ErrMsg = "Conflict subscribe id ",
handle_out(error, {receipt_id(Headers), ErrMsg}, Channel);
false ->
case emqx_gateway_ctx:authorize(Ctx, ClientInfo, subscribe, Topic) of
deny ->
handle_out(error, {receipt_id(Headers), "Authorization Deny"}, Channel);
allow ->
_ = emqx_broker:subscribe(MountedTopic),
maybe_outgoing_receipt(
receipt_id(Headers),
Channel#channel{subscriptions = [{SubId, MountedTopic, Ack} | Subs]}
)
end
case emqx_misc:pipeline(
[ fun parse_topic_filter/2
, fun check_subscribed_status/2
, fun check_sub_acl/2
], {SubId, Topic}, Channel) of
{ok, {_, TopicFilter}, NChannel} ->
TopicFilters = [TopicFilter],
NTopicFilters = run_hooks(Ctx, 'client.subscribe',
[ClientInfo, #{}], TopicFilters),
case do_subscribe(NTopicFilters, NChannel) of
[] ->
ErrMsg = "Permission denied",
handle_out(error, {receipt_id(Headers), ErrMsg}, Channel);
[MountedTopic|_] ->
NChannel1 = NChannel#channel{
subscriptions = [{SubId, MountedTopic, Ack}
| Subs]
},
handle_out(receipt, receipt_id(Headers), NChannel1)
end;
{error, ErrMsg, NChannel} ->
?LOG(error, "Failed to subscribe topic ~s, reason: ~s",
[Topic, ErrMsg]),
handle_out(error, {receipt_id(Headers), ErrMsg}, NChannel)
end;
handle_in(?PACKET(?CMD_UNSUBSCRIBE, Headers),
@ -521,6 +523,54 @@ trans_pipeline([{Func, Args}|More], Outgoings, Channel) ->
{error, Reason, Channel}
end.
%%--------------------------------------------------------------------
%% Subs
parse_topic_filter({SubId, Topic}, Channel) ->
TopicFilter = emqx_topic:parse(Topic),
{ok, {SubId, TopicFilter}, Channel}.
check_subscribed_status({SubId, TopicFilter},
#channel{
subscriptions = Subs,
clientinfo = #{mountpoint := Mountpoint}
}) ->
MountedTopic = emqx_mountpoint:mount(Mountpoint, TopicFilter),
case lists:keyfind(SubId, 1, Subs) of
{SubId, MountedTopic, _Ack} ->
ok;
{SubId, _OtherTopic, _Ack} ->
{error, "Conflict subscribe id"};
false ->
ok
end.
check_sub_acl({_SubId, TopicFilter},
#channel{
ctx = Ctx,
clientinfo = ClientInfo}) ->
case emqx_gateway_ctx:authorize(Ctx, ClientInfo, subscribe, TopicFilter) of
deny -> {error, "ACL Deny"};
allow -> ok
end.
do_subscribe(TopicFilters, Channel) ->
do_subscribe(TopicFilters, Channel, []).
do_subscribe([], _Channel, Acc) ->
lists:reverse(Acc);
do_subscribe([{TopicFilter, Option}|More],
Channel = #channel{
ctx = Ctx,
clientinfo = ClientInfo
= #{clientid := ClientId,
mountpoint := Mountpoint}}, Acc) ->
SubOpts = maps:merge(emqx_gateway_utils:default_subopts(), Option),
MountedTopic = emqx_mountpoint:mount(Mountpoint, TopicFilter),
_ = emqx_broker:subscribe(MountedTopic, ClientId, SubOpts),
run_hooks(Ctx, 'session.subscribed', [ClientInfo, MountedTopic, SubOpts]),
do_subscribe(More, Channel, [MountedTopic|Acc]).
%%--------------------------------------------------------------------
%% Handle outgoing packet
%%--------------------------------------------------------------------