fix(test): add testcases for ws subprotocols
This commit is contained in:
parent
08366c2735
commit
d6797760a1
|
@ -41,7 +41,9 @@ all() -> emqx_ct:all(?MODULE).
|
|||
%% CT callbacks
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
init_per_suite(Config) ->
|
||||
init_per_testcase(TestCase, Config) when
|
||||
TestCase =/= t_ws_sub_protocols_mqtt_equivalents,
|
||||
TestCase =/= t_ws_sub_protocols_mqtt ->
|
||||
%% Mock cowboy_req
|
||||
ok = meck:new(cowboy_req, [passthrough, no_history, no_link]),
|
||||
ok = meck:expect(cowboy_req, peer, fun(_) -> {{127,0,0,1}, 3456} end),
|
||||
|
@ -75,9 +77,15 @@ init_per_suite(Config) ->
|
|||
ok = meck:expect(emqx_metrics, inc, fun(_, _) -> ok end),
|
||||
ok = meck:expect(emqx_metrics, inc_recv, fun(_) -> ok end),
|
||||
ok = meck:expect(emqx_metrics, inc_sent, fun(_) -> ok end),
|
||||
Config;
|
||||
|
||||
init_per_testcase(_, Config) ->
|
||||
Config.
|
||||
|
||||
end_per_suite(_Config) ->
|
||||
|
||||
end_per_testcase(TestCase, Config) when
|
||||
TestCase =/= t_ws_sub_protocols_mqtt_equivalents,
|
||||
TestCase =/= t_ws_sub_protocols_mqtt ->
|
||||
lists:foreach(fun meck:unload/1,
|
||||
[cowboy_req,
|
||||
emqx_zone,
|
||||
|
@ -85,12 +93,9 @@ end_per_suite(_Config) ->
|
|||
emqx_broker,
|
||||
emqx_hooks,
|
||||
emqx_metrics
|
||||
]).
|
||||
]);
|
||||
|
||||
init_per_testcase(_TestCase, Config) ->
|
||||
Config.
|
||||
|
||||
end_per_testcase(_TestCase, Config) ->
|
||||
end_per_testcase(_, Config) ->
|
||||
Config.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
|
@ -145,6 +150,27 @@ t_call(_) ->
|
|||
end),
|
||||
?assertEqual(Info, ?ws_conn:call(WsPid, info)).
|
||||
|
||||
t_ws_sub_protocols_mqtt(_) ->
|
||||
ok = emqx_ct_helpers:start_apps([]),
|
||||
{ok, _} = application:ensure_all_started(gun),
|
||||
?assertMatch({gun_upgrade, _},
|
||||
start_ws_client(#{protocols => [<<"mqtt">>]})),
|
||||
emqx_ct_helpers:stop_apps([]).
|
||||
|
||||
t_ws_sub_protocols_mqtt_equivalents(_) ->
|
||||
ok = emqx_ct_helpers:start_apps([]),
|
||||
{ok, _} = application:ensure_all_started(gun),
|
||||
%% also support mqtt-v3, mqtt-v3.1.1, mqtt-v5
|
||||
?assertMatch({gun_upgrade, _},
|
||||
start_ws_client(#{protocols => [<<"mqtt-v3">>]})),
|
||||
?assertMatch({gun_upgrade, _},
|
||||
start_ws_client(#{protocols => [<<"mqtt-v3.1.1">>]})),
|
||||
?assertMatch({gun_upgrade, _},
|
||||
start_ws_client(#{protocols => [<<"mqtt-v5">>]})),
|
||||
?assertMatch({gun_response, {_, 400, _}},
|
||||
start_ws_client(#{protocols => [<<"not-mqtt">>]})),
|
||||
emqx_ct_helpers:stop_apps([]).
|
||||
|
||||
t_init(_) ->
|
||||
Opts = [{idle_timeout, 300000},
|
||||
{fail_if_no_subprotocol, false},
|
||||
|
@ -395,3 +421,56 @@ channel(InitFields) ->
|
|||
conn_state => connected
|
||||
}, InitFields)).
|
||||
|
||||
start_ws_client(State) ->
|
||||
Host = maps:get(host, State, "127.0.0.1"),
|
||||
Port = maps:get(port, State, 8083),
|
||||
{ok, WPID} = gun:open(Host, Port),
|
||||
#{result := Result} = ws_client(State#{wpid => WPID}),
|
||||
gun:close(WPID),
|
||||
Result.
|
||||
|
||||
ws_client(State) ->
|
||||
receive
|
||||
{gun_up, WPID, _Proto} ->
|
||||
#{protocols := Protos} = State,
|
||||
StreamRef = gun:ws_upgrade(WPID, "/mqtt", [],
|
||||
#{protocols => [{P, gun_ws_h} || P <- Protos]}),
|
||||
ws_client(State#{wref => StreamRef});
|
||||
{gun_down, _WPID, _, Reason, _, _} ->
|
||||
State#{result => {gun_down, Reason}};
|
||||
{gun_upgrade, _WPID, _Ref, _Proto, Data} ->
|
||||
ct:pal("-- gun_upgrade: ~p", [Data]),
|
||||
State#{result => {gun_upgrade, Data}};
|
||||
{gun_response, _WPID, _Ref, _Code, _HttpStatusCode, _Headers} ->
|
||||
Rsp = {_Code, _HttpStatusCode, _Headers},
|
||||
ct:pal("-- gun_response: ~p", [Rsp]),
|
||||
State#{result => {gun_response, Rsp}};
|
||||
{gun_error, _WPID, _Ref, _Reason} ->
|
||||
State#{result => {gun_error, _Reason}};
|
||||
{'DOWN',_PID,process,_WPID,_Reason} ->
|
||||
State#{result => {down, _Reason}};
|
||||
{gun_ws, _WPID, Frame} ->
|
||||
case Frame of
|
||||
close ->
|
||||
self() ! stop;
|
||||
{close,_Code,_Message} ->
|
||||
self() ! stop;
|
||||
{text,TextData} ->
|
||||
io:format("Received Text Frame: ~p~n",[TextData]);
|
||||
{binary,BinData} ->
|
||||
io:format("Received Binary Frame: ~p~n",[BinData]);
|
||||
_ ->
|
||||
io:format("Received Unhandled Frame: ~p~n",[Frame])
|
||||
end,
|
||||
ws_client(State);
|
||||
stop ->
|
||||
#{wpid := WPID} = State,
|
||||
gun:flush(WPID),
|
||||
gun:shutdown(WPID),
|
||||
State#{result => {stop, normal}};
|
||||
Message ->
|
||||
ct:pal("Received Unknown Message on Gun: ~p~n",[Message]),
|
||||
ws_client(State)
|
||||
after 1000 ->
|
||||
ct:fail(ws_timeout)
|
||||
end.
|
||||
|
|
Loading…
Reference in New Issue