From 042ff2e0d7b85654c3fdf7a91c88379bb4458ce8 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Sat, 10 Jul 2021 18:01:45 +0800 Subject: [PATCH] fix(test): update test cases for emqx_connection_SUITE --- apps/emqx/src/emqx_misc.erl | 2 +- apps/emqx/src/emqx_session.erl | 35 +++++++++--------- apps/emqx/test/emqx_channel_SUITE.erl | 7 ++-- apps/emqx/test/emqx_connection_SUITE.erl | 45 +++++++++++------------- 4 files changed, 45 insertions(+), 44 deletions(-) diff --git a/apps/emqx/src/emqx_misc.erl b/apps/emqx/src/emqx_misc.erl index 640ed5704..d45b6f7ce 100644 --- a/apps/emqx/src/emqx_misc.erl +++ b/apps/emqx/src/emqx_misc.erl @@ -198,7 +198,7 @@ check_oom(Policy) -> -spec(check_oom(pid(), emqx_types:oom_policy()) -> ok | {shutdown, term()}). check_oom(_Pid, #{enable := false}) -> ok; -check_oom(Pid, #{message_queue_len := MaxQLen, +check_oom(Pid, #{max_message_queue_len := MaxQLen, max_heap_size := MaxHeapSize}) -> case process_info(Pid, [message_queue_len, total_heap_size]) of undefined -> ok; diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index 9463345d4..995aff713 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -92,8 +92,6 @@ -export_type([session/0]). --import(emqx_zone, [get_env/3]). - -record(session, { %% Client’s Subscriptions. subscriptions :: map(), @@ -159,27 +157,28 @@ %%-------------------------------------------------------------------- -spec(init(emqx_types:clientinfo(), emqx_types:conninfo()) -> session()). -init(#{zone := Zone}, #{receive_maximum := MaxInflight}) -> - #session{max_subscriptions = get_env(Zone, max_subscriptions, 0), +init(#{zone := Zone, listener := Listener}, #{receive_maximum := MaxInflight}) -> + #session{max_subscriptions = get_conf(Zone, Listener, max_subscriptions), subscriptions = #{}, - upgrade_qos = get_env(Zone, upgrade_qos, false), + upgrade_qos = get_conf(Zone, Listener, upgrade_qos), inflight = emqx_inflight:new(MaxInflight), - mqueue = init_mqueue(Zone), + mqueue = init_mqueue(Zone, Listener), next_pkt_id = 1, - retry_interval = timer:seconds(get_env(Zone, retry_interval, 0)), + retry_interval = timer:seconds(get_conf(Zone, Listener, retry_interval)), awaiting_rel = #{}, - max_awaiting_rel = get_env(Zone, max_awaiting_rel, 100), - await_rel_timeout = timer:seconds(get_env(Zone, await_rel_timeout, 300)), + max_awaiting_rel = get_conf(Zone, Listener, max_awaiting_rel), + await_rel_timeout = timer:seconds(get_conf(Zone, Listener, await_rel_timeout)), created_at = erlang:system_time(millisecond) }. %% @private init mq -init_mqueue(Zone) -> - emqx_mqueue:init(#{max_len => get_env(Zone, max_mqueue_len, 1000), - store_qos0 => get_env(Zone, mqueue_store_qos0, true), - priorities => get_env(Zone, mqueue_priorities, none), - default_priority => get_env(Zone, mqueue_default_priority, lowest) - }). +init_mqueue(Zone, Listener) -> + emqx_mqueue:init(#{ + max_len => get_conf(Zone, Listener, max_mqueue_len), + store_qos0 => get_conf(Zone, Listener, mqueue_store_qos0), + priorities => get_conf(Zone, Listener, mqueue_priorities), + default_priority => get_conf(Zone, Listener, mqueue_default_priority) + }). %%-------------------------------------------------------------------- %% Info, Stats @@ -253,7 +252,7 @@ subscribe(ClientInfo = #{clientid := ClientId}, TopicFilter, SubOpts, end. -compile({inline, [is_subscriptions_full/1]}). -is_subscriptions_full(#session{max_subscriptions = 0}) -> +is_subscriptions_full(#session{max_subscriptions = infinity}) -> false; is_subscriptions_full(#session{subscriptions = Subs, max_subscriptions = MaxLimit}) -> @@ -302,7 +301,7 @@ publish(_PacketId, Msg, Session) -> {ok, emqx_broker:publish(Msg), Session}. -compile({inline, [is_awaiting_full/1]}). -is_awaiting_full(#session{max_awaiting_rel = 0}) -> +is_awaiting_full(#session{max_awaiting_rel = infinity}) -> false; is_awaiting_full(#session{awaiting_rel = AwaitingRel, max_awaiting_rel = MaxLimit}) -> @@ -697,3 +696,5 @@ set_field(Name, Value, Session) -> Pos = emqx_misc:index_of(Name, record_info(fields, session)), setelement(Pos+1, Session, Value). +get_conf(Zone, Listener, Key) -> + emqx_config:get_listener_conf(Zone, Listener, [mqtt, Key]). diff --git a/apps/emqx/test/emqx_channel_SUITE.erl b/apps/emqx/test/emqx_channel_SUITE.erl index 2b2b628ad..4d108bb94 100644 --- a/apps/emqx/test/emqx_channel_SUITE.erl +++ b/apps/emqx/test/emqx_channel_SUITE.erl @@ -151,7 +151,7 @@ default_zone_conf() -> conn_congestion => #{enable_alarm => true, min_alarm_sustain_duration => 60000}, flapping_detect => - #{ban_time => 300000,enable => true, + #{ban_time => 300000,enable => false, max_count => 15,window_time => 60000}, force_gc => #{bytes => 16777216,count => 16000, @@ -168,6 +168,9 @@ default_zone_conf() -> } }. +set_default_zone_conf() -> + emqx_config:put(default_zone_conf()). + %%-------------------------------------------------------------------- %% CT Callbacks %%-------------------------------------------------------------------- @@ -207,7 +210,7 @@ end_per_suite(_Config) -> ]). init_per_testcase(_TestCase, Config) -> - emqx_config:put(default_zone_conf()), + set_default_zone_conf(), Config. end_per_testcase(_TestCase, Config) -> diff --git a/apps/emqx/test/emqx_connection_SUITE.erl b/apps/emqx/test/emqx_connection_SUITE.erl index a6b2b614a..3e6281fc0 100644 --- a/apps/emqx/test/emqx_connection_SUITE.erl +++ b/apps/emqx/test/emqx_connection_SUITE.erl @@ -57,6 +57,7 @@ init_per_suite(Config) -> ok = meck:expect(emqx_alarm, deactivate, fun(_) -> ok end), ok = meck:expect(emqx_alarm, deactivate, fun(_, _) -> ok end), + emqx_channel_SUITE:set_default_zone_conf(), Config. end_per_suite(_Config) -> @@ -120,14 +121,13 @@ t_info(_) -> end end), #{sockinfo := SockInfo} = emqx_connection:info(CPid), - ?assertMatch(#{active_n := 100, - peername := {{127,0,0,1},3456}, + ?assertMatch(#{ peername := {{127,0,0,1},3456}, sockname := {{127,0,0,1},1883}, sockstate := idle, socktype := tcp}, SockInfo). t_info_limiter(_) -> - St = st(#{limiter => emqx_limiter:init(external, [])}), + St = st(#{limiter => emqx_limiter:init(default, [])}), ?assertEqual(undefined, emqx_connection:info(limiter, St)). t_stats(_) -> @@ -219,8 +219,10 @@ t_handle_msg_deliver(_) -> t_handle_msg_inet_reply(_) -> ok = meck:expect(emqx_pd, get_counter, fun(_) -> 10 end), - ?assertMatch({ok, _St}, handle_msg({inet_reply, for_testing, ok}, st(#{active_n => 0}))), - ?assertEqual(ok, handle_msg({inet_reply, for_testing, ok}, st(#{active_n => 100}))), + emqx_config:put_listener_conf(default, mqtt_tcp, [tcp, active_n], 0), + ?assertMatch({ok, _St}, handle_msg({inet_reply, for_testing, ok}, st())), + emqx_config:put_listener_conf(default, mqtt_tcp, [tcp, active_n], 100), + ?assertEqual(ok, handle_msg({inet_reply, for_testing, ok}, st())), ?assertMatch({stop, {shutdown, for_testing}, _St}, handle_msg({inet_reply, for_testing, {error, for_testing}}, st())). @@ -331,12 +333,12 @@ t_ensure_rate_limit(_) -> ?assertEqual(undefined, emqx_connection:info(limiter, State)), ok = meck:expect(emqx_limiter, check, - fun(_, _) -> {ok, emqx_limiter:init(external, [])} end), + fun(_, _) -> {ok, emqx_limiter:init(default, [])} end), State1 = emqx_connection:ensure_rate_limit(#{}, st(#{limiter => #{}})), ?assertEqual(undefined, emqx_connection:info(limiter, State1)), ok = meck:expect(emqx_limiter, check, - fun(_, _) -> {pause, 3000, emqx_limiter:init(external, [])} end), + fun(_, _) -> {pause, 3000, emqx_limiter:init(default, [])} end), State2 = emqx_connection:ensure_rate_limit(#{}, st(#{limiter => #{}})), ?assertEqual(undefined, emqx_connection:info(limiter, State2)), ?assertEqual(blocked, emqx_connection:info(sockstate, State2)). @@ -386,8 +388,7 @@ t_start_link_exit_on_activate(_) -> t_get_conn_info(_) -> with_conn(fun(CPid) -> #{sockinfo := SockInfo} = emqx_connection:info(CPid), - ?assertEqual(#{active_n => 100, - peername => {{127,0,0,1},3456}, + ?assertEqual(#{peername => {{127,0,0,1},3456}, sockname => {{127,0,0,1},1883}, sockstate => running, socktype => tcp @@ -397,16 +398,12 @@ t_get_conn_info(_) -> t_oom_shutdown(init, Config) -> ok = snabbkaffe:start_trace(), ok = meck:new(emqx_misc, [non_strict, passthrough, no_history, no_link]), - ok = meck:new(emqx_zone, [non_strict, passthrough, no_history, no_link]), - meck:expect(emqx_zone, oom_policy, - fun(_Zone) -> #{message_queue_len => 10, max_heap_size => 8000000} end), meck:expect(emqx_misc, check_oom, fun(_) -> {shutdown, "fake_oom"} end), Config; t_oom_shutdown('end', _Config) -> snabbkaffe:stop(), meck:unload(emqx_misc), - meck:unload(emqx_zone), ok. t_oom_shutdown(_) -> @@ -455,13 +452,11 @@ exit_on_activate_error(SockErr, Reason) -> with_conn(TestFun) -> with_conn(TestFun, #{trap_exit => false}). -with_conn(TestFun, Options) when is_map(Options) -> - with_conn(TestFun, maps:to_list(Options)); - -with_conn(TestFun, Options) -> - TrapExit = proplists:get_value(trap_exit, Options, false), +with_conn(TestFun, Opts) when is_map(Opts) -> + TrapExit = maps:get(trap_exit, Opts, false), process_flag(trap_exit, TrapExit), - {ok, CPid} = emqx_connection:start_link(emqx_transport, sock, Options), + {ok, CPid} = emqx_connection:start_link(emqx_transport, sock, + maps:merge(Opts, #{zone => default, listener => mqtt_tcp})), TestFun(CPid), TrapExit orelse emqx_connection:stop(CPid), ok. @@ -483,7 +478,8 @@ st() -> st(#{}, #{}). st(InitFields) when is_map(InitFields) -> st(InitFields, #{}). st(InitFields, ChannelFields) when is_map(InitFields) -> - St = emqx_connection:init_state(emqx_transport, sock, [#{zone => external}]), + St = emqx_connection:init_state(emqx_transport, sock, #{zone => default, + listener => mqtt_tcp}), maps:fold(fun(N, V, S) -> emqx_connection:set_field(N, V, S) end, emqx_connection:set_field(channel, channel(ChannelFields), St), InitFields @@ -503,7 +499,8 @@ channel(InitFields) -> receive_maximum => 100, expiry_interval => 0 }, - ClientInfo = #{zone => zone, + ClientInfo = #{zone => default, + listener => mqtt_tcp, protocol => mqtt, peerhost => {127,0,0,1}, clientid => <<"clientid">>, @@ -512,13 +509,13 @@ channel(InitFields) -> peercert => undefined, mountpoint => undefined }, - Session = emqx_session:init(#{zone => external}, + Session = emqx_session:init(#{zone => default, listener => mqtt_tcp}, #{receive_maximum => 0} ), maps:fold(fun(Field, Value, Channel) -> - emqx_channel:set_field(Field, Value, Channel) + emqx_channel:set_field(Field, Value, Channel) end, - emqx_channel:init(ConnInfo, [{zone, zone}]), + emqx_channel:init(ConnInfo, #{zone => default, listener => mqtt_tcp}), maps:merge(#{clientinfo => ClientInfo, session => Session, conn_state => connected