From 1f964d5a7cf313246500b2625ebe1f47b197c208 Mon Sep 17 00:00:00 2001 From: zhouzb Date: Wed, 27 May 2020 10:57:44 +0800 Subject: [PATCH] Subscribe or unsubscribe via HTTP API skip ACL checking --- src/emqx_channel.erl | 37 +++++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index 7cfbd60d5..3f96d9a2e 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -485,6 +485,21 @@ do_subscribe(TopicFilter, SubOpts = #{qos := QoS}, Channel = {error, RC} -> {RC, Channel} end. +-compile({inline, [process_force_subscribe/2]}). +process_force_subscribe(Subscriptions, Channel = + #channel{clientinfo = ClientInfo = #{mountpoint := MountPoint}, + session = Session}) -> + lists:foldl(fun({TopicFilter, SubOpts = #{qos := QoS}}, {ReasonCodes, ChannelAcc}) -> + NTopicFilter = emqx_mountpoint:mount(MountPoint, TopicFilter), + NSubOpts = enrich_subopts(maps:merge(?DEFAULT_SUBOPTS, SubOpts), ChannelAcc), + case emqx_session:subscribe(ClientInfo, NTopicFilter, NSubOpts, Session) of + {ok, NSession} -> + {ReasonCodes ++ [QoS], ChannelAcc#channel{session = NSession}}; + {error, ReasonCode} -> + {ReasonCodes ++ [ReasonCode], ChannelAcc} + end + end, {[], Channel}, Subscriptions). + %%-------------------------------------------------------------------- %% Process Unsubscribe %%-------------------------------------------------------------------- @@ -510,6 +525,20 @@ do_unsubscribe(TopicFilter, _SubOpts, Channel = {error, RC} -> {RC, Channel} end. +-compile({inline, [process_force_unsubscribe/2]}). +process_force_unsubscribe(Subscriptions, Channel = + #channel{clientinfo = ClientInfo = #{mountpoint := MountPoint}, + session = Session}) -> + lists:foldl(fun({TopicFilter, _SubOpts}, {ReasonCodes, ChannelAcc}) -> + NTopicFilter = emqx_mountpoint:mount(MountPoint, TopicFilter), + case emqx_session:unsubscribe(ClientInfo, NTopicFilter, Session) of + {ok, NSession} -> + {ReasonCodes ++ [?RC_SUCCESS], ChannelAcc#channel{session = NSession}}; + {error, ReasonCode} -> + {ReasonCodes ++ [ReasonCode], ChannelAcc} + end + end, {[], Channel}, Subscriptions). + %%-------------------------------------------------------------------- %% Process Disconnect %%-------------------------------------------------------------------- @@ -763,6 +792,10 @@ handle_info({subscribe, TopicFilters}, Channel = #channel{clientinfo = ClientInf {_ReasonCodes, NChannel} = process_subscribe(TopicFilters1, Channel), {ok, NChannel}; +handle_info({force_subscribe, TopicFilters}, Channel) -> + {_ReasonCodes, NChannel} = process_force_subscribe(parse_topic_filters(TopicFilters), Channel), + {ok, NChannel}; + handle_info({unsubscribe, TopicFilters}, Channel = #channel{clientinfo = ClientInfo}) -> TopicFilters1 = run_hooks('client.unsubscribe', [ClientInfo, #{'Internal' => true}], @@ -771,6 +804,10 @@ handle_info({unsubscribe, TopicFilters}, Channel = #channel{clientinfo = ClientI {_ReasonCodes, NChannel} = process_unsubscribe(TopicFilters1, Channel), {ok, NChannel}; +handle_info({force_unsubscribe, TopicFilters}, Channel) -> + {_ReasonCodes, NChannel} = process_force_unsubscribe(parse_topic_filters(TopicFilters), Channel), + {ok, NChannel}; + handle_info({sock_closed, Reason}, Channel = #channel{conn_state = idle}) -> shutdown(Reason, Channel);