diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 19d1d9d09..f82e8dcad 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -392,8 +392,8 @@ process_packet(Packet = ?PUBLISH_PACKET(?QOS_1, Topic, PacketId, _Payload), PSta ?LOG(warning, "Cannot publish qos1 message to ~s for ~s", [Topic, emqx_reason_codes:text(ReasonCode)]), case deliver({puback, PacketId, ReasonCode}, PState) of - {ok, _PState} -> - do_acl_deny_action(Packet, ReasonCode, PState); + {ok, PState1} -> + do_acl_deny_action(Packet, ReasonCode, PState1); Error -> Error end @@ -406,9 +406,9 @@ process_packet(Packet = ?PUBLISH_PACKET(?QOS_2, Topic, PacketId, _Payload), PSta {error, ReasonCode} -> ?LOG(warning, "Cannot publish qos2 message to ~s for ~s", [Topic, emqx_reason_codes:text(ReasonCode)]), - case deliver({pubrec, PacketId, ?RC_NOT_AUTHORIZED}, PState) of - {ok, _PState} -> - do_acl_deny_action(Packet, ReasonCode, PState); + case deliver({pubrec, PacketId, ReasonCode}, PState) of + {ok, PState1} -> + do_acl_deny_action(Packet, ReasonCode, PState1); Error -> Error end @@ -472,8 +472,12 @@ process_packet(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters) {SubTopics, ReasonCodes} = {lists:reverse(ReverseSubTopics), lists:reverse(ReverseReasonCodes)}, ?LOG(warning, "Cannot subscribe ~p for ~p", [SubTopics, [emqx_reason_codes:text(R) || R <- ReasonCodes]]), - deliver({suback, PacketId, ReasonCodes}, PState), - do_acl_deny_action(Packet, ReasonCodes, PState) + case deliver({suback, PacketId, ReasonCodes}, PState) of + {ok, PState1} -> + do_acl_deny_action(Packet, ReasonCodes, PState1); + Error -> + Error + end end; process_packet(?UNSUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters), diff --git a/test/emqx_protocol_SUITE.erl b/test/emqx_protocol_SUITE.erl index 571019a15..eaad67a4c 100644 --- a/test/emqx_protocol_SUITE.erl +++ b/test/emqx_protocol_SUITE.erl @@ -33,6 +33,67 @@ username = <<"emqx">>, password = <<"public">>})). +-record(pstate, { + zone, + sendfun, + peername, + peercert, + proto_ver, + proto_name, + client_id, + is_assigned, + conn_pid, + conn_props, + ack_props, + username, + session, + clean_start, + topic_aliases, + packet_size, + will_topic, + will_msg, + keepalive, + mountpoint, + is_super, + is_bridge, + enable_ban, + enable_acl, + acl_deny_action, + recv_stats, + send_stats, + connected, + connected_at, + ignore_loop, + topic_alias_maximum + }). + + +-define(TEST_PSTATE(ProtoVer, SendStats), + #pstate{zone = test, + sendfun = fun(_Packet, _Options) -> ok end, + peername = test_peername, + peercert = test_peercert, + proto_ver = ProtoVer, + proto_name = <<"MQTT">>, + client_id = <<"test_pstate">>, + is_assigned = false, + conn_pid = self(), + username = <<"emqx">>, + is_super = false, + clean_start = false, + topic_aliases = #{}, + packet_size = 1000, + mountpoint = <<>>, + is_bridge = false, + enable_ban = false, + enable_acl = true, + acl_deny_action = disconnect, + recv_stats = #{msg => 0, pkt => 0}, + send_stats = SendStats, + connected = false, + ignore_loop = false, + topic_alias_maximum = #{to_client => 0, from_client => 0}}). + all() -> [ {group, mqtt_common}, @@ -55,7 +116,9 @@ groups() -> subscribe_v5]}, {acl, [sequence], - [acl_deny_action]}]. + [ + acl_deny_action_ct, + acl_deny_action_eunit]}]. init_per_suite(Config) -> [start_apps(App, SchemaFile, ConfigFile) || @@ -507,7 +570,7 @@ raw_recv_parse(P, ProtoVersion) -> emqx_frame:parse(P, {none, #{max_packet_size => ?MAX_PACKET_SIZE, version => ProtoVersion}}). -acl_deny_action(_) -> +acl_deny_action_ct(_) -> emqx_zone:set_env(external, acl_deny_action, disconnect), process_flag(trap_exit, true), [acl_deny_do_disconnect(publish, QoS, <<"acl_deny_action">>) || QoS <- lists:seq(0, 2)], @@ -515,6 +578,14 @@ acl_deny_action(_) -> emqx_zone:set_env(external, acl_deny_action, ignore), ok. +acl_deny_action_eunit(_) -> + PState = ?TEST_PSTATE(?MQTT_PROTO_V5, #{msg => 0, pkt => 0}), + CodeName = emqx_reason_codes:name(?RC_NOT_AUTHORIZED, ?MQTT_PROTO_V5), + {error, CodeName, NEWPSTATE1} = emqx_protocol:process_packet(?PUBLISH_PACKET(?QOS_1, <<"acl_deny_action">>, 1, <<"payload">>), PState), + ?assertEqual(#{pkt => 1, msg => 0}, NEWPSTATE1#pstate.send_stats), + {error, CodeName, NEWPSTATE2} = emqx_protocol:process_packet(?PUBLISH_PACKET(?QOS_2, <<"acl_deny_action">>, 2, <<"payload">>), PState), + ?assertEqual(#{pkt => 1, msg => 0}, NEWPSTATE2#pstate.send_stats). + will_check(_) -> process_flag(trap_exit, true), will_topic_check(0),