diff --git a/apps/emqx/test/emqx_channel_SUITE.erl b/apps/emqx/test/emqx_channel_SUITE.erl index 86f565ea7..6b468c480 100644 --- a/apps/emqx/test/emqx_channel_SUITE.erl +++ b/apps/emqx/test/emqx_channel_SUITE.erl @@ -42,8 +42,8 @@ mqtt_conf() -> max_topic_alias => 65535, max_topic_levels => 65535, mountpoint => <<>>, - mqueue_default_priority => highest, - mqueue_priorities => [], + mqueue_default_priority => lowest, + mqueue_priorities => #{}, mqueue_store_qos0 => true, peer_cert_as_clientid => disabled, peer_cert_as_username => disabled, diff --git a/apps/emqx/test/emqx_session_SUITE.erl b/apps/emqx/test/emqx_session_SUITE.erl index 87dd66183..67d06c281 100644 --- a/apps/emqx/test/emqx_session_SUITE.erl +++ b/apps/emqx/test/emqx_session_SUITE.erl @@ -29,6 +29,7 @@ all() -> emqx_ct:all(?MODULE). %%-------------------------------------------------------------------- init_per_suite(Config) -> + emqx_channel_SUITE:set_default_zone_conf(), ok = meck:new([emqx_hooks, emqx_metrics, emqx_broker], [passthrough, no_history, no_link]), ok = meck:expect(emqx_metrics, inc, fun(_) -> ok end), @@ -59,7 +60,7 @@ t_session_init(_) -> ?assertEqual(0, emqx_session:info(inflight_cnt, Session)), ?assertEqual(64, emqx_session:info(inflight_max, Session)), ?assertEqual(1, emqx_session:info(next_pkt_id, Session)), - ?assertEqual(0, emqx_session:info(retry_interval, Session)), + ?assertEqual(30, emqx_session:info(retry_interval, Session)), ?assertEqual(0, emqx_mqueue:len(emqx_session:info(mqueue, Session))), ?assertEqual(0, emqx_session:info(awaiting_rel_cnt, Session)), ?assertEqual(100, emqx_session:info(awaiting_rel_max, Session)), @@ -100,7 +101,7 @@ t_subscribe(_) -> ?assertEqual(1, emqx_session:info(subscriptions_cnt, Session)). t_is_subscriptions_full_false(_) -> - Session = session(#{max_subscriptions => 0}), + Session = session(#{max_subscriptions => infinity}), ?assertNot(emqx_session:is_subscriptions_full(Session)). t_is_subscriptions_full_true(_) -> diff --git a/apps/emqx/test/emqx_ws_connection_SUITE.erl b/apps/emqx/test/emqx_ws_connection_SUITE.erl index cfa45f1ad..73e84633a 100644 --- a/apps/emqx/test/emqx_ws_connection_SUITE.erl +++ b/apps/emqx/test/emqx_ws_connection_SUITE.erl @@ -55,13 +55,6 @@ init_per_testcase(TestCase, Config) when ok = meck:expect(cowboy_req, sock, fun(_) -> {{127,0,0,1}, 18083} end), ok = meck:expect(cowboy_req, cert, fun(_) -> undefined end), ok = meck:expect(cowboy_req, parse_cookies, fun(_) -> error(badarg) end), - %% Mock emqx_zone - ok = meck:new(emqx_zone, [passthrough, no_history, no_link]), - ok = meck:expect(emqx_zone, oom_policy, - fun(_) -> #{max_heap_size => 838860800, - message_queue_len => 8000 - } - end), %% Mock emqx_access_control ok = meck:new(emqx_access_control, [passthrough, no_history, no_link]), ok = meck:expect(emqx_access_control, authorize, fun(_, _, _) -> allow end), @@ -96,7 +89,6 @@ end_per_testcase(TestCase, _Config) when -> lists:foreach(fun meck:unload/1, [cowboy_req, - emqx_zone, emqx_access_control, emqx_broker, emqx_hooks, @@ -124,12 +116,16 @@ t_info(_) -> sockstate := running } = SockInfo. +set_ws_opts(Key, Val) -> + emqx_config:put_listener_conf(default, mqtt_ws, [websocket, Key], Val). + t_header(_) -> - ok = meck:expect(cowboy_req, header, fun(<<"x-forwarded-for">>, _, _) -> <<"100.100.100.100, 99.99.99.99">>; - (<<"x-forwarded-port">>, _, _) -> <<"1000">> end), - {ok, St, _} = ?ws_conn:websocket_init([req, [{zone, external}, - {proxy_address_header, <<"x-forwarded-for">>}, - {proxy_port_header, <<"x-forwarded-port">>}]]), + ok = meck:expect(cowboy_req, header, + fun(<<"x-forwarded-for">>, _, _) -> <<"100.100.100.100, 99.99.99.99">>; + (<<"x-forwarded-port">>, _, _) -> <<"1000">> end), + set_ws_opts(proxy_address_header, <<"x-forwarded-for">>), + set_ws_opts(proxy_port_header, <<"x-forwarded-port">>), + {ok, St, _} = ?ws_conn:websocket_init([req, #{zone => default, listener => mqtt_ws}]), WsPid = spawn(fun() -> receive {call, From, info} -> gen_server:reply(From, ?ws_conn:info(St)) @@ -450,15 +446,6 @@ t_run_gc(_) -> WsSt = st(#{gc_state => GcSt}), ?ws_conn:run_gc(#{cnt => 100, oct => 10000}, WsSt). -t_check_oom(_) -> - %%Policy = #{max_heap_size => 10, message_queue_len => 10}, - %%meck:expect(emqx_zone, oom_policy, fun(_) -> Policy end), - _St = ?ws_conn:check_oom(st()), - ok = timer:sleep(10). - %%receive {shutdown, proc_heap_too_large} -> ok - %%after 0 -> error(expect_shutdown) - %%end. - t_enqueue(_) -> Packet = ?PUBLISH_PACKET(?QOS_0), St = ?ws_conn:enqueue(Packet, st()), @@ -473,7 +460,7 @@ t_shutdown(_) -> st() -> st(#{}). st(InitFields) when is_map(InitFields) -> - {ok, St, _} = ?ws_conn:websocket_init([req, [{zone, external}]]), + {ok, St, _} = ?ws_conn:websocket_init([req, #{zone => default, listener => mqtt_ws}]), maps:fold(fun(N, V, S) -> ?ws_conn:set_field(N, V, S) end, ?ws_conn:set_field(channel, channel(), St), InitFields @@ -493,7 +480,8 @@ channel(InitFields) -> receive_maximum => 100, expiry_interval => 0 }, - ClientInfo = #{zone => zone, + ClientInfo = #{zone => default, + listener => mqtt_ws, protocol => mqtt, peerhost => {127,0,0,1}, clientid => <<"clientid">>, @@ -502,13 +490,13 @@ channel(InitFields) -> peercert => undefined, mountpoint => undefined }, - Session = emqx_session:init(#{zone => default, listener => mqtt_tcp}, + Session = emqx_session:init(#{zone => default, listener => mqtt_ws}, #{receive_maximum => 0} ), maps:fold(fun(Field, Value, Channel) -> emqx_channel:set_field(Field, Value, Channel) end, - emqx_channel:init(ConnInfo, [{zone, zone}]), + emqx_channel:init(ConnInfo, #{zone => default, listener => mqtt_ws}), maps:merge(#{clientinfo => ClientInfo, session => Session, conn_state => connected