From 2cf3af12efeaf22893c10fef8fd45635392fa52c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E5=A5=87=E6=80=AA?= Date: Fri, 20 Dec 2019 19:15:45 +0800 Subject: [PATCH] Update test cases for emqx_connection (#3116) --- src/emqx_connection.erl | 2 + test/emqx_connection_SUITE.erl | 531 +++++++++++++++++---------------- 2 files changed, 277 insertions(+), 256 deletions(-) diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index 605e78bf7..40ec230e9 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -136,6 +136,8 @@ info(sockstate, #state{sockstate = SockSt}) -> SockSt; info(active_n, #state{active_n = ActiveN}) -> ActiveN; +info(stats_timer, #state{stats_timer = Stats_timer}) -> + Stats_timer; info(limiter, #state{limiter = Limiter}) -> maybe_apply(fun emqx_limiter:info/1, Limiter). diff --git a/test/emqx_connection_SUITE.erl b/test/emqx_connection_SUITE.erl index 9f87ce3b2..d0c3f23f5 100644 --- a/test/emqx_connection_SUITE.erl +++ b/test/emqx_connection_SUITE.erl @@ -35,6 +35,10 @@ init_per_suite(Config) -> ok = meck:new(emqx_channel, [passthrough, no_history, no_link]), %% Meck Cm ok = meck:new(emqx_cm, [passthrough, no_history, no_link]), + %% Meck Limiter + ok = meck:new(emqx_limiter, [passthrough, no_history, no_link]), + %% Meck Pd + ok = meck:new(emqx_pd, [passthrough, no_history, no_link]), %% Meck Metrics ok = meck:new(emqx_metrics, [passthrough, no_history, no_link]), ok = meck:expect(emqx_metrics, inc, fun(_, _) -> ok end), @@ -46,6 +50,8 @@ end_per_suite(_Config) -> ok = meck:unload(emqx_transport), ok = meck:unload(emqx_channel), ok = meck:unload(emqx_cm), + ok = meck:unload(emqx_limiter), + ok = meck:unload(emqx_pd), ok = meck:unload(emqx_metrics), ok. @@ -72,6 +78,233 @@ end_per_testcase(_TestCase, Config) -> %% Test cases %%-------------------------------------------------------------------- +t_info(_) -> + CPid = spawn(fun() -> + receive + {'$gen_call', From, info} -> + gen_server:reply(From, emqx_connection:info(st())) + after + 0 -> error("error") + end + end), + #{sockinfo := SockInfo} = emqx_connection:info(CPid), + ?assertMatch(#{active_n := 100, + peername := {{127,0,0,1},3456}, + sockname := {{127,0,0,1},1883}, + sockstate := idle, + socktype := tcp}, SockInfo). + +t_info_limiter(_) -> + St = st(#{limiter => emqx_limiter:init([])}), + ?assertEqual(undefined, emqx_connection:info(limiter, St)). + +t_stats(_) -> + CPid = spawn(fun() -> + receive + {'$gen_call', From, stats} -> + gen_server:reply(From, emqx_connection:stats(st())) + after + 0 -> error("error") + end + end), + Stats = emqx_connection:stats(CPid), + ?assertMatch([{recv_oct,0}, + {recv_cnt,0}, + {send_oct,0}, + {send_cnt,0}, + {send_pend,0}| _] , Stats). + +t_process_msg(_) -> + with_conn(fun(CPid) -> + ok = meck:expect(emqx_channel, handle_in, + fun(_Packet, Channel) -> + {ok, Channel} + end), + CPid ! {incoming, ?PACKET(?PINGREQ)}, + CPid ! {incoming, undefined}, + CPid ! {tcp_passive, sock}, + CPid ! {tcp_closed, sock}, + timer:sleep(100), + ok = trap_exit(CPid, {shutdown, tcp_closed}) + end, #{trap_exit => true}). + +t_ensure_stats_timer(_) -> + NStats = emqx_connection:ensure_stats_timer(100, st()), + Stats_timer = emqx_connection:info(stats_timer, NStats), + ?assert(is_reference(Stats_timer)), + ?assertEqual(NStats, emqx_connection:ensure_stats_timer(100, NStats)). + +t_cancel_stats_timer(_) -> + NStats = emqx_connection:cancel_stats_timer(st(#{stats_timer => make_ref()})), + Stats_timer = emqx_connection:info(stats_timer, NStats), + ?assertEqual(undefined, Stats_timer), + ?assertEqual(NStats, emqx_connection:cancel_stats_timer(NStats)). + +t_append_msg(_) -> + ?assertEqual([msg], emqx_connection:append_msg([], [msg])), + ?assertEqual([msg], emqx_connection:append_msg([], msg)), + ?assertEqual([msg1,msg], emqx_connection:append_msg([msg1], [msg])), + ?assertEqual([msg1,msg], emqx_connection:append_msg([msg1], msg)). + +t_handle_msg(_) -> + From = {make_ref(), self()}, + ?assertMatch({ok, _St}, emqx_connection:handle_msg({'$gen_call', From, for_testing}, st())), + ?assertMatch({stop, {shutdown,discarded}, _St}, emqx_connection:handle_msg({'$gen_call', From, discard}, st())), + ?assertMatch({stop, {shutdown,discarded}, _St}, emqx_connection:handle_msg({'$gen_call', From, discard}, st())), + ?assertMatch({ok, [], _St}, emqx_connection:handle_msg({tcp, From, <<"for_testing">>}, st())), + ?assertMatch({ok, _St}, emqx_connection:handle_msg(for_testing, st())). + +t_handle_msg_incoming(_) -> + ?assertMatch({ok, _Out, _St}, emqx_connection:handle_msg({incoming, ?CONNECT_PACKET(#mqtt_packet_connect{})}, st())), + ?assertEqual(ok, emqx_connection:handle_msg({incoming, ?PACKET(?PINGREQ)}, st())), + ok = meck:expect(emqx_channel, handle_in, fun(_Packet, Channel) -> {ok, Channel} end), + ?assertMatch({ok, _St}, emqx_connection:handle_msg({incoming, ?PUBLISH_PACKET(?QOS_1, <<"t">>, 1, <<"payload">>)}, st())), + ?assertMatch({ok, _St}, emqx_connection:handle_msg({incoming, <>}, st())), + ?assertMatch({ok, _St}, emqx_connection:handle_msg({incoming, <>}, st())), + ?assertMatch({ok, _St}, emqx_connection:handle_msg({incoming, undefined}, st())). + +t_handle_msg_outgoing(_) -> + ?assertEqual(ok, emqx_connection:handle_msg({outgoing, ?PUBLISH_PACKET(?QOS_2, <<"Topic">>, 1, <<>>)}, st())), + ?assertEqual(ok, emqx_connection:handle_msg({outgoing, ?PUBREL_PACKET(1)}, st())), + ?assertEqual(ok, emqx_connection:handle_msg({outgoing, ?PUBCOMP_PACKET(1)}, st())). + +t_handle_msg_tcp_error(_) -> + ?assertMatch({stop, {shutdown, econnreset}, _St}, emqx_connection:handle_msg({tcp_error, sock, econnreset}, st())). + +t_handle_msg_tcp_closed(_) -> + ?assertMatch({stop, {shutdown, tcp_closed}, _St}, emqx_connection:handle_msg({tcp_closed, sock}, st())). + +t_handle_msg_passive(_) -> + ?assertMatch({ok, _Event, _St}, emqx_connection:handle_msg({tcp_passive, sock}, st())). + +t_handle_msg_deliver(_) -> + ok = meck:expect(emqx_channel, handle_deliver, fun(_, Channel) -> {ok, Channel} end), + ?assertMatch({ok, _St}, emqx_connection:handle_msg({deliver, topic, msg}, st())). + +t_handle_msg_inet_reply(_) -> + ok = meck:expect(emqx_pd, get_counter, fun(_) -> 10 end), + ?assertMatch({ok, _St}, emqx_connection:handle_msg({inet_reply, for_testing, ok}, st(#{active_n => 0}))), + ?assertEqual(ok, emqx_connection:handle_msg({inet_reply, for_testing, ok}, st(#{active_n => 100}))), + ?assertMatch({stop, {shutdown, for_testing}, _St}, emqx_connection:handle_msg({inet_reply, for_testing, {error, for_testing}}, st())). + +t_handle_msg_connack(_) -> + ?assertEqual(ok, emqx_connection:handle_msg({connack, ?CONNACK_PACKET(?CONNACK_ACCEPT)}, st())). + +t_handle_msg_close(_) -> + ?assertMatch({stop, {shutdown, normal}, _St}, emqx_connection:handle_msg({close, normal}, st())). + +t_handle_msg_event(_) -> + ok = meck:expect(emqx_cm, register_channel, fun(_, _, _) -> ok end), + ok = meck:expect(emqx_cm, set_chan_info, fun(_, _) -> ok end), + ok = meck:expect(emqx_cm, connection_closed, fun(_) -> ok end), + ?assertEqual(ok, emqx_connection:handle_msg({event, connected}, st())), + ?assertMatch({ok, _St}, emqx_connection:handle_msg({event, disconnected}, st())), + ?assertMatch({ok, _St}, emqx_connection:handle_msg({event, undefined}, st())). + +t_handle_msg_timeout(_) -> + ?assertMatch({ok, _St}, emqx_connection:handle_msg({timeout, make_ref(), for_testing}, st())). + +t_handle_msg_shutdown(_) -> + ?assertMatch({stop, {shutdown, for_testing}, _St}, emqx_connection:handle_msg({shutdown, for_testing}, st())). + +t_handle_call(_) -> + St = st(), + ?assertMatch({ok, _St}, emqx_connection:handle_msg({event, undefined}, St)), + ?assertMatch({reply, _Info, _NSt}, emqx_connection:handle_call(self(), info, St)), + ?assertMatch({reply, _Stats, _NSt }, emqx_connection:handle_call(self(), stats, St)), + ?assertEqual({reply, ignored, St}, emqx_connection:handle_call(self(), for_testing, St)), + ?assertEqual({stop, {shutdown,kicked}, ok, St}, emqx_connection:handle_call(self(), kick, St)). + +t_handle_timeout(_) -> + TRef = make_ref(), + State = st(#{idle_timer => TRef, limit_timer => TRef, stats_timer => TRef}), + ?assertMatch({stop, {shutdown,idle_timeout}, _NState}, emqx_connection:handle_timeout(TRef, idle_timeout, State)), + ?assertMatch({ok, {event,running}, _NState}, emqx_connection:handle_timeout(TRef, limit_timeout, State)), + ?assertMatch({ok, _NState}, emqx_connection:handle_timeout(TRef, emit_stats, State)), + ?assertMatch({ok, _NState}, emqx_connection:handle_timeout(TRef, keepalive, State)), + + ok = meck:expect(emqx_transport, getstat, fun(_Sock, _Options) -> {error, for_testing} end), + ?assertMatch({stop, {shutdown,for_testing}, _NState}, emqx_connection:handle_timeout(TRef, keepalive, State)), + ?assertMatch({ok, _NState}, emqx_connection:handle_timeout(TRef, undefined, State)). + +t_parse_incoming(_) -> + ?assertMatch({ok, [], _NState}, emqx_connection:parse_incoming(<<>>, st())), + ?assertMatch({[], _NState}, emqx_connection:parse_incoming(<<"for_testing">>, [], st())). + +t_next_incoming_msgs(_) -> + ?assertEqual({incoming, packet}, emqx_connection:next_incoming_msgs([packet])), + ?assertEqual([{incoming, packet2}, {incoming, packet1}], emqx_connection:next_incoming_msgs([packet1, packet2])). + +t_handle_incoming(_) -> + ?assertMatch({ok, _Out, _NState}, emqx_connection:handle_incoming(?CONNECT_PACKET(#mqtt_packet_connect{}), st())), + ?assertMatch({ok, _Out, _NState}, emqx_connection:handle_incoming(frame_error, st())). + +t_with_channel(_) -> + State = st(), + + ok = meck:expect(emqx_channel, handle_in, fun(_, _) -> ok end), + ?assertEqual({ok, State}, emqx_connection:with_channel(handle_in, [for_testing], State)), + + ok = meck:expect(emqx_channel, handle_in, fun(_, _) -> Channel = channel(), {ok, Channel} end), + ?assertMatch({ok, _NState}, emqx_connection:with_channel(handle_in, [for_testing], State)), + + ok = meck:expect(emqx_channel, handle_in, fun(_, _) -> Channel = channel(), {ok, ?DISCONNECT_PACKET(),Channel} end), + ?assertMatch({ok, _Out, _NChannel}, emqx_connection:with_channel(handle_in, [for_testing], State)), + + ok = meck:expect(emqx_channel, handle_in, fun(_, _) -> Channel = channel(), {shutdown, [for_testing], Channel} end), + ?assertMatch({stop, {shutdown,[for_testing]}, _NState}, emqx_connection:with_channel(handle_in, [for_testing], State)), + + ok = meck:expect(emqx_channel, handle_in, fun(_, _) -> Channel = channel(), {shutdown, [for_testing], ?DISCONNECT_PACKET(), Channel} end), + ?assertMatch({stop, {shutdown,[for_testing]}, _NState}, emqx_connection:with_channel(handle_in, [for_testing], State)). + +t_handle_outgoing(_) -> + ?assertEqual(ok, emqx_connection:handle_outgoing(?PACKET(?PINGRESP), st())), + ?assertEqual(ok, emqx_connection:handle_outgoing([?PACKET(?PINGRESP)], st())). + +t_handle_info(_) -> + ?assertMatch({ok, {event,running}, _NState}, emqx_connection:handle_info(activate_socket, st())), + ?assertMatch({stop, {shutdown, for_testing}, _NStats}, emqx_connection:handle_info({sock_error, for_testing}, st())), + ?assertMatch({ok, _NState}, emqx_connection:handle_info(for_testing, st())). + +t_ensure_rate_limit(_) -> + State = emqx_connection:ensure_rate_limit(#{}, st(#{limiter => undefined})), + ?assertEqual(undefined, emqx_connection:info(limiter, State)), + + ok = meck:expect(emqx_limiter, check, fun(_, _) -> {ok, emqx_limiter:init([])} end), + State1 = emqx_connection:ensure_rate_limit(#{}, st(#{limiter => #{}})), + ?assertEqual(undefined, emqx_connection:info(limiter, State1)), + + ok = meck:expect(emqx_limiter, check, fun(_, _) -> {pause, 3000, emqx_limiter:init([])} end), + State2 = emqx_connection:ensure_rate_limit(#{}, st(#{limiter => #{}})), + ?assertEqual(undefined, emqx_connection:info(limiter, State2)), + ?assertEqual(blocked, emqx_connection:info(sockstate, State2)). + +t_activate_socket(_) -> + State = st(), + {ok, NStats} = emqx_connection:activate_socket(State), + ?assertEqual(running, emqx_connection:info(sockstate, NStats)), + + State1 = st(#{sockstate => blocked}), + ?assertEqual({ok, State1}, emqx_connection:activate_socket(State1)), + + State2 = st(#{sockstate => closed}), + ?assertEqual({ok, State2}, emqx_connection:activate_socket(State2)). + +t_close_socket(_) -> + State = emqx_connection:close_socket(st(#{sockstate => closed})), + ?assertEqual(closed, emqx_connection:info(sockstate, State)), + State1 = emqx_connection:close_socket(st()), + ?assertEqual(closed, emqx_connection:info(sockstate, State1)). + +t_system_code_change(_) -> + State = st(), + ?assertEqual({ok, State}, emqx_connection:system_code_change(State, [], [], [])). + +t_next_msgs(_) -> + ?assertEqual({outgoing, ?CONNECT_PACKET()}, emqx_connection:next_msgs(?CONNECT_PACKET())), + ?assertEqual({}, emqx_connection:next_msgs({})), + ?assertEqual([], emqx_connection:next_msgs([])). + t_start_link_ok(_) -> with_conn(fun(CPid) -> state = element(1, sys:get_state(CPid)) end). @@ -99,262 +332,6 @@ t_get_conn_info(_) -> }, SockInfo) end). -t_handle_call_discard(_) -> - with_conn(fun(CPid) -> - ok = meck:expect(emqx_channel, handle_call, - fun(discard, Channel) -> - {shutdown, discarded, ok, Channel} - end), - ok = emqx_connection:call(CPid, discard), - timer:sleep(100), - ok = trap_exit(CPid, {shutdown, discarded}) - end, #{trap_exit => true}), - with_conn(fun(CPid) -> - ok = meck:expect(emqx_channel, handle_call, - fun(discard, Channel) -> - {shutdown, discarded, ok, ?DISCONNECT_PACKET(?RC_SESSION_TAKEN_OVER), Channel} - end), - ok = emqx_connection:call(CPid, discard), - timer:sleep(100), - ok = trap_exit(CPid, {shutdown, discarded}) - end, #{trap_exit => true}). - -t_handle_call_takeover(_) -> - with_conn(fun(CPid) -> - ok = meck:expect(emqx_channel, handle_call, - fun({takeover, 'begin'}, Channel) -> - {reply, session, Channel}; - ({takeover, 'end'}, Channel) -> - {shutdown, takeovered, [], Channel} - end), - session = emqx_connection:call(CPid, {takeover, 'begin'}), - [] = emqx_connection:call(CPid, {takeover, 'end'}), - timer:sleep(100), - ok = trap_exit(CPid, {shutdown, takeovered}) - end, #{trap_exit => true}). - -t_handle_call_any(_) -> - with_conn(fun(CPid) -> - ok = meck:expect(emqx_channel, handle_call, - fun(_Req, Channel) -> {reply, ok, Channel} end), - ok = emqx_connection:call(CPid, req) - end). - -t_handle_incoming_connect(_) -> - with_conn(fun(CPid) -> - ok = meck:expect(emqx_channel, handle_in, fun(_Packet, Channel) -> {ok, Channel} end), - ConnPkt = #mqtt_packet_connect{proto_ver = ?MQTT_PROTO_V5, - proto_name = <<"MQTT">>, - clientid = <<>>, - clean_start = true, - keepalive = 60 - }, - Frame = make_frame(?CONNECT_PACKET(ConnPkt)), - CPid ! {tcp, sock, Frame} - end). - -t_handle_incoming_publish(_) -> - with_conn(fun(CPid) -> - ok = meck:expect(emqx_channel, handle_in, fun(_Packet, Channel) -> {ok, Channel} end), - Frame = make_frame(?PUBLISH_PACKET(?QOS_1, <<"t">>, 1, <<"payload">>)), - CPid ! {tcp, sock, Frame} - end). - -t_handle_incoming_subscribe(_) -> - with_conn(fun(CPid) -> - ok = meck:expect(emqx_channel, handle_in, fun(_Packet, Channel) -> {ok, Channel} end), - Frame = <>, - CPid ! {tcp, sock, Frame} - end). - -t_handle_incoming_unsubscribe(_) -> - with_conn(fun(CPid) -> - ok = meck:expect(emqx_channel, handle_in, fun(_Packet, Channel) -> {ok, Channel} end), - Frame = <>, - CPid ! {tcp, sock, Frame} - end). - -t_handle_incoming_undefined(_) -> - with_conn(fun(CPid) -> - ok = meck:expect(emqx_channel, handle_in, fun(_Packet, Channel) -> {ok, Channel} end), - CPid ! {incoming, undefined} - end). - -t_handle_sock_error(_) -> - with_conn(fun(CPid) -> - ok = meck:expect(emqx_channel, handle_info, - fun({_, Reason}, Channel) -> - {shutdown, Reason, Channel} - end), - %% TODO: fixme later - CPid ! {tcp_error, sock, econnreset}, - timer:sleep(100), - trap_exit(CPid, {shutdown, econnreset}) - end, #{trap_exit => true}). - -t_handle_sock_activate(_) -> - with_conn(fun(CPid) -> CPid ! activate_socket end). - -t_handle_sock_closed(_) -> - with_conn(fun(CPid) -> - ok = meck:expect(emqx_channel, handle_info, - fun({sock_closed, Reason}, Channel) -> - {shutdown, Reason, Channel} - end), - CPid ! {tcp_closed, sock}, - timer:sleep(100), - trap_exit(CPid, {shutdown, tcp_closed}) - end, #{trap_exit => true}), - with_conn(fun(CPid) -> - ok = meck:expect(emqx_channel, handle_info, - fun({sock_closed, Reason}, Channel) -> - {shutdown, Reason, ?DISCONNECT_PACKET(), Channel} - end), - CPid ! {tcp_closed, sock}, - timer:sleep(100), - trap_exit(CPid, {shutdown, tcp_closed}) - end, #{trap_exit => true}). - -t_handle_outgoing(_) -> - with_conn(fun(CPid) -> - Publish = ?PUBLISH_PACKET(?QOS_2, <<"Topic">>, 1, <<>>), - CPid ! {outgoing, Publish}, - CPid ! {outgoing, ?PUBREL_PACKET(1)}, - CPid ! {outgoing, [?PUBCOMP_PACKET(1)]} - end). - -t_conn_rate_limit(_) -> - with_conn(fun(CPid) -> - ok = meck:expect(emqx_channel, handle_in, fun(_, Channel) -> {ok, Channel} end), - lists:foreach(fun(I) -> - Publish = ?PUBLISH_PACKET(?QOS_0, <<"Topic">>, I, payload(2000)), - CPid ! {tcp, sock, make_frame(Publish)} - end, [1, 2]) - end, #{active_n => 1, rate_limit => {1, 1024}}). - -t_conn_pub_limit(_) -> - with_conn(fun(CPid) -> - ok = meck:expect(emqx_channel, handle_in, fun(_, Channel) -> {ok, Channel} end), - ok = lists:foreach(fun(I) -> - CPid ! {incoming, ?PUBLISH_PACKET(?QOS_0, <<"Topic">>, I, <<>>)} - end, lists:seq(1, 3)) - %%#{sockinfo := #{sockstate := blocked}} = emqx_connection:info(CPid) - end, #{active_n => 1, publish_limit => {1, 2}}). - -t_conn_pingreq(_) -> - with_conn(fun(CPid) -> CPid ! {incoming, ?PACKET(?PINGREQ)} end). - -t_inet_reply(_) -> - ok = meck:new(emqx_pd, [passthrough, no_history]), - with_conn(fun(CPid) -> - ok = meck:expect(emqx_pd, get_counter, fun(_) -> 10 end), - CPid ! {inet_reply, for_testing, ok}, - timer:sleep(100) - end, #{active_n => 1, trap_exit => true}), - ok = meck:unload(emqx_pd), - with_conn(fun(CPid) -> - CPid ! {inet_reply, for_testing, {error, for_testing}}, - timer:sleep(100), - trap_exit(CPid, {shutdown, for_testing}) - end, #{trap_exit => true}). - -t_deliver(_) -> - with_conn(fun(CPid) -> - ok = meck:expect(emqx_channel, handle_deliver, - fun(_, Channel) -> {ok, Channel} end), - CPid ! {deliver, topic, msg} - end). - -t_event_disconnected(_) -> - with_conn(fun(CPid) -> - ok = meck:expect(emqx_cm, set_chan_info, fun(_, _) -> ok end), - ok = meck:expect(emqx_cm, connection_closed, fun(_) -> ok end), - CPid ! {event, disconnected} - end). - -t_event_undefined(_) -> - with_conn(fun(CPid) -> - ok = meck:expect(emqx_channel, stats, fun(_Channel) -> [] end), - ok = meck:expect(emqx_cm, set_chan_info, fun(_, _) -> ok end), - ok = meck:expect(emqx_cm, set_chan_stats, fun(_, _) -> true end), - CPid ! {event, undefined} - end). - -t_cloes(_) -> - with_conn(fun(CPid) -> - CPid ! {close, normal}, - timer:sleep(100), - trap_exit(CPid, {shutdown, normal}) - end, #{trap_exit => true}). - -t_oom_shutdown(_) -> - with_conn(fun(CPid) -> - CPid ! {shutdown, message_queue_too_long}, - timer:sleep(100), - trap_exit(CPid, {shutdown, message_queue_too_long}) - end, #{trap_exit => true}). - -t_handle_idle_timeout(_) -> - ok = emqx_zone:set_env(external, idle_timeout, 10), - with_conn(fun(CPid) -> - timer:sleep(100), - trap_exit(CPid, {shutdown, idle_timeout}) - end, #{zone => external, trap_exit => true}). - -t_handle_emit_stats(_) -> - ok = emqx_zone:set_env(external, idle_timeout, 1000), - with_conn(fun(CPid) -> - ok = meck:expect(emqx_channel, stats, fun(_Channel) -> [] end), - ok = meck:expect(emqx_channel, handle_in, fun(_Packet, Channel) -> {ok, Channel} end), - ok = meck:expect(emqx_cm, set_chan_stats, fun(_, _) -> true end), - CPid ! {incoming, ?CONNECT_PACKET(#{strict_mode => false, - max_size => ?MAX_PACKET_SIZE, - version => ?MQTT_PROTO_V4 - })}, - timer:sleep(1000) - end,#{zone => external, trap_exit => true}). - -t_handle_limit_timeout(_) -> - with_conn(fun(CPid) -> - CPid ! {timeout, undefined, limit_timeout}, - timer:sleep(100), - true = erlang:is_process_alive(CPid) - end). - -t_handle_keepalive_timeout(_) -> - with_conn(fun(CPid) -> - ok = meck:expect(emqx_channel, handle_timeout, - fun(_TRef, _TMsg, Channel) -> - {shutdown, keepalive_timeout, Channel} - end), - CPid ! {timeout, make_ref(), keepalive}, - timer:sleep(100), - trap_exit(CPid, {shutdown, keepalive_timeout}) - end, #{trap_exit => true}), - with_conn(fun(CPid) -> - ok = meck:expect(emqx_transport, getstat, fun(_Sock, _Options) -> {error, for_testing} end), - ok = meck:expect(emqx_channel, handle_timeout, - fun(_TRef, _TMsg, Channel) -> - {shutdown, keepalive_timeout, Channel} - end), - CPid ! {timeout, make_ref(), keepalive}, - timer:sleep(100), - false = erlang:is_process_alive(CPid) - end, #{trap_exit => true}). - -t_handle_shutdown(_) -> - with_conn(fun(CPid) -> - CPid ! Shutdown = {shutdown, reason}, - timer:sleep(100), - trap_exit(CPid, Shutdown) - end, #{trap_exit => true}). - -t_exit_message(_) -> - with_conn(fun(CPid) -> - CPid ! {'EXIT', CPid, for_testing}, - timer:sleep(1000) - end, #{trap_exit => true}). - %%-------------------------------------------------------------------- %% Helper functions %%-------------------------------------------------------------------- @@ -406,3 +383,45 @@ make_frame(Packet) -> payload(Len) -> iolist_to_binary(lists:duplicate(Len, 1)). +st() -> st(#{}). +st(InitFields) when is_map(InitFields) -> + St = emqx_connection:init_state(emqx_transport, sock, [#{zone => external}]), + maps:fold(fun(N, V, S) -> emqx_connection:set_field(N, V, S) end, + emqx_connection:set_field(channel, channel(), St), + InitFields + ). + +channel() -> channel(#{}). +channel(InitFields) -> + ConnInfo = #{peername => {{127,0,0,1}, 3456}, + sockname => {{127,0,0,1}, 18083}, + conn_mod => emqx_connection, + proto_name => <<"MQTT">>, + proto_ver => ?MQTT_PROTO_V5, + clean_start => true, + keepalive => 30, + clientid => <<"clientid">>, + username => <<"username">>, + receive_maximum => 100, + expiry_interval => 0 + }, + ClientInfo = #{zone => zone, + protocol => mqtt, + peerhost => {127,0,0,1}, + clientid => <<"clientid">>, + username => <<"username">>, + is_superuser => false, + peercert => undefined, + mountpoint => undefined + }, + Session = emqx_session:init(#{zone => external}, + #{receive_maximum => 0} + ), + maps:fold(fun(Field, Value, Channel) -> + emqx_channel:set_field(Field, Value, Channel) + end, + emqx_channel:init(ConnInfo, [{zone, zone}]), + maps:merge(#{clientinfo => ClientInfo, + session => Session, + conn_state => connected + }, InitFields)). \ No newline at end of file