From d6797760a1458dc3cb025c038af73df8beffac72 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Tue, 16 Mar 2021 16:51:10 +0800 Subject: [PATCH] fix(test): add testcases for ws subprotocols --- test/emqx_ws_connection_SUITE.erl | 93 ++++++++++++++++++++++++++++--- 1 file changed, 86 insertions(+), 7 deletions(-) diff --git a/test/emqx_ws_connection_SUITE.erl b/test/emqx_ws_connection_SUITE.erl index 203a05065..71c6e40e5 100644 --- a/test/emqx_ws_connection_SUITE.erl +++ b/test/emqx_ws_connection_SUITE.erl @@ -41,7 +41,9 @@ all() -> emqx_ct:all(?MODULE). %% CT callbacks %%-------------------------------------------------------------------- -init_per_suite(Config) -> +init_per_testcase(TestCase, Config) when + TestCase =/= t_ws_sub_protocols_mqtt_equivalents, + TestCase =/= t_ws_sub_protocols_mqtt -> %% Mock cowboy_req ok = meck:new(cowboy_req, [passthrough, no_history, no_link]), ok = meck:expect(cowboy_req, peer, fun(_) -> {{127,0,0,1}, 3456} end), @@ -75,9 +77,15 @@ init_per_suite(Config) -> ok = meck:expect(emqx_metrics, inc, fun(_, _) -> ok end), ok = meck:expect(emqx_metrics, inc_recv, fun(_) -> ok end), ok = meck:expect(emqx_metrics, inc_sent, fun(_) -> ok end), + Config; + +init_per_testcase(_, Config) -> Config. -end_per_suite(_Config) -> + +end_per_testcase(TestCase, Config) when + TestCase =/= t_ws_sub_protocols_mqtt_equivalents, + TestCase =/= t_ws_sub_protocols_mqtt -> lists:foreach(fun meck:unload/1, [cowboy_req, emqx_zone, @@ -85,12 +93,9 @@ end_per_suite(_Config) -> emqx_broker, emqx_hooks, emqx_metrics - ]). + ]); -init_per_testcase(_TestCase, Config) -> - Config. - -end_per_testcase(_TestCase, Config) -> +end_per_testcase(_, Config) -> Config. %%-------------------------------------------------------------------- @@ -145,6 +150,27 @@ t_call(_) -> end), ?assertEqual(Info, ?ws_conn:call(WsPid, info)). +t_ws_sub_protocols_mqtt(_) -> + ok = emqx_ct_helpers:start_apps([]), + {ok, _} = application:ensure_all_started(gun), + ?assertMatch({gun_upgrade, _}, + start_ws_client(#{protocols => [<<"mqtt">>]})), + emqx_ct_helpers:stop_apps([]). + +t_ws_sub_protocols_mqtt_equivalents(_) -> + ok = emqx_ct_helpers:start_apps([]), + {ok, _} = application:ensure_all_started(gun), + %% also support mqtt-v3, mqtt-v3.1.1, mqtt-v5 + ?assertMatch({gun_upgrade, _}, + start_ws_client(#{protocols => [<<"mqtt-v3">>]})), + ?assertMatch({gun_upgrade, _}, + start_ws_client(#{protocols => [<<"mqtt-v3.1.1">>]})), + ?assertMatch({gun_upgrade, _}, + start_ws_client(#{protocols => [<<"mqtt-v5">>]})), + ?assertMatch({gun_response, {_, 400, _}}, + start_ws_client(#{protocols => [<<"not-mqtt">>]})), + emqx_ct_helpers:stop_apps([]). + t_init(_) -> Opts = [{idle_timeout, 300000}, {fail_if_no_subprotocol, false}, @@ -395,3 +421,56 @@ channel(InitFields) -> conn_state => connected }, InitFields)). +start_ws_client(State) -> + Host = maps:get(host, State, "127.0.0.1"), + Port = maps:get(port, State, 8083), + {ok, WPID} = gun:open(Host, Port), + #{result := Result} = ws_client(State#{wpid => WPID}), + gun:close(WPID), + Result. + +ws_client(State) -> + receive + {gun_up, WPID, _Proto} -> + #{protocols := Protos} = State, + StreamRef = gun:ws_upgrade(WPID, "/mqtt", [], + #{protocols => [{P, gun_ws_h} || P <- Protos]}), + ws_client(State#{wref => StreamRef}); + {gun_down, _WPID, _, Reason, _, _} -> + State#{result => {gun_down, Reason}}; + {gun_upgrade, _WPID, _Ref, _Proto, Data} -> + ct:pal("-- gun_upgrade: ~p", [Data]), + State#{result => {gun_upgrade, Data}}; + {gun_response, _WPID, _Ref, _Code, _HttpStatusCode, _Headers} -> + Rsp = {_Code, _HttpStatusCode, _Headers}, + ct:pal("-- gun_response: ~p", [Rsp]), + State#{result => {gun_response, Rsp}}; + {gun_error, _WPID, _Ref, _Reason} -> + State#{result => {gun_error, _Reason}}; + {'DOWN',_PID,process,_WPID,_Reason} -> + State#{result => {down, _Reason}}; + {gun_ws, _WPID, Frame} -> + case Frame of + close -> + self() ! stop; + {close,_Code,_Message} -> + self() ! stop; + {text,TextData} -> + io:format("Received Text Frame: ~p~n",[TextData]); + {binary,BinData} -> + io:format("Received Binary Frame: ~p~n",[BinData]); + _ -> + io:format("Received Unhandled Frame: ~p~n",[Frame]) + end, + ws_client(State); + stop -> + #{wpid := WPID} = State, + gun:flush(WPID), + gun:shutdown(WPID), + State#{result => {stop, normal}}; + Message -> + ct:pal("Received Unknown Message on Gun: ~p~n",[Message]), + ws_client(State) + after 1000 -> + ct:fail(ws_timeout) + end.