fix(bridge): add test cases for sending msgs via http bridge

This commit is contained in:
Shawn 2021-12-28 11:35:47 +08:00
parent 1cd226c18b
commit eb992ad2ad
1 changed files with 49 additions and 13 deletions

View File

@ -47,7 +47,7 @@ groups() ->
[]. [].
suite() -> suite() ->
[{timetrap,{seconds,30}}]. [{timetrap,{seconds,60}}].
init_per_suite(Config) -> init_per_suite(Config) ->
ok = emqx_config:put([emqx_dashboard], #{ ok = emqx_config:put([emqx_dashboard], #{
@ -84,7 +84,7 @@ start_http_server(HandleFun) ->
spawn_link(fun() -> spawn_link(fun() ->
{Port, Sock} = listen_on_random_port(), {Port, Sock} = listen_on_random_port(),
Parent ! {port, Port}, Parent ! {port, Port},
loop(Sock, HandleFun) loop(Sock, HandleFun, Parent)
end), end),
receive receive
{port, Port} -> Port {port, Port} -> Port
@ -95,40 +95,49 @@ start_http_server(HandleFun) ->
listen_on_random_port() -> listen_on_random_port() ->
Min = 1024, Max = 65000, Min = 1024, Max = 65000,
Port = rand:uniform(Max - Min) + Min, Port = rand:uniform(Max - Min) + Min,
case gen_tcp:listen(Port, [{active, false}, {reuseaddr, true}]) of case gen_tcp:listen(Port, [{active, false}, {reuseaddr, true}, binary]) of
{ok, Sock} -> {Port, Sock}; {ok, Sock} -> {Port, Sock};
{error, eaddrinuse} -> listen_on_random_port() {error, eaddrinuse} -> listen_on_random_port()
end. end.
loop(Sock, HandleFun) -> loop(Sock, HandleFun, Parent) ->
{ok, Conn} = gen_tcp:accept(Sock), {ok, Conn} = gen_tcp:accept(Sock),
Handler = spawn(fun () -> HandleFun(Conn) end), Handler = spawn(fun () -> HandleFun(Conn, Parent) end),
gen_tcp:controlling_process(Conn, Handler), gen_tcp:controlling_process(Conn, Handler),
loop(Sock, HandleFun). loop(Sock, HandleFun, Parent).
make_response(CodeStr, Str) -> make_response(CodeStr, Str) ->
B = iolist_to_binary(Str), B = iolist_to_binary(Str),
iolist_to_binary( iolist_to_binary(
io_lib:fwrite( io_lib:fwrite(
"HTTP/1.0 ~s\nContent-Type: text/html\nContent-Length: ~p\n\n~s", "HTTP/1.0 ~s\r\nContent-Type: text/html\r\nContent-Length: ~p\r\n\r\n~s",
[CodeStr, size(B), B])). [CodeStr, size(B), B])).
handle_fun_200_ok(Conn) -> handle_fun_200_ok(Conn, Parent) ->
case gen_tcp:recv(Conn, 0) of case gen_tcp:recv(Conn, 0) of
{ok, Request} -> {ok, ReqStr} ->
ct:pal("the http handler got request: ~p", [ReqStr]),
Req = parse_http_request(ReqStr),
Parent ! {http_server, received, Req},
gen_tcp:send(Conn, make_response("200 OK", "Request OK")), gen_tcp:send(Conn, make_response("200 OK", "Request OK")),
self() ! {http_server, received, Request}, handle_fun_200_ok(Conn, Parent);
handle_fun_200_ok(Conn);
{error, closed} -> {error, closed} ->
gen_tcp:close(Conn) gen_tcp:close(Conn)
end. end.
parse_http_request(ReqStr0) ->
[Method, ReqStr1] = string:split(ReqStr0, " ", leading),
[Path, ReqStr2] = string:split(ReqStr1, " ", leading),
[_ProtoVsn, ReqStr3] = string:split(ReqStr2, "\r\n", leading),
[_HeaderStr, Body] = string:split(ReqStr3, "\r\n\r\n", leading),
#{method => Method, path => Path, body => Body}.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Testcases %% Testcases
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
t_http_crud_apis(_) -> t_http_crud_apis(_) ->
Port = start_http_server(fun handle_fun_200_ok/1), Port = start_http_server(fun handle_fun_200_ok/2),
%% assert we there's no bridges at first %% assert we there's no bridges at first
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
@ -163,6 +172,20 @@ t_http_crud_apis(_) ->
, <<"message">> := <<"bridge already exists">> , <<"message">> := <<"bridge already exists">>
}, jsx:decode(RetMsg)), }, jsx:decode(RetMsg)),
%% send an message to emqx and the message should be forwarded to the HTTP server
Body = <<"my msg">>,
emqx:publish(emqx_message:make(<<"emqx_http/1">>, Body)),
?assert(
receive
{http_server, received, #{method := <<"POST">>, path := <<"/path1">>,
body := Body}} ->
true;
Msg ->
ct:pal("error: http got unexpected request: ~p", [Msg]),
false
after 100 ->
false
end),
%% update the request-path of the bridge %% update the request-path of the bridge
URL2 = ?URL(Port, "path2"), URL2 = ?URL(Port, "path2"),
{ok, 200, Bridge2} = request(put, uri(["bridges", ?BRIDGE_ID]), {ok, 200, Bridge2} = request(put, uri(["bridges", ?BRIDGE_ID]),
@ -201,6 +224,19 @@ t_http_crud_apis(_) ->
, <<"url">> := URL2 , <<"url">> := URL2
}, jsx:decode(Bridge3Str)), }, jsx:decode(Bridge3Str)),
%% send an message to emqx again, check the path has been changed
emqx:publish(emqx_message:make(<<"emqx_http/1">>, Body)),
?assert(
receive
{http_server, received, #{path := <<"/path2">>}} ->
true;
Msg2 ->
ct:pal("error: http got unexpected request: ~p", [Msg2]),
false
after 100 ->
false
end),
%% delete the bridge %% delete the bridge
{ok, 204, <<>>} = request(delete, uri(["bridges", ?BRIDGE_ID]), []), {ok, 204, <<>>} = request(delete, uri(["bridges", ?BRIDGE_ID]), []),
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
@ -215,7 +251,7 @@ t_http_crud_apis(_) ->
ok. ok.
t_start_stop_bridges(_) -> t_start_stop_bridges(_) ->
Port = start_http_server(fun handle_fun_200_ok/1), Port = start_http_server(fun handle_fun_200_ok/2),
URL1 = ?URL(Port, "abc"), URL1 = ?URL(Port, "abc"),
{ok, 201, Bridge} = request(post, uri(["bridges"]), {ok, 201, Bridge} = request(post, uri(["bridges"]),
?HTTP_BRIDGE(URL1)#{ ?HTTP_BRIDGE(URL1)#{