Improve 'emqx_connection' module and update test cases

This commit is contained in:
Feng Lee 2019-12-14 12:42:45 +08:00
parent 03681b6a3b
commit 2ef52828bc
3 changed files with 284 additions and 302 deletions

View File

@ -347,11 +347,10 @@ handle_msg({Passive, _Sock}, State)
NState1 = check_oom(run_gc(InStats, NState)), NState1 = check_oom(run_gc(InStats, NState)),
handle_info(activate_socket, NState1); handle_info(activate_socket, NState1);
handle_msg(Deliver = {deliver, _Topic, _Msg}, State = handle_msg(Deliver = {deliver, _Topic, _Msg},
#state{active_n = ActiveN, channel = Channel}) -> State = #state{active_n = ActiveN}) ->
Delivers = [Deliver|emqx_misc:drain_deliver(ActiveN)], Delivers = [Deliver|emqx_misc:drain_deliver(ActiveN)],
Ret = emqx_channel:handle_deliver(Delivers, Channel), with_channel(handle_deliver, [Delivers], State);
handle_chan_return(Ret, State);
%% Something sent %% Something sent
handle_msg({inet_reply, _Sock, ok}, State = #state{active_n = ActiveN}) -> handle_msg({inet_reply, _Sock, ok}, State = #state{active_n = ActiveN}) ->
@ -471,9 +470,8 @@ handle_timeout(TRef, keepalive, State =
handle_info({sock_error, Reason}, State) handle_info({sock_error, Reason}, State)
end; end;
handle_timeout(TRef, Msg, State = #state{channel = Channel}) -> handle_timeout(TRef, Msg, State) ->
Ret = emqx_channel:handle_timeout(TRef, Msg, Channel), with_channel(handle_timeout, [TRef, Msg], State).
handle_chan_return(Ret, State).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Parse incoming data %% Parse incoming data
@ -509,30 +507,31 @@ next_incoming_msgs(Packets) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Handle incoming packet %% Handle incoming packet
handle_incoming(Packet, State = #state{channel = Channel}) handle_incoming(Packet, State) when is_record(Packet, mqtt_packet) ->
when is_record(Packet, mqtt_packet) ->
ok = inc_incoming_stats(Packet), ok = inc_incoming_stats(Packet),
?LOG(debug, "RECV ~s", [emqx_packet:format(Packet)]), ?LOG(debug, "RECV ~s", [emqx_packet:format(Packet)]),
handle_chan_return(emqx_channel:handle_in(Packet, Channel), State); with_channel(handle_in, [Packet], State);
handle_incoming(FrameError, State = #state{channel = Channel}) -> handle_incoming(FrameError, State) ->
handle_chan_return(emqx_channel:handle_in(FrameError, Channel), State). with_channel(handle_in, [FrameError], State).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Handle channel return %% With Channel
handle_chan_return(ok, State) -> with_channel(Fun, Args, State = #state{channel = Channel}) ->
{ok, State}; case erlang:apply(emqx_channel, Fun, Args ++ [Channel]) of
handle_chan_return({ok, NChannel}, State) -> ok -> {ok, State};
{ok, State#state{channel = NChannel}}; {ok, NChannel} ->
handle_chan_return({ok, Replies, NChannel}, State) -> {ok, State#state{channel = NChannel}};
{ok, next_msgs(Replies), State#state{channel = NChannel}}; {ok, Replies, NChannel} ->
handle_chan_return({shutdown, Reason, NChannel}, State) -> {ok, next_msgs(Replies), State#state{channel = NChannel}};
shutdown(Reason, State#state{channel = NChannel}); {shutdown, Reason, NChannel} ->
handle_chan_return({shutdown, Reason, OutPacket, NChannel}, State) -> shutdown(Reason, State#state{channel = NChannel});
NState = State#state{channel = NChannel}, {shutdown, Reason, Packet, NChannel} ->
ok = handle_outgoing(OutPacket, NState), NState = State#state{channel = NChannel},
shutdown(Reason, NState). ok = handle_outgoing(Packet, NState),
shutdown(Reason, NState)
end.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Handle outgoing packets %% Handle outgoing packets
@ -589,9 +588,8 @@ handle_info({sock_error, Reason}, State) ->
?LOG(debug, "Socket error: ~p", [Reason]), ?LOG(debug, "Socket error: ~p", [Reason]),
handle_info({sock_closed, Reason}, close_socket(State)); handle_info({sock_closed, Reason}, close_socket(State));
handle_info(Info, State = #state{channel = Channel}) -> handle_info(Info, State) ->
Ret = emqx_channel:handle_info(Info, Channel), with_channel(handle_info, [Info], State).
handle_chan_return(Ret, State).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Ensure rate limit %% Ensure rate limit

View File

@ -22,43 +22,34 @@
-include("emqx_mqtt.hrl"). -include("emqx_mqtt.hrl").
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-define(STATS_KYES, [recv_pkt, recv_msg, send_pkt, send_msg, all() -> emqx_ct:all(?MODULE).
recv_oct, recv_cnt, send_oct, send_cnt,
send_pend
]).
all() -> emqx_ct:all(?MODULE) ++ [{group, real_client}].
groups() ->
[{real_client, [non_parallel_tests],
[
g_get_conn_stats,
g_handle_sock_passive
]}].
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% CT callbacks %% CT callbacks
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
init_per_suite(Config) -> init_per_suite(Config) ->
%% Meck Transport
ok = meck:new(emqx_transport, [non_strict, passthrough, no_history, no_link]),
%% Meck Channel
ok = meck:new(emqx_channel, [passthrough, no_history, no_link]),
%% Meck Cm
ok = meck:new(emqx_cm, [passthrough, no_history, no_link]),
%% Meck Metrics
ok = meck:new(emqx_metrics, [passthrough, no_history, no_link]),
ok = meck:expect(emqx_metrics, inc, fun(_, _) -> ok end),
ok = meck:expect(emqx_metrics, inc_recv, fun(_) -> ok end),
ok = meck:expect(emqx_metrics, inc_sent, fun(_) -> ok end),
Config. Config.
end_per_suite(_Config) -> end_per_suite(_Config) ->
ok = meck:unload(emqx_transport),
ok = meck:unload(emqx_channel),
ok = meck:unload(emqx_cm),
ok = meck:unload(emqx_metrics),
ok. ok.
init_per_group(real_client, Config) ->
emqx_ct_helpers:boot_modules(all),
emqx_ct_helpers:start_apps([]),
Config;
init_per_group(_, Config) -> Config.
end_per_group(real_client, _Config) ->
emqx_ct_helpers:stop_apps([]);
end_per_group(_, Config) -> Config.
init_per_testcase(_TestCase, Config) -> init_per_testcase(_TestCase, Config) ->
%% Meck Transport
ok = meck:new(emqx_transport, [non_strict, passthrough, no_history]),
ok = meck:expect(emqx_transport, wait, fun(Sock) -> {ok, Sock} end), ok = meck:expect(emqx_transport, wait, fun(Sock) -> {ok, Sock} end),
ok = meck:expect(emqx_transport, type, fun(_Sock) -> tcp end), ok = meck:expect(emqx_transport, type, fun(_Sock) -> tcp end),
ok = meck:expect(emqx_transport, ensure_ok_or_exit, ok = meck:expect(emqx_transport, ensure_ok_or_exit,
@ -72,22 +63,9 @@ init_per_testcase(_TestCase, Config) ->
end), end),
ok = meck:expect(emqx_transport, async_send, fun(_Sock, _Data) -> ok end), ok = meck:expect(emqx_transport, async_send, fun(_Sock, _Data) -> ok end),
ok = meck:expect(emqx_transport, fast_close, fun(_Sock) -> ok end), ok = meck:expect(emqx_transport, fast_close, fun(_Sock) -> ok end),
%% Meck Channel
ok = meck:new(emqx_channel, [passthrough, no_history]),
%% Meck Cm
ok = meck:new(emqx_cm, [passthrough, no_history]),
%% Meck Metrics
ok = meck:new(emqx_metrics, [passthrough, no_history]),
ok = meck:expect(emqx_metrics, inc, fun(_, _) -> ok end),
ok = meck:expect(emqx_metrics, inc_recv, fun(_) -> ok end),
ok = meck:expect(emqx_metrics, inc_sent, fun(_) -> ok end),
Config. Config.
end_per_testcase(_TestCase, Config) -> end_per_testcase(_TestCase, Config) ->
ok = meck:unload(emqx_transport),
ok = meck:unload(emqx_channel),
ok = meck:unload(emqx_cm),
ok = meck:unload(emqx_metrics),
Config. Config.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -95,9 +73,7 @@ end_per_testcase(_TestCase, Config) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
t_start_link_ok(_) -> t_start_link_ok(_) ->
with_connection(fun(CPid) -> with_conn(fun(CPid) -> state = element(1, sys:get_state(CPid)) end).
state = element(1, sys:get_state(CPid))
end).
t_start_link_exit_on_wait(_) -> t_start_link_exit_on_wait(_) ->
ok = exit_on_wait_error(enotconn, normal), ok = exit_on_wait_error(enotconn, normal),
@ -113,123 +89,114 @@ t_start_link_exit_on_activate(_) ->
ok = exit_on_activate_error(econnreset, {shutdown, econnreset}). ok = exit_on_activate_error(econnreset, {shutdown, econnreset}).
t_get_conn_info(_) -> t_get_conn_info(_) ->
with_connection(fun(CPid) -> with_conn(fun(CPid) ->
#{sockinfo := SockInfo} = emqx_connection:info(CPid), #{sockinfo := SockInfo} = emqx_connection:info(CPid),
?assertEqual(#{active_n => 100, ?assertEqual(#{active_n => 100,
peername => {{127,0,0,1},3456}, peername => {{127,0,0,1},3456},
sockname => {{127,0,0,1},1883}, sockname => {{127,0,0,1},1883},
sockstate => running, sockstate => running,
socktype => tcp}, SockInfo) socktype => tcp
end). }, SockInfo)
end).
g_get_conn_stats(_) ->
with_client(fun(CPid) ->
Stats = emqx_connection:stats(CPid),
ct:pal("==== stats: ~p", [Stats]),
[?assert(proplists:get_value(Key, Stats) >= 0) || Key <- ?STATS_KYES]
end, []).
t_handle_call_discard(_) -> t_handle_call_discard(_) ->
with_connection(fun(CPid) -> with_conn(fun(CPid) ->
ok = meck:expect(emqx_channel, handle_call, ok = meck:expect(emqx_channel, handle_call,
fun(discard, Channel) -> fun(discard, Channel) ->
{shutdown, discarded, ok, Channel} {shutdown, discarded, ok, Channel}
end), end),
ok = emqx_connection:call(CPid, discard), ok = emqx_connection:call(CPid, discard),
timer:sleep(100), timer:sleep(100),
ok = trap_exit(CPid, {shutdown, discarded}) ok = trap_exit(CPid, {shutdown, discarded})
end, #{trap_exit => true}), end, #{trap_exit => true}),
with_connection(fun(CPid) -> with_conn(fun(CPid) ->
ok = meck:expect(emqx_channel, handle_call, ok = meck:expect(emqx_channel, handle_call,
fun(discard, Channel) -> fun(discard, Channel) ->
{shutdown, discarded, ok, ?DISCONNECT_PACKET(?RC_SESSION_TAKEN_OVER), Channel} {shutdown, discarded, ok, ?DISCONNECT_PACKET(?RC_SESSION_TAKEN_OVER), Channel}
end), end),
ok = emqx_connection:call(CPid, discard), ok = emqx_connection:call(CPid, discard),
timer:sleep(100), timer:sleep(100),
ok = trap_exit(CPid, {shutdown, discarded}) ok = trap_exit(CPid, {shutdown, discarded})
end, #{trap_exit => true}). end, #{trap_exit => true}).
t_handle_call_takeover(_) -> t_handle_call_takeover(_) ->
with_connection(fun(CPid) -> with_conn(fun(CPid) ->
ok = meck:expect(emqx_channel, handle_call, ok = meck:expect(emqx_channel, handle_call,
fun({takeover, 'begin'}, Channel) -> fun({takeover, 'begin'}, Channel) ->
{reply, session, Channel}; {reply, session, Channel};
({takeover, 'end'}, Channel) -> ({takeover, 'end'}, Channel) ->
{shutdown, takeovered, [], Channel} {shutdown, takeovered, [], Channel}
end), end),
session = emqx_connection:call(CPid, {takeover, 'begin'}), session = emqx_connection:call(CPid, {takeover, 'begin'}),
[] = emqx_connection:call(CPid, {takeover, 'end'}), [] = emqx_connection:call(CPid, {takeover, 'end'}),
timer:sleep(100), timer:sleep(100),
ok = trap_exit(CPid, {shutdown, takeovered}) ok = trap_exit(CPid, {shutdown, takeovered})
end, #{trap_exit => true}). end, #{trap_exit => true}).
t_handle_call_any(_) -> t_handle_call_any(_) ->
with_connection(fun(CPid) -> with_conn(fun(CPid) ->
ok = meck:expect(emqx_channel, handle_call, ok = meck:expect(emqx_channel, handle_call,
fun(_Req, Channel) -> {reply, ok, Channel} end), fun(_Req, Channel) -> {reply, ok, Channel} end),
ok = emqx_connection:call(CPid, req) ok = emqx_connection:call(CPid, req)
end). end).
t_handle_incoming_connect(_) -> t_handle_incoming_connect(_) ->
with_connection(fun(CPid) -> with_conn(fun(CPid) ->
ok = meck:expect(emqx_channel, handle_in, fun(_Packet, Channel) -> {ok, Channel} end), ok = meck:expect(emqx_channel, handle_in, fun(_Packet, Channel) -> {ok, Channel} end),
ConnPkt = #mqtt_packet_connect{proto_ver = ?MQTT_PROTO_V5, ConnPkt = #mqtt_packet_connect{proto_ver = ?MQTT_PROTO_V5,
proto_name = <<"MQTT">>, proto_name = <<"MQTT">>,
clientid = <<>>, clientid = <<>>,
clean_start = true, clean_start = true,
keepalive = 60 keepalive = 60
}, },
Frame = make_frame(?CONNECT_PACKET(ConnPkt)), Frame = make_frame(?CONNECT_PACKET(ConnPkt)),
CPid ! {tcp, sock, Frame} CPid ! {tcp, sock, Frame}
end). end).
t_handle_incoming_publish(_) -> t_handle_incoming_publish(_) ->
with_connection(fun(CPid) -> with_conn(fun(CPid) ->
ok = meck:expect(emqx_channel, handle_in, fun(_Packet, Channel) -> {ok, Channel} end), ok = meck:expect(emqx_channel, handle_in, fun(_Packet, Channel) -> {ok, Channel} end),
Frame = make_frame(?PUBLISH_PACKET(?QOS_1, <<"t">>, 1, <<"payload">>)), Frame = make_frame(?PUBLISH_PACKET(?QOS_1, <<"t">>, 1, <<"payload">>)),
CPid ! {tcp, sock, Frame} CPid ! {tcp, sock, Frame}
end). end).
t_handle_incoming_subscribe(_) -> t_handle_incoming_subscribe(_) ->
with_connection(fun(CPid) -> with_conn(fun(CPid) ->
ok = meck:expect(emqx_channel, handle_in, fun(_Packet, Channel) -> {ok, Channel} end), ok = meck:expect(emqx_channel, handle_in, fun(_Packet, Channel) -> {ok, Channel} end),
Frame = <<?SUBSCRIBE:4,2:4,11,0,2,0,6,84,111,112,105,99,65,2>>, Frame = <<?SUBSCRIBE:4,2:4,11,0,2,0,6,84,111,112,105,99,65,2>>,
CPid ! {tcp, sock, Frame} CPid ! {tcp, sock, Frame}
end). end).
t_handle_incoming_unsubscribe(_) -> t_handle_incoming_unsubscribe(_) ->
with_connection(fun(CPid) -> with_conn(fun(CPid) ->
ok = meck:expect(emqx_channel, handle_in, fun(_Packet, Channel) -> {ok, Channel} end), ok = meck:expect(emqx_channel, handle_in, fun(_Packet, Channel) -> {ok, Channel} end),
Frame = <<?UNSUBSCRIBE:4,2:4,10,0,2,0,6,84,111,112,105,99,65>>, Frame = <<?UNSUBSCRIBE:4,2:4,10,0,2,0,6,84,111,112,105,99,65>>,
CPid ! {tcp, sock, Frame} CPid ! {tcp, sock, Frame}
end). end).
t_handle_incoming_undefined(_) -> t_handle_incoming_undefined(_) ->
with_connection(fun(CPid) -> with_conn(fun(CPid) ->
ok = meck:expect(emqx_channel, handle_in, fun(_Packet, Channel) -> {ok, Channel} end), ok = meck:expect(emqx_channel, handle_in, fun(_Packet, Channel) -> {ok, Channel} end),
CPid ! {incoming, undefined} CPid ! {incoming, undefined}
end). end).
t_handle_sock_error(_) -> t_handle_sock_error(_) ->
with_connection(fun(CPid) -> with_conn(fun(CPid) ->
ok = meck:expect(emqx_channel, handle_info, ok = meck:expect(emqx_channel, handle_info,
fun({_, Reason}, Channel) -> fun({_, Reason}, Channel) ->
{shutdown, Reason, Channel} {shutdown, Reason, Channel}
end), end),
%% TODO: fixme later %% TODO: fixme later
CPid ! {tcp_error, sock, econnreset}, CPid ! {tcp_error, sock, econnreset},
timer:sleep(100), timer:sleep(100),
trap_exit(CPid, {shutdown, econnreset}) trap_exit(CPid, {shutdown, econnreset})
end, #{trap_exit => true}). end, #{trap_exit => true}).
g_handle_sock_passive(_) ->
with_client(fun(CPid) -> CPid ! {tcp_passive, sock} end, []).
t_handle_sock_activate(_) -> t_handle_sock_activate(_) ->
with_connection(fun(CPid) -> CPid ! activate_socket end). with_conn(fun(CPid) -> CPid ! activate_socket end).
t_handle_sock_closed(_) -> t_handle_sock_closed(_) ->
with_connection(fun(CPid) -> with_conn(fun(CPid) ->
ok = meck:expect(emqx_channel, handle_info, ok = meck:expect(emqx_channel, handle_info,
fun({sock_closed, Reason}, Channel) -> fun({sock_closed, Reason}, Channel) ->
{shutdown, Reason, Channel} {shutdown, Reason, Channel}
@ -238,157 +205,155 @@ t_handle_sock_closed(_) ->
timer:sleep(100), timer:sleep(100),
trap_exit(CPid, {shutdown, tcp_closed}) trap_exit(CPid, {shutdown, tcp_closed})
end, #{trap_exit => true}), end, #{trap_exit => true}),
with_connection(fun(CPid) -> with_conn(fun(CPid) ->
ok = meck:expect(emqx_channel, handle_info, ok = meck:expect(emqx_channel, handle_info,
fun({sock_closed, Reason}, Channel) -> fun({sock_closed, Reason}, Channel) ->
{shutdown, Reason, ?DISCONNECT_PACKET(), Channel} {shutdown, Reason, ?DISCONNECT_PACKET(), Channel}
end), end),
CPid ! {tcp_closed, sock}, CPid ! {tcp_closed, sock},
timer:sleep(100), timer:sleep(100),
trap_exit(CPid, {shutdown, tcp_closed}) trap_exit(CPid, {shutdown, tcp_closed})
end, #{trap_exit => true}). end, #{trap_exit => true}).
t_handle_outgoing(_) -> t_handle_outgoing(_) ->
with_connection(fun(CPid) -> with_conn(fun(CPid) ->
Publish = ?PUBLISH_PACKET(?QOS_2, <<"Topic">>, 1, <<>>), Publish = ?PUBLISH_PACKET(?QOS_2, <<"Topic">>, 1, <<>>),
CPid ! {outgoing, Publish}, CPid ! {outgoing, Publish},
CPid ! {outgoing, ?PUBREL_PACKET(1)}, CPid ! {outgoing, ?PUBREL_PACKET(1)},
CPid ! {outgoing, [?PUBCOMP_PACKET(1)]} CPid ! {outgoing, [?PUBCOMP_PACKET(1)]}
end). end).
t_conn_rate_limit(_) -> t_conn_rate_limit(_) ->
with_connection(fun(CPid) -> with_conn(fun(CPid) ->
ok = meck:expect(emqx_channel, handle_in, fun(_, Channel) -> {ok, Channel} end), ok = meck:expect(emqx_channel, handle_in, fun(_, Channel) -> {ok, Channel} end),
lists:foreach(fun(I) -> lists:foreach(fun(I) ->
Publish = ?PUBLISH_PACKET(?QOS_0, <<"Topic">>, I, payload(2000)), Publish = ?PUBLISH_PACKET(?QOS_0, <<"Topic">>, I, payload(2000)),
CPid ! {tcp, sock, make_frame(Publish)} CPid ! {tcp, sock, make_frame(Publish)}
end, [1, 2]) end, [1, 2])
%%#{sockinfo := #{sockstate := blocked}} = emqx_connection:info(CPid) end, #{active_n => 1, rate_limit => {1, 1024}}).
end, #{active_n => 1, rate_limit => {1, 1024}}).
t_conn_pub_limit(_) -> t_conn_pub_limit(_) ->
with_connection(fun(CPid) -> with_conn(fun(CPid) ->
ok = meck:expect(emqx_channel, handle_in, fun(_, Channel) -> {ok, Channel} end), ok = meck:expect(emqx_channel, handle_in, fun(_, Channel) -> {ok, Channel} end),
ok = lists:foreach(fun(I) -> ok = lists:foreach(fun(I) ->
CPid ! {incoming, ?PUBLISH_PACKET(?QOS_0, <<"Topic">>, I, <<>>)} CPid ! {incoming, ?PUBLISH_PACKET(?QOS_0, <<"Topic">>, I, <<>>)}
end, lists:seq(1, 3)) end, lists:seq(1, 3))
%%#{sockinfo := #{sockstate := blocked}} = emqx_connection:info(CPid) %%#{sockinfo := #{sockstate := blocked}} = emqx_connection:info(CPid)
end, #{active_n => 1, publish_limit => {1, 2}}). end, #{active_n => 1, publish_limit => {1, 2}}).
t_conn_pingreq(_) -> t_conn_pingreq(_) ->
with_connection(fun(CPid) -> with_conn(fun(CPid) -> CPid ! {incoming, ?PACKET(?PINGREQ)} end).
CPid ! {incoming, ?PACKET(?PINGREQ)}
end).
t_inet_reply(_) -> t_inet_reply(_) ->
ok = meck:new(emqx_pd, [passthrough, no_history]), ok = meck:new(emqx_pd, [passthrough, no_history]),
with_connection(fun(CPid) -> with_conn(fun(CPid) ->
ok = meck:expect(emqx_pd, get_counter, fun(_) -> 10 end), ok = meck:expect(emqx_pd, get_counter, fun(_) -> 10 end),
CPid ! {inet_reply, for_testing, ok}, CPid ! {inet_reply, for_testing, ok},
timer:sleep(100) timer:sleep(100)
end, #{active_n => 1, trap_exit => true}), end, #{active_n => 1, trap_exit => true}),
ok = meck:unload(emqx_pd), ok = meck:unload(emqx_pd),
with_connection(fun(CPid) -> with_conn(fun(CPid) ->
CPid ! {inet_reply, for_testing, {error, for_testing}}, CPid ! {inet_reply, for_testing, {error, for_testing}},
timer:sleep(100), timer:sleep(100),
trap_exit(CPid, {shutdown, for_testing}) trap_exit(CPid, {shutdown, for_testing})
end, #{trap_exit => true}). end, #{trap_exit => true}).
t_deliver(_) -> t_deliver(_) ->
with_connection(fun(CPid) -> with_conn(fun(CPid) ->
ok = meck:expect(emqx_channel, handle_deliver, fun(_, Channel) -> {ok, Channel} end), ok = meck:expect(emqx_channel, handle_deliver,
CPid ! {deliver, topic, msg} fun(_, Channel) -> {ok, Channel} end),
end). CPid ! {deliver, topic, msg}
end).
t_event_disconnected(_) -> t_event_disconnected(_) ->
with_connection(fun(CPid) -> with_conn(fun(CPid) ->
ok = meck:expect(emqx_cm, set_chan_info, fun(_, _) -> ok end), ok = meck:expect(emqx_cm, set_chan_info, fun(_, _) -> ok end),
ok = meck:expect(emqx_cm, connection_closed, fun(_) -> ok end), ok = meck:expect(emqx_cm, connection_closed, fun(_) -> ok end),
CPid ! {event, disconnected} CPid ! {event, disconnected}
end). end).
t_event_undefined(_) -> t_event_undefined(_) ->
with_connection(fun(CPid) -> with_conn(fun(CPid) ->
ok = meck:expect(emqx_channel, stats, fun(_Channel) -> [] end), ok = meck:expect(emqx_channel, stats, fun(_Channel) -> [] end),
ok = meck:expect(emqx_cm, set_chan_info, fun(_, _) -> ok end), ok = meck:expect(emqx_cm, set_chan_info, fun(_, _) -> ok end),
ok = meck:expect(emqx_cm, set_chan_stats, fun(_, _) -> true end), ok = meck:expect(emqx_cm, set_chan_stats, fun(_, _) -> true end),
CPid ! {event, undefined} CPid ! {event, undefined}
end). end).
t_cloes(_) -> t_cloes(_) ->
with_connection(fun(CPid) -> with_conn(fun(CPid) ->
CPid ! {close, normal}, CPid ! {close, normal},
timer:sleep(100), timer:sleep(100),
trap_exit(CPid, {shutdown, normal}) trap_exit(CPid, {shutdown, normal})
end, #{trap_exit => true}). end, #{trap_exit => true}).
t_oom_shutdown(_) -> t_oom_shutdown(_) ->
with_connection(fun(CPid) -> with_conn(fun(CPid) ->
CPid ! {shutdown, message_queue_too_long}, CPid ! {shutdown, message_queue_too_long},
timer:sleep(100), timer:sleep(100),
trap_exit(CPid, {shutdown, message_queue_too_long}) trap_exit(CPid, {shutdown, message_queue_too_long})
end, #{trap_exit => true}). end, #{trap_exit => true}).
t_handle_idle_timeout(_) -> t_handle_idle_timeout(_) ->
ok = emqx_zone:set_env(external, idle_timeout, 10), ok = emqx_zone:set_env(external, idle_timeout, 10),
with_connection(fun(CPid) -> with_conn(fun(CPid) ->
timer:sleep(100), timer:sleep(100),
trap_exit(CPid, {shutdown, idle_timeout}) trap_exit(CPid, {shutdown, idle_timeout})
end, #{zone => external, trap_exit => true}). end, #{zone => external, trap_exit => true}).
t_handle_emit_stats(_) -> t_handle_emit_stats(_) ->
ok = emqx_zone:set_env(external, idle_timeout, 1000), ok = emqx_zone:set_env(external, idle_timeout, 1000),
with_connection(fun(CPid) -> with_conn(fun(CPid) ->
ok = meck:expect(emqx_channel, stats, fun(_Channel) -> [] end), ok = meck:expect(emqx_channel, stats, fun(_Channel) -> [] end),
ok = meck:expect(emqx_channel, handle_in, fun(_Packet, Channel) -> {ok, Channel} end), ok = meck:expect(emqx_channel, handle_in, fun(_Packet, Channel) -> {ok, Channel} end),
ok = meck:expect(emqx_cm, set_chan_stats, fun(_, _) -> true end), ok = meck:expect(emqx_cm, set_chan_stats, fun(_, _) -> true end),
CPid ! {incoming, ?CONNECT_PACKET(#{strict_mode => false, CPid ! {incoming, ?CONNECT_PACKET(#{strict_mode => false,
max_size => ?MAX_PACKET_SIZE, max_size => ?MAX_PACKET_SIZE,
version => ?MQTT_PROTO_V4 version => ?MQTT_PROTO_V4
})}, })},
timer:sleep(1000) timer:sleep(1000)
end,#{zone => external, trap_exit => true}). end,#{zone => external, trap_exit => true}).
t_handle_limit_timeout(_) -> t_handle_limit_timeout(_) ->
with_connection(fun(CPid) -> with_conn(fun(CPid) ->
CPid ! {timeout, undefined, limit_timeout}, CPid ! {timeout, undefined, limit_timeout},
timer:sleep(100), timer:sleep(100),
false = erlang:is_process_alive(CPid) true = erlang:is_process_alive(CPid)
end, #{trap_exit => true}). end).
t_handle_keepalive_timeout(_) -> t_handle_keepalive_timeout(_) ->
with_connection(fun(CPid) -> with_conn(fun(CPid) ->
ok = meck:expect(emqx_channel, handle_timeout, ok = meck:expect(emqx_channel, handle_timeout,
fun(_TRef, _TMsg, Channel) -> fun(_TRef, _TMsg, Channel) ->
{shutdown, keepalive_timeout, Channel} {shutdown, keepalive_timeout, Channel}
end), end),
CPid ! {timeout, make_ref(), keepalive}, CPid ! {timeout, make_ref(), keepalive},
timer:sleep(100), timer:sleep(100),
trap_exit(CPid, {shutdown, keepalive_timeout}) trap_exit(CPid, {shutdown, keepalive_timeout})
end, #{trap_exit => true}), end, #{trap_exit => true}),
with_connection(fun(CPid) -> with_conn(fun(CPid) ->
ok = meck:expect(emqx_transport, getstat, fun(_Sock, _Options) -> {error, for_testing} end), ok = meck:expect(emqx_transport, getstat, fun(_Sock, _Options) -> {error, for_testing} end),
ok = meck:expect(emqx_channel, handle_timeout, ok = meck:expect(emqx_channel, handle_timeout,
fun(_TRef, _TMsg, Channel) -> fun(_TRef, _TMsg, Channel) ->
{shutdown, keepalive_timeout, Channel} {shutdown, keepalive_timeout, Channel}
end), end),
CPid ! {timeout, make_ref(), keepalive}, CPid ! {timeout, make_ref(), keepalive},
timer:sleep(100), timer:sleep(100),
false = erlang:is_process_alive(CPid) false = erlang:is_process_alive(CPid)
end, #{trap_exit => true}). end, #{trap_exit => true}).
t_handle_shutdown(_) -> t_handle_shutdown(_) ->
with_connection(fun(CPid) -> with_conn(fun(CPid) ->
CPid ! Shutdown = {shutdown, reason}, CPid ! Shutdown = {shutdown, reason},
timer:sleep(100), timer:sleep(100),
trap_exit(CPid, Shutdown) trap_exit(CPid, Shutdown)
end, #{trap_exit => true}). end, #{trap_exit => true}).
t_exit_message(_) -> t_exit_message(_) ->
with_connection(fun(CPid) -> with_conn(fun(CPid) ->
CPid ! {'EXIT', CPid, for_testing}, CPid ! {'EXIT', CPid, for_testing},
timer:sleep(1000) timer:sleep(1000)
end, #{trap_exit => true}). end, #{trap_exit => true}).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Helper functions %% Helper functions
@ -399,27 +364,28 @@ exit_on_wait_error(SockErr, Reason) ->
fun(_Sock) -> fun(_Sock) ->
{error, SockErr} {error, SockErr}
end), end),
with_connection(fun(CPid) -> with_conn(fun(CPid) ->
timer:sleep(100), timer:sleep(100),
trap_exit(CPid, Reason) trap_exit(CPid, Reason)
end, #{trap_exit => true}). end, #{trap_exit => true}).
exit_on_activate_error(SockErr, Reason) -> exit_on_activate_error(SockErr, Reason) ->
ok = meck:expect(emqx_transport, setopts, ok = meck:expect(emqx_transport, setopts,
fun(_Sock, _Opts) -> fun(_Sock, _Opts) ->
{error, SockErr} {error, SockErr}
end), end),
with_connection(fun(CPid) -> with_conn(fun(CPid) ->
timer:sleep(100), timer:sleep(100),
trap_exit(CPid, Reason) trap_exit(CPid, Reason)
end, #{trap_exit => true}). end, #{trap_exit => true}).
with_connection(TestFun) -> with_conn(TestFun) ->
with_connection(TestFun, #{trap_exit => false}). with_conn(TestFun, #{trap_exit => false}).
with_connection(TestFun, Options) when is_map(Options) -> with_conn(TestFun, Options) when is_map(Options) ->
with_connection(TestFun, maps:to_list(Options)); with_conn(TestFun, maps:to_list(Options));
with_connection(TestFun, Options) ->
with_conn(TestFun, Options) ->
TrapExit = proplists:get_value(trap_exit, Options, false), TrapExit = proplists:get_value(trap_exit, Options, false),
process_flag(trap_exit, TrapExit), process_flag(trap_exit, TrapExit),
{ok, CPid} = emqx_connection:start_link(emqx_transport, sock, Options), {ok, CPid} = emqx_connection:start_link(emqx_transport, sock, Options),
@ -427,18 +393,6 @@ with_connection(TestFun, Options) ->
TrapExit orelse emqx_connection:stop(CPid), TrapExit orelse emqx_connection:stop(CPid),
ok. ok.
with_client(TestFun, _Options) ->
ClientId = <<"t_conn">>,
{ok, C} = emqtt:start_link([{clientid, ClientId}]),
{ok, _} = emqtt:connect(C),
timer:sleep(50),
case emqx_cm:lookup_channels(ClientId) of
[] -> ct:fail({client_not_started, ClientId});
[ChanPid] ->
TestFun(ChanPid),
emqtt:stop(C)
end.
trap_exit(Pid, Reason) -> trap_exit(Pid, Reason) ->
receive receive
{'EXIT', Pid, Reason} -> ok; {'EXIT', Pid, Reason} -> ok;

View File

@ -14,23 +14,40 @@
%% limitations under the License. %% limitations under the License.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-module(emqx_msg_expiry_interval_SUITE). -module(emqx_mqtt_SUITE).
-compile(export_all). -compile(export_all).
-compile(nowarn_export_all). -compile(nowarn_export_all).
-include("emqx.hrl").
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-define(STATS_KYES, [recv_pkt, recv_msg, send_pkt, send_msg,
recv_oct, recv_cnt, send_oct, send_cnt,
send_pend
]).
all() -> emqx_ct:all(?MODULE). all() -> emqx_ct:all(?MODULE).
init_per_suite(Config) -> init_per_suite(Config) ->
emqx_ct_helpers:boot_modules(all), emqx_ct_helpers:boot_modules(all),
emqx_ct_helpers:start_apps([]), emqx_ct_helpers:start_apps([]),
Config. Config.
end_per_suite(_Config) -> end_per_suite(_Config) ->
emqx_ct_helpers:stop_apps([]). emqx_ct_helpers:stop_apps([]).
t_conn_stats(_) ->
with_client(fun(CPid) ->
Stats = emqx_connection:stats(CPid),
ct:pal("==== stats: ~p", [Stats]),
[?assert(proplists:get_value(Key, Stats) >= 0) || Key <- ?STATS_KYES]
end, []).
t_tcp_sock_passive(_) ->
with_client(fun(CPid) -> CPid ! {tcp_passive, sock} end, []).
t_message_expiry_interval_1(_) -> t_message_expiry_interval_1(_) ->
ClientA = message_expiry_interval_init(), ClientA = message_expiry_interval_init(),
[message_expiry_interval_exipred(ClientA, QoS) || QoS <- [0,1,2]], [message_expiry_interval_exipred(ClientA, QoS) || QoS <- [0,1,2]],
@ -92,3 +109,16 @@ message_expiry_interval_not_exipred(ClientA, QoS) ->
ct:fail(no_publish_received) ct:fail(no_publish_received)
end, end,
emqtt:stop(ClientB1). emqtt:stop(ClientB1).
with_client(TestFun, _Options) ->
ClientId = <<"t_conn">>,
{ok, C} = emqtt:start_link([{clientid, ClientId}]),
{ok, _} = emqtt:connect(C),
timer:sleep(50),
case emqx_cm:lookup_channels(ClientId) of
[] -> ct:fail({client_not_started, ClientId});
[ChanPid] ->
TestFun(ChanPid),
emqtt:stop(C)
end.