Merge pull request #3496 from emqx/force_subscribe_4.0
Subscribe or unsubscribe via HTTP API skip ACL checking
This commit is contained in:
commit
0f5bc86dff
|
@ -485,6 +485,21 @@ do_subscribe(TopicFilter, SubOpts = #{qos := QoS}, Channel =
|
|||
{error, RC} -> {RC, Channel}
|
||||
end.
|
||||
|
||||
-compile({inline, [process_force_subscribe/2]}).
|
||||
process_force_subscribe(Subscriptions, Channel =
|
||||
#channel{clientinfo = ClientInfo = #{mountpoint := MountPoint},
|
||||
session = Session}) ->
|
||||
lists:foldl(fun({TopicFilter, SubOpts = #{qos := QoS}}, {ReasonCodes, ChannelAcc}) ->
|
||||
NTopicFilter = emqx_mountpoint:mount(MountPoint, TopicFilter),
|
||||
NSubOpts = enrich_subopts(maps:merge(?DEFAULT_SUBOPTS, SubOpts), ChannelAcc),
|
||||
case emqx_session:subscribe(ClientInfo, NTopicFilter, NSubOpts, Session) of
|
||||
{ok, NSession} ->
|
||||
{ReasonCodes ++ [QoS], ChannelAcc#channel{session = NSession}};
|
||||
{error, ReasonCode} ->
|
||||
{ReasonCodes ++ [ReasonCode], ChannelAcc}
|
||||
end
|
||||
end, {[], Channel}, Subscriptions).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Process Unsubscribe
|
||||
%%--------------------------------------------------------------------
|
||||
|
@ -510,6 +525,20 @@ do_unsubscribe(TopicFilter, _SubOpts, Channel =
|
|||
{error, RC} -> {RC, Channel}
|
||||
end.
|
||||
|
||||
-compile({inline, [process_force_unsubscribe/2]}).
|
||||
process_force_unsubscribe(Subscriptions, Channel =
|
||||
#channel{clientinfo = ClientInfo = #{mountpoint := MountPoint},
|
||||
session = Session}) ->
|
||||
lists:foldl(fun({TopicFilter, _SubOpts}, {ReasonCodes, ChannelAcc}) ->
|
||||
NTopicFilter = emqx_mountpoint:mount(MountPoint, TopicFilter),
|
||||
case emqx_session:unsubscribe(ClientInfo, NTopicFilter, Session) of
|
||||
{ok, NSession} ->
|
||||
{ReasonCodes ++ [?RC_SUCCESS], ChannelAcc#channel{session = NSession}};
|
||||
{error, ReasonCode} ->
|
||||
{ReasonCodes ++ [ReasonCode], ChannelAcc}
|
||||
end
|
||||
end, {[], Channel}, Subscriptions).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Process Disconnect
|
||||
%%--------------------------------------------------------------------
|
||||
|
@ -763,6 +792,10 @@ handle_info({subscribe, TopicFilters}, Channel = #channel{clientinfo = ClientInf
|
|||
{_ReasonCodes, NChannel} = process_subscribe(TopicFilters1, Channel),
|
||||
{ok, NChannel};
|
||||
|
||||
handle_info({force_subscribe, TopicFilters}, Channel) ->
|
||||
{_ReasonCodes, NChannel} = process_force_subscribe(parse_topic_filters(TopicFilters), Channel),
|
||||
{ok, NChannel};
|
||||
|
||||
handle_info({unsubscribe, TopicFilters}, Channel = #channel{clientinfo = ClientInfo}) ->
|
||||
TopicFilters1 = run_hooks('client.unsubscribe',
|
||||
[ClientInfo, #{'Internal' => true}],
|
||||
|
@ -771,6 +804,10 @@ handle_info({unsubscribe, TopicFilters}, Channel = #channel{clientinfo = ClientI
|
|||
{_ReasonCodes, NChannel} = process_unsubscribe(TopicFilters1, Channel),
|
||||
{ok, NChannel};
|
||||
|
||||
handle_info({force_unsubscribe, TopicFilters}, Channel) ->
|
||||
{_ReasonCodes, NChannel} = process_force_unsubscribe(parse_topic_filters(TopicFilters), Channel),
|
||||
{ok, NChannel};
|
||||
|
||||
handle_info({sock_closed, Reason}, Channel = #channel{conn_state = idle}) ->
|
||||
shutdown(Reason, Channel);
|
||||
|
||||
|
|
Loading…
Reference in New Issue