1282 lines
47 KiB
Erlang
1282 lines
47 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2019-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(emqx_channel_SUITE).
|
|
|
|
-compile(export_all).
|
|
-compile(nowarn_export_all).
|
|
|
|
-include_lib("emqx/include/emqx.hrl").
|
|
-include_lib("emqx/include/emqx_mqtt.hrl").
|
|
-include_lib("eunit/include/eunit.hrl").
|
|
-include_lib("common_test/include/ct.hrl").
|
|
|
|
all() ->
|
|
emqx_common_test_helpers:all(?MODULE).
|
|
|
|
force_gc_conf() ->
|
|
#{bytes => 16777216, count => 16000, enable => true}.
|
|
|
|
force_shutdown_conf() ->
|
|
#{enable => true, max_heap_size => 4194304, max_message_queue_len => 1000}.
|
|
|
|
rpc_conf() ->
|
|
#{
|
|
async_batch_size => 256,
|
|
authentication_timeout => 5000,
|
|
call_receive_timeout => 15000,
|
|
connect_timeout => 5000,
|
|
mode => async,
|
|
port_discovery => stateless,
|
|
send_timeout => 5000,
|
|
socket_buffer => 1048576,
|
|
socket_keepalive_count => 9,
|
|
socket_keepalive_idle => 900,
|
|
socket_keepalive_interval => 75,
|
|
socket_recbuf => 1048576,
|
|
socket_sndbuf => 1048576,
|
|
tcp_client_num => 1,
|
|
tcp_server_port => 5369
|
|
}.
|
|
|
|
mqtt_conf() ->
|
|
#{
|
|
await_rel_timeout => 300000,
|
|
idle_timeout => 15000,
|
|
ignore_loop_deliver => false,
|
|
keepalive_backoff => 0.75,
|
|
max_awaiting_rel => 100,
|
|
max_clientid_len => 65535,
|
|
max_inflight => 32,
|
|
max_mqueue_len => 1000,
|
|
max_packet_size => 1048576,
|
|
max_qos_allowed => 2,
|
|
max_subscriptions => infinity,
|
|
max_topic_alias => 65535,
|
|
max_topic_levels => 128,
|
|
mqueue_default_priority => lowest,
|
|
mqueue_priorities => disabled,
|
|
mqueue_store_qos0 => true,
|
|
peer_cert_as_clientid => disabled,
|
|
peer_cert_as_username => disabled,
|
|
response_information => [],
|
|
retain_available => true,
|
|
retry_interval => 30000,
|
|
server_keepalive => disabled,
|
|
session_expiry_interval => 7200000,
|
|
shared_subscription => true,
|
|
strict_mode => false,
|
|
upgrade_qos => false,
|
|
use_username_as_clientid => false,
|
|
wildcard_subscription => true
|
|
}.
|
|
|
|
listener_mqtt_tcp_conf() ->
|
|
#{
|
|
acceptors => 16,
|
|
zone => default,
|
|
access_rules => ["allow all"],
|
|
bind => {{0, 0, 0, 0}, 1883},
|
|
max_connections => 1024000,
|
|
mountpoint => <<>>,
|
|
proxy_protocol => false,
|
|
proxy_protocol_timeout => 3000,
|
|
tcp_options => #{
|
|
active_n => 100,
|
|
backlog => 1024,
|
|
buffer => 4096,
|
|
high_watermark => 1048576,
|
|
nodelay => false,
|
|
reuseaddr => true,
|
|
send_timeout => 15000,
|
|
send_timeout_close => true
|
|
}
|
|
}.
|
|
|
|
listener_mqtt_ws_conf() ->
|
|
#{
|
|
acceptors => 16,
|
|
zone => default,
|
|
access_rules => ["allow all"],
|
|
bind => {{0, 0, 0, 0}, 8083},
|
|
max_connections => 1024000,
|
|
mountpoint => <<>>,
|
|
proxy_protocol => false,
|
|
proxy_protocol_timeout => 3000,
|
|
tcp_options =>
|
|
#{
|
|
active_n => 100,
|
|
backlog => 1024,
|
|
buffer => 4096,
|
|
high_watermark => 1048576,
|
|
nodelay => false,
|
|
reuseaddr => true,
|
|
send_timeout => 15000,
|
|
send_timeout_close => true
|
|
},
|
|
websocket =>
|
|
#{
|
|
allow_origin_absence => true,
|
|
check_origin_enable => false,
|
|
check_origins => [],
|
|
compress => false,
|
|
deflate_opts =>
|
|
#{
|
|
client_max_window_bits => 15,
|
|
mem_level => 8,
|
|
server_max_window_bits => 15
|
|
},
|
|
fail_if_no_subprotocol => true,
|
|
idle_timeout => 86400000,
|
|
max_frame_size => infinity,
|
|
mqtt_path => "/mqtt",
|
|
mqtt_piggyback => multiple,
|
|
% should allow uppercase in config
|
|
proxy_address_header => "X-Forwarded-For",
|
|
proxy_port_header => "x-forwarded-port",
|
|
supported_subprotocols =>
|
|
["mqtt", "mqtt-v3", "mqtt-v3.1.1", "mqtt-v5"]
|
|
}
|
|
}.
|
|
|
|
listeners_conf() ->
|
|
#{
|
|
tcp => #{default => listener_mqtt_tcp_conf()},
|
|
ws => #{default => listener_mqtt_ws_conf()}
|
|
}.
|
|
|
|
limiter_conf() ->
|
|
Make = fun() ->
|
|
#{
|
|
burst => 0,
|
|
rate => infinity,
|
|
capacity => infinity
|
|
}
|
|
end,
|
|
|
|
lists:foldl(
|
|
fun(Name, Acc) ->
|
|
Acc#{Name => Make()}
|
|
end,
|
|
#{},
|
|
[bytes_in, message_in, message_routing, connection, internal]
|
|
).
|
|
|
|
stats_conf() ->
|
|
#{enable => true}.
|
|
|
|
zone_conf() ->
|
|
#{}.
|
|
|
|
basic_conf() ->
|
|
#{
|
|
force_gc => force_gc_conf(),
|
|
force_shutdown => force_shutdown_conf(),
|
|
mqtt => mqtt_conf(),
|
|
rpc => rpc_conf(),
|
|
stats => stats_conf(),
|
|
listeners => listeners_conf(),
|
|
zones => zone_conf(),
|
|
limiter => limiter_conf()
|
|
}.
|
|
|
|
set_test_listener_confs() ->
|
|
Conf = emqx_config:get([], #{}),
|
|
emqx_config:put(basic_conf()),
|
|
Conf.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% CT Callbacks
|
|
%%--------------------------------------------------------------------
|
|
|
|
init_per_suite(Config) ->
|
|
%% CM Meck
|
|
ok = meck:new(emqx_cm, [passthrough, no_history, no_link]),
|
|
ok = meck:expect(emqx_cm, mark_channel_connected, fun(_) -> ok end),
|
|
ok = meck:expect(emqx_cm, mark_channel_disconnected, fun(_) -> ok end),
|
|
%% Broker Meck
|
|
ok = meck:new(emqx_broker, [passthrough, no_history, no_link]),
|
|
%% Hooks Meck
|
|
ok = meck:new(emqx_hooks, [passthrough, no_history, no_link]),
|
|
ok = meck:expect(emqx_hooks, run, fun(_Hook, _Args) -> ok end),
|
|
ok = meck:expect(emqx_hooks, run_fold, fun(_Hook, _Args, Acc) -> Acc end),
|
|
%% Session Meck
|
|
ok = meck:new(emqx_session, [passthrough, no_history, no_link]),
|
|
%% Metrics
|
|
ok = meck:new(emqx_metrics, [passthrough, no_history, no_link]),
|
|
ok = meck:expect(emqx_metrics, inc, fun(_) -> ok end),
|
|
ok = meck:expect(emqx_metrics, inc, fun(_, _) -> ok end),
|
|
%% Ban
|
|
meck:new(emqx_banned, [passthrough, no_history, no_link]),
|
|
ok = meck:expect(emqx_banned, check, fun(_ConnInfo) -> false end),
|
|
Config.
|
|
|
|
end_per_suite(_Config) ->
|
|
meck:unload([
|
|
emqx_metrics,
|
|
emqx_session,
|
|
emqx_broker,
|
|
emqx_hooks,
|
|
emqx_cm,
|
|
emqx_banned
|
|
]).
|
|
|
|
init_per_testcase(_TestCase, Config) ->
|
|
%% Access Control Meck
|
|
ok = meck:new(emqx_access_control, [passthrough, no_history, no_link]),
|
|
ok = meck:expect(
|
|
emqx_access_control,
|
|
authenticate,
|
|
fun(_) -> {ok, #{is_superuser => false}} end
|
|
),
|
|
ok = meck:expect(emqx_access_control, authorize, fun(_, _, _) -> allow end),
|
|
%% Set confs
|
|
OldConf = set_test_listener_confs(),
|
|
emqx_common_test_helpers:start_apps([]),
|
|
[{config, OldConf} | Config].
|
|
|
|
end_per_testcase(_TestCase, Config) ->
|
|
meck:unload([emqx_access_control]),
|
|
emqx_config:put(?config(config, Config)),
|
|
emqx_common_test_helpers:stop_apps([]),
|
|
Config.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Test cases for channel info/stats/caps
|
|
%%--------------------------------------------------------------------
|
|
|
|
t_chan_info(_) ->
|
|
#{
|
|
conn_state := connected,
|
|
clientinfo := ClientInfo
|
|
} = emqx_channel:info(channel()),
|
|
?assertEqual(clientinfo(), ClientInfo).
|
|
|
|
t_chan_caps(_) ->
|
|
?assertMatch(
|
|
#{
|
|
max_clientid_len := 65535,
|
|
max_qos_allowed := 2,
|
|
max_topic_alias := 65535,
|
|
max_topic_levels := Level,
|
|
retain_available := true,
|
|
shared_subscription := true,
|
|
subscription_identifiers := true,
|
|
wildcard_subscription := true
|
|
} when is_integer(Level),
|
|
emqx_channel:caps(channel())
|
|
).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Test cases for channel handle_in
|
|
%%--------------------------------------------------------------------
|
|
|
|
t_handle_in_connect_packet_sucess(_) ->
|
|
ok = meck:expect(
|
|
emqx_cm,
|
|
open_session,
|
|
fun(true, _ClientInfo, _ConnInfo) ->
|
|
{ok, #{session => session(), present => false}}
|
|
end
|
|
),
|
|
IdleChannel = channel(#{conn_state => idle}),
|
|
{ok, [{event, connected}, {connack, ?CONNACK_PACKET(?RC_SUCCESS, 0, _)}], Channel} =
|
|
emqx_channel:handle_in(?CONNECT_PACKET(connpkt()), IdleChannel),
|
|
ClientInfo = emqx_channel:info(clientinfo, Channel),
|
|
?assertMatch(
|
|
#{
|
|
clientid := <<"clientid">>,
|
|
username := <<"username">>
|
|
},
|
|
ClientInfo
|
|
),
|
|
?assertEqual(connected, emqx_channel:info(conn_state, Channel)).
|
|
|
|
t_handle_in_unexpected_connect_packet(_) ->
|
|
Channel = emqx_channel:set_field(conn_state, connected, channel()),
|
|
Packet = ?DISCONNECT_PACKET(?RC_PROTOCOL_ERROR),
|
|
{ok, [{outgoing, Packet}, {close, protocol_error}], Channel} =
|
|
emqx_channel:handle_in(?CONNECT_PACKET(connpkt()), Channel).
|
|
|
|
t_handle_in_unexpected_packet(_) ->
|
|
Channel = emqx_channel:set_field(conn_state, idle, channel()),
|
|
Packet = ?DISCONNECT_PACKET(?RC_PROTOCOL_ERROR),
|
|
{ok, [{outgoing, Packet}, {close, protocol_error}], Channel} =
|
|
emqx_channel:handle_in(?PUBLISH_PACKET(?QOS_0), Channel).
|
|
|
|
% t_handle_in_connect_auth_failed(_) ->
|
|
% ConnPkt = #mqtt_packet_connect{
|
|
% proto_name = <<"MQTT">>,
|
|
% proto_ver = ?MQTT_PROTO_V5,
|
|
% is_bridge = false,
|
|
% clean_start = true,
|
|
% keepalive = 30,
|
|
% properties = #{
|
|
% 'Authentication-Method' => <<"failed_auth_method">>,
|
|
% 'Authentication-Data' => <<"failed_auth_data">>
|
|
% },
|
|
% clientid = <<"clientid">>,
|
|
% username = <<"username">>
|
|
% },
|
|
% {shutdown, not_authorized, ?CONNACK_PACKET(?RC_NOT_AUTHORIZED), _} =
|
|
% emqx_channel:handle_in(?CONNECT_PACKET(ConnPkt), channel(#{conn_state => idle})).
|
|
|
|
t_handle_in_continue_auth(_) ->
|
|
Properties = #{
|
|
'Authentication-Method' => <<"failed_auth_method">>,
|
|
'Authentication-Data' => <<"failed_auth_data">>
|
|
},
|
|
|
|
Channel1 = channel(#{conn_state => connected}),
|
|
{ok, [{outgoing, ?DISCONNECT_PACKET(?RC_PROTOCOL_ERROR)}, {close, protocol_error}], Channel1} =
|
|
emqx_channel:handle_in(?AUTH_PACKET(?RC_CONTINUE_AUTHENTICATION, Properties), Channel1),
|
|
|
|
Channel2 = channel(#{conn_state => connecting}),
|
|
ConnInfo = emqx_channel:info(conninfo, Channel2),
|
|
Channel3 = emqx_channel:set_field(conninfo, ConnInfo#{conn_props => Properties}, Channel2),
|
|
|
|
{ok, [{event, connected}, {connack, ?CONNACK_PACKET(?RC_SUCCESS)}], _} =
|
|
emqx_channel:handle_in(
|
|
?AUTH_PACKET(?RC_CONTINUE_AUTHENTICATION, Properties), Channel3
|
|
).
|
|
|
|
t_handle_in_re_auth(_) ->
|
|
Properties = #{
|
|
'Authentication-Method' => <<"failed_auth_method">>,
|
|
'Authentication-Data' => <<"failed_auth_data">>
|
|
},
|
|
{ok,
|
|
[
|
|
{outgoing, ?DISCONNECT_PACKET(?RC_BAD_AUTHENTICATION_METHOD)},
|
|
{close, bad_authentication_method}
|
|
],
|
|
_} =
|
|
emqx_channel:handle_in(
|
|
?AUTH_PACKET(?RC_RE_AUTHENTICATE, Properties),
|
|
channel()
|
|
),
|
|
{ok,
|
|
[
|
|
{outgoing, ?DISCONNECT_PACKET(?RC_BAD_AUTHENTICATION_METHOD)},
|
|
{close, bad_authentication_method}
|
|
],
|
|
_} =
|
|
emqx_channel:handle_in(
|
|
?AUTH_PACKET(?RC_RE_AUTHENTICATE, Properties),
|
|
channel(#{conninfo => #{proto_ver => ?MQTT_PROTO_V5, conn_props => undefined}})
|
|
),
|
|
|
|
Channel1 = channel(),
|
|
ConnInfo = emqx_channel:info(conninfo, Channel1),
|
|
Channel2 = emqx_channel:set_field(conninfo, ConnInfo#{conn_props => Properties}, Channel1),
|
|
|
|
{ok, ?AUTH_PACKET(?RC_SUCCESS), _} =
|
|
emqx_channel:handle_in(
|
|
?AUTH_PACKET(?RC_RE_AUTHENTICATE, Properties), Channel2
|
|
).
|
|
|
|
t_handle_in_qos0_publish(_) ->
|
|
ok = meck:expect(emqx_broker, publish, fun(_) -> [] end),
|
|
Channel = channel(#{conn_state => connected}),
|
|
Publish = ?PUBLISH_PACKET(?QOS_0, <<"topic">>, undefined, <<"payload">>),
|
|
{ok, _NChannel} = emqx_channel:handle_in(Publish, Channel).
|
|
|
|
t_handle_in_qos1_publish(_) ->
|
|
ok = meck:expect(emqx_broker, publish, fun(_) -> [] end),
|
|
Publish = ?PUBLISH_PACKET(?QOS_1, <<"topic">>, 1, <<"payload">>),
|
|
{ok, ?PUBACK_PACKET(1, ?RC_NO_MATCHING_SUBSCRIBERS), _Channel} =
|
|
emqx_channel:handle_in(Publish, channel(#{conn_state => connected})).
|
|
|
|
t_handle_in_qos2_publish(_) ->
|
|
ok = meck:expect(emqx_broker, publish, fun(_) -> [{node(), <<"topic">>, {ok, 1}}] end),
|
|
Channel = channel(#{conn_state => connected, session => session()}),
|
|
%% waiting limiter server
|
|
timer:sleep(200),
|
|
Publish1 = ?PUBLISH_PACKET(?QOS_2, <<"topic">>, 1, <<"payload">>),
|
|
{ok, ?PUBREC_PACKET(1, ?RC_SUCCESS), Channel1} =
|
|
emqx_channel:handle_in(Publish1, Channel),
|
|
ok = meck:expect(emqx_broker, publish, fun(_) -> [] end),
|
|
Publish2 = ?PUBLISH_PACKET(?QOS_2, <<"topic">>, 2, <<"payload">>),
|
|
{ok, ?PUBREC_PACKET(2, ?RC_NO_MATCHING_SUBSCRIBERS), Channel2} =
|
|
emqx_channel:handle_in(Publish2, Channel1),
|
|
?assertEqual(2, proplists:get_value(awaiting_rel_cnt, emqx_channel:stats(Channel2))).
|
|
|
|
t_handle_in_qos2_publish_with_error_return(_) ->
|
|
ok = meck:expect(emqx_metrics, inc, fun(_) -> ok end),
|
|
ok = meck:expect(emqx_broker, publish, fun(_) -> [] end),
|
|
Session = session(#{max_awaiting_rel => 2, awaiting_rel => #{1 => 1}}),
|
|
Channel = channel(#{conn_state => connected, session => Session}),
|
|
%% waiting limiter server
|
|
timer:sleep(200),
|
|
Publish1 = ?PUBLISH_PACKET(?QOS_2, <<"topic">>, 1, <<"payload">>),
|
|
{ok, ?PUBREC_PACKET(1, ?RC_PACKET_IDENTIFIER_IN_USE), Channel} =
|
|
emqx_channel:handle_in(Publish1, Channel),
|
|
Publish2 = ?PUBLISH_PACKET(?QOS_2, <<"topic">>, 2, <<"payload">>),
|
|
{ok, ?PUBREC_PACKET(2, ?RC_NO_MATCHING_SUBSCRIBERS), Channel1} =
|
|
emqx_channel:handle_in(Publish2, Channel),
|
|
Publish3 = ?PUBLISH_PACKET(?QOS_2, <<"topic">>, 3, <<"payload">>),
|
|
{ok,
|
|
[
|
|
{outgoing, ?DISCONNECT_PACKET(?RC_RECEIVE_MAXIMUM_EXCEEDED)},
|
|
{close, receive_maximum_exceeded}
|
|
],
|
|
Channel1} =
|
|
emqx_channel:handle_in(Publish3, Channel1).
|
|
|
|
t_handle_in_puback_ok(_) ->
|
|
Msg = emqx_message:make(<<"t">>, <<"payload">>),
|
|
ok = meck:expect(
|
|
emqx_session,
|
|
puback,
|
|
fun(_, _PacketId, Session) -> {ok, Msg, Session} end
|
|
),
|
|
Channel = channel(#{conn_state => connected}),
|
|
{ok, _NChannel} = emqx_channel:handle_in(?PUBACK_PACKET(1, ?RC_SUCCESS), Channel).
|
|
% ?assertEqual(#{puback_in => 1}, emqx_channel:info(pub_stats, NChannel)).
|
|
|
|
t_handle_in_puback_id_in_use(_) ->
|
|
ok = meck:expect(
|
|
emqx_session,
|
|
puback,
|
|
fun(_, _, _Session) ->
|
|
{error, ?RC_PACKET_IDENTIFIER_IN_USE}
|
|
end
|
|
),
|
|
{ok, _Channel} = emqx_channel:handle_in(?PUBACK_PACKET(1, ?RC_SUCCESS), channel()).
|
|
% ?assertEqual(#{puback_in => 1}, emqx_channel:info(pub_stats, Channel)).
|
|
|
|
t_handle_in_puback_id_not_found(_) ->
|
|
ok = meck:expect(
|
|
emqx_session,
|
|
puback,
|
|
fun(_, _, _Session) ->
|
|
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}
|
|
end
|
|
),
|
|
{ok, _Channel} = emqx_channel:handle_in(?PUBACK_PACKET(1, ?RC_SUCCESS), channel()).
|
|
% ?assertEqual(#{puback_in => 1}, emqx_channel:info(pub_stats, Channel)).
|
|
|
|
t_bad_receive_maximum(_) ->
|
|
ok = meck:expect(
|
|
emqx_cm,
|
|
open_session,
|
|
fun(true, _ClientInfo, _ConnInfo) ->
|
|
{ok, #{session => session(), present => false}}
|
|
end
|
|
),
|
|
emqx_config:put_zone_conf(default, [mqtt, response_information], test),
|
|
C1 = channel(#{conn_state => idle}),
|
|
{shutdown, protocol_error, _, _} =
|
|
emqx_channel:handle_in(
|
|
?CONNECT_PACKET(connpkt(#{'Receive-Maximum' => 0})),
|
|
C1
|
|
).
|
|
|
|
t_override_client_receive_maximum(_) ->
|
|
ok = meck:expect(
|
|
emqx_cm,
|
|
open_session,
|
|
fun(true, _ClientInfo, _ConnInfo) ->
|
|
{ok, #{session => session(), present => false}}
|
|
end
|
|
),
|
|
emqx_config:put_zone_conf(default, [mqtt, response_information], test),
|
|
emqx_config:put_zone_conf(default, [mqtt, max_inflight], 0),
|
|
C1 = channel(#{conn_state => idle}),
|
|
ClientCapacity = 2,
|
|
{ok, [{event, connected}, _ConnAck], C2} =
|
|
emqx_channel:handle_in(
|
|
?CONNECT_PACKET(connpkt(#{'Receive-Maximum' => ClientCapacity})),
|
|
C1
|
|
),
|
|
ConnInfo = emqx_channel:info(conninfo, C2),
|
|
?assertEqual(ClientCapacity, maps:get(receive_maximum, ConnInfo)).
|
|
|
|
t_handle_in_pubrec_ok(_) ->
|
|
Msg = emqx_message:make(test, ?QOS_2, <<"t">>, <<"payload">>),
|
|
ok = meck:expect(emqx_session, pubrec, fun(_, _, Session) -> {ok, Msg, Session} end),
|
|
Channel = channel(#{conn_state => connected}),
|
|
{ok, ?PUBREL_PACKET(1, ?RC_SUCCESS), _Channel1} =
|
|
emqx_channel:handle_in(?PUBREC_PACKET(1, ?RC_SUCCESS), Channel).
|
|
|
|
t_handle_in_pubrec_id_in_use(_) ->
|
|
ok = meck:expect(
|
|
emqx_session,
|
|
pubrec,
|
|
fun(_, _, _Session) ->
|
|
{error, ?RC_PACKET_IDENTIFIER_IN_USE}
|
|
end
|
|
),
|
|
{ok, ?PUBREL_PACKET(1, ?RC_PACKET_IDENTIFIER_IN_USE), _Channel} =
|
|
emqx_channel:handle_in(?PUBREC_PACKET(1, ?RC_SUCCESS), channel()).
|
|
|
|
t_handle_in_pubrec_id_not_found(_) ->
|
|
ok = meck:expect(
|
|
emqx_session,
|
|
pubrec,
|
|
fun(_, _, _Session) ->
|
|
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}
|
|
end
|
|
),
|
|
{ok, ?PUBREL_PACKET(1, ?RC_PACKET_IDENTIFIER_NOT_FOUND), _Channel} =
|
|
emqx_channel:handle_in(?PUBREC_PACKET(1, ?RC_SUCCESS), channel()).
|
|
|
|
t_handle_in_pubrel_ok(_) ->
|
|
ok = meck:expect(emqx_session, pubrel, fun(_, _, Session) -> {ok, Session} end),
|
|
Channel = channel(#{conn_state => connected}),
|
|
{ok, ?PUBCOMP_PACKET(1, ?RC_SUCCESS), _Channel1} =
|
|
emqx_channel:handle_in(?PUBREL_PACKET(1, ?RC_SUCCESS), Channel).
|
|
|
|
t_handle_in_pubrel_not_found_error(_) ->
|
|
ok = meck:expect(
|
|
emqx_session,
|
|
pubrel,
|
|
fun(_, _PacketId, _Session) ->
|
|
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}
|
|
end
|
|
),
|
|
{ok, ?PUBCOMP_PACKET(1, ?RC_PACKET_IDENTIFIER_NOT_FOUND), _Channel} =
|
|
emqx_channel:handle_in(?PUBREL_PACKET(1, ?RC_SUCCESS), channel()).
|
|
|
|
t_handle_in_pubcomp_ok(_) ->
|
|
ok = meck:expect(emqx_session, pubcomp, fun(_, _, Session) -> {ok, Session} end),
|
|
{ok, _Channel} = emqx_channel:handle_in(?PUBCOMP_PACKET(1, ?RC_SUCCESS), channel()).
|
|
% ?assertEqual(#{pubcomp_in => 1}, emqx_channel:info(pub_stats, Channel)).
|
|
|
|
t_handle_in_pubcomp_not_found_error(_) ->
|
|
ok = meck:expect(
|
|
emqx_session,
|
|
pubcomp,
|
|
fun(_, _PacketId, _Session) ->
|
|
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}
|
|
end
|
|
),
|
|
Channel = channel(#{conn_state => connected}),
|
|
{ok, _Channel1} = emqx_channel:handle_in(?PUBCOMP_PACKET(1, ?RC_SUCCESS), Channel).
|
|
|
|
t_handle_in_subscribe(_) ->
|
|
ok = meck:expect(
|
|
emqx_session,
|
|
subscribe,
|
|
fun(_, _, _, Session) -> {ok, Session} end
|
|
),
|
|
Channel = channel(#{conn_state => connected}),
|
|
TopicFilters = [{<<"+">>, ?DEFAULT_SUBOPTS}],
|
|
Subscribe = ?SUBSCRIBE_PACKET(1, #{}, TopicFilters),
|
|
Replies = [{outgoing, ?SUBACK_PACKET(1, [?QOS_0])}, {event, updated}],
|
|
{ok, Replies, _Chan} = emqx_channel:handle_in(Subscribe, Channel).
|
|
|
|
t_handle_in_unsubscribe(_) ->
|
|
ok = meck:expect(
|
|
emqx_session,
|
|
unsubscribe,
|
|
fun(_, _, _, Session) ->
|
|
{ok, Session}
|
|
end
|
|
),
|
|
Channel = channel(#{conn_state => connected}),
|
|
{ok, [{outgoing, ?UNSUBACK_PACKET(1)}, {event, updated}], _Chan} =
|
|
emqx_channel:handle_in(?UNSUBSCRIBE_PACKET(1, #{}, [<<"+">>]), Channel).
|
|
|
|
t_handle_in_pingreq(_) ->
|
|
{ok, ?PACKET(?PINGRESP), _Channel} =
|
|
emqx_channel:handle_in(?PACKET(?PINGREQ), channel()).
|
|
|
|
t_handle_in_disconnect(_) ->
|
|
Packet = ?DISCONNECT_PACKET(?RC_SUCCESS),
|
|
Channel = channel(#{conn_state => connected}),
|
|
{ok, {close, normal}, Channel1} = emqx_channel:handle_in(Packet, Channel),
|
|
?assertEqual(undefined, emqx_channel:info(will_msg, Channel1)).
|
|
|
|
t_handle_in_auth(_) ->
|
|
Channel = channel(#{conn_state => connected}),
|
|
Packet = ?DISCONNECT_PACKET(?RC_PROTOCOL_ERROR),
|
|
{ok, [{outgoing, Packet}, {close, protocol_error}], Channel} =
|
|
emqx_channel:handle_in(?AUTH_PACKET(), Channel).
|
|
|
|
t_handle_in_frame_error(_) ->
|
|
IdleChannel = channel(#{conn_state => idle}),
|
|
{shutdown, frame_too_large, _Chan} =
|
|
emqx_channel:handle_in({frame_error, frame_too_large}, IdleChannel),
|
|
ConnectingChan = channel(#{conn_state => connecting}),
|
|
ConnackPacket = ?CONNACK_PACKET(?RC_PACKET_TOO_LARGE),
|
|
{shutdown, frame_too_large, ConnackPacket, _} =
|
|
emqx_channel:handle_in({frame_error, frame_too_large}, ConnectingChan),
|
|
DisconnectPacket = ?DISCONNECT_PACKET(?RC_PACKET_TOO_LARGE),
|
|
ConnectedChan = channel(#{conn_state => connected}),
|
|
{ok, [{outgoing, DisconnectPacket}, {close, frame_too_large}], _} =
|
|
emqx_channel:handle_in({frame_error, frame_too_large}, ConnectedChan),
|
|
DisconnectedChan = channel(#{conn_state => disconnected}),
|
|
{ok, DisconnectedChan} =
|
|
emqx_channel:handle_in({frame_error, frame_too_large}, DisconnectedChan).
|
|
|
|
t_handle_in_expected_packet(_) ->
|
|
Packet = ?DISCONNECT_PACKET(?RC_PROTOCOL_ERROR),
|
|
{ok, [{outgoing, Packet}, {close, protocol_error}], _Chan} =
|
|
emqx_channel:handle_in(packet, channel()).
|
|
|
|
t_process_connect(_) ->
|
|
ok = meck:expect(
|
|
emqx_cm,
|
|
open_session,
|
|
fun(true, _ClientInfo, _ConnInfo) ->
|
|
{ok, #{session => session(), present => false}}
|
|
end
|
|
),
|
|
{ok, [{event, connected}, {connack, ?CONNACK_PACKET(?RC_SUCCESS)}], _Chan} =
|
|
emqx_channel:process_connect(#{}, channel(#{conn_state => idle})).
|
|
|
|
t_process_publish_qos0(_) ->
|
|
ok = meck:expect(emqx_broker, publish, fun(_) -> [] end),
|
|
Publish = ?PUBLISH_PACKET(?QOS_0, <<"t">>, 1, <<"payload">>),
|
|
{ok, _Channel} = emqx_channel:process_publish(Publish, channel()).
|
|
|
|
t_process_publish_qos1(_) ->
|
|
ok = meck:expect(emqx_broker, publish, fun(_) -> [] end),
|
|
Publish = ?PUBLISH_PACKET(?QOS_1, <<"t">>, 1, <<"payload">>),
|
|
{ok, ?PUBACK_PACKET(1, ?RC_NO_MATCHING_SUBSCRIBERS), _Channel} =
|
|
emqx_channel:process_publish(Publish, channel()).
|
|
|
|
t_process_subscribe(_) ->
|
|
ok = meck:expect(emqx_session, subscribe, fun(_, _, _, Session) -> {ok, Session} end),
|
|
TopicFilters = [TopicFilter = {<<"+">>, ?DEFAULT_SUBOPTS}],
|
|
{[{TopicFilter, ?RC_SUCCESS}], _Channel} =
|
|
emqx_channel:process_subscribe(TopicFilters, #{}, channel()).
|
|
|
|
t_process_unsubscribe(_) ->
|
|
ok = meck:expect(emqx_session, unsubscribe, fun(_, _, _, Session) -> {ok, Session} end),
|
|
TopicFilters = [{<<"+">>, ?DEFAULT_SUBOPTS}],
|
|
{[?RC_SUCCESS], _Channel} = emqx_channel:process_unsubscribe(TopicFilters, #{}, channel()).
|
|
|
|
t_quota_qos0(_) ->
|
|
esockd_limiter:start_link(),
|
|
add_bucket(),
|
|
Cnter = counters:new(1, []),
|
|
ok = meck:expect(emqx_broker, publish, fun(_) -> [{node(), <<"topic">>, {ok, 4}}] end),
|
|
ok = meck:expect(
|
|
emqx_metrics,
|
|
inc,
|
|
fun('packets.publish.dropped') -> counters:add(Cnter, 1, 1) end
|
|
),
|
|
ok = meck:expect(
|
|
emqx_metrics,
|
|
val,
|
|
fun('packets.publish.dropped') -> counters:get(Cnter, 1) end
|
|
),
|
|
Chann = channel(#{conn_state => connected, quota => quota()}),
|
|
Pub = ?PUBLISH_PACKET(?QOS_0, <<"topic">>, undefined, <<"payload">>),
|
|
|
|
M1 = emqx_metrics:val('packets.publish.dropped'),
|
|
{ok, Chann1} = emqx_channel:handle_in(Pub, Chann),
|
|
{ok, Chann2} = emqx_channel:handle_in(Pub, Chann1),
|
|
M1 = emqx_metrics:val('packets.publish.dropped') - 1,
|
|
timer:sleep(1000),
|
|
{ok, Chann3} = emqx_channel:handle_timeout(ref, expire_quota_limit, Chann2),
|
|
{ok, _} = emqx_channel:handle_in(Pub, Chann3),
|
|
M1 = emqx_metrics:val('packets.publish.dropped') - 1,
|
|
|
|
ok = meck:expect(emqx_metrics, inc, fun(_) -> ok end),
|
|
ok = meck:expect(emqx_metrics, inc, fun(_, _) -> ok end),
|
|
del_bucket(),
|
|
esockd_limiter:stop().
|
|
|
|
t_quota_qos1(_) ->
|
|
esockd_limiter:start_link(),
|
|
add_bucket(),
|
|
ok = meck:expect(emqx_broker, publish, fun(_) -> [{node(), <<"topic">>, {ok, 4}}] end),
|
|
Chann = channel(#{conn_state => connected, quota => quota()}),
|
|
Pub = ?PUBLISH_PACKET(?QOS_1, <<"topic">>, 1, <<"payload">>),
|
|
%% Quota per connections
|
|
{ok, ?PUBACK_PACKET(1, ?RC_SUCCESS), Chann1} = emqx_channel:handle_in(Pub, Chann),
|
|
{ok, ?PUBACK_PACKET(1, ?RC_QUOTA_EXCEEDED), Chann2} = emqx_channel:handle_in(Pub, Chann1),
|
|
{ok, Chann3} = emqx_channel:handle_timeout(ref, expire_quota_limit, Chann2),
|
|
{ok, ?PUBACK_PACKET(1, ?RC_SUCCESS), Chann4} = emqx_channel:handle_in(Pub, Chann3),
|
|
%% Quota in overall
|
|
{ok, ?PUBACK_PACKET(1, ?RC_QUOTA_EXCEEDED), _} = emqx_channel:handle_in(Pub, Chann4),
|
|
del_bucket(),
|
|
esockd_limiter:stop().
|
|
|
|
t_quota_qos2(_) ->
|
|
esockd_limiter:start_link(),
|
|
add_bucket(),
|
|
ok = meck:expect(emqx_broker, publish, fun(_) -> [{node(), <<"topic">>, {ok, 4}}] end),
|
|
Chann = channel(#{conn_state => connected, quota => quota()}),
|
|
Pub1 = ?PUBLISH_PACKET(?QOS_2, <<"topic">>, 1, <<"payload">>),
|
|
Pub2 = ?PUBLISH_PACKET(?QOS_2, <<"topic">>, 2, <<"payload">>),
|
|
Pub3 = ?PUBLISH_PACKET(?QOS_2, <<"topic">>, 3, <<"payload">>),
|
|
Pub4 = ?PUBLISH_PACKET(?QOS_2, <<"topic">>, 4, <<"payload">>),
|
|
%% Quota per connections
|
|
{ok, ?PUBREC_PACKET(1, ?RC_SUCCESS), Chann1} = emqx_channel:handle_in(Pub1, Chann),
|
|
{ok, ?PUBREC_PACKET(2, ?RC_QUOTA_EXCEEDED), Chann2} = emqx_channel:handle_in(Pub2, Chann1),
|
|
{ok, Chann3} = emqx_channel:handle_timeout(ref, expire_quota_limit, Chann2),
|
|
{ok, ?PUBREC_PACKET(3, ?RC_SUCCESS), Chann4} = emqx_channel:handle_in(Pub3, Chann3),
|
|
%% Quota in overall
|
|
{ok, ?PUBREC_PACKET(4, ?RC_QUOTA_EXCEEDED), _} = emqx_channel:handle_in(Pub4, Chann4),
|
|
del_bucket(),
|
|
esockd_limiter:stop().
|
|
|
|
t_mount_will_msg(_) ->
|
|
Self = self(),
|
|
ClientInfo = clientinfo(#{mountpoint => <<"prefix/">>}),
|
|
Msg = emqx_message:make(test, <<"will_topic">>, <<"will_payload">>),
|
|
Channel = channel(#{clientinfo => ClientInfo, will_msg => Msg}),
|
|
|
|
ok = meck:expect(emqx_broker, publish, fun(M) -> Self ! {pub, M} end),
|
|
|
|
{shutdown, kicked, ok, ?DISCONNECT_PACKET(?RC_ADMINISTRATIVE_ACTION), _} = emqx_channel:handle_call(
|
|
kick, Channel
|
|
),
|
|
receive
|
|
{pub, #message{topic = <<"prefix/will_topic">>}} -> ok
|
|
after 200 -> exit(will_message_not_published_or_not_correct)
|
|
end.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Test cases for handle_deliver
|
|
%%--------------------------------------------------------------------
|
|
|
|
t_handle_deliver(_) ->
|
|
Msg0 = emqx_message:make(test, ?QOS_1, <<"t1">>, <<"qos1">>),
|
|
Msg1 = emqx_message:make(test, ?QOS_2, <<"t2">>, <<"qos2">>),
|
|
Delivers = [{deliver, <<"+">>, Msg0}, {deliver, <<"+">>, Msg1}],
|
|
{ok, {outgoing, Packets}, _Ch} = emqx_channel:handle_deliver(Delivers, channel()),
|
|
?assertEqual([?QOS_1, ?QOS_2], [emqx_packet:qos(Pkt) || Pkt <- Packets]).
|
|
|
|
t_handle_deliver_nl(_) ->
|
|
ClientInfo = clientinfo(#{clientid => <<"clientid">>}),
|
|
Session = session(#{subscriptions => #{<<"t1">> => #{nl => 1}}}),
|
|
Channel = channel(#{clientinfo => ClientInfo, session => Session}),
|
|
Msg = emqx_message:make(<<"clientid">>, ?QOS_1, <<"t1">>, <<"qos1">>),
|
|
NMsg = emqx_message:set_flag(nl, Msg),
|
|
{ok, _} = emqx_channel:handle_deliver([{deliver, <<"t1">>, NMsg}], Channel).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Test cases for handle_out
|
|
%%--------------------------------------------------------------------
|
|
|
|
t_handle_out_publish(_) ->
|
|
Channel = channel(#{conn_state => connected}),
|
|
Pub0 = {undefined, emqx_message:make(<<"t">>, <<"qos0">>)},
|
|
Pub1 = {1, emqx_message:make(<<"c">>, ?QOS_1, <<"t">>, <<"qos1">>)},
|
|
{ok, {outgoing, Packets}, _NChannel} =
|
|
emqx_channel:handle_out(publish, [Pub0, Pub1], Channel),
|
|
?assertEqual(2, length(Packets)).
|
|
|
|
t_handle_out_publish_1(_) ->
|
|
Msg = emqx_message:make(<<"clientid">>, ?QOS_1, <<"t">>, <<"payload">>),
|
|
{ok, {outgoing, [?PUBLISH_PACKET(?QOS_1, <<"t">>, 1, <<"payload">>)]}, _Chan} =
|
|
emqx_channel:handle_out(publish, [{1, Msg}], channel()).
|
|
|
|
t_handle_out_connack_sucess(_) ->
|
|
{ok, [{event, connected}, {connack, ?CONNACK_PACKET(?RC_SUCCESS, 0, _)}], Channel} =
|
|
emqx_channel:handle_out(connack, {?RC_SUCCESS, 0, #{}}, channel()),
|
|
?assertEqual(connected, emqx_channel:info(conn_state, Channel)).
|
|
|
|
t_handle_out_connack_response_information(_) ->
|
|
ok = meck:expect(
|
|
emqx_cm,
|
|
open_session,
|
|
fun(true, _ClientInfo, _ConnInfo) ->
|
|
{ok, #{session => session(), present => false}}
|
|
end
|
|
),
|
|
emqx_config:put_zone_conf(default, [mqtt, response_information], test),
|
|
IdleChannel = channel(#{conn_state => idle}),
|
|
{ok,
|
|
[
|
|
{event, connected},
|
|
{connack, ?CONNACK_PACKET(?RC_SUCCESS, 0, #{'Response-Information' := test})}
|
|
],
|
|
_} = emqx_channel:handle_in(
|
|
?CONNECT_PACKET(connpkt(#{'Request-Response-Information' => 1})),
|
|
IdleChannel
|
|
).
|
|
|
|
t_handle_out_connack_not_response_information(_) ->
|
|
ok = meck:expect(
|
|
emqx_cm,
|
|
open_session,
|
|
fun(true, _ClientInfo, _ConnInfo) ->
|
|
{ok, #{session => session(), present => false}}
|
|
end
|
|
),
|
|
emqx_config:put_zone_conf(default, [mqtt, response_information], test),
|
|
IdleChannel = channel(#{conn_state => idle}),
|
|
{ok, [{event, connected}, {connack, ?CONNACK_PACKET(?RC_SUCCESS, 0, AckProps)}], _} =
|
|
emqx_channel:handle_in(
|
|
?CONNECT_PACKET(connpkt(#{'Request-Response-Information' => 0})),
|
|
IdleChannel
|
|
),
|
|
?assertEqual(false, maps:is_key('Response-Information', AckProps)).
|
|
|
|
t_handle_out_connack_failure(_) ->
|
|
{shutdown, not_authorized, ?CONNACK_PACKET(?RC_NOT_AUTHORIZED), _Chan} =
|
|
emqx_channel:handle_out(connack, ?RC_NOT_AUTHORIZED, channel()).
|
|
|
|
t_handle_out_puback(_) ->
|
|
Channel = channel(#{conn_state => connected}),
|
|
{ok, ?PUBACK_PACKET(1, ?RC_SUCCESS), _NChannel} =
|
|
emqx_channel:handle_out(puback, {1, ?RC_SUCCESS}, Channel).
|
|
|
|
t_handle_out_pubrec(_) ->
|
|
Channel = channel(#{conn_state => connected}),
|
|
{ok, ?PUBREC_PACKET(1, ?RC_SUCCESS), _NChannel} =
|
|
emqx_channel:handle_out(pubrec, {1, ?RC_SUCCESS}, Channel).
|
|
|
|
t_handle_out_pubrel(_) ->
|
|
Channel = channel(#{conn_state => connected}),
|
|
{ok, ?PUBREL_PACKET(1), Channel1} =
|
|
emqx_channel:handle_out(pubrel, {1, ?RC_SUCCESS}, Channel),
|
|
{ok, ?PUBREL_PACKET(2, ?RC_SUCCESS), _Channel2} =
|
|
emqx_channel:handle_out(pubrel, {2, ?RC_SUCCESS}, Channel1).
|
|
|
|
t_handle_out_pubcomp(_) ->
|
|
{ok, ?PUBCOMP_PACKET(1, ?RC_SUCCESS), _Channel} =
|
|
emqx_channel:handle_out(pubcomp, {1, ?RC_SUCCESS}, channel()).
|
|
|
|
t_handle_out_suback(_) ->
|
|
Replies = [{outgoing, ?SUBACK_PACKET(1, [?QOS_2])}, {event, updated}],
|
|
{ok, Replies, _Chan} = emqx_channel:handle_out(suback, {1, [?QOS_2]}, channel()).
|
|
|
|
t_handle_out_unsuback(_) ->
|
|
Replies = [{outgoing, ?UNSUBACK_PACKET(1, [?RC_SUCCESS])}, {event, updated}],
|
|
{ok, Replies, _Chan} = emqx_channel:handle_out(unsuback, {1, [?RC_SUCCESS]}, channel()).
|
|
|
|
t_handle_out_disconnect(_) ->
|
|
Packet = ?DISCONNECT_PACKET(?RC_SUCCESS),
|
|
{ok, [{outgoing, Packet}, {close, normal}], _Chan} =
|
|
emqx_channel:handle_out(disconnect, ?RC_SUCCESS, channel()).
|
|
|
|
t_handle_out_unexpected(_) ->
|
|
{ok, _Channel} = emqx_channel:handle_out(unexpected, <<"data">>, channel()).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Test cases for handle_call
|
|
%%--------------------------------------------------------------------
|
|
|
|
t_handle_call_kick(_) ->
|
|
Channelv5 = channel(),
|
|
Channelv4 = v4(Channelv5),
|
|
{shutdown, kicked, ok, _} = emqx_channel:handle_call(kick, Channelv4),
|
|
{shutdown, kicked, ok, ?DISCONNECT_PACKET(?RC_ADMINISTRATIVE_ACTION), _} = emqx_channel:handle_call(
|
|
kick, Channelv5
|
|
),
|
|
|
|
DisconnectedChannelv5 = channel(#{conn_state => disconnected}),
|
|
DisconnectedChannelv4 = v4(DisconnectedChannelv5),
|
|
|
|
{shutdown, kicked, ok, _} = emqx_channel:handle_call(kick, DisconnectedChannelv5),
|
|
{shutdown, kicked, ok, _} = emqx_channel:handle_call(kick, DisconnectedChannelv4).
|
|
|
|
t_handle_kicked_publish_will_msg(_) ->
|
|
Self = self(),
|
|
ok = meck:expect(emqx_broker, publish, fun(M) -> Self ! {pub, M} end),
|
|
|
|
Msg = emqx_message:make(test, <<"will_topic">>, <<"will_payload">>),
|
|
|
|
{shutdown, kicked, ok, ?DISCONNECT_PACKET(?RC_ADMINISTRATIVE_ACTION), _} = emqx_channel:handle_call(
|
|
kick, channel(#{will_msg => Msg})
|
|
),
|
|
receive
|
|
{pub, Msg} -> ok
|
|
after 200 -> exit(will_message_not_published)
|
|
end.
|
|
|
|
t_handle_call_discard(_) ->
|
|
Packet = ?DISCONNECT_PACKET(?RC_SESSION_TAKEN_OVER),
|
|
{shutdown, discarded, ok, Packet, _Channel} =
|
|
emqx_channel:handle_call(discard, channel()).
|
|
|
|
t_handle_call_takeover_begin(_) ->
|
|
{reply, _Session, _Chan} = emqx_channel:handle_call({takeover, 'begin'}, channel()).
|
|
|
|
t_handle_call_takeover_end(_) ->
|
|
ok = meck:expect(emqx_session, takeover, fun(_) -> ok end),
|
|
{shutdown, takenover, [], _, _Chan} =
|
|
emqx_channel:handle_call({takeover, 'end'}, channel()).
|
|
|
|
t_handle_call_unexpected(_) ->
|
|
{reply, ignored, _Chan} = emqx_channel:handle_call(unexpected_req, channel()).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Test cases for handle_info
|
|
%%--------------------------------------------------------------------
|
|
|
|
t_handle_info_subscribe(_) ->
|
|
ok = meck:expect(emqx_session, subscribe, fun(_, _, _, Session) -> {ok, Session} end),
|
|
{ok, _Chan} = emqx_channel:handle_info({subscribe, topic_filters()}, channel()).
|
|
|
|
t_handle_info_unsubscribe(_) ->
|
|
ok = meck:expect(emqx_session, unsubscribe, fun(_, _, _, Session) -> {ok, Session} end),
|
|
{ok, _Chan} = emqx_channel:handle_info({unsubscribe, topic_filters()}, channel()).
|
|
|
|
t_handle_info_sock_closed(_) ->
|
|
Channel = channel(#{conn_state => disconnected}),
|
|
{ok, Channel} = emqx_channel:handle_info({sock_closed, reason}, Channel).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Test cases for handle_timeout
|
|
%%--------------------------------------------------------------------
|
|
|
|
t_handle_timeout_emit_stats(_) ->
|
|
TRef = make_ref(),
|
|
ok = meck:expect(emqx_cm, set_chan_stats, fun(_, _) -> ok end),
|
|
Channel = emqx_channel:set_field(timers, #{stats_timer => TRef}, channel()),
|
|
{ok, _Chan} = emqx_channel:handle_timeout(TRef, {emit_stats, []}, Channel).
|
|
|
|
t_handle_timeout_keepalive(_) ->
|
|
TRef = make_ref(),
|
|
Channel = emqx_channel:set_field(timers, #{alive_timer => TRef}, channel()),
|
|
{ok, _Chan} = emqx_channel:handle_timeout(make_ref(), {keepalive, 10}, Channel).
|
|
|
|
t_handle_timeout_retry_delivery(_) ->
|
|
TRef = make_ref(),
|
|
ok = meck:expect(emqx_session, retry, fun(_, Session) -> {ok, Session} end),
|
|
Channel = emqx_channel:set_field(timers, #{retry_timer => TRef}, channel()),
|
|
{ok, _Chan} = emqx_channel:handle_timeout(TRef, retry_delivery, Channel).
|
|
|
|
t_handle_timeout_expire_awaiting_rel(_) ->
|
|
TRef = make_ref(),
|
|
ok = meck:expect(emqx_session, expire, fun(_, _, Session) -> {ok, Session} end),
|
|
Channel = emqx_channel:set_field(timers, #{await_timer => TRef}, channel()),
|
|
{ok, _Chan} = emqx_channel:handle_timeout(TRef, expire_awaiting_rel, Channel).
|
|
|
|
t_handle_timeout_expire_session(_) ->
|
|
TRef = make_ref(),
|
|
Channel = emqx_channel:set_field(timers, #{expire_timer => TRef}, channel()),
|
|
{shutdown, expired, _Chan} = emqx_channel:handle_timeout(TRef, expire_session, Channel).
|
|
|
|
t_handle_timeout_will_message(_) ->
|
|
{ok, _Chan} = emqx_channel:handle_timeout(make_ref(), will_message, channel()).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Test cases for internal functions
|
|
%%--------------------------------------------------------------------
|
|
|
|
t_enrich_conninfo(_) ->
|
|
{ok, _Chan} = emqx_channel:enrich_conninfo(connpkt(), channel()).
|
|
|
|
t_enrich_client(_) ->
|
|
{ok, _ConnPkt, _Chan} = emqx_channel:enrich_client(connpkt(), channel()).
|
|
|
|
t_auth_connect(_) ->
|
|
{ok, _, _Chan} = emqx_channel:authenticate(?CONNECT_PACKET(connpkt()), channel()).
|
|
|
|
t_process_alias(_) ->
|
|
Publish = #mqtt_packet_publish{topic_name = <<>>, properties = #{'Topic-Alias' => 1}},
|
|
Channel = emqx_channel:set_field(topic_aliases, #{inbound => #{1 => <<"t">>}}, channel()),
|
|
{ok, #mqtt_packet{variable = #mqtt_packet_publish{topic_name = <<"t">>}}, _Chan} =
|
|
emqx_channel:process_alias(#mqtt_packet{variable = Publish}, Channel).
|
|
|
|
t_process_alias_inexistent_alias(_) ->
|
|
Publish = #mqtt_packet_publish{topic_name = <<>>, properties = #{'Topic-Alias' => 1}},
|
|
Channel = channel(),
|
|
?assertEqual(
|
|
{error, ?RC_PROTOCOL_ERROR},
|
|
emqx_channel:process_alias(#mqtt_packet{variable = Publish}, Channel)
|
|
).
|
|
|
|
t_packing_alias(_) ->
|
|
Packet1 = #mqtt_packet{
|
|
variable = #mqtt_packet_publish{
|
|
topic_name = <<"x">>,
|
|
properties = #{'User-Property' => [{<<"k">>, <<"v">>}]}
|
|
}
|
|
},
|
|
Packet2 = #mqtt_packet{variable = #mqtt_packet_publish{topic_name = <<"y">>}},
|
|
Channel = emqx_channel:set_field(alias_maximum, #{outbound => 1}, channel()),
|
|
|
|
{RePacket1, NChannel1} = emqx_channel:packing_alias(Packet1, Channel),
|
|
?assertEqual(
|
|
#mqtt_packet{
|
|
variable = #mqtt_packet_publish{
|
|
topic_name = <<"x">>,
|
|
properties = #{
|
|
'Topic-Alias' => 1,
|
|
'User-Property' => [{<<"k">>, <<"v">>}]
|
|
}
|
|
}
|
|
},
|
|
RePacket1
|
|
),
|
|
|
|
{RePacket2, NChannel2} = emqx_channel:packing_alias(Packet1, NChannel1),
|
|
?assertEqual(
|
|
#mqtt_packet{
|
|
variable = #mqtt_packet_publish{
|
|
topic_name = <<>>,
|
|
properties = #{
|
|
'Topic-Alias' => 1,
|
|
'User-Property' => [{<<"k">>, <<"v">>}]
|
|
}
|
|
}
|
|
},
|
|
RePacket2
|
|
),
|
|
|
|
{RePacket3, _} = emqx_channel:packing_alias(Packet2, NChannel2),
|
|
?assertEqual(
|
|
#mqtt_packet{variable = #mqtt_packet_publish{topic_name = <<"y">>, properties = #{}}},
|
|
RePacket3
|
|
),
|
|
|
|
?assertMatch(
|
|
{#mqtt_packet{variable = #mqtt_packet_publish{topic_name = <<"z">>}}, _},
|
|
emqx_channel:packing_alias(
|
|
#mqtt_packet{variable = #mqtt_packet_publish{topic_name = <<"z">>}},
|
|
channel()
|
|
)
|
|
).
|
|
|
|
t_packing_alias_inexistent_alias(_) ->
|
|
Publish = #mqtt_packet_publish{topic_name = <<>>, properties = #{'Topic-Alias' => 1}},
|
|
Channel = channel(),
|
|
Packet = #mqtt_packet{variable = Publish},
|
|
ExpectedChannel = emqx_channel:set_field(
|
|
topic_aliases,
|
|
#{
|
|
inbound => #{},
|
|
outbound => #{<<>> => 1}
|
|
},
|
|
Channel
|
|
),
|
|
?assertEqual(
|
|
{Packet, ExpectedChannel},
|
|
emqx_channel:packing_alias(Packet, Channel)
|
|
).
|
|
|
|
t_check_pub_authz(_) ->
|
|
emqx_config:put_zone_conf(default, [authorization, enable], true),
|
|
Publish = ?PUBLISH_PACKET(?QOS_0, <<"t">>, 1, <<"payload">>),
|
|
ok = emqx_channel:check_pub_authz(Publish, channel()).
|
|
|
|
t_check_pub_alias(_) ->
|
|
Publish = #mqtt_packet_publish{topic_name = <<>>, properties = #{'Topic-Alias' => 1}},
|
|
Channel = emqx_channel:set_field(alias_maximum, #{inbound => 10}, channel()),
|
|
ok = emqx_channel:check_pub_alias(#mqtt_packet{variable = Publish}, Channel).
|
|
|
|
t_check_sub_authzs(_) ->
|
|
emqx_config:put_zone_conf(default, [authorization, enable], true),
|
|
TopicFilter = {<<"t">>, ?DEFAULT_SUBOPTS},
|
|
[{TopicFilter, 0}] = emqx_channel:check_sub_authzs([TopicFilter], channel()).
|
|
|
|
t_enrich_connack_caps(_) ->
|
|
ok = meck:new(emqx_mqtt_caps, [passthrough, no_history]),
|
|
ok = meck:expect(
|
|
emqx_mqtt_caps,
|
|
get_caps,
|
|
fun(_Zone) ->
|
|
#{
|
|
max_packet_size => 1024,
|
|
max_qos_allowed => ?QOS_2,
|
|
retain_available => true,
|
|
max_topic_alias => 10,
|
|
shared_subscription => true,
|
|
wildcard_subscription => true
|
|
}
|
|
end
|
|
),
|
|
AckProps = emqx_channel:enrich_connack_caps(#{}, channel()),
|
|
?assertMatch(
|
|
#{
|
|
'Retain-Available' := 1,
|
|
'Maximum-Packet-Size' := 1024,
|
|
'Topic-Alias-Maximum' := 10,
|
|
'Wildcard-Subscription-Available' := 1,
|
|
'Subscription-Identifier-Available' := 1,
|
|
'Shared-Subscription-Available' := 1
|
|
},
|
|
AckProps
|
|
),
|
|
ok = meck:unload(emqx_mqtt_caps).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Test cases for terminate
|
|
%%--------------------------------------------------------------------
|
|
|
|
t_terminate(_) ->
|
|
ok = emqx_channel:terminate(normal, channel()),
|
|
ok = emqx_channel:terminate(sock_error, channel(#{conn_state => connected})),
|
|
ok = emqx_channel:terminate({shutdown, kicked}, channel(#{conn_state => connected})).
|
|
|
|
t_ws_cookie_init(_) ->
|
|
WsCookie = [{<<"session_id">>, <<"xyz">>}],
|
|
ConnInfo = #{
|
|
socktype => ws,
|
|
peername => {{127, 0, 0, 1}, 3456},
|
|
sockname => {{127, 0, 0, 1}, 1883},
|
|
peercert => nossl,
|
|
conn_mod => emqx_ws_connection,
|
|
ws_cookie => WsCookie
|
|
},
|
|
Channel = emqx_channel:init(
|
|
ConnInfo,
|
|
#{
|
|
zone => default,
|
|
limiter => undefined,
|
|
listener => {tcp, default}
|
|
}
|
|
),
|
|
?assertMatch(#{ws_cookie := WsCookie}, emqx_channel:info(clientinfo, Channel)).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Test cases for other mechnisms
|
|
%%--------------------------------------------------------------------
|
|
|
|
t_flapping_detect(_) ->
|
|
emqx_config:put_zone_conf(default, [flapping_detect, enable], true),
|
|
Parent = self(),
|
|
ok = meck:expect(
|
|
emqx_cm,
|
|
open_session,
|
|
fun(true, _ClientInfo, _ConnInfo) ->
|
|
{ok, #{session => session(), present => false}}
|
|
end
|
|
),
|
|
ok = meck:expect(emqx_access_control, authenticate, fun(_) -> {error, not_authorized} end),
|
|
ok = meck:expect(emqx_flapping, detect, fun(_) -> Parent ! flapping_detect end),
|
|
IdleChannel = channel(#{conn_state => idle}),
|
|
{shutdown, not_authorized, _ConnAck, _Channel} =
|
|
emqx_channel:handle_in(?CONNECT_PACKET(connpkt()), IdleChannel),
|
|
receive
|
|
flapping_detect -> ok
|
|
after 2000 ->
|
|
?assert(false, "Flapping detect should be exected in connecting progress")
|
|
end,
|
|
meck:unload([emqx_flapping]).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Helper functions
|
|
%%--------------------------------------------------------------------
|
|
|
|
channel() -> channel(#{}).
|
|
channel(InitFields) ->
|
|
ConnInfo = #{
|
|
peername => {{127, 0, 0, 1}, 3456},
|
|
sockname => {{127, 0, 0, 1}, 1883},
|
|
conn_mod => emqx_connection,
|
|
proto_name => <<"MQTT">>,
|
|
proto_ver => ?MQTT_PROTO_V5,
|
|
clean_start => true,
|
|
keepalive => 30,
|
|
clientid => <<"clientid">>,
|
|
username => <<"username">>,
|
|
conn_props => #{},
|
|
receive_maximum => 100,
|
|
expiry_interval => 0
|
|
},
|
|
maps:fold(
|
|
fun(Field, Value, Channel) ->
|
|
emqx_channel:set_field(Field, Value, Channel)
|
|
end,
|
|
emqx_channel:init(
|
|
ConnInfo,
|
|
#{
|
|
zone => default,
|
|
limiter => undefined,
|
|
listener => {tcp, default}
|
|
}
|
|
),
|
|
maps:merge(
|
|
#{
|
|
clientinfo => clientinfo(),
|
|
session => session(),
|
|
conn_state => connected
|
|
},
|
|
InitFields
|
|
)
|
|
).
|
|
|
|
clientinfo() -> clientinfo(#{}).
|
|
clientinfo(InitProps) ->
|
|
maps:merge(
|
|
#{
|
|
zone => default,
|
|
listener => {tcp, default},
|
|
protocol => mqtt,
|
|
peerhost => {127, 0, 0, 1},
|
|
clientid => <<"clientid">>,
|
|
username => <<"username">>,
|
|
is_superuser => false,
|
|
peercert => undefined,
|
|
mountpoint => undefined
|
|
},
|
|
InitProps
|
|
).
|
|
|
|
topic_filters() ->
|
|
[{<<"+">>, ?DEFAULT_SUBOPTS}, {<<"#">>, ?DEFAULT_SUBOPTS}].
|
|
|
|
connpkt() -> connpkt(#{}).
|
|
connpkt(Props) ->
|
|
#mqtt_packet_connect{
|
|
proto_name = <<"MQTT">>,
|
|
proto_ver = ?MQTT_PROTO_V4,
|
|
is_bridge = false,
|
|
clean_start = true,
|
|
keepalive = 30,
|
|
properties = Props,
|
|
clientid = <<"clientid">>,
|
|
username = <<"username">>,
|
|
password = <<"passwd">>
|
|
}.
|
|
|
|
session() -> session(#{}).
|
|
session(InitFields) when is_map(InitFields) ->
|
|
maps:fold(
|
|
fun(Field, Value, Session) ->
|
|
emqx_session:set_field(Field, Value, Session)
|
|
end,
|
|
emqx_session:init(#{max_inflight => 0}),
|
|
InitFields
|
|
).
|
|
|
|
%% conn: 5/s; overall: 10/s
|
|
quota() ->
|
|
emqx_limiter_container:get_limiter_by_types(?MODULE, [message_routing], limiter_cfg()).
|
|
|
|
limiter_cfg() ->
|
|
Client = #{
|
|
rate => 5,
|
|
initial => 0,
|
|
capacity => 5,
|
|
low_watermark => 1,
|
|
divisible => false,
|
|
max_retry_time => timer:seconds(5),
|
|
failure_strategy => force
|
|
},
|
|
#{
|
|
message_routing => bucket_cfg(),
|
|
client => #{message_routing => Client}
|
|
}.
|
|
|
|
bucket_cfg() ->
|
|
#{rate => 10, initial => 0, capacity => 10}.
|
|
|
|
add_bucket() ->
|
|
emqx_limiter_server:add_bucket(?MODULE, message_routing, bucket_cfg()).
|
|
|
|
del_bucket() ->
|
|
emqx_limiter_server:del_bucket(?MODULE, message_routing).
|
|
|
|
v4(Channel) ->
|
|
ConnInfo = emqx_channel:info(conninfo, Channel),
|
|
emqx_channel:set_field(
|
|
conninfo,
|
|
maps:put(proto_ver, ?MQTT_PROTO_V4, ConnInfo),
|
|
Channel
|
|
).
|