test(bridge_http): stablize flaky test case

This commit is contained in:
Thales Macedo Garitezi 2023-10-26 14:12:40 -03:00 committed by Zaiming (Stone) Shi
parent 83e05b0d77
commit 89812f0a7a
1 changed files with 31 additions and 9 deletions

View File

@ -28,6 +28,7 @@
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("emqx/include/asserts.hrl").
-define(BRIDGE_TYPE, <<"webhook">>). -define(BRIDGE_TYPE, <<"webhook">>).
-define(BRIDGE_NAME, atom_to_binary(?MODULE)). -define(BRIDGE_NAME, atom_to_binary(?MODULE)).
@ -58,9 +59,20 @@ suite() ->
init_per_testcase(t_bad_bridge_config, Config) -> init_per_testcase(t_bad_bridge_config, Config) ->
Config; Config;
init_per_testcase(t_send_async_connection_timeout, Config) -> init_per_testcase(t_send_async_connection_timeout, Config) ->
HTTPPath = <<"/path">>,
ServerSSLOpts = false,
{ok, {HTTPPort, _Pid}} = emqx_bridge_http_connector_test_server:start_link(
_Port = random, HTTPPath, ServerSSLOpts
),
ResponseDelayMS = 500, ResponseDelayMS = 500,
Server = start_http_server(#{response_delay_ms => ResponseDelayMS}), ok = emqx_bridge_http_connector_test_server:set_handler(
[{http_server, Server}, {response_delay_ms, ResponseDelayMS} | Config]; success_http_handler(#{response_delay => ResponseDelayMS})
),
[
{http_server, #{port => HTTPPort, path => HTTPPath}},
{response_delay_ms, ResponseDelayMS}
| Config
];
init_per_testcase(t_path_not_found, Config) -> init_per_testcase(t_path_not_found, Config) ->
HTTPPath = <<"/nonexisting/path">>, HTTPPath = <<"/nonexisting/path">>,
ServerSSLOpts = false, ServerSSLOpts = false,
@ -98,7 +110,8 @@ end_per_testcase(TestCase, _Config) when
TestCase =:= t_path_not_found; TestCase =:= t_path_not_found;
TestCase =:= t_too_many_requests; TestCase =:= t_too_many_requests;
TestCase =:= t_rule_action_expired; TestCase =:= t_rule_action_expired;
TestCase =:= t_bridge_probes_header_atoms TestCase =:= t_bridge_probes_header_atoms;
TestCase =:= t_send_async_connection_timeout
-> ->
ok = emqx_bridge_http_connector_test_server:stop(), ok = emqx_bridge_http_connector_test_server:stop(),
persistent_term:erase({?MODULE, times_called}), persistent_term:erase({?MODULE, times_called}),
@ -302,11 +315,18 @@ make_bridge(Config) ->
emqx_bridge_resource:bridge_id(Type, Name). emqx_bridge_resource:bridge_id(Type, Name).
success_http_handler() -> success_http_handler() ->
success_http_handler(#{response_delay => 0}).
success_http_handler(Opts) ->
ResponseDelay = maps:get(response_delay, Opts, 0),
TestPid = self(), TestPid = self(),
fun(Req0, State) -> fun(Req0, State) ->
{ok, Body, Req} = cowboy_req:read_body(Req0), {ok, Body, Req} = cowboy_req:read_body(Req0),
Headers = cowboy_req:headers(Req), Headers = cowboy_req:headers(Req),
ct:pal("http request received: ~p", [#{body => Body, headers => Headers}]), ct:pal("http request received: ~p", [
#{body => Body, headers => Headers, response_delay => ResponseDelay}
]),
ResponseDelay > 0 andalso timer:sleep(ResponseDelay),
TestPid ! {http, Headers, Body}, TestPid ! {http, Headers, Body},
Rep = cowboy_req:reply( Rep = cowboy_req:reply(
200, 200,
@ -380,9 +400,10 @@ wait_http_request() ->
%% When the connection time out all the queued requests where dropped in %% When the connection time out all the queued requests where dropped in
t_send_async_connection_timeout(Config) -> t_send_async_connection_timeout(Config) ->
ResponseDelayMS = ?config(response_delay_ms, Config), ResponseDelayMS = ?config(response_delay_ms, Config),
#{port := Port} = ?config(http_server, Config), #{port := Port, path := Path} = ?config(http_server, Config),
BridgeID = make_bridge(#{ BridgeID = make_bridge(#{
port => Port, port => Port,
path => Path,
pool_size => 1, pool_size => 1,
query_mode => "async", query_mode => "async",
connect_timeout => integer_to_list(ResponseDelayMS * 2) ++ "ms", connect_timeout => integer_to_list(ResponseDelayMS * 2) ++ "ms",
@ -724,16 +745,17 @@ receive_request_notifications(MessageIDs, _ResponseDelay, _Acc) when map_size(Me
ok; ok;
receive_request_notifications(MessageIDs, ResponseDelay, Acc) -> receive_request_notifications(MessageIDs, ResponseDelay, Acc) ->
receive receive
{http_server, received, Req} -> {http, _Headers, Body} ->
RemainingMessageIDs = remove_message_id(MessageIDs, Req), RemainingMessageIDs = remove_message_id(MessageIDs, Body),
receive_request_notifications(RemainingMessageIDs, ResponseDelay, [Req | Acc]) receive_request_notifications(RemainingMessageIDs, ResponseDelay, [Body | Acc])
after (30 * 1000) -> after (30 * 1000) ->
ct:pal("Waited a long time but did not get any message"), ct:pal("Waited a long time but did not get any message"),
ct:pal("Messages received so far:\n ~p", [Acc]), ct:pal("Messages received so far:\n ~p", [Acc]),
ct:pal("Mailbox:\n ~p", [?drainMailbox()]),
ct:fail("All requests did not reach server at least once") ct:fail("All requests did not reach server at least once")
end. end.
remove_message_id(MessageIDs, #{body := IDBin}) -> remove_message_id(MessageIDs, IDBin) ->
ID = erlang:binary_to_integer(IDBin), ID = erlang:binary_to_integer(IDBin),
%% It is acceptable to get the same message more than once %% It is acceptable to get the same message more than once
maps:without([ID], MessageIDs). maps:without([ID], MessageIDs).