Subscribe or unsubscribe via HTTP API skip ACL checking
This commit is contained in:
parent
36f9cf0ac9
commit
1f964d5a7c
|
@ -485,6 +485,21 @@ do_subscribe(TopicFilter, SubOpts = #{qos := QoS}, Channel =
|
||||||
{error, RC} -> {RC, Channel}
|
{error, RC} -> {RC, Channel}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
-compile({inline, [process_force_subscribe/2]}).
|
||||||
|
process_force_subscribe(Subscriptions, Channel =
|
||||||
|
#channel{clientinfo = ClientInfo = #{mountpoint := MountPoint},
|
||||||
|
session = Session}) ->
|
||||||
|
lists:foldl(fun({TopicFilter, SubOpts = #{qos := QoS}}, {ReasonCodes, ChannelAcc}) ->
|
||||||
|
NTopicFilter = emqx_mountpoint:mount(MountPoint, TopicFilter),
|
||||||
|
NSubOpts = enrich_subopts(maps:merge(?DEFAULT_SUBOPTS, SubOpts), ChannelAcc),
|
||||||
|
case emqx_session:subscribe(ClientInfo, NTopicFilter, NSubOpts, Session) of
|
||||||
|
{ok, NSession} ->
|
||||||
|
{ReasonCodes ++ [QoS], ChannelAcc#channel{session = NSession}};
|
||||||
|
{error, ReasonCode} ->
|
||||||
|
{ReasonCodes ++ [ReasonCode], ChannelAcc}
|
||||||
|
end
|
||||||
|
end, {[], Channel}, Subscriptions).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Process Unsubscribe
|
%% Process Unsubscribe
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -510,6 +525,20 @@ do_unsubscribe(TopicFilter, _SubOpts, Channel =
|
||||||
{error, RC} -> {RC, Channel}
|
{error, RC} -> {RC, Channel}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
-compile({inline, [process_force_unsubscribe/2]}).
|
||||||
|
process_force_unsubscribe(Subscriptions, Channel =
|
||||||
|
#channel{clientinfo = ClientInfo = #{mountpoint := MountPoint},
|
||||||
|
session = Session}) ->
|
||||||
|
lists:foldl(fun({TopicFilter, _SubOpts}, {ReasonCodes, ChannelAcc}) ->
|
||||||
|
NTopicFilter = emqx_mountpoint:mount(MountPoint, TopicFilter),
|
||||||
|
case emqx_session:unsubscribe(ClientInfo, NTopicFilter, Session) of
|
||||||
|
{ok, NSession} ->
|
||||||
|
{ReasonCodes ++ [?RC_SUCCESS], ChannelAcc#channel{session = NSession}};
|
||||||
|
{error, ReasonCode} ->
|
||||||
|
{ReasonCodes ++ [ReasonCode], ChannelAcc}
|
||||||
|
end
|
||||||
|
end, {[], Channel}, Subscriptions).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Process Disconnect
|
%% Process Disconnect
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -763,6 +792,10 @@ handle_info({subscribe, TopicFilters}, Channel = #channel{clientinfo = ClientInf
|
||||||
{_ReasonCodes, NChannel} = process_subscribe(TopicFilters1, Channel),
|
{_ReasonCodes, NChannel} = process_subscribe(TopicFilters1, Channel),
|
||||||
{ok, NChannel};
|
{ok, NChannel};
|
||||||
|
|
||||||
|
handle_info({force_subscribe, TopicFilters}, Channel) ->
|
||||||
|
{_ReasonCodes, NChannel} = process_force_subscribe(parse_topic_filters(TopicFilters), Channel),
|
||||||
|
{ok, NChannel};
|
||||||
|
|
||||||
handle_info({unsubscribe, TopicFilters}, Channel = #channel{clientinfo = ClientInfo}) ->
|
handle_info({unsubscribe, TopicFilters}, Channel = #channel{clientinfo = ClientInfo}) ->
|
||||||
TopicFilters1 = run_hooks('client.unsubscribe',
|
TopicFilters1 = run_hooks('client.unsubscribe',
|
||||||
[ClientInfo, #{'Internal' => true}],
|
[ClientInfo, #{'Internal' => true}],
|
||||||
|
@ -771,6 +804,10 @@ handle_info({unsubscribe, TopicFilters}, Channel = #channel{clientinfo = ClientI
|
||||||
{_ReasonCodes, NChannel} = process_unsubscribe(TopicFilters1, Channel),
|
{_ReasonCodes, NChannel} = process_unsubscribe(TopicFilters1, Channel),
|
||||||
{ok, NChannel};
|
{ok, NChannel};
|
||||||
|
|
||||||
|
handle_info({force_unsubscribe, TopicFilters}, Channel) ->
|
||||||
|
{_ReasonCodes, NChannel} = process_force_unsubscribe(parse_topic_filters(TopicFilters), Channel),
|
||||||
|
{ok, NChannel};
|
||||||
|
|
||||||
handle_info({sock_closed, Reason}, Channel = #channel{conn_state = idle}) ->
|
handle_info({sock_closed, Reason}, Channel = #channel{conn_state = idle}) ->
|
||||||
shutdown(Reason, Channel);
|
shutdown(Reason, Channel);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue