diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index 4d66d5cdc..97a1fce23 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -391,22 +391,30 @@ handle_in(?PUBCOMP_PACKET(PacketId, _ReasonCode), Channel = #channel{session = S handle_in(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters), Channel = #channel{clientinfo = ClientInfo = #{zone := Zone}}) -> case emqx_packet:check(Packet) of - ok -> TopicFilters1 = parse_topic_filters(TopicFilters), - TopicFilters2 = put_subid_in_subopts(Properties, TopicFilters1), - TopicFilters3 = run_hooks('client.subscribe', - [ClientInfo, Properties], - TopicFilters2 - ), - {ReasonCodes, NChannel} = process_subscribe(TopicFilters3, Properties, Channel), - case emqx_zone:get_env(Zone, acl_deny_action, ignore) =:= disconnect andalso - lists:any(fun(ReasonCode) -> - ReasonCode =:= ?RC_NOT_AUTHORIZED - end, ReasonCodes) of - true -> - handle_out(disconnect, ?RC_NOT_AUTHORIZED, NChannel); - false -> - handle_out(suback, {PacketId, ReasonCodes}, NChannel) - end; + ok -> + TopicFilters0 = parse_topic_filters(TopicFilters), + TupleTopicFilters0 = check_sub_acls(TopicFilters0, Channel), + case emqx_zone:get_env(Zone, acl_deny_action, ignore) =:= disconnect andalso + lists:any(fun({_TopicFilter, ReasonCode}) -> + ReasonCode =:= ?RC_NOT_AUTHORIZED + end, TupleTopicFilters0) of + true -> handle_out(disconnect, ?RC_NOT_AUTHORIZED, Channel); + false -> + Replace = fun + _Fun(TupleList, [ Tuple = {Key, _Value} | More]) -> + _Fun(lists:keyreplace(Key, 1, TupleList, Tuple), More); + _Fun(TupleList, []) -> TupleList + end, + TopicFilters1 = [ TopicFilter || {TopicFilter, 0} <- TupleTopicFilters0], + TopicFilters2 = put_subid_in_subopts(Properties, TopicFilters1), + TopicFilters3 = run_hooks('client.subscribe', + [ClientInfo, Properties], + TopicFilters2), + {TupleTopicFilters1, NChannel} = process_subscribe(TopicFilters3, Properties, Channel), + TupleTopicFilters2 = Replace(TupleTopicFilters0, TupleTopicFilters1), + ReasonCodes2 = [ ReasonCode || {_TopicFilter, ReasonCode} <- TupleTopicFilters2], + handle_out(suback, {PacketId, ReasonCodes2}, NChannel) + end; {error, ReasonCode} -> handle_out(disconnect, ReasonCode, Channel) end; @@ -613,15 +621,15 @@ process_subscribe(TopicFilters, SubProps, Channel) -> process_subscribe([], _SubProps, Channel, Acc) -> {lists:reverse(Acc), Channel}; -process_subscribe([{TopicFilter, SubOpts}|More], SubProps, Channel, Acc) -> - case check_subscribe(TopicFilter, SubOpts, Channel) of +process_subscribe([Topic = {TopicFilter, SubOpts}|More], SubProps, Channel, Acc) -> + case check_sub_caps(TopicFilter, SubOpts, Channel) of ok -> - {RC, NChannel} = do_subscribe(TopicFilter, SubOpts#{sub_props => SubProps}, Channel), - process_subscribe(More, SubProps, NChannel, [RC|Acc]); - {error, RC} -> + {ReasonCode, NChannel} = do_subscribe(TopicFilter, SubOpts#{sub_props => SubProps}, Channel), + process_subscribe(More, SubProps, NChannel, [{Topic, ReasonCode} | Acc]); + {error, ReasonCode} -> ?LOG(warning, "Cannot subscribe ~s due to ~s.", - [TopicFilter, emqx_reason_codes:text(RC)]), - process_subscribe(More, SubProps, Channel, [RC|Acc]) + [TopicFilter, emqx_reason_codes:text(ReasonCode)]), + process_subscribe(More, SubProps, Channel, [{Topic, ReasonCode} | Acc]) end. do_subscribe(TopicFilter, SubOpts = #{qos := QoS}, Channel = @@ -1389,18 +1397,22 @@ check_pub_caps(#mqtt_packet{header = #mqtt_packet_header{qos = QoS, #channel{clientinfo = #{zone := Zone}}) -> emqx_mqtt_caps:check_pub(Zone, #{qos => QoS, retain => Retain, topic => Topic}). -%%-------------------------------------------------------------------- -%% Check Subscribe - -check_subscribe(TopicFilter, SubOpts, Channel) -> - case check_sub_acl(TopicFilter, Channel) of - allow -> check_sub_caps(TopicFilter, SubOpts, Channel); - deny -> {error, ?RC_NOT_AUTHORIZED} - end. - %%-------------------------------------------------------------------- %% Check Sub ACL +check_sub_acls(TopicFilters, Channel) -> + check_sub_acls(TopicFilters, Channel, []). + +check_sub_acls([ TopicFilter = {Topic, _} | More] , Channel, Acc) -> + case check_sub_acl(Topic, Channel) of + allow -> + check_sub_acls(More, Channel, [ {TopicFilter, 0} | Acc]); + deny -> + check_sub_acls(More, Channel, [ {TopicFilter, ?RC_NOT_AUTHORIZED} | Acc]) + end; +check_sub_acls([], _Channel, Acc) -> + lists:reverse(Acc). + check_sub_acl(TopicFilter, #channel{clientinfo = ClientInfo}) -> case is_acl_enabled(ClientInfo) andalso emqx_access_control:check_acl(ClientInfo, subscribe, TopicFilter) of diff --git a/test/emqx_channel_SUITE.erl b/test/emqx_channel_SUITE.erl index 15f3c6674..f3ee4aade 100644 --- a/test/emqx_channel_SUITE.erl +++ b/test/emqx_channel_SUITE.erl @@ -347,8 +347,8 @@ t_process_publish_qos1(_) -> t_process_subscribe(_) -> ok = meck:expect(emqx_session, subscribe, fun(_, _, _, Session) -> {ok, Session} end), - TopicFilters = [{<<"+">>, ?DEFAULT_SUBOPTS}], - {[?RC_SUCCESS], _Channel} = emqx_channel:process_subscribe(TopicFilters, #{}, channel()). + TopicFilters = [ TopicFilter = {<<"+">>, ?DEFAULT_SUBOPTS}], + {[{TopicFilter, ?RC_SUCCESS}], _Channel} = emqx_channel:process_subscribe(TopicFilters, #{}, channel()). t_process_unsubscribe(_) -> ok = meck:expect(emqx_session, unsubscribe, fun(_, _, _, Session) -> {ok, Session} end), @@ -650,10 +650,11 @@ t_check_pub_alias(_) -> Channel = emqx_channel:set_field(alias_maximum, #{inbound => 10}, channel()), ok = emqx_channel:check_pub_alias(#mqtt_packet{variable = Publish}, Channel). -t_check_subscribe(_) -> +t_check_sub_acls(_) -> ok = meck:new(emqx_zone, [passthrough, no_history]), ok = meck:expect(emqx_zone, enable_acl, fun(_) -> true end), - ok = emqx_channel:check_subscribe(<<"t">>, ?DEFAULT_SUBOPTS, channel()), + TopicFilter = {<<"t">>, ?DEFAULT_SUBOPTS}, + [{TopicFilter, 0}] = emqx_channel:check_sub_acls([TopicFilter], channel()), ok = meck:unload(emqx_zone). t_enrich_connack_caps(_) ->