From 2ef52828bcd802b444b6be4bf0d0862d8f896f83 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Sat, 14 Dec 2019 12:42:45 +0800 Subject: [PATCH] Improve 'emqx_connection' module and update test cases --- src/emqx_connection.erl | 54 +- test/emqx_connection_SUITE.erl | 498 ++++++++---------- ...interval_SUITE.erl => emqx_mqtt_SUITE.erl} | 34 +- 3 files changed, 284 insertions(+), 302 deletions(-) rename test/{emqx_msg_expiry_interval_SUITE.erl => emqx_mqtt_SUITE.erl} (76%) diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index b88a1dfb4..3461788d5 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -347,11 +347,10 @@ handle_msg({Passive, _Sock}, State) NState1 = check_oom(run_gc(InStats, NState)), handle_info(activate_socket, NState1); -handle_msg(Deliver = {deliver, _Topic, _Msg}, State = - #state{active_n = ActiveN, channel = Channel}) -> +handle_msg(Deliver = {deliver, _Topic, _Msg}, + State = #state{active_n = ActiveN}) -> Delivers = [Deliver|emqx_misc:drain_deliver(ActiveN)], - Ret = emqx_channel:handle_deliver(Delivers, Channel), - handle_chan_return(Ret, State); + with_channel(handle_deliver, [Delivers], State); %% Something sent handle_msg({inet_reply, _Sock, ok}, State = #state{active_n = ActiveN}) -> @@ -471,9 +470,8 @@ handle_timeout(TRef, keepalive, State = handle_info({sock_error, Reason}, State) end; -handle_timeout(TRef, Msg, State = #state{channel = Channel}) -> - Ret = emqx_channel:handle_timeout(TRef, Msg, Channel), - handle_chan_return(Ret, State). +handle_timeout(TRef, Msg, State) -> + with_channel(handle_timeout, [TRef, Msg], State). %%-------------------------------------------------------------------- %% Parse incoming data @@ -509,30 +507,31 @@ next_incoming_msgs(Packets) -> %%-------------------------------------------------------------------- %% Handle incoming packet -handle_incoming(Packet, State = #state{channel = Channel}) - when is_record(Packet, mqtt_packet) -> +handle_incoming(Packet, State) when is_record(Packet, mqtt_packet) -> ok = inc_incoming_stats(Packet), ?LOG(debug, "RECV ~s", [emqx_packet:format(Packet)]), - handle_chan_return(emqx_channel:handle_in(Packet, Channel), State); + with_channel(handle_in, [Packet], State); -handle_incoming(FrameError, State = #state{channel = Channel}) -> - handle_chan_return(emqx_channel:handle_in(FrameError, Channel), State). +handle_incoming(FrameError, State) -> + with_channel(handle_in, [FrameError], State). %%-------------------------------------------------------------------- -%% Handle channel return +%% With Channel -handle_chan_return(ok, State) -> - {ok, State}; -handle_chan_return({ok, NChannel}, State) -> - {ok, State#state{channel = NChannel}}; -handle_chan_return({ok, Replies, NChannel}, State) -> - {ok, next_msgs(Replies), State#state{channel = NChannel}}; -handle_chan_return({shutdown, Reason, NChannel}, State) -> - shutdown(Reason, State#state{channel = NChannel}); -handle_chan_return({shutdown, Reason, OutPacket, NChannel}, State) -> - NState = State#state{channel = NChannel}, - ok = handle_outgoing(OutPacket, NState), - shutdown(Reason, NState). +with_channel(Fun, Args, State = #state{channel = Channel}) -> + case erlang:apply(emqx_channel, Fun, Args ++ [Channel]) of + ok -> {ok, State}; + {ok, NChannel} -> + {ok, State#state{channel = NChannel}}; + {ok, Replies, NChannel} -> + {ok, next_msgs(Replies), State#state{channel = NChannel}}; + {shutdown, Reason, NChannel} -> + shutdown(Reason, State#state{channel = NChannel}); + {shutdown, Reason, Packet, NChannel} -> + NState = State#state{channel = NChannel}, + ok = handle_outgoing(Packet, NState), + shutdown(Reason, NState) + end. %%-------------------------------------------------------------------- %% Handle outgoing packets @@ -589,9 +588,8 @@ handle_info({sock_error, Reason}, State) -> ?LOG(debug, "Socket error: ~p", [Reason]), handle_info({sock_closed, Reason}, close_socket(State)); -handle_info(Info, State = #state{channel = Channel}) -> - Ret = emqx_channel:handle_info(Info, Channel), - handle_chan_return(Ret, State). +handle_info(Info, State) -> + with_channel(handle_info, [Info], State). %%-------------------------------------------------------------------- %% Ensure rate limit diff --git a/test/emqx_connection_SUITE.erl b/test/emqx_connection_SUITE.erl index 38f531371..9f87ce3b2 100644 --- a/test/emqx_connection_SUITE.erl +++ b/test/emqx_connection_SUITE.erl @@ -22,43 +22,34 @@ -include("emqx_mqtt.hrl"). -include_lib("eunit/include/eunit.hrl"). --define(STATS_KYES, [recv_pkt, recv_msg, send_pkt, send_msg, - recv_oct, recv_cnt, send_oct, send_cnt, - send_pend - ]). - -all() -> emqx_ct:all(?MODULE) ++ [{group, real_client}]. - -groups() -> - [{real_client, [non_parallel_tests], - [ - g_get_conn_stats, - g_handle_sock_passive - ]}]. +all() -> emqx_ct:all(?MODULE). %%-------------------------------------------------------------------- %% CT callbacks %%-------------------------------------------------------------------- init_per_suite(Config) -> + %% Meck Transport + ok = meck:new(emqx_transport, [non_strict, passthrough, no_history, no_link]), + %% Meck Channel + ok = meck:new(emqx_channel, [passthrough, no_history, no_link]), + %% Meck Cm + ok = meck:new(emqx_cm, [passthrough, no_history, no_link]), + %% Meck Metrics + ok = meck:new(emqx_metrics, [passthrough, no_history, no_link]), + ok = meck:expect(emqx_metrics, inc, fun(_, _) -> ok end), + ok = meck:expect(emqx_metrics, inc_recv, fun(_) -> ok end), + ok = meck:expect(emqx_metrics, inc_sent, fun(_) -> ok end), Config. end_per_suite(_Config) -> + ok = meck:unload(emqx_transport), + ok = meck:unload(emqx_channel), + ok = meck:unload(emqx_cm), + ok = meck:unload(emqx_metrics), ok. -init_per_group(real_client, Config) -> - emqx_ct_helpers:boot_modules(all), - emqx_ct_helpers:start_apps([]), - Config; -init_per_group(_, Config) -> Config. - -end_per_group(real_client, _Config) -> - emqx_ct_helpers:stop_apps([]); -end_per_group(_, Config) -> Config. - init_per_testcase(_TestCase, Config) -> - %% Meck Transport - ok = meck:new(emqx_transport, [non_strict, passthrough, no_history]), ok = meck:expect(emqx_transport, wait, fun(Sock) -> {ok, Sock} end), ok = meck:expect(emqx_transport, type, fun(_Sock) -> tcp end), ok = meck:expect(emqx_transport, ensure_ok_or_exit, @@ -72,22 +63,9 @@ init_per_testcase(_TestCase, Config) -> end), ok = meck:expect(emqx_transport, async_send, fun(_Sock, _Data) -> ok end), ok = meck:expect(emqx_transport, fast_close, fun(_Sock) -> ok end), - %% Meck Channel - ok = meck:new(emqx_channel, [passthrough, no_history]), - %% Meck Cm - ok = meck:new(emqx_cm, [passthrough, no_history]), - %% Meck Metrics - ok = meck:new(emqx_metrics, [passthrough, no_history]), - ok = meck:expect(emqx_metrics, inc, fun(_, _) -> ok end), - ok = meck:expect(emqx_metrics, inc_recv, fun(_) -> ok end), - ok = meck:expect(emqx_metrics, inc_sent, fun(_) -> ok end), Config. end_per_testcase(_TestCase, Config) -> - ok = meck:unload(emqx_transport), - ok = meck:unload(emqx_channel), - ok = meck:unload(emqx_cm), - ok = meck:unload(emqx_metrics), Config. %%-------------------------------------------------------------------- @@ -95,9 +73,7 @@ end_per_testcase(_TestCase, Config) -> %%-------------------------------------------------------------------- t_start_link_ok(_) -> - with_connection(fun(CPid) -> - state = element(1, sys:get_state(CPid)) - end). + with_conn(fun(CPid) -> state = element(1, sys:get_state(CPid)) end). t_start_link_exit_on_wait(_) -> ok = exit_on_wait_error(enotconn, normal), @@ -113,123 +89,114 @@ t_start_link_exit_on_activate(_) -> ok = exit_on_activate_error(econnreset, {shutdown, econnreset}). t_get_conn_info(_) -> - with_connection(fun(CPid) -> - #{sockinfo := SockInfo} = emqx_connection:info(CPid), - ?assertEqual(#{active_n => 100, - peername => {{127,0,0,1},3456}, - sockname => {{127,0,0,1},1883}, - sockstate => running, - socktype => tcp}, SockInfo) - end). - -g_get_conn_stats(_) -> - with_client(fun(CPid) -> - Stats = emqx_connection:stats(CPid), - ct:pal("==== stats: ~p", [Stats]), - [?assert(proplists:get_value(Key, Stats) >= 0) || Key <- ?STATS_KYES] - end, []). + with_conn(fun(CPid) -> + #{sockinfo := SockInfo} = emqx_connection:info(CPid), + ?assertEqual(#{active_n => 100, + peername => {{127,0,0,1},3456}, + sockname => {{127,0,0,1},1883}, + sockstate => running, + socktype => tcp + }, SockInfo) + end). t_handle_call_discard(_) -> - with_connection(fun(CPid) -> - ok = meck:expect(emqx_channel, handle_call, - fun(discard, Channel) -> - {shutdown, discarded, ok, Channel} - end), - ok = emqx_connection:call(CPid, discard), - timer:sleep(100), - ok = trap_exit(CPid, {shutdown, discarded}) - end, #{trap_exit => true}), - with_connection(fun(CPid) -> - ok = meck:expect(emqx_channel, handle_call, - fun(discard, Channel) -> - {shutdown, discarded, ok, ?DISCONNECT_PACKET(?RC_SESSION_TAKEN_OVER), Channel} - end), - ok = emqx_connection:call(CPid, discard), - timer:sleep(100), - ok = trap_exit(CPid, {shutdown, discarded}) - end, #{trap_exit => true}). + with_conn(fun(CPid) -> + ok = meck:expect(emqx_channel, handle_call, + fun(discard, Channel) -> + {shutdown, discarded, ok, Channel} + end), + ok = emqx_connection:call(CPid, discard), + timer:sleep(100), + ok = trap_exit(CPid, {shutdown, discarded}) + end, #{trap_exit => true}), + with_conn(fun(CPid) -> + ok = meck:expect(emqx_channel, handle_call, + fun(discard, Channel) -> + {shutdown, discarded, ok, ?DISCONNECT_PACKET(?RC_SESSION_TAKEN_OVER), Channel} + end), + ok = emqx_connection:call(CPid, discard), + timer:sleep(100), + ok = trap_exit(CPid, {shutdown, discarded}) + end, #{trap_exit => true}). t_handle_call_takeover(_) -> - with_connection(fun(CPid) -> - ok = meck:expect(emqx_channel, handle_call, - fun({takeover, 'begin'}, Channel) -> - {reply, session, Channel}; - ({takeover, 'end'}, Channel) -> - {shutdown, takeovered, [], Channel} - end), - session = emqx_connection:call(CPid, {takeover, 'begin'}), - [] = emqx_connection:call(CPid, {takeover, 'end'}), - timer:sleep(100), - ok = trap_exit(CPid, {shutdown, takeovered}) - end, #{trap_exit => true}). + with_conn(fun(CPid) -> + ok = meck:expect(emqx_channel, handle_call, + fun({takeover, 'begin'}, Channel) -> + {reply, session, Channel}; + ({takeover, 'end'}, Channel) -> + {shutdown, takeovered, [], Channel} + end), + session = emqx_connection:call(CPid, {takeover, 'begin'}), + [] = emqx_connection:call(CPid, {takeover, 'end'}), + timer:sleep(100), + ok = trap_exit(CPid, {shutdown, takeovered}) + end, #{trap_exit => true}). t_handle_call_any(_) -> - with_connection(fun(CPid) -> - ok = meck:expect(emqx_channel, handle_call, - fun(_Req, Channel) -> {reply, ok, Channel} end), - ok = emqx_connection:call(CPid, req) - end). + with_conn(fun(CPid) -> + ok = meck:expect(emqx_channel, handle_call, + fun(_Req, Channel) -> {reply, ok, Channel} end), + ok = emqx_connection:call(CPid, req) + end). t_handle_incoming_connect(_) -> - with_connection(fun(CPid) -> - ok = meck:expect(emqx_channel, handle_in, fun(_Packet, Channel) -> {ok, Channel} end), - ConnPkt = #mqtt_packet_connect{proto_ver = ?MQTT_PROTO_V5, - proto_name = <<"MQTT">>, - clientid = <<>>, - clean_start = true, - keepalive = 60 - }, - Frame = make_frame(?CONNECT_PACKET(ConnPkt)), - CPid ! {tcp, sock, Frame} - end). + with_conn(fun(CPid) -> + ok = meck:expect(emqx_channel, handle_in, fun(_Packet, Channel) -> {ok, Channel} end), + ConnPkt = #mqtt_packet_connect{proto_ver = ?MQTT_PROTO_V5, + proto_name = <<"MQTT">>, + clientid = <<>>, + clean_start = true, + keepalive = 60 + }, + Frame = make_frame(?CONNECT_PACKET(ConnPkt)), + CPid ! {tcp, sock, Frame} + end). t_handle_incoming_publish(_) -> - with_connection(fun(CPid) -> - ok = meck:expect(emqx_channel, handle_in, fun(_Packet, Channel) -> {ok, Channel} end), - Frame = make_frame(?PUBLISH_PACKET(?QOS_1, <<"t">>, 1, <<"payload">>)), - CPid ! {tcp, sock, Frame} - end). + with_conn(fun(CPid) -> + ok = meck:expect(emqx_channel, handle_in, fun(_Packet, Channel) -> {ok, Channel} end), + Frame = make_frame(?PUBLISH_PACKET(?QOS_1, <<"t">>, 1, <<"payload">>)), + CPid ! {tcp, sock, Frame} + end). t_handle_incoming_subscribe(_) -> - with_connection(fun(CPid) -> - ok = meck:expect(emqx_channel, handle_in, fun(_Packet, Channel) -> {ok, Channel} end), - Frame = <>, - CPid ! {tcp, sock, Frame} - end). + with_conn(fun(CPid) -> + ok = meck:expect(emqx_channel, handle_in, fun(_Packet, Channel) -> {ok, Channel} end), + Frame = <>, + CPid ! {tcp, sock, Frame} + end). t_handle_incoming_unsubscribe(_) -> - with_connection(fun(CPid) -> - ok = meck:expect(emqx_channel, handle_in, fun(_Packet, Channel) -> {ok, Channel} end), - Frame = <>, - CPid ! {tcp, sock, Frame} - end). + with_conn(fun(CPid) -> + ok = meck:expect(emqx_channel, handle_in, fun(_Packet, Channel) -> {ok, Channel} end), + Frame = <>, + CPid ! {tcp, sock, Frame} + end). t_handle_incoming_undefined(_) -> - with_connection(fun(CPid) -> - ok = meck:expect(emqx_channel, handle_in, fun(_Packet, Channel) -> {ok, Channel} end), - CPid ! {incoming, undefined} - end). + with_conn(fun(CPid) -> + ok = meck:expect(emqx_channel, handle_in, fun(_Packet, Channel) -> {ok, Channel} end), + CPid ! {incoming, undefined} + end). t_handle_sock_error(_) -> - with_connection(fun(CPid) -> - ok = meck:expect(emqx_channel, handle_info, - fun({_, Reason}, Channel) -> - {shutdown, Reason, Channel} - end), + with_conn(fun(CPid) -> + ok = meck:expect(emqx_channel, handle_info, + fun({_, Reason}, Channel) -> + {shutdown, Reason, Channel} + end), %% TODO: fixme later CPid ! {tcp_error, sock, econnreset}, timer:sleep(100), trap_exit(CPid, {shutdown, econnreset}) - end, #{trap_exit => true}). - -g_handle_sock_passive(_) -> - with_client(fun(CPid) -> CPid ! {tcp_passive, sock} end, []). + end, #{trap_exit => true}). t_handle_sock_activate(_) -> - with_connection(fun(CPid) -> CPid ! activate_socket end). + with_conn(fun(CPid) -> CPid ! activate_socket end). t_handle_sock_closed(_) -> - with_connection(fun(CPid) -> + with_conn(fun(CPid) -> ok = meck:expect(emqx_channel, handle_info, fun({sock_closed, Reason}, Channel) -> {shutdown, Reason, Channel} @@ -238,157 +205,155 @@ t_handle_sock_closed(_) -> timer:sleep(100), trap_exit(CPid, {shutdown, tcp_closed}) end, #{trap_exit => true}), - with_connection(fun(CPid) -> - ok = meck:expect(emqx_channel, handle_info, - fun({sock_closed, Reason}, Channel) -> - {shutdown, Reason, ?DISCONNECT_PACKET(), Channel} - end), - CPid ! {tcp_closed, sock}, - timer:sleep(100), - trap_exit(CPid, {shutdown, tcp_closed}) - end, #{trap_exit => true}). + with_conn(fun(CPid) -> + ok = meck:expect(emqx_channel, handle_info, + fun({sock_closed, Reason}, Channel) -> + {shutdown, Reason, ?DISCONNECT_PACKET(), Channel} + end), + CPid ! {tcp_closed, sock}, + timer:sleep(100), + trap_exit(CPid, {shutdown, tcp_closed}) + end, #{trap_exit => true}). t_handle_outgoing(_) -> - with_connection(fun(CPid) -> - Publish = ?PUBLISH_PACKET(?QOS_2, <<"Topic">>, 1, <<>>), - CPid ! {outgoing, Publish}, - CPid ! {outgoing, ?PUBREL_PACKET(1)}, - CPid ! {outgoing, [?PUBCOMP_PACKET(1)]} - end). + with_conn(fun(CPid) -> + Publish = ?PUBLISH_PACKET(?QOS_2, <<"Topic">>, 1, <<>>), + CPid ! {outgoing, Publish}, + CPid ! {outgoing, ?PUBREL_PACKET(1)}, + CPid ! {outgoing, [?PUBCOMP_PACKET(1)]} + end). t_conn_rate_limit(_) -> - with_connection(fun(CPid) -> - ok = meck:expect(emqx_channel, handle_in, fun(_, Channel) -> {ok, Channel} end), - lists:foreach(fun(I) -> - Publish = ?PUBLISH_PACKET(?QOS_0, <<"Topic">>, I, payload(2000)), - CPid ! {tcp, sock, make_frame(Publish)} - end, [1, 2]) - %%#{sockinfo := #{sockstate := blocked}} = emqx_connection:info(CPid) - end, #{active_n => 1, rate_limit => {1, 1024}}). + with_conn(fun(CPid) -> + ok = meck:expect(emqx_channel, handle_in, fun(_, Channel) -> {ok, Channel} end), + lists:foreach(fun(I) -> + Publish = ?PUBLISH_PACKET(?QOS_0, <<"Topic">>, I, payload(2000)), + CPid ! {tcp, sock, make_frame(Publish)} + end, [1, 2]) + end, #{active_n => 1, rate_limit => {1, 1024}}). t_conn_pub_limit(_) -> - with_connection(fun(CPid) -> - ok = meck:expect(emqx_channel, handle_in, fun(_, Channel) -> {ok, Channel} end), - ok = lists:foreach(fun(I) -> - CPid ! {incoming, ?PUBLISH_PACKET(?QOS_0, <<"Topic">>, I, <<>>)} - end, lists:seq(1, 3)) - %%#{sockinfo := #{sockstate := blocked}} = emqx_connection:info(CPid) - end, #{active_n => 1, publish_limit => {1, 2}}). + with_conn(fun(CPid) -> + ok = meck:expect(emqx_channel, handle_in, fun(_, Channel) -> {ok, Channel} end), + ok = lists:foreach(fun(I) -> + CPid ! {incoming, ?PUBLISH_PACKET(?QOS_0, <<"Topic">>, I, <<>>)} + end, lists:seq(1, 3)) + %%#{sockinfo := #{sockstate := blocked}} = emqx_connection:info(CPid) + end, #{active_n => 1, publish_limit => {1, 2}}). t_conn_pingreq(_) -> - with_connection(fun(CPid) -> - CPid ! {incoming, ?PACKET(?PINGREQ)} - end). + with_conn(fun(CPid) -> CPid ! {incoming, ?PACKET(?PINGREQ)} end). t_inet_reply(_) -> ok = meck:new(emqx_pd, [passthrough, no_history]), - with_connection(fun(CPid) -> - ok = meck:expect(emqx_pd, get_counter, fun(_) -> 10 end), - CPid ! {inet_reply, for_testing, ok}, - timer:sleep(100) - end, #{active_n => 1, trap_exit => true}), + with_conn(fun(CPid) -> + ok = meck:expect(emqx_pd, get_counter, fun(_) -> 10 end), + CPid ! {inet_reply, for_testing, ok}, + timer:sleep(100) + end, #{active_n => 1, trap_exit => true}), ok = meck:unload(emqx_pd), - with_connection(fun(CPid) -> - CPid ! {inet_reply, for_testing, {error, for_testing}}, - timer:sleep(100), - trap_exit(CPid, {shutdown, for_testing}) - end, #{trap_exit => true}). + with_conn(fun(CPid) -> + CPid ! {inet_reply, for_testing, {error, for_testing}}, + timer:sleep(100), + trap_exit(CPid, {shutdown, for_testing}) + end, #{trap_exit => true}). t_deliver(_) -> - with_connection(fun(CPid) -> - ok = meck:expect(emqx_channel, handle_deliver, fun(_, Channel) -> {ok, Channel} end), - CPid ! {deliver, topic, msg} - end). + with_conn(fun(CPid) -> + ok = meck:expect(emqx_channel, handle_deliver, + fun(_, Channel) -> {ok, Channel} end), + CPid ! {deliver, topic, msg} + end). t_event_disconnected(_) -> - with_connection(fun(CPid) -> - ok = meck:expect(emqx_cm, set_chan_info, fun(_, _) -> ok end), - ok = meck:expect(emqx_cm, connection_closed, fun(_) -> ok end), - CPid ! {event, disconnected} - end). + with_conn(fun(CPid) -> + ok = meck:expect(emqx_cm, set_chan_info, fun(_, _) -> ok end), + ok = meck:expect(emqx_cm, connection_closed, fun(_) -> ok end), + CPid ! {event, disconnected} + end). t_event_undefined(_) -> - with_connection(fun(CPid) -> - ok = meck:expect(emqx_channel, stats, fun(_Channel) -> [] end), - ok = meck:expect(emqx_cm, set_chan_info, fun(_, _) -> ok end), - ok = meck:expect(emqx_cm, set_chan_stats, fun(_, _) -> true end), - CPid ! {event, undefined} - end). + with_conn(fun(CPid) -> + ok = meck:expect(emqx_channel, stats, fun(_Channel) -> [] end), + ok = meck:expect(emqx_cm, set_chan_info, fun(_, _) -> ok end), + ok = meck:expect(emqx_cm, set_chan_stats, fun(_, _) -> true end), + CPid ! {event, undefined} + end). t_cloes(_) -> - with_connection(fun(CPid) -> - CPid ! {close, normal}, - timer:sleep(100), - trap_exit(CPid, {shutdown, normal}) - end, #{trap_exit => true}). + with_conn(fun(CPid) -> + CPid ! {close, normal}, + timer:sleep(100), + trap_exit(CPid, {shutdown, normal}) + end, #{trap_exit => true}). t_oom_shutdown(_) -> - with_connection(fun(CPid) -> - CPid ! {shutdown, message_queue_too_long}, - timer:sleep(100), - trap_exit(CPid, {shutdown, message_queue_too_long}) - end, #{trap_exit => true}). + with_conn(fun(CPid) -> + CPid ! {shutdown, message_queue_too_long}, + timer:sleep(100), + trap_exit(CPid, {shutdown, message_queue_too_long}) + end, #{trap_exit => true}). t_handle_idle_timeout(_) -> ok = emqx_zone:set_env(external, idle_timeout, 10), - with_connection(fun(CPid) -> - timer:sleep(100), - trap_exit(CPid, {shutdown, idle_timeout}) - end, #{zone => external, trap_exit => true}). + with_conn(fun(CPid) -> + timer:sleep(100), + trap_exit(CPid, {shutdown, idle_timeout}) + end, #{zone => external, trap_exit => true}). t_handle_emit_stats(_) -> ok = emqx_zone:set_env(external, idle_timeout, 1000), - with_connection(fun(CPid) -> - ok = meck:expect(emqx_channel, stats, fun(_Channel) -> [] end), - ok = meck:expect(emqx_channel, handle_in, fun(_Packet, Channel) -> {ok, Channel} end), - ok = meck:expect(emqx_cm, set_chan_stats, fun(_, _) -> true end), - CPid ! {incoming, ?CONNECT_PACKET(#{strict_mode => false, - max_size => ?MAX_PACKET_SIZE, - version => ?MQTT_PROTO_V4 - })}, - timer:sleep(1000) - end,#{zone => external, trap_exit => true}). + with_conn(fun(CPid) -> + ok = meck:expect(emqx_channel, stats, fun(_Channel) -> [] end), + ok = meck:expect(emqx_channel, handle_in, fun(_Packet, Channel) -> {ok, Channel} end), + ok = meck:expect(emqx_cm, set_chan_stats, fun(_, _) -> true end), + CPid ! {incoming, ?CONNECT_PACKET(#{strict_mode => false, + max_size => ?MAX_PACKET_SIZE, + version => ?MQTT_PROTO_V4 + })}, + timer:sleep(1000) + end,#{zone => external, trap_exit => true}). t_handle_limit_timeout(_) -> - with_connection(fun(CPid) -> - CPid ! {timeout, undefined, limit_timeout}, - timer:sleep(100), - false = erlang:is_process_alive(CPid) - end, #{trap_exit => true}). + with_conn(fun(CPid) -> + CPid ! {timeout, undefined, limit_timeout}, + timer:sleep(100), + true = erlang:is_process_alive(CPid) + end). t_handle_keepalive_timeout(_) -> - with_connection(fun(CPid) -> - ok = meck:expect(emqx_channel, handle_timeout, - fun(_TRef, _TMsg, Channel) -> - {shutdown, keepalive_timeout, Channel} - end), - CPid ! {timeout, make_ref(), keepalive}, - timer:sleep(100), - trap_exit(CPid, {shutdown, keepalive_timeout}) - end, #{trap_exit => true}), - with_connection(fun(CPid) -> - ok = meck:expect(emqx_transport, getstat, fun(_Sock, _Options) -> {error, for_testing} end), - ok = meck:expect(emqx_channel, handle_timeout, - fun(_TRef, _TMsg, Channel) -> - {shutdown, keepalive_timeout, Channel} - end), - CPid ! {timeout, make_ref(), keepalive}, - timer:sleep(100), - false = erlang:is_process_alive(CPid) - end, #{trap_exit => true}). + with_conn(fun(CPid) -> + ok = meck:expect(emqx_channel, handle_timeout, + fun(_TRef, _TMsg, Channel) -> + {shutdown, keepalive_timeout, Channel} + end), + CPid ! {timeout, make_ref(), keepalive}, + timer:sleep(100), + trap_exit(CPid, {shutdown, keepalive_timeout}) + end, #{trap_exit => true}), + with_conn(fun(CPid) -> + ok = meck:expect(emqx_transport, getstat, fun(_Sock, _Options) -> {error, for_testing} end), + ok = meck:expect(emqx_channel, handle_timeout, + fun(_TRef, _TMsg, Channel) -> + {shutdown, keepalive_timeout, Channel} + end), + CPid ! {timeout, make_ref(), keepalive}, + timer:sleep(100), + false = erlang:is_process_alive(CPid) + end, #{trap_exit => true}). t_handle_shutdown(_) -> - with_connection(fun(CPid) -> - CPid ! Shutdown = {shutdown, reason}, - timer:sleep(100), - trap_exit(CPid, Shutdown) - end, #{trap_exit => true}). + with_conn(fun(CPid) -> + CPid ! Shutdown = {shutdown, reason}, + timer:sleep(100), + trap_exit(CPid, Shutdown) + end, #{trap_exit => true}). t_exit_message(_) -> - with_connection(fun(CPid) -> - CPid ! {'EXIT', CPid, for_testing}, - timer:sleep(1000) - end, #{trap_exit => true}). + with_conn(fun(CPid) -> + CPid ! {'EXIT', CPid, for_testing}, + timer:sleep(1000) + end, #{trap_exit => true}). %%-------------------------------------------------------------------- %% Helper functions @@ -399,27 +364,28 @@ exit_on_wait_error(SockErr, Reason) -> fun(_Sock) -> {error, SockErr} end), - with_connection(fun(CPid) -> - timer:sleep(100), - trap_exit(CPid, Reason) - end, #{trap_exit => true}). + with_conn(fun(CPid) -> + timer:sleep(100), + trap_exit(CPid, Reason) + end, #{trap_exit => true}). exit_on_activate_error(SockErr, Reason) -> ok = meck:expect(emqx_transport, setopts, fun(_Sock, _Opts) -> {error, SockErr} end), - with_connection(fun(CPid) -> - timer:sleep(100), - trap_exit(CPid, Reason) - end, #{trap_exit => true}). + with_conn(fun(CPid) -> + timer:sleep(100), + trap_exit(CPid, Reason) + end, #{trap_exit => true}). -with_connection(TestFun) -> - with_connection(TestFun, #{trap_exit => false}). +with_conn(TestFun) -> + with_conn(TestFun, #{trap_exit => false}). -with_connection(TestFun, Options) when is_map(Options) -> - with_connection(TestFun, maps:to_list(Options)); -with_connection(TestFun, Options) -> +with_conn(TestFun, Options) when is_map(Options) -> + with_conn(TestFun, maps:to_list(Options)); + +with_conn(TestFun, Options) -> TrapExit = proplists:get_value(trap_exit, Options, false), process_flag(trap_exit, TrapExit), {ok, CPid} = emqx_connection:start_link(emqx_transport, sock, Options), @@ -427,18 +393,6 @@ with_connection(TestFun, Options) -> TrapExit orelse emqx_connection:stop(CPid), ok. -with_client(TestFun, _Options) -> - ClientId = <<"t_conn">>, - {ok, C} = emqtt:start_link([{clientid, ClientId}]), - {ok, _} = emqtt:connect(C), - timer:sleep(50), - case emqx_cm:lookup_channels(ClientId) of - [] -> ct:fail({client_not_started, ClientId}); - [ChanPid] -> - TestFun(ChanPid), - emqtt:stop(C) - end. - trap_exit(Pid, Reason) -> receive {'EXIT', Pid, Reason} -> ok; diff --git a/test/emqx_msg_expiry_interval_SUITE.erl b/test/emqx_mqtt_SUITE.erl similarity index 76% rename from test/emqx_msg_expiry_interval_SUITE.erl rename to test/emqx_mqtt_SUITE.erl index 063cf19dc..ffdf07800 100644 --- a/test/emqx_msg_expiry_interval_SUITE.erl +++ b/test/emqx_mqtt_SUITE.erl @@ -14,23 +14,40 @@ %% limitations under the License. %%-------------------------------------------------------------------- --module(emqx_msg_expiry_interval_SUITE). +-module(emqx_mqtt_SUITE). -compile(export_all). -compile(nowarn_export_all). +-include("emqx.hrl"). -include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). + +-define(STATS_KYES, [recv_pkt, recv_msg, send_pkt, send_msg, + recv_oct, recv_cnt, send_oct, send_cnt, + send_pend + ]). all() -> emqx_ct:all(?MODULE). init_per_suite(Config) -> - emqx_ct_helpers:boot_modules(all), + emqx_ct_helpers:boot_modules(all), emqx_ct_helpers:start_apps([]), Config. end_per_suite(_Config) -> emqx_ct_helpers:stop_apps([]). +t_conn_stats(_) -> + with_client(fun(CPid) -> + Stats = emqx_connection:stats(CPid), + ct:pal("==== stats: ~p", [Stats]), + [?assert(proplists:get_value(Key, Stats) >= 0) || Key <- ?STATS_KYES] + end, []). + +t_tcp_sock_passive(_) -> + with_client(fun(CPid) -> CPid ! {tcp_passive, sock} end, []). + t_message_expiry_interval_1(_) -> ClientA = message_expiry_interval_init(), [message_expiry_interval_exipred(ClientA, QoS) || QoS <- [0,1,2]], @@ -92,3 +109,16 @@ message_expiry_interval_not_exipred(ClientA, QoS) -> ct:fail(no_publish_received) end, emqtt:stop(ClientB1). + +with_client(TestFun, _Options) -> + ClientId = <<"t_conn">>, + {ok, C} = emqtt:start_link([{clientid, ClientId}]), + {ok, _} = emqtt:connect(C), + timer:sleep(50), + case emqx_cm:lookup_channels(ClientId) of + [] -> ct:fail({client_not_started, ClientId}); + [ChanPid] -> + TestFun(ChanPid), + emqtt:stop(C) + end. +