fix(test): update test cases for emqx_channel_SUITE

This commit is contained in:
Shawn 2021-07-10 14:29:45 +08:00
parent 9cda6ab3c8
commit 4c122d0722
2 changed files with 159 additions and 17 deletions

View File

@ -207,7 +207,7 @@ init(ConnInfo = #{peername := {PeerHost, _Port},
Peercert = maps:get(peercert, ConnInfo, undefined), Peercert = maps:get(peercert, ConnInfo, undefined),
Protocol = maps:get(protocol, ConnInfo, mqtt), Protocol = maps:get(protocol, ConnInfo, mqtt),
MountPoint = case get_mqtt_conf(Zone, Listener, mountpoint) of MountPoint = case get_mqtt_conf(Zone, Listener, mountpoint) of
"" -> undefined; <<>> -> undefined;
MP -> MP MP -> MP
end, end,
QuotaPolicy = emqx_config:get_listener_conf(Zone, Listener, [rate_limit, quota]), QuotaPolicy = emqx_config:get_listener_conf(Zone, Listener, [rate_limit, quota]),

View File

@ -24,7 +24,149 @@
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
all() -> emqx_ct:all(?MODULE). all() ->
emqx_ct:all(?MODULE).
mqtt_conf() ->
#{await_rel_timeout => 300,
idle_timeout => 15000,
ignore_loop_deliver => false,
keepalive_backoff => 0.75,
max_awaiting_rel => 100,
max_clientid_len => 65535,
max_inflight => 32,
max_mqueue_len => 1000,
max_packet_size => 1048576,
max_qos_allowed => 2,
max_subscriptions => infinity,
max_topic_alias => 65535,
max_topic_levels => 65535,
mountpoint => <<>>,
mqueue_default_priority => highest,
mqueue_priorities => [],
mqueue_store_qos0 => true,
peer_cert_as_clientid => disabled,
peer_cert_as_username => disabled,
response_information => [],
retain_available => true,
retry_interval => 30,
server_keepalive => disabled,
session_expiry_interval => 7200,
shared_subscription => true,
strict_mode => false,
upgrade_qos => false,
use_username_as_clientid => false,
wildcard_subscription => true}.
listener_mqtt_tcp_conf() ->
#{acceptors => 16,
access_rules => ["allow all"],
bind => {{0,0,0,0},1883},
max_connections => 1024000,
proxy_protocol => false,
proxy_protocol_timeout => 3000,
rate_limit =>
#{conn_bytes_in =>
["100KB","10s"],
conn_messages_in =>
["100","10s"],
max_conn_rate => 1000,
quota =>
#{conn_messages_routing => infinity,
overall_messages_routing => infinity}},
tcp =>
#{active_n => 100,
backlog => 1024,
buffer => 4096,
high_watermark => 1048576,
send_timeout => 15000,
send_timeout_close =>
true},
type => tcp}.
listener_mqtt_ws_conf() ->
#{acceptors => 16,
access_rules => ["allow all"],
bind => {{0,0,0,0},8083},
max_connections => 1024000,
proxy_protocol => false,
proxy_protocol_timeout => 3000,
rate_limit =>
#{conn_bytes_in =>
["100KB","10s"],
conn_messages_in =>
["100","10s"],
max_conn_rate => 1000,
quota =>
#{conn_messages_routing => infinity,
overall_messages_routing => infinity}},
tcp =>
#{active_n => 100,
backlog => 1024,
buffer => 4096,
high_watermark => 1048576,
send_timeout => 15000,
send_timeout_close =>
true},
type => ws,
websocket =>
#{allow_origin_absence =>
true,
check_origin_enable =>
false,
check_origins => [],
compress => false,
deflate_opts =>
#{client_max_window_bits =>
15,
mem_level => 8,
server_max_window_bits =>
15},
fail_if_no_subprotocol =>
true,
idle_timeout => 86400000,
max_frame_size => infinity,
mqtt_path => "/mqtt",
mqtt_piggyback => multiple,
proxy_address_header =>
"x-forwarded-for",
proxy_port_header =>
"x-forwarded-port",
supported_subprotocols =>
["mqtt","mqtt-v3",
"mqtt-v3.1.1",
"mqtt-v5"]}}.
default_zone_conf() ->
#{zones =>
#{default =>
#{ acl => #{
cache => #{enable => true,max_size => 32, ttl => 60000},
deny_action => ignore,
enable => false
},
auth => #{enable => false},
overall_max_connections => infinity,
stats => #{enable => true},
conn_congestion =>
#{enable_alarm => true, min_alarm_sustain_duration => 60000},
flapping_detect =>
#{ban_time => 300000,enable => true,
max_count => 15,window_time => 60000},
force_gc =>
#{bytes => 16777216,count => 16000,
enable => true},
force_shutdown =>
#{enable => true,
max_heap_size => 4194304,
max_message_queue_len => 1000},
mqtt => mqtt_conf(),
listeners =>
#{mqtt_tcp => listener_mqtt_tcp_conf(),
mqtt_ws => listener_mqtt_ws_conf()}
}
}
}.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% CT Callbacks %% CT Callbacks
@ -50,6 +192,9 @@ init_per_suite(Config) ->
ok = meck:new(emqx_metrics, [passthrough, no_history, no_link]), ok = meck:new(emqx_metrics, [passthrough, no_history, no_link]),
ok = meck:expect(emqx_metrics, inc, fun(_) -> ok end), ok = meck:expect(emqx_metrics, inc, fun(_) -> ok end),
ok = meck:expect(emqx_metrics, inc, fun(_, _) -> ok end), ok = meck:expect(emqx_metrics, inc, fun(_, _) -> ok end),
%% Ban
meck:new(emqx_banned, [passthrough, no_history, no_link]),
ok = meck:expect(emqx_banned, check, fun(_ConnInfo) -> false end),
Config. Config.
end_per_suite(_Config) -> end_per_suite(_Config) ->
@ -62,11 +207,10 @@ end_per_suite(_Config) ->
]). ]).
init_per_testcase(_TestCase, Config) -> init_per_testcase(_TestCase, Config) ->
meck:new(emqx_zone, [passthrough, no_history, no_link]), emqx_config:put(default_zone_conf()),
Config. Config.
end_per_testcase(_TestCase, Config) -> end_per_testcase(_TestCase, Config) ->
meck:unload([emqx_zone]),
Config. Config.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -241,7 +385,7 @@ t_bad_receive_maximum(_) ->
fun(true, _ClientInfo, _ConnInfo) -> fun(true, _ClientInfo, _ConnInfo) ->
{ok, #{session => session(), present => false}} {ok, #{session => session(), present => false}}
end), end),
ok = meck:expect(emqx_zone, response_information, fun(_) -> test end), emqx_config:put_listener_conf(default, mqtt_tcp, [mqtt, response_information], test),
C1 = channel(#{conn_state => idle}), C1 = channel(#{conn_state => idle}),
{shutdown, protocol_error, _, _} = {shutdown, protocol_error, _, _} =
emqx_channel:handle_in( emqx_channel:handle_in(
@ -254,8 +398,8 @@ t_override_client_receive_maximum(_) ->
fun(true, _ClientInfo, _ConnInfo) -> fun(true, _ClientInfo, _ConnInfo) ->
{ok, #{session => session(), present => false}} {ok, #{session => session(), present => false}}
end), end),
ok = meck:expect(emqx_zone, response_information, fun(_) -> test end), emqx_config:put_listener_conf(default, mqtt_tcp, [mqtt, response_information], test),
ok = meck:expect(emqx_zone, max_inflight, fun(_) -> 0 end), emqx_config:put_listener_conf(default, mqtt_tcp, [mqtt, max_inflight], 0),
C1 = channel(#{conn_state => idle}), C1 = channel(#{conn_state => idle}),
ClientCapacity = 2, ClientCapacity = 2,
{ok, [{event, connected}, _ConnAck], C2} = {ok, [{event, connected}, _ConnAck], C2} =
@ -506,7 +650,7 @@ t_handle_out_connack_response_information(_) ->
fun(true, _ClientInfo, _ConnInfo) -> fun(true, _ClientInfo, _ConnInfo) ->
{ok, #{session => session(), present => false}} {ok, #{session => session(), present => false}}
end), end),
ok = meck:expect(emqx_zone, response_information, fun(_) -> test end), emqx_config:put_listener_conf(default, mqtt_tcp, [mqtt, response_information], test),
IdleChannel = channel(#{conn_state => idle}), IdleChannel = channel(#{conn_state => idle}),
{ok, [{event, connected}, {ok, [{event, connected},
{connack, ?CONNACK_PACKET(?RC_SUCCESS, 0, #{'Response-Information' := test})}], {connack, ?CONNACK_PACKET(?RC_SUCCESS, 0, #{'Response-Information' := test})}],
@ -520,7 +664,7 @@ t_handle_out_connack_not_response_information(_) ->
fun(true, _ClientInfo, _ConnInfo) -> fun(true, _ClientInfo, _ConnInfo) ->
{ok, #{session => session(), present => false}} {ok, #{session => session(), present => false}}
end), end),
ok = meck:expect(emqx_zone, response_information, fun(_) -> test end), emqx_config:put_listener_conf(default, mqtt_tcp, [mqtt, response_information], test),
IdleChannel = channel(#{conn_state => idle}), IdleChannel = channel(#{conn_state => idle}),
{ok, [{event, connected}, {connack, ?CONNACK_PACKET(?RC_SUCCESS, 0, AckProps)}], _} = {ok, [{event, connected}, {connack, ?CONNACK_PACKET(?RC_SUCCESS, 0, AckProps)}], _} =
emqx_channel:handle_in( emqx_channel:handle_in(
@ -660,9 +804,6 @@ t_enrich_conninfo(_) ->
t_enrich_client(_) -> t_enrich_client(_) ->
{ok, _ConnPkt, _Chan} = emqx_channel:enrich_client(connpkt(), channel()). {ok, _ConnPkt, _Chan} = emqx_channel:enrich_client(connpkt(), channel()).
t_check_banned(_) ->
ok = emqx_channel:check_banned(connpkt(), channel()).
t_auth_connect(_) -> t_auth_connect(_) ->
{ok, _Chan} = emqx_channel:auth_connect(connpkt(), channel()). {ok, _Chan} = emqx_channel:auth_connect(connpkt(), channel()).
@ -709,7 +850,7 @@ t_packing_alias(_) ->
channel())). channel())).
t_check_pub_acl(_) -> t_check_pub_acl(_) ->
ok = meck:expect(emqx_zone, enable_acl, fun(_) -> true end), emqx_config:put_listener_conf(default, mqtt_tcp, [acl, enable], true),
Publish = ?PUBLISH_PACKET(?QOS_0, <<"t">>, 1, <<"payload">>), Publish = ?PUBLISH_PACKET(?QOS_0, <<"t">>, 1, <<"payload">>),
ok = emqx_channel:check_pub_acl(Publish, channel()). ok = emqx_channel:check_pub_acl(Publish, channel()).
@ -719,7 +860,7 @@ t_check_pub_alias(_) ->
ok = emqx_channel:check_pub_alias(#mqtt_packet{variable = Publish}, Channel). ok = emqx_channel:check_pub_alias(#mqtt_packet{variable = Publish}, Channel).
t_check_sub_acls(_) -> t_check_sub_acls(_) ->
ok = meck:expect(emqx_zone, enable_acl, fun(_) -> true end), emqx_config:put_listener_conf(default, mqtt_tcp, [acl, enable], true),
TopicFilter = {<<"t">>, ?DEFAULT_SUBOPTS}, TopicFilter = {<<"t">>, ?DEFAULT_SUBOPTS},
[{TopicFilter, 0}] = emqx_channel:check_sub_acls([TopicFilter], channel()). [{TopicFilter, 0}] = emqx_channel:check_sub_acls([TopicFilter], channel()).
@ -763,7 +904,7 @@ t_ws_cookie_init(_) ->
conn_mod => emqx_ws_connection, conn_mod => emqx_ws_connection,
ws_cookie => WsCookie ws_cookie => WsCookie
}, },
Channel = emqx_channel:init(ConnInfo, [{zone, zone}]), Channel = emqx_channel:init(ConnInfo, #{zone => default, listener => mqtt_tcp}),
?assertMatch(#{ws_cookie := WsCookie}, emqx_channel:info(clientinfo, Channel)). ?assertMatch(#{ws_cookie := WsCookie}, emqx_channel:info(clientinfo, Channel)).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -788,7 +929,7 @@ channel(InitFields) ->
maps:fold(fun(Field, Value, Channel) -> maps:fold(fun(Field, Value, Channel) ->
emqx_channel:set_field(Field, Value, Channel) emqx_channel:set_field(Field, Value, Channel)
end, end,
emqx_channel:init(ConnInfo, [{zone, zone}]), emqx_channel:init(ConnInfo, #{zone => default, listener => mqtt_tcp}),
maps:merge(#{clientinfo => clientinfo(), maps:merge(#{clientinfo => clientinfo(),
session => session(), session => session(),
conn_state => connected conn_state => connected
@ -796,7 +937,8 @@ channel(InitFields) ->
clientinfo() -> clientinfo(#{}). clientinfo() -> clientinfo(#{}).
clientinfo(InitProps) -> clientinfo(InitProps) ->
maps:merge(#{zone => zone, maps:merge(#{zone => default,
listener => mqtt_tcp,
protocol => mqtt, protocol => mqtt,
peerhost => {127,0,0,1}, peerhost => {127,0,0,1},
clientid => <<"clientid">>, clientid => <<"clientid">>,