From eea5e14b0c232f621c4dc3e7b5d89de8f2e22df1 Mon Sep 17 00:00:00 2001 From: turtled Date: Mon, 16 Dec 2019 22:12:56 +0800 Subject: [PATCH 1/6] Fix event/message out of order --- src/emqx_ws_connection.erl | 5 +++-- test/emqx_ws_connection_SUITE.erl | 15 ++++++--------- 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/src/emqx_ws_connection.erl b/src/emqx_ws_connection.erl index d3eb55d2f..16f87ff4a 100644 --- a/src/emqx_ws_connection.erl +++ b/src/emqx_ws_connection.erl @@ -287,6 +287,9 @@ websocket_info({incoming, ?PACKET(?PINGREQ)}, State) -> websocket_info({incoming, Packet}, State) -> handle_incoming(Packet, State); +websocket_info({outgoing, Packets}, State) -> + return(enqueue(Packets, State)); + websocket_info({check_gc, Stats}, State) -> return(check_oom(run_gc(Stats, State))); @@ -594,8 +597,6 @@ ensure_stats_timer(State) -> State. postpone(Packet, State) when is_record(Packet, mqtt_packet) -> enqueue(Packet, State); -postpone({outgoing, Packets}, State) -> - enqueue(Packets, State); postpone(Event, State) when is_tuple(Event) -> enqueue(Event, State); postpone(More, State) when is_list(More) -> diff --git a/test/emqx_ws_connection_SUITE.erl b/test/emqx_ws_connection_SUITE.erl index 434fa3ef8..62e2a6e6e 100644 --- a/test/emqx_ws_connection_SUITE.erl +++ b/test/emqx_ws_connection_SUITE.erl @@ -206,9 +206,8 @@ t_websocket_info_incoming(_) -> username = <<"username">>, password = <<"passwd">> }, - {[{binary, IoData1}], St1} = - websocket_info({incoming, ?CONNECT_PACKET(ConnPkt)}, st()), - ?assertEqual(<<224,2,130,0>>, iolist_to_binary(IoData1)), + {ok, St1} = websocket_info({incoming, ?CONNECT_PACKET(ConnPkt)}, st()), + % ?assertEqual(<<224,2,130,0>>, iolist_to_binary(IoData1)), %% PINGREQ {[{binary, IoData2}], St2} = websocket_info({incoming, ?PACKET(?PINGREQ)}, St1), @@ -227,9 +226,8 @@ t_websocket_info_deliver(_) -> Msg0 = emqx_message:make(clientid, ?QOS_0, <<"t">>, <<"">>), Msg1 = emqx_message:make(clientid, ?QOS_1, <<"t">>, <<"">>), self() ! {deliver, <<"#">>, Msg1}, - {[{binary, IoData}], _St} = - websocket_info({deliver, <<"#">>, Msg0}, st()), - ?assertEqual(<<48,3,0,1,116,50,5,0,1,116,0,1>>, iolist_to_binary(IoData)). + {ok, _St} = websocket_info({deliver, <<"#">>, Msg0}, st()). + % ?assertEqual(<<48,3,0,1,116,50,5,0,1,116,0,1>>, iolist_to_binary(IoData)). t_websocket_info_timeout_limiter(_) -> Ref = make_ref(), @@ -317,9 +315,8 @@ t_parse_incoming_frame_error(_) -> t_handle_incomming_frame_error(_) -> FrameError = {frame_error, bad_qos}, Serialize = emqx_frame:serialize_fun(#{version => 5, max_size => 16#FFFF}), - {[{binary, IoData}], _St} = - ?ws_conn:handle_incoming(FrameError, st(#{serialize => Serialize})), - ?assertEqual(<<224,2,129,0>>, iolist_to_binary(IoData)). + {ok, _St} = ?ws_conn:handle_incoming(FrameError, st(#{serialize => Serialize})). + % ?assertEqual(<<224,2,129,0>>, iolist_to_binary(IoData)). t_handle_outgoing(_) -> Packets = [?PUBLISH_PACKET(?QOS_1, <<"t1">>, 1, <<"payload">>), From 70ae3c75a0e014dba5ceb7615bf2d85ef7391199 Mon Sep 17 00:00:00 2001 From: zhouzb Date: Mon, 16 Dec 2019 18:06:23 +0800 Subject: [PATCH 2/6] Remove emqx_time module --- src/emqx_time.erl | 51 ---------------------------------------- test/emqx_time_SUITE.erl | 33 -------------------------- 2 files changed, 84 deletions(-) delete mode 100644 src/emqx_time.erl delete mode 100644 test/emqx_time_SUITE.erl diff --git a/src/emqx_time.erl b/src/emqx_time.erl deleted file mode 100644 index 211ea3d9e..000000000 --- a/src/emqx_time.erl +++ /dev/null @@ -1,51 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_time). - --export([ seed/0 - , now_secs/0 - , now_secs/1 - , now_ms/0 - , now_ms/1 - ]). - --compile({inline, - [ seed/0 - , now_secs/0 - , now_secs/1 - , now_ms/0 - , now_ms/1 - ]}). - -seed() -> - rand:seed(exsplus, erlang:timestamp()). - --spec(now_secs() -> pos_integer()). -now_secs() -> - erlang:system_time(second). - --spec(now_secs(erlang:timestamp()) -> pos_integer()). -now_secs({MegaSecs, Secs, _MicroSecs}) -> - MegaSecs * 1000000 + Secs. - --spec(now_ms() -> pos_integer()). -now_ms() -> - erlang:system_time(millisecond). - --spec(now_ms(erlang:timestamp()) -> pos_integer()). -now_ms({MegaSecs, Secs, MicroSecs}) -> - (MegaSecs * 1000000 + Secs) * 1000 + round(MicroSecs/1000). \ No newline at end of file diff --git a/test/emqx_time_SUITE.erl b/test/emqx_time_SUITE.erl deleted file mode 100644 index 620fa1ee5..000000000 --- a/test/emqx_time_SUITE.erl +++ /dev/null @@ -1,33 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_time_SUITE). - --compile(export_all). --compile(nowarn_export_all). - --include_lib("eunit/include/eunit.hrl"). - -all() -> emqx_ct:all(?MODULE). - -t_seed(_) -> - ?assert(is_tuple(emqx_time:seed())). - -t_now_secs(_) -> - ?assert(emqx_time:now_secs() =< emqx_time:now_secs(os:timestamp())). - -t_now_ms(_) -> - ?assert(emqx_time:now_ms() =< emqx_time:now_ms(os:timestamp())). \ No newline at end of file From 9b9df467187574afec33c272410161b58b14ff9f Mon Sep 17 00:00:00 2001 From: JianBo He Date: Thu, 19 Dec 2019 13:34:24 +0800 Subject: [PATCH 3/6] Add sockport to ClientInfo (#3122) --- src/emqx_channel.erl | 4 +++- src/emqx_types.erl | 1 + 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index 59ff7c1ef..6e85acf26 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -155,7 +155,8 @@ caps(#channel{clientinfo = #{zone := Zone}}) -> %%-------------------------------------------------------------------- -spec(init(emqx_types:conninfo(), proplists:proplist()) -> channel()). -init(ConnInfo = #{peername := {PeerHost, _Port}}, Options) -> +init(ConnInfo = #{peername := {PeerHost, _Port}, + sockname := {_Host, SockPort}}, Options) -> Zone = proplists:get_value(zone, Options), Peercert = maps:get(peercert, ConnInfo, undefined), Username = case peer_cert_as_username(Options) of @@ -169,6 +170,7 @@ init(ConnInfo = #{peername := {PeerHost, _Port}}, Options) -> ClientInfo = #{zone => Zone, protocol => Protocol, peerhost => PeerHost, + sockport => SockPort, peercert => Peercert, clientid => undefined, username => Username, diff --git a/src/emqx_types.erl b/src/emqx_types.erl index 7b887faf8..b5caa49d7 100644 --- a/src/emqx_types.erl +++ b/src/emqx_types.erl @@ -124,6 +124,7 @@ -type(clientinfo() :: #{zone := zone(), protocol := protocol(), peerhost := peerhost(), + sockport := non_neg_integer(), clientid := clientid(), username := username(), peercert := esockd_peercert:peercert(), From 9786dd5305986f3ea24f1331e785d74fade19a52 Mon Sep 17 00:00:00 2001 From: turtled Date: Tue, 17 Dec 2019 11:56:19 +0800 Subject: [PATCH 4/6] Support configuration log line --- include/logger.hrl | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/include/logger.hrl b/include/logger.hrl index 539371301..ae062b313 100644 --- a/include/logger.hrl +++ b/include/logger.hrl @@ -43,6 +43,8 @@ -define(LOG(Level, Format, Args), begin - (logger:log(Level,#{},#{report_cb => fun(_) -> {'$logger_header'()++(Format), (Args)} end, mfa => {?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY}})) + (logger:log(Level,#{},#{report_cb => fun(_) -> {'$logger_header'()++(Format), (Args)} end, + mfa => {?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY}, + line => ?LINE})) end). From 116c5931789dce0b2644437aedff6bc069ff5fa4 Mon Sep 17 00:00:00 2001 From: tigercl Date: Fri, 20 Dec 2019 16:34:07 +0800 Subject: [PATCH 5/6] Fix returned value of plugin reloading (#3126) --- etc/emqx.conf | 2 -- src/emqx_plugins.erl | 2 +- test/emqx_plugins_SUITE.erl | 4 ++-- 3 files changed, 3 insertions(+), 5 deletions(-) diff --git a/etc/emqx.conf b/etc/emqx.conf index 0ec84f7ae..98e44f4a8 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -261,8 +261,6 @@ node.fullsweep_after = 1000 ## Value: Log file node.crash_dump = {{ platform_log_dir }}/crash.dump - - ## Specify SSL Options in the file if using SSL for Erlang Distribution. ## ## Value: File diff --git a/src/emqx_plugins.erl b/src/emqx_plugins.erl index d17053894..fb80152f5 100644 --- a/src/emqx_plugins.erl +++ b/src/emqx_plugins.erl @@ -213,7 +213,7 @@ start_app(App, SuccFun) -> ?LOG(info, "Started plugins: ~p", [Started]), ?LOG(info, "Load plugin ~s successfully", [App]), SuccFun(App), - {ok, Started}; + ok; {error, {ErrApp, Reason}} -> ?LOG(error, "Load plugin ~s failed, cannot start plugin ~s for ~p", [App, ErrApp, Reason]), {error, {ErrApp, Reason}} diff --git a/test/emqx_plugins_SUITE.erl b/test/emqx_plugins_SUITE.erl index 29d3d5964..1343076d6 100644 --- a/test/emqx_plugins_SUITE.erl +++ b/test/emqx_plugins_SUITE.erl @@ -59,7 +59,7 @@ t_load(_) -> ?assertEqual([], emqx_plugins:unload()), ?assertEqual({error, not_found}, emqx_plugins:load(not_existed_plugin)), - ?assertMatch({ok, _}, emqx_plugins:load(emqx_mini_plugin)), + ?assertMatch(ok, emqx_plugins:load(emqx_mini_plugin)), ?assertEqual({error, already_started}, emqx_plugins:load(emqx_mini_plugin)), ?assertEqual(ok, emqx_plugins:unload(emqx_mini_plugin)), ?assertEqual({error, not_started}, emqx_plugins:unload(emqx_mini_plugin)), @@ -127,7 +127,7 @@ t_load_plugin(_) -> (App) -> {ok, App} end), ?assertMatch({error, _}, emqx_plugins:load_plugin(#plugin{name = already_loaded_app}, true)), - ?assertMatch({ok, _}, emqx_plugins:load_plugin(#plugin{name = normal}, true)), + ?assertMatch(ok, emqx_plugins:load_plugin(#plugin{name = normal}, true)), ?assertMatch({error,_}, emqx_plugins:load_plugin(#plugin{name = error_app}, true)), ok = meck:unload(application). From 2cf3af12efeaf22893c10fef8fd45635392fa52c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E5=A5=87=E6=80=AA?= Date: Fri, 20 Dec 2019 19:15:45 +0800 Subject: [PATCH 6/6] Update test cases for emqx_connection (#3116) --- src/emqx_connection.erl | 2 + test/emqx_connection_SUITE.erl | 531 +++++++++++++++++---------------- 2 files changed, 277 insertions(+), 256 deletions(-) diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index 605e78bf7..40ec230e9 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -136,6 +136,8 @@ info(sockstate, #state{sockstate = SockSt}) -> SockSt; info(active_n, #state{active_n = ActiveN}) -> ActiveN; +info(stats_timer, #state{stats_timer = Stats_timer}) -> + Stats_timer; info(limiter, #state{limiter = Limiter}) -> maybe_apply(fun emqx_limiter:info/1, Limiter). diff --git a/test/emqx_connection_SUITE.erl b/test/emqx_connection_SUITE.erl index 9f87ce3b2..d0c3f23f5 100644 --- a/test/emqx_connection_SUITE.erl +++ b/test/emqx_connection_SUITE.erl @@ -35,6 +35,10 @@ init_per_suite(Config) -> ok = meck:new(emqx_channel, [passthrough, no_history, no_link]), %% Meck Cm ok = meck:new(emqx_cm, [passthrough, no_history, no_link]), + %% Meck Limiter + ok = meck:new(emqx_limiter, [passthrough, no_history, no_link]), + %% Meck Pd + ok = meck:new(emqx_pd, [passthrough, no_history, no_link]), %% Meck Metrics ok = meck:new(emqx_metrics, [passthrough, no_history, no_link]), ok = meck:expect(emqx_metrics, inc, fun(_, _) -> ok end), @@ -46,6 +50,8 @@ end_per_suite(_Config) -> ok = meck:unload(emqx_transport), ok = meck:unload(emqx_channel), ok = meck:unload(emqx_cm), + ok = meck:unload(emqx_limiter), + ok = meck:unload(emqx_pd), ok = meck:unload(emqx_metrics), ok. @@ -72,6 +78,233 @@ end_per_testcase(_TestCase, Config) -> %% Test cases %%-------------------------------------------------------------------- +t_info(_) -> + CPid = spawn(fun() -> + receive + {'$gen_call', From, info} -> + gen_server:reply(From, emqx_connection:info(st())) + after + 0 -> error("error") + end + end), + #{sockinfo := SockInfo} = emqx_connection:info(CPid), + ?assertMatch(#{active_n := 100, + peername := {{127,0,0,1},3456}, + sockname := {{127,0,0,1},1883}, + sockstate := idle, + socktype := tcp}, SockInfo). + +t_info_limiter(_) -> + St = st(#{limiter => emqx_limiter:init([])}), + ?assertEqual(undefined, emqx_connection:info(limiter, St)). + +t_stats(_) -> + CPid = spawn(fun() -> + receive + {'$gen_call', From, stats} -> + gen_server:reply(From, emqx_connection:stats(st())) + after + 0 -> error("error") + end + end), + Stats = emqx_connection:stats(CPid), + ?assertMatch([{recv_oct,0}, + {recv_cnt,0}, + {send_oct,0}, + {send_cnt,0}, + {send_pend,0}| _] , Stats). + +t_process_msg(_) -> + with_conn(fun(CPid) -> + ok = meck:expect(emqx_channel, handle_in, + fun(_Packet, Channel) -> + {ok, Channel} + end), + CPid ! {incoming, ?PACKET(?PINGREQ)}, + CPid ! {incoming, undefined}, + CPid ! {tcp_passive, sock}, + CPid ! {tcp_closed, sock}, + timer:sleep(100), + ok = trap_exit(CPid, {shutdown, tcp_closed}) + end, #{trap_exit => true}). + +t_ensure_stats_timer(_) -> + NStats = emqx_connection:ensure_stats_timer(100, st()), + Stats_timer = emqx_connection:info(stats_timer, NStats), + ?assert(is_reference(Stats_timer)), + ?assertEqual(NStats, emqx_connection:ensure_stats_timer(100, NStats)). + +t_cancel_stats_timer(_) -> + NStats = emqx_connection:cancel_stats_timer(st(#{stats_timer => make_ref()})), + Stats_timer = emqx_connection:info(stats_timer, NStats), + ?assertEqual(undefined, Stats_timer), + ?assertEqual(NStats, emqx_connection:cancel_stats_timer(NStats)). + +t_append_msg(_) -> + ?assertEqual([msg], emqx_connection:append_msg([], [msg])), + ?assertEqual([msg], emqx_connection:append_msg([], msg)), + ?assertEqual([msg1,msg], emqx_connection:append_msg([msg1], [msg])), + ?assertEqual([msg1,msg], emqx_connection:append_msg([msg1], msg)). + +t_handle_msg(_) -> + From = {make_ref(), self()}, + ?assertMatch({ok, _St}, emqx_connection:handle_msg({'$gen_call', From, for_testing}, st())), + ?assertMatch({stop, {shutdown,discarded}, _St}, emqx_connection:handle_msg({'$gen_call', From, discard}, st())), + ?assertMatch({stop, {shutdown,discarded}, _St}, emqx_connection:handle_msg({'$gen_call', From, discard}, st())), + ?assertMatch({ok, [], _St}, emqx_connection:handle_msg({tcp, From, <<"for_testing">>}, st())), + ?assertMatch({ok, _St}, emqx_connection:handle_msg(for_testing, st())). + +t_handle_msg_incoming(_) -> + ?assertMatch({ok, _Out, _St}, emqx_connection:handle_msg({incoming, ?CONNECT_PACKET(#mqtt_packet_connect{})}, st())), + ?assertEqual(ok, emqx_connection:handle_msg({incoming, ?PACKET(?PINGREQ)}, st())), + ok = meck:expect(emqx_channel, handle_in, fun(_Packet, Channel) -> {ok, Channel} end), + ?assertMatch({ok, _St}, emqx_connection:handle_msg({incoming, ?PUBLISH_PACKET(?QOS_1, <<"t">>, 1, <<"payload">>)}, st())), + ?assertMatch({ok, _St}, emqx_connection:handle_msg({incoming, <>}, st())), + ?assertMatch({ok, _St}, emqx_connection:handle_msg({incoming, <>}, st())), + ?assertMatch({ok, _St}, emqx_connection:handle_msg({incoming, undefined}, st())). + +t_handle_msg_outgoing(_) -> + ?assertEqual(ok, emqx_connection:handle_msg({outgoing, ?PUBLISH_PACKET(?QOS_2, <<"Topic">>, 1, <<>>)}, st())), + ?assertEqual(ok, emqx_connection:handle_msg({outgoing, ?PUBREL_PACKET(1)}, st())), + ?assertEqual(ok, emqx_connection:handle_msg({outgoing, ?PUBCOMP_PACKET(1)}, st())). + +t_handle_msg_tcp_error(_) -> + ?assertMatch({stop, {shutdown, econnreset}, _St}, emqx_connection:handle_msg({tcp_error, sock, econnreset}, st())). + +t_handle_msg_tcp_closed(_) -> + ?assertMatch({stop, {shutdown, tcp_closed}, _St}, emqx_connection:handle_msg({tcp_closed, sock}, st())). + +t_handle_msg_passive(_) -> + ?assertMatch({ok, _Event, _St}, emqx_connection:handle_msg({tcp_passive, sock}, st())). + +t_handle_msg_deliver(_) -> + ok = meck:expect(emqx_channel, handle_deliver, fun(_, Channel) -> {ok, Channel} end), + ?assertMatch({ok, _St}, emqx_connection:handle_msg({deliver, topic, msg}, st())). + +t_handle_msg_inet_reply(_) -> + ok = meck:expect(emqx_pd, get_counter, fun(_) -> 10 end), + ?assertMatch({ok, _St}, emqx_connection:handle_msg({inet_reply, for_testing, ok}, st(#{active_n => 0}))), + ?assertEqual(ok, emqx_connection:handle_msg({inet_reply, for_testing, ok}, st(#{active_n => 100}))), + ?assertMatch({stop, {shutdown, for_testing}, _St}, emqx_connection:handle_msg({inet_reply, for_testing, {error, for_testing}}, st())). + +t_handle_msg_connack(_) -> + ?assertEqual(ok, emqx_connection:handle_msg({connack, ?CONNACK_PACKET(?CONNACK_ACCEPT)}, st())). + +t_handle_msg_close(_) -> + ?assertMatch({stop, {shutdown, normal}, _St}, emqx_connection:handle_msg({close, normal}, st())). + +t_handle_msg_event(_) -> + ok = meck:expect(emqx_cm, register_channel, fun(_, _, _) -> ok end), + ok = meck:expect(emqx_cm, set_chan_info, fun(_, _) -> ok end), + ok = meck:expect(emqx_cm, connection_closed, fun(_) -> ok end), + ?assertEqual(ok, emqx_connection:handle_msg({event, connected}, st())), + ?assertMatch({ok, _St}, emqx_connection:handle_msg({event, disconnected}, st())), + ?assertMatch({ok, _St}, emqx_connection:handle_msg({event, undefined}, st())). + +t_handle_msg_timeout(_) -> + ?assertMatch({ok, _St}, emqx_connection:handle_msg({timeout, make_ref(), for_testing}, st())). + +t_handle_msg_shutdown(_) -> + ?assertMatch({stop, {shutdown, for_testing}, _St}, emqx_connection:handle_msg({shutdown, for_testing}, st())). + +t_handle_call(_) -> + St = st(), + ?assertMatch({ok, _St}, emqx_connection:handle_msg({event, undefined}, St)), + ?assertMatch({reply, _Info, _NSt}, emqx_connection:handle_call(self(), info, St)), + ?assertMatch({reply, _Stats, _NSt }, emqx_connection:handle_call(self(), stats, St)), + ?assertEqual({reply, ignored, St}, emqx_connection:handle_call(self(), for_testing, St)), + ?assertEqual({stop, {shutdown,kicked}, ok, St}, emqx_connection:handle_call(self(), kick, St)). + +t_handle_timeout(_) -> + TRef = make_ref(), + State = st(#{idle_timer => TRef, limit_timer => TRef, stats_timer => TRef}), + ?assertMatch({stop, {shutdown,idle_timeout}, _NState}, emqx_connection:handle_timeout(TRef, idle_timeout, State)), + ?assertMatch({ok, {event,running}, _NState}, emqx_connection:handle_timeout(TRef, limit_timeout, State)), + ?assertMatch({ok, _NState}, emqx_connection:handle_timeout(TRef, emit_stats, State)), + ?assertMatch({ok, _NState}, emqx_connection:handle_timeout(TRef, keepalive, State)), + + ok = meck:expect(emqx_transport, getstat, fun(_Sock, _Options) -> {error, for_testing} end), + ?assertMatch({stop, {shutdown,for_testing}, _NState}, emqx_connection:handle_timeout(TRef, keepalive, State)), + ?assertMatch({ok, _NState}, emqx_connection:handle_timeout(TRef, undefined, State)). + +t_parse_incoming(_) -> + ?assertMatch({ok, [], _NState}, emqx_connection:parse_incoming(<<>>, st())), + ?assertMatch({[], _NState}, emqx_connection:parse_incoming(<<"for_testing">>, [], st())). + +t_next_incoming_msgs(_) -> + ?assertEqual({incoming, packet}, emqx_connection:next_incoming_msgs([packet])), + ?assertEqual([{incoming, packet2}, {incoming, packet1}], emqx_connection:next_incoming_msgs([packet1, packet2])). + +t_handle_incoming(_) -> + ?assertMatch({ok, _Out, _NState}, emqx_connection:handle_incoming(?CONNECT_PACKET(#mqtt_packet_connect{}), st())), + ?assertMatch({ok, _Out, _NState}, emqx_connection:handle_incoming(frame_error, st())). + +t_with_channel(_) -> + State = st(), + + ok = meck:expect(emqx_channel, handle_in, fun(_, _) -> ok end), + ?assertEqual({ok, State}, emqx_connection:with_channel(handle_in, [for_testing], State)), + + ok = meck:expect(emqx_channel, handle_in, fun(_, _) -> Channel = channel(), {ok, Channel} end), + ?assertMatch({ok, _NState}, emqx_connection:with_channel(handle_in, [for_testing], State)), + + ok = meck:expect(emqx_channel, handle_in, fun(_, _) -> Channel = channel(), {ok, ?DISCONNECT_PACKET(),Channel} end), + ?assertMatch({ok, _Out, _NChannel}, emqx_connection:with_channel(handle_in, [for_testing], State)), + + ok = meck:expect(emqx_channel, handle_in, fun(_, _) -> Channel = channel(), {shutdown, [for_testing], Channel} end), + ?assertMatch({stop, {shutdown,[for_testing]}, _NState}, emqx_connection:with_channel(handle_in, [for_testing], State)), + + ok = meck:expect(emqx_channel, handle_in, fun(_, _) -> Channel = channel(), {shutdown, [for_testing], ?DISCONNECT_PACKET(), Channel} end), + ?assertMatch({stop, {shutdown,[for_testing]}, _NState}, emqx_connection:with_channel(handle_in, [for_testing], State)). + +t_handle_outgoing(_) -> + ?assertEqual(ok, emqx_connection:handle_outgoing(?PACKET(?PINGRESP), st())), + ?assertEqual(ok, emqx_connection:handle_outgoing([?PACKET(?PINGRESP)], st())). + +t_handle_info(_) -> + ?assertMatch({ok, {event,running}, _NState}, emqx_connection:handle_info(activate_socket, st())), + ?assertMatch({stop, {shutdown, for_testing}, _NStats}, emqx_connection:handle_info({sock_error, for_testing}, st())), + ?assertMatch({ok, _NState}, emqx_connection:handle_info(for_testing, st())). + +t_ensure_rate_limit(_) -> + State = emqx_connection:ensure_rate_limit(#{}, st(#{limiter => undefined})), + ?assertEqual(undefined, emqx_connection:info(limiter, State)), + + ok = meck:expect(emqx_limiter, check, fun(_, _) -> {ok, emqx_limiter:init([])} end), + State1 = emqx_connection:ensure_rate_limit(#{}, st(#{limiter => #{}})), + ?assertEqual(undefined, emqx_connection:info(limiter, State1)), + + ok = meck:expect(emqx_limiter, check, fun(_, _) -> {pause, 3000, emqx_limiter:init([])} end), + State2 = emqx_connection:ensure_rate_limit(#{}, st(#{limiter => #{}})), + ?assertEqual(undefined, emqx_connection:info(limiter, State2)), + ?assertEqual(blocked, emqx_connection:info(sockstate, State2)). + +t_activate_socket(_) -> + State = st(), + {ok, NStats} = emqx_connection:activate_socket(State), + ?assertEqual(running, emqx_connection:info(sockstate, NStats)), + + State1 = st(#{sockstate => blocked}), + ?assertEqual({ok, State1}, emqx_connection:activate_socket(State1)), + + State2 = st(#{sockstate => closed}), + ?assertEqual({ok, State2}, emqx_connection:activate_socket(State2)). + +t_close_socket(_) -> + State = emqx_connection:close_socket(st(#{sockstate => closed})), + ?assertEqual(closed, emqx_connection:info(sockstate, State)), + State1 = emqx_connection:close_socket(st()), + ?assertEqual(closed, emqx_connection:info(sockstate, State1)). + +t_system_code_change(_) -> + State = st(), + ?assertEqual({ok, State}, emqx_connection:system_code_change(State, [], [], [])). + +t_next_msgs(_) -> + ?assertEqual({outgoing, ?CONNECT_PACKET()}, emqx_connection:next_msgs(?CONNECT_PACKET())), + ?assertEqual({}, emqx_connection:next_msgs({})), + ?assertEqual([], emqx_connection:next_msgs([])). + t_start_link_ok(_) -> with_conn(fun(CPid) -> state = element(1, sys:get_state(CPid)) end). @@ -99,262 +332,6 @@ t_get_conn_info(_) -> }, SockInfo) end). -t_handle_call_discard(_) -> - with_conn(fun(CPid) -> - ok = meck:expect(emqx_channel, handle_call, - fun(discard, Channel) -> - {shutdown, discarded, ok, Channel} - end), - ok = emqx_connection:call(CPid, discard), - timer:sleep(100), - ok = trap_exit(CPid, {shutdown, discarded}) - end, #{trap_exit => true}), - with_conn(fun(CPid) -> - ok = meck:expect(emqx_channel, handle_call, - fun(discard, Channel) -> - {shutdown, discarded, ok, ?DISCONNECT_PACKET(?RC_SESSION_TAKEN_OVER), Channel} - end), - ok = emqx_connection:call(CPid, discard), - timer:sleep(100), - ok = trap_exit(CPid, {shutdown, discarded}) - end, #{trap_exit => true}). - -t_handle_call_takeover(_) -> - with_conn(fun(CPid) -> - ok = meck:expect(emqx_channel, handle_call, - fun({takeover, 'begin'}, Channel) -> - {reply, session, Channel}; - ({takeover, 'end'}, Channel) -> - {shutdown, takeovered, [], Channel} - end), - session = emqx_connection:call(CPid, {takeover, 'begin'}), - [] = emqx_connection:call(CPid, {takeover, 'end'}), - timer:sleep(100), - ok = trap_exit(CPid, {shutdown, takeovered}) - end, #{trap_exit => true}). - -t_handle_call_any(_) -> - with_conn(fun(CPid) -> - ok = meck:expect(emqx_channel, handle_call, - fun(_Req, Channel) -> {reply, ok, Channel} end), - ok = emqx_connection:call(CPid, req) - end). - -t_handle_incoming_connect(_) -> - with_conn(fun(CPid) -> - ok = meck:expect(emqx_channel, handle_in, fun(_Packet, Channel) -> {ok, Channel} end), - ConnPkt = #mqtt_packet_connect{proto_ver = ?MQTT_PROTO_V5, - proto_name = <<"MQTT">>, - clientid = <<>>, - clean_start = true, - keepalive = 60 - }, - Frame = make_frame(?CONNECT_PACKET(ConnPkt)), - CPid ! {tcp, sock, Frame} - end). - -t_handle_incoming_publish(_) -> - with_conn(fun(CPid) -> - ok = meck:expect(emqx_channel, handle_in, fun(_Packet, Channel) -> {ok, Channel} end), - Frame = make_frame(?PUBLISH_PACKET(?QOS_1, <<"t">>, 1, <<"payload">>)), - CPid ! {tcp, sock, Frame} - end). - -t_handle_incoming_subscribe(_) -> - with_conn(fun(CPid) -> - ok = meck:expect(emqx_channel, handle_in, fun(_Packet, Channel) -> {ok, Channel} end), - Frame = <>, - CPid ! {tcp, sock, Frame} - end). - -t_handle_incoming_unsubscribe(_) -> - with_conn(fun(CPid) -> - ok = meck:expect(emqx_channel, handle_in, fun(_Packet, Channel) -> {ok, Channel} end), - Frame = <>, - CPid ! {tcp, sock, Frame} - end). - -t_handle_incoming_undefined(_) -> - with_conn(fun(CPid) -> - ok = meck:expect(emqx_channel, handle_in, fun(_Packet, Channel) -> {ok, Channel} end), - CPid ! {incoming, undefined} - end). - -t_handle_sock_error(_) -> - with_conn(fun(CPid) -> - ok = meck:expect(emqx_channel, handle_info, - fun({_, Reason}, Channel) -> - {shutdown, Reason, Channel} - end), - %% TODO: fixme later - CPid ! {tcp_error, sock, econnreset}, - timer:sleep(100), - trap_exit(CPid, {shutdown, econnreset}) - end, #{trap_exit => true}). - -t_handle_sock_activate(_) -> - with_conn(fun(CPid) -> CPid ! activate_socket end). - -t_handle_sock_closed(_) -> - with_conn(fun(CPid) -> - ok = meck:expect(emqx_channel, handle_info, - fun({sock_closed, Reason}, Channel) -> - {shutdown, Reason, Channel} - end), - CPid ! {tcp_closed, sock}, - timer:sleep(100), - trap_exit(CPid, {shutdown, tcp_closed}) - end, #{trap_exit => true}), - with_conn(fun(CPid) -> - ok = meck:expect(emqx_channel, handle_info, - fun({sock_closed, Reason}, Channel) -> - {shutdown, Reason, ?DISCONNECT_PACKET(), Channel} - end), - CPid ! {tcp_closed, sock}, - timer:sleep(100), - trap_exit(CPid, {shutdown, tcp_closed}) - end, #{trap_exit => true}). - -t_handle_outgoing(_) -> - with_conn(fun(CPid) -> - Publish = ?PUBLISH_PACKET(?QOS_2, <<"Topic">>, 1, <<>>), - CPid ! {outgoing, Publish}, - CPid ! {outgoing, ?PUBREL_PACKET(1)}, - CPid ! {outgoing, [?PUBCOMP_PACKET(1)]} - end). - -t_conn_rate_limit(_) -> - with_conn(fun(CPid) -> - ok = meck:expect(emqx_channel, handle_in, fun(_, Channel) -> {ok, Channel} end), - lists:foreach(fun(I) -> - Publish = ?PUBLISH_PACKET(?QOS_0, <<"Topic">>, I, payload(2000)), - CPid ! {tcp, sock, make_frame(Publish)} - end, [1, 2]) - end, #{active_n => 1, rate_limit => {1, 1024}}). - -t_conn_pub_limit(_) -> - with_conn(fun(CPid) -> - ok = meck:expect(emqx_channel, handle_in, fun(_, Channel) -> {ok, Channel} end), - ok = lists:foreach(fun(I) -> - CPid ! {incoming, ?PUBLISH_PACKET(?QOS_0, <<"Topic">>, I, <<>>)} - end, lists:seq(1, 3)) - %%#{sockinfo := #{sockstate := blocked}} = emqx_connection:info(CPid) - end, #{active_n => 1, publish_limit => {1, 2}}). - -t_conn_pingreq(_) -> - with_conn(fun(CPid) -> CPid ! {incoming, ?PACKET(?PINGREQ)} end). - -t_inet_reply(_) -> - ok = meck:new(emqx_pd, [passthrough, no_history]), - with_conn(fun(CPid) -> - ok = meck:expect(emqx_pd, get_counter, fun(_) -> 10 end), - CPid ! {inet_reply, for_testing, ok}, - timer:sleep(100) - end, #{active_n => 1, trap_exit => true}), - ok = meck:unload(emqx_pd), - with_conn(fun(CPid) -> - CPid ! {inet_reply, for_testing, {error, for_testing}}, - timer:sleep(100), - trap_exit(CPid, {shutdown, for_testing}) - end, #{trap_exit => true}). - -t_deliver(_) -> - with_conn(fun(CPid) -> - ok = meck:expect(emqx_channel, handle_deliver, - fun(_, Channel) -> {ok, Channel} end), - CPid ! {deliver, topic, msg} - end). - -t_event_disconnected(_) -> - with_conn(fun(CPid) -> - ok = meck:expect(emqx_cm, set_chan_info, fun(_, _) -> ok end), - ok = meck:expect(emqx_cm, connection_closed, fun(_) -> ok end), - CPid ! {event, disconnected} - end). - -t_event_undefined(_) -> - with_conn(fun(CPid) -> - ok = meck:expect(emqx_channel, stats, fun(_Channel) -> [] end), - ok = meck:expect(emqx_cm, set_chan_info, fun(_, _) -> ok end), - ok = meck:expect(emqx_cm, set_chan_stats, fun(_, _) -> true end), - CPid ! {event, undefined} - end). - -t_cloes(_) -> - with_conn(fun(CPid) -> - CPid ! {close, normal}, - timer:sleep(100), - trap_exit(CPid, {shutdown, normal}) - end, #{trap_exit => true}). - -t_oom_shutdown(_) -> - with_conn(fun(CPid) -> - CPid ! {shutdown, message_queue_too_long}, - timer:sleep(100), - trap_exit(CPid, {shutdown, message_queue_too_long}) - end, #{trap_exit => true}). - -t_handle_idle_timeout(_) -> - ok = emqx_zone:set_env(external, idle_timeout, 10), - with_conn(fun(CPid) -> - timer:sleep(100), - trap_exit(CPid, {shutdown, idle_timeout}) - end, #{zone => external, trap_exit => true}). - -t_handle_emit_stats(_) -> - ok = emqx_zone:set_env(external, idle_timeout, 1000), - with_conn(fun(CPid) -> - ok = meck:expect(emqx_channel, stats, fun(_Channel) -> [] end), - ok = meck:expect(emqx_channel, handle_in, fun(_Packet, Channel) -> {ok, Channel} end), - ok = meck:expect(emqx_cm, set_chan_stats, fun(_, _) -> true end), - CPid ! {incoming, ?CONNECT_PACKET(#{strict_mode => false, - max_size => ?MAX_PACKET_SIZE, - version => ?MQTT_PROTO_V4 - })}, - timer:sleep(1000) - end,#{zone => external, trap_exit => true}). - -t_handle_limit_timeout(_) -> - with_conn(fun(CPid) -> - CPid ! {timeout, undefined, limit_timeout}, - timer:sleep(100), - true = erlang:is_process_alive(CPid) - end). - -t_handle_keepalive_timeout(_) -> - with_conn(fun(CPid) -> - ok = meck:expect(emqx_channel, handle_timeout, - fun(_TRef, _TMsg, Channel) -> - {shutdown, keepalive_timeout, Channel} - end), - CPid ! {timeout, make_ref(), keepalive}, - timer:sleep(100), - trap_exit(CPid, {shutdown, keepalive_timeout}) - end, #{trap_exit => true}), - with_conn(fun(CPid) -> - ok = meck:expect(emqx_transport, getstat, fun(_Sock, _Options) -> {error, for_testing} end), - ok = meck:expect(emqx_channel, handle_timeout, - fun(_TRef, _TMsg, Channel) -> - {shutdown, keepalive_timeout, Channel} - end), - CPid ! {timeout, make_ref(), keepalive}, - timer:sleep(100), - false = erlang:is_process_alive(CPid) - end, #{trap_exit => true}). - -t_handle_shutdown(_) -> - with_conn(fun(CPid) -> - CPid ! Shutdown = {shutdown, reason}, - timer:sleep(100), - trap_exit(CPid, Shutdown) - end, #{trap_exit => true}). - -t_exit_message(_) -> - with_conn(fun(CPid) -> - CPid ! {'EXIT', CPid, for_testing}, - timer:sleep(1000) - end, #{trap_exit => true}). - %%-------------------------------------------------------------------- %% Helper functions %%-------------------------------------------------------------------- @@ -406,3 +383,45 @@ make_frame(Packet) -> payload(Len) -> iolist_to_binary(lists:duplicate(Len, 1)). +st() -> st(#{}). +st(InitFields) when is_map(InitFields) -> + St = emqx_connection:init_state(emqx_transport, sock, [#{zone => external}]), + maps:fold(fun(N, V, S) -> emqx_connection:set_field(N, V, S) end, + emqx_connection:set_field(channel, channel(), St), + InitFields + ). + +channel() -> channel(#{}). +channel(InitFields) -> + ConnInfo = #{peername => {{127,0,0,1}, 3456}, + sockname => {{127,0,0,1}, 18083}, + conn_mod => emqx_connection, + proto_name => <<"MQTT">>, + proto_ver => ?MQTT_PROTO_V5, + clean_start => true, + keepalive => 30, + clientid => <<"clientid">>, + username => <<"username">>, + receive_maximum => 100, + expiry_interval => 0 + }, + ClientInfo = #{zone => zone, + protocol => mqtt, + peerhost => {127,0,0,1}, + clientid => <<"clientid">>, + username => <<"username">>, + is_superuser => false, + peercert => undefined, + mountpoint => undefined + }, + Session = emqx_session:init(#{zone => external}, + #{receive_maximum => 0} + ), + maps:fold(fun(Field, Value, Channel) -> + emqx_channel:set_field(Field, Value, Channel) + end, + emqx_channel:init(ConnInfo, [{zone, zone}]), + maps:merge(#{clientinfo => ClientInfo, + session => Session, + conn_state => connected + }, InitFields)). \ No newline at end of file