Fix the deliver packet bug and add test case
This commit is contained in:
parent
a5f9466040
commit
ec03f8e1fa
|
@ -392,8 +392,8 @@ process_packet(Packet = ?PUBLISH_PACKET(?QOS_1, Topic, PacketId, _Payload), PSta
|
||||||
?LOG(warning, "Cannot publish qos1 message to ~s for ~s",
|
?LOG(warning, "Cannot publish qos1 message to ~s for ~s",
|
||||||
[Topic, emqx_reason_codes:text(ReasonCode)]),
|
[Topic, emqx_reason_codes:text(ReasonCode)]),
|
||||||
case deliver({puback, PacketId, ReasonCode}, PState) of
|
case deliver({puback, PacketId, ReasonCode}, PState) of
|
||||||
{ok, _PState} ->
|
{ok, PState1} ->
|
||||||
do_acl_deny_action(Packet, ReasonCode, PState);
|
do_acl_deny_action(Packet, ReasonCode, PState1);
|
||||||
Error ->
|
Error ->
|
||||||
Error
|
Error
|
||||||
end
|
end
|
||||||
|
@ -406,9 +406,9 @@ process_packet(Packet = ?PUBLISH_PACKET(?QOS_2, Topic, PacketId, _Payload), PSta
|
||||||
{error, ReasonCode} ->
|
{error, ReasonCode} ->
|
||||||
?LOG(warning, "Cannot publish qos2 message to ~s for ~s",
|
?LOG(warning, "Cannot publish qos2 message to ~s for ~s",
|
||||||
[Topic, emqx_reason_codes:text(ReasonCode)]),
|
[Topic, emqx_reason_codes:text(ReasonCode)]),
|
||||||
case deliver({pubrec, PacketId, ?RC_NOT_AUTHORIZED}, PState) of
|
case deliver({pubrec, PacketId, ReasonCode}, PState) of
|
||||||
{ok, _PState} ->
|
{ok, PState1} ->
|
||||||
do_acl_deny_action(Packet, ReasonCode, PState);
|
do_acl_deny_action(Packet, ReasonCode, PState1);
|
||||||
Error ->
|
Error ->
|
||||||
Error
|
Error
|
||||||
end
|
end
|
||||||
|
@ -472,8 +472,12 @@ process_packet(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters)
|
||||||
{SubTopics, ReasonCodes} = {lists:reverse(ReverseSubTopics), lists:reverse(ReverseReasonCodes)},
|
{SubTopics, ReasonCodes} = {lists:reverse(ReverseSubTopics), lists:reverse(ReverseReasonCodes)},
|
||||||
?LOG(warning, "Cannot subscribe ~p for ~p",
|
?LOG(warning, "Cannot subscribe ~p for ~p",
|
||||||
[SubTopics, [emqx_reason_codes:text(R) || R <- ReasonCodes]]),
|
[SubTopics, [emqx_reason_codes:text(R) || R <- ReasonCodes]]),
|
||||||
deliver({suback, PacketId, ReasonCodes}, PState),
|
case deliver({suback, PacketId, ReasonCodes}, PState) of
|
||||||
do_acl_deny_action(Packet, ReasonCodes, PState)
|
{ok, PState1} ->
|
||||||
|
do_acl_deny_action(Packet, ReasonCodes, PState1);
|
||||||
|
Error ->
|
||||||
|
Error
|
||||||
|
end
|
||||||
end;
|
end;
|
||||||
|
|
||||||
process_packet(?UNSUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters),
|
process_packet(?UNSUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters),
|
||||||
|
|
|
@ -33,6 +33,67 @@
|
||||||
username = <<"emqx">>,
|
username = <<"emqx">>,
|
||||||
password = <<"public">>})).
|
password = <<"public">>})).
|
||||||
|
|
||||||
|
-record(pstate, {
|
||||||
|
zone,
|
||||||
|
sendfun,
|
||||||
|
peername,
|
||||||
|
peercert,
|
||||||
|
proto_ver,
|
||||||
|
proto_name,
|
||||||
|
client_id,
|
||||||
|
is_assigned,
|
||||||
|
conn_pid,
|
||||||
|
conn_props,
|
||||||
|
ack_props,
|
||||||
|
username,
|
||||||
|
session,
|
||||||
|
clean_start,
|
||||||
|
topic_aliases,
|
||||||
|
packet_size,
|
||||||
|
will_topic,
|
||||||
|
will_msg,
|
||||||
|
keepalive,
|
||||||
|
mountpoint,
|
||||||
|
is_super,
|
||||||
|
is_bridge,
|
||||||
|
enable_ban,
|
||||||
|
enable_acl,
|
||||||
|
acl_deny_action,
|
||||||
|
recv_stats,
|
||||||
|
send_stats,
|
||||||
|
connected,
|
||||||
|
connected_at,
|
||||||
|
ignore_loop,
|
||||||
|
topic_alias_maximum
|
||||||
|
}).
|
||||||
|
|
||||||
|
|
||||||
|
-define(TEST_PSTATE(ProtoVer, SendStats),
|
||||||
|
#pstate{zone = test,
|
||||||
|
sendfun = fun(_Packet, _Options) -> ok end,
|
||||||
|
peername = test_peername,
|
||||||
|
peercert = test_peercert,
|
||||||
|
proto_ver = ProtoVer,
|
||||||
|
proto_name = <<"MQTT">>,
|
||||||
|
client_id = <<"test_pstate">>,
|
||||||
|
is_assigned = false,
|
||||||
|
conn_pid = self(),
|
||||||
|
username = <<"emqx">>,
|
||||||
|
is_super = false,
|
||||||
|
clean_start = false,
|
||||||
|
topic_aliases = #{},
|
||||||
|
packet_size = 1000,
|
||||||
|
mountpoint = <<>>,
|
||||||
|
is_bridge = false,
|
||||||
|
enable_ban = false,
|
||||||
|
enable_acl = true,
|
||||||
|
acl_deny_action = disconnect,
|
||||||
|
recv_stats = #{msg => 0, pkt => 0},
|
||||||
|
send_stats = SendStats,
|
||||||
|
connected = false,
|
||||||
|
ignore_loop = false,
|
||||||
|
topic_alias_maximum = #{to_client => 0, from_client => 0}}).
|
||||||
|
|
||||||
all() ->
|
all() ->
|
||||||
[
|
[
|
||||||
{group, mqtt_common},
|
{group, mqtt_common},
|
||||||
|
@ -55,7 +116,9 @@ groups() ->
|
||||||
subscribe_v5]},
|
subscribe_v5]},
|
||||||
{acl,
|
{acl,
|
||||||
[sequence],
|
[sequence],
|
||||||
[acl_deny_action]}].
|
[
|
||||||
|
acl_deny_action_ct,
|
||||||
|
acl_deny_action_eunit]}].
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
[start_apps(App, SchemaFile, ConfigFile) ||
|
[start_apps(App, SchemaFile, ConfigFile) ||
|
||||||
|
@ -507,7 +570,7 @@ raw_recv_parse(P, ProtoVersion) ->
|
||||||
emqx_frame:parse(P, {none, #{max_packet_size => ?MAX_PACKET_SIZE,
|
emqx_frame:parse(P, {none, #{max_packet_size => ?MAX_PACKET_SIZE,
|
||||||
version => ProtoVersion}}).
|
version => ProtoVersion}}).
|
||||||
|
|
||||||
acl_deny_action(_) ->
|
acl_deny_action_ct(_) ->
|
||||||
emqx_zone:set_env(external, acl_deny_action, disconnect),
|
emqx_zone:set_env(external, acl_deny_action, disconnect),
|
||||||
process_flag(trap_exit, true),
|
process_flag(trap_exit, true),
|
||||||
[acl_deny_do_disconnect(publish, QoS, <<"acl_deny_action">>) || QoS <- lists:seq(0, 2)],
|
[acl_deny_do_disconnect(publish, QoS, <<"acl_deny_action">>) || QoS <- lists:seq(0, 2)],
|
||||||
|
@ -515,6 +578,14 @@ acl_deny_action(_) ->
|
||||||
emqx_zone:set_env(external, acl_deny_action, ignore),
|
emqx_zone:set_env(external, acl_deny_action, ignore),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
acl_deny_action_eunit(_) ->
|
||||||
|
PState = ?TEST_PSTATE(?MQTT_PROTO_V5, #{msg => 0, pkt => 0}),
|
||||||
|
CodeName = emqx_reason_codes:name(?RC_NOT_AUTHORIZED, ?MQTT_PROTO_V5),
|
||||||
|
{error, CodeName, NEWPSTATE1} = emqx_protocol:process_packet(?PUBLISH_PACKET(?QOS_1, <<"acl_deny_action">>, 1, <<"payload">>), PState),
|
||||||
|
?assertEqual(#{pkt => 1, msg => 0}, NEWPSTATE1#pstate.send_stats),
|
||||||
|
{error, CodeName, NEWPSTATE2} = emqx_protocol:process_packet(?PUBLISH_PACKET(?QOS_2, <<"acl_deny_action">>, 2, <<"payload">>), PState),
|
||||||
|
?assertEqual(#{pkt => 1, msg => 0}, NEWPSTATE2#pstate.send_stats).
|
||||||
|
|
||||||
will_check(_) ->
|
will_check(_) ->
|
||||||
process_flag(trap_exit, true),
|
process_flag(trap_exit, true),
|
||||||
will_topic_check(0),
|
will_topic_check(0),
|
||||||
|
|
Loading…
Reference in New Issue