%%-------------------------------------------------------------------- %% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. %% You may obtain a copy of the License at %% http://www.apache.org/licenses/LICENSE-2.0 %% %% Unless required by applicable law or agreed to in writing, software %% distributed under the License is distributed on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. %%-------------------------------------------------------------------- -module(emqx_bridge_http_SUITE). %% This suite should contains testcases that are specific for the webhook %% bridge. There are also some test cases that implicitly tests the webhook %% bridge in emqx_bridge_api_SUITE -compile(nowarn_export_all). -compile(export_all). -import(emqx_mgmt_api_test_util, [request/3, uri/1]). -import(emqx_common_test_helpers, [on_exit/1]). -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("emqx/include/asserts.hrl"). -define(BRIDGE_TYPE, <<"webhook">>). -define(BRIDGE_NAME, atom_to_binary(?MODULE)). all() -> emqx_common_test_helpers:all(?MODULE). groups() -> []. init_per_suite(Config0) -> Config = case os:getenv("DEBUG_CASE") of [_ | _] = DebugCase -> CaseName = list_to_atom(DebugCase), [{debug_case, CaseName} | Config0]; _ -> Config0 end, Apps = emqx_cth_suite:start( [ emqx, emqx_conf, emqx_connector, emqx_bridge_http, emqx_bridge, emqx_rule_engine ], #{work_dir => emqx_cth_suite:work_dir(Config)} ), emqx_mgmt_api_test_util:init_suite(), [{apps, Apps} | Config]. end_per_suite(Config) -> Apps = ?config(apps, Config), emqx_mgmt_api_test_util:end_suite(), ok = emqx_cth_suite:stop(Apps), ok. suite() -> [{timetrap, {seconds, 60}}]. init_per_testcase(t_bad_bridge_config, Config) -> Config; init_per_testcase(t_send_async_connection_timeout, Config) -> HTTPPath = <<"/path">>, ServerSSLOpts = false, {ok, {HTTPPort, _Pid}} = emqx_bridge_http_connector_test_server:start_link( _Port = random, HTTPPath, ServerSSLOpts ), ResponseDelayMS = 500, ok = emqx_bridge_http_connector_test_server:set_handler( success_http_handler(#{response_delay => ResponseDelayMS}) ), [ {http_server, #{port => HTTPPort, path => HTTPPath}}, {response_delay_ms, ResponseDelayMS} | Config ]; init_per_testcase(t_path_not_found, Config) -> HTTPPath = <<"/nonexisting/path">>, ServerSSLOpts = false, {ok, {HTTPPort, _Pid}} = emqx_bridge_http_connector_test_server:start_link( _Port = random, HTTPPath, ServerSSLOpts ), ok = emqx_bridge_http_connector_test_server:set_handler(not_found_http_handler()), [{http_server, #{port => HTTPPort, path => HTTPPath}} | Config]; init_per_testcase(t_too_many_requests, Config) -> HTTPPath = <<"/path">>, ServerSSLOpts = false, {ok, {HTTPPort, _Pid}} = emqx_bridge_http_connector_test_server:start_link( _Port = random, HTTPPath, ServerSSLOpts ), ok = emqx_bridge_http_connector_test_server:set_handler(too_many_requests_http_handler()), [{http_server, #{port => HTTPPort, path => HTTPPath}} | Config]; init_per_testcase(t_rule_action_expired, Config) -> [ {bridge_name, ?BRIDGE_NAME} | Config ]; init_per_testcase(t_bridge_probes_header_atoms, Config) -> HTTPPath = <<"/path">>, ServerSSLOpts = false, {ok, {HTTPPort, _Pid}} = emqx_bridge_http_connector_test_server:start_link( _Port = random, HTTPPath, ServerSSLOpts ), ok = emqx_bridge_http_connector_test_server:set_handler(success_http_handler()), [{http_server, #{port => HTTPPort, path => HTTPPath}} | Config]; init_per_testcase(_TestCase, Config) -> Server = start_http_server(#{response_delay_ms => 0}), [{http_server, Server} | Config]. end_per_testcase(TestCase, _Config) when TestCase =:= t_path_not_found; TestCase =:= t_too_many_requests; TestCase =:= t_rule_action_expired; TestCase =:= t_bridge_probes_header_atoms; TestCase =:= t_send_async_connection_timeout -> ok = emqx_bridge_http_connector_test_server:stop(), persistent_term:erase({?MODULE, times_called}), emqx_bridge_v2_testlib:delete_all_bridges(), emqx_bridge_v2_testlib:delete_all_connectors(), emqx_common_test_helpers:call_janitor(), ok; end_per_testcase(_TestCase, Config) -> case ?config(http_server, Config) of undefined -> ok; Server -> stop_http_server(Server) end, emqx_bridge_v2_testlib:delete_all_bridges(), emqx_bridge_v2_testlib:delete_all_connectors(), emqx_common_test_helpers:call_janitor(), ok. %%------------------------------------------------------------------------------ %% HTTP server for testing %% (Originally copied from emqx_bridge_api_SUITE) %%------------------------------------------------------------------------------ start_http_server(HTTPServerConfig) -> process_flag(trap_exit, true), Parent = self(), ct:pal("Starting server for ~p", [Parent]), {ok, {Port, Sock}} = listen_on_random_port(), Acceptor = spawn(fun() -> accept_loop(Sock, Parent, HTTPServerConfig) end), ct:pal("Started server on port ~p", [Port]), timer:sleep(100), #{port => Port, sock => Sock, acceptor => Acceptor}. stop_http_server(#{sock := Sock, acceptor := Acceptor}) -> ct:pal("Stop server\n"), exit(Acceptor, kill), gen_tcp:close(Sock). listen_on_random_port() -> SockOpts = [binary, {active, false}, {packet, raw}, {reuseaddr, true}, {backlog, 1000}], case gen_tcp:listen(0, SockOpts) of {ok, Sock} -> {ok, Port} = inet:port(Sock), {ok, {Port, Sock}}; {error, Reason} when Reason =/= eaddrinuse -> {error, Reason} end. accept_loop(Sock, Parent, HTTPServerConfig) -> process_flag(trap_exit, true), case gen_tcp:accept(Sock) of {ok, Conn} -> spawn(fun() -> handle_fun_200_ok(Conn, Parent, HTTPServerConfig, <<>>) end), %%gen_tcp:controlling_process(Conn, Handler), accept_loop(Sock, Parent, HTTPServerConfig); {error, closed} -> %% socket owner died ok end. make_response(CodeStr, Str) -> B = iolist_to_binary(Str), iolist_to_binary( io_lib:fwrite( "HTTP/1.0 ~s\r\nContent-Type: text/html\r\nContent-Length: ~p\r\n\r\n~s", [CodeStr, size(B), B] ) ). handle_fun_200_ok(Conn, Parent, HTTPServerConfig, Acc) -> ResponseDelayMS = maps:get(response_delay_ms, HTTPServerConfig, 0), ct:pal("Waiting for request~n"), case gen_tcp:recv(Conn, 0) of {ok, ReqStr} -> ct:pal("The http handler got request: ~p", [ReqStr]), case parse_http_request(<>) of {ok, incomplete, NewAcc} -> handle_fun_200_ok(Conn, Parent, HTTPServerConfig, NewAcc); {ok, Req, NewAcc} -> timer:sleep(ResponseDelayMS), Parent ! {http_server, received, Req}, gen_tcp:send(Conn, make_response("200 OK", "Request OK")), handle_fun_200_ok(Conn, Parent, HTTPServerConfig, NewAcc) end; {error, closed} -> ct:pal("http connection closed"); {error, Reason} -> ct:pal("the http handler recv error: ~p", [Reason]), timer:sleep(100), gen_tcp:close(Conn) end. parse_http_request(ReqStr) -> try parse_http_request_assertive(ReqStr) catch _:_ -> {ok, incomplete, ReqStr} end. parse_http_request_assertive(ReqStr0) -> %% find body length [_, LengthStr0] = string:split(ReqStr0, "content-length:"), [LengthStr, _] = string:split(LengthStr0, "\r\n"), Length = binary_to_integer(string:trim(LengthStr, both)), %% split between multiple requests [Method, ReqStr1] = string:split(ReqStr0, " ", leading), [Path, ReqStr2] = string:split(ReqStr1, " ", leading), [_ProtoVsn, ReqStr3] = string:split(ReqStr2, "\r\n", leading), [_HeaderStr, Rest] = string:split(ReqStr3, "\r\n\r\n", leading), <> = Rest, {ok, #{method => Method, path => Path, body => Body}, Remain}. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% Helper functions %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% get_metrics(Name) -> %% Note: `emqx_bridge:get_metrics/2' is currently *only* called in prod by %% `emqx_bridge_api:lookup_from_local_node' with an action (not v1 bridge) type. Type = <<"http">>, emqx_bridge:get_metrics(Type, Name). bridge_async_config(#{port := Port} = Config) -> Type = maps:get(type, Config, ?BRIDGE_TYPE), Name = maps:get(name, Config, ?BRIDGE_NAME), Host = maps:get(host, Config, "localhost"), Path = maps:get(path, Config, ""), PoolSize = maps:get(pool_size, Config, 1), QueryMode = maps:get(query_mode, Config, "async"), ConnectTimeout = maps:get(connect_timeout, Config, "1s"), RequestTimeout = maps:get(request_timeout, Config, "10s"), ResumeInterval = maps:get(resume_interval, Config, "1s"), HealthCheckInterval = maps:get(health_check_interval, Config, "200ms"), ResourceRequestTTL = maps:get(resource_request_ttl, Config, "infinity"), LocalTopic = case maps:find(local_topic, Config) of {ok, LT} -> lists:flatten(["local_topic = \"", LT, "\""]); error -> "" end, ConfigString = io_lib:format( "bridges.~s.~s {\n" " url = \"http://~s:~p~s\"\n" " connect_timeout = \"~p\"\n" " enable = true\n" %% local_topic " ~s\n" " enable_pipelining = 100\n" " max_retries = 2\n" " method = \"post\"\n" " pool_size = ~p\n" " pool_type = \"random\"\n" " request_timeout = \"~s\"\n" " body = \"${id}\"\n" " resource_opts {\n" " inflight_window = 100\n" " health_check_interval = \"~s\"\n" " max_buffer_bytes = \"1GB\"\n" " query_mode = \"~s\"\n" " request_ttl = \"~p\"\n" " resume_interval = \"~s\"\n" " start_after_created = \"true\"\n" " start_timeout = \"5s\"\n" " worker_pool_size = \"1\"\n" " }\n" " ssl {\n" " enable = false\n" " }\n" "}\n", [ Type, Name, Host, Port, Path, ConnectTimeout, LocalTopic, PoolSize, RequestTimeout, HealthCheckInterval, QueryMode, ResourceRequestTTL, ResumeInterval ] ), ct:pal(ConfigString), parse_and_check(ConfigString, Type, Name). parse_and_check(ConfigString, BridgeType, Name) -> {ok, RawConf} = hocon:binary(ConfigString, #{format => map}), hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}), #{<<"bridges">> := #{BridgeType := #{Name := RetConfig}}} = RawConf, RetConfig. make_bridge(Config) -> Type = ?BRIDGE_TYPE, Name = ?BRIDGE_NAME, BridgeConfig = bridge_async_config(Config#{ name => Name, type => Type }), {ok, _} = emqx_bridge:create( Type, Name, BridgeConfig ), emqx_bridge_resource:bridge_id(Type, Name). success_http_handler() -> success_http_handler(#{response_delay => 0}). success_http_handler(Opts) -> ResponseDelay = maps:get(response_delay, Opts, 0), TestPid = self(), fun(Req0, State) -> {ok, Body, Req} = cowboy_req:read_body(Req0), Headers = cowboy_req:headers(Req), ct:pal("http request received: ~p", [ #{body => Body, headers => Headers, response_delay => ResponseDelay} ]), ResponseDelay > 0 andalso timer:sleep(ResponseDelay), TestPid ! {http, Headers, Body}, Rep = cowboy_req:reply( 200, #{<<"content-type">> => <<"text/plain">>}, <<"hello">>, Req ), {ok, Rep, State} end. not_found_http_handler() -> TestPid = self(), fun(Req0, State) -> {ok, Body, Req} = cowboy_req:read_body(Req0), TestPid ! {http, cowboy_req:headers(Req), Body}, Rep = cowboy_req:reply( 404, #{<<"content-type">> => <<"text/plain">>}, <<"not found">>, Req ), {ok, Rep, State} end. too_many_requests_http_handler() -> GetAndBump = fun() -> NCalled = persistent_term:get({?MODULE, times_called}, 0), persistent_term:put({?MODULE, times_called}, NCalled + 1), NCalled + 1 end, TestPid = self(), fun(Req0, State) -> N = GetAndBump(), {ok, Body, Req} = cowboy_req:read_body(Req0), TestPid ! {http, cowboy_req:headers(Req), Body}, Rep = case N >= 2 of true -> cowboy_req:reply( 200, #{<<"content-type">> => <<"text/plain">>}, <<"ok">>, Req ); false -> cowboy_req:reply( 429, #{<<"content-type">> => <<"text/plain">>}, <<"slow down, buddy">>, Req ) end, {ok, Rep, State} end. wait_http_request() -> receive {http, _Headers, _Req} -> ok after 1_000 -> ct:pal("mailbox: ~p", [process_info(self(), messages)]), ct:fail("http request not made") end. %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ %% This test ensures that https://emqx.atlassian.net/browse/CI-62 is fixed. %% When the connection time out all the queued requests where dropped in t_send_async_connection_timeout(Config) -> ResponseDelayMS = ?config(response_delay_ms, Config), #{port := Port, path := Path} = ?config(http_server, Config), BridgeID = make_bridge(#{ port => Port, path => Path, pool_size => 1, query_mode => "async", connect_timeout => integer_to_list(ResponseDelayMS * 2) ++ "ms", request_timeout => "10s", resume_interval => "200ms", health_check_interval => "200ms", resource_request_ttl => "infinity" }), ResourceId = emqx_bridge_resource:resource_id(BridgeID), ?retry( _Interval0 = 200, _NAttempts0 = 20, ?assertMatch({ok, connected}, emqx_resource_manager:health_check(ResourceId)) ), NumberOfMessagesToSend = 10, [ do_send_message(#{<<"id">> => Id}) || Id <- lists:seq(1, NumberOfMessagesToSend) ], %% Make sure server receives all messages ct:pal("Sent messages\n"), MessageIDs = maps:from_keys(lists:seq(1, NumberOfMessagesToSend), void), receive_request_notifications(MessageIDs, ResponseDelayMS, []), ok. t_async_free_retries(Config) -> #{port := Port} = ?config(http_server, Config), _BridgeID = make_bridge(#{ port => Port, pool_size => 1, query_mode => "sync", connect_timeout => "1s", request_timeout => "10s", resource_request_ttl => "10000s" }), %% Fail 5 times then succeed. Context = #{error_attempts => 5}, ExpectedAttempts = 6, Fn = fun(Get, Error) -> ?assertMatch( {ok, 200, _, _}, do_send_message(#{<<"hello">> => <<"world">>}), #{error => Error} ), ?assertEqual(ExpectedAttempts, Get(), #{error => Error}) end, do_t_async_retries(?FUNCTION_NAME, Context, {error, normal}, Fn), do_t_async_retries(?FUNCTION_NAME, Context, {error, {shutdown, normal}}, Fn), ok. t_async_common_retries(Config) -> #{port := Port} = ?config(http_server, Config), _BridgeID = make_bridge(#{ port => Port, pool_size => 1, query_mode => "sync", resume_interval => "100ms", connect_timeout => "1s", request_timeout => "10s", resource_request_ttl => "10000s" }), %% Keeps failing until connector gives up. Context = #{error_attempts => infinity}, ExpectedAttempts = 3, FnSucceed = fun(Get, Error) -> ?assertMatch( {ok, 200, _, _}, do_send_message(#{<<"hello">> => <<"world">>}), #{error => Error, attempts => Get()} ), ?assertEqual(ExpectedAttempts, Get(), #{error => Error}) end, FnFail = fun(Get, Error) -> ?assertMatch( Error, do_send_message(#{<<"hello">> => <<"world">>}), #{error => Error, attempts => Get()} ), ?assertEqual(ExpectedAttempts, Get(), #{error => Error}) end, %% These two succeed because they're further retried by the buffer %% worker synchronously, and we're not mock that call. do_t_async_retries( ?FUNCTION_NAME, Context, {error, {closed, "The connection was lost."}}, FnSucceed ), do_t_async_retries(?FUNCTION_NAME, Context, {error, {shutdown, closed}}, FnSucceed), %% This fails because this error is treated as unrecoverable. do_t_async_retries(?FUNCTION_NAME, Context, {error, something_else}, FnFail), ok. t_bad_bridge_config(_Config) -> BridgeConfig = bridge_async_config(#{port => 12345}), ?assertMatch( {ok, {{_, 201, _}, _Headers, #{ <<"status">> := <<"disconnected">>, <<"status_reason">> := <<"Connection refused">> }}}, emqx_bridge_testlib:create_bridge_api( ?BRIDGE_TYPE, ?BRIDGE_NAME, BridgeConfig ) ), %% try `/start` bridge ?assertMatch( {error, {{_, 400, _}, _Headers, #{<<"message">> := <<"Connection refused">>}}}, emqx_bridge_testlib:op_bridge_api("start", ?BRIDGE_TYPE, ?BRIDGE_NAME) ), ok. t_start_stop(Config) -> #{port := Port} = ?config(http_server, Config), BridgeConfig = bridge_async_config(#{ type => ?BRIDGE_TYPE, name => ?BRIDGE_NAME, port => Port }), emqx_bridge_testlib:t_start_stop( ?BRIDGE_TYPE, ?BRIDGE_NAME, BridgeConfig, emqx_connector_http_stopped ). t_path_not_found(Config) -> ?check_trace( begin #{port := Port, path := Path} = ?config(http_server, Config), MQTTTopic = <<"t/webhook">>, BridgeConfig = bridge_async_config(#{ type => ?BRIDGE_TYPE, name => ?BRIDGE_NAME, local_topic => MQTTTopic, port => Port, path => Path }), {ok, _} = emqx_bridge:create(?BRIDGE_TYPE, ?BRIDGE_NAME, BridgeConfig), Msg = emqx_message:make(MQTTTopic, <<"{}">>), emqx:publish(Msg), wait_http_request(), ?retry( _Interval = 500, _NAttempts = 20, ?assertMatch( #{ counters := #{ matched := 1, failed := 1, success := 0 } }, get_metrics(?BRIDGE_NAME) ) ), ok end, fun(Trace) -> ?assertEqual([], ?of_kind(http_will_retry_async, Trace)), ok end ), ok. t_too_many_requests(Config) -> ?check_trace( begin #{port := Port, path := Path} = ?config(http_server, Config), MQTTTopic = <<"t/webhook">>, BridgeConfig = bridge_async_config(#{ type => ?BRIDGE_TYPE, name => ?BRIDGE_NAME, local_topic => MQTTTopic, port => Port, path => Path }), {ok, _} = emqx_bridge:create(?BRIDGE_TYPE, ?BRIDGE_NAME, BridgeConfig), Msg = emqx_message:make(MQTTTopic, <<"{}">>), emqx:publish(Msg), %% should retry wait_http_request(), wait_http_request(), ?retry( _Interval = 500, _NAttempts = 20, ?assertMatch( #{ counters := #{ matched := 1, failed := 0, success := 1 } }, get_metrics(?BRIDGE_NAME) ) ), ok end, fun(Trace) -> ?assertMatch([_ | _], ?of_kind(http_will_retry_async, Trace)), ok end ), ok. t_rule_action_expired(Config) -> ?check_trace( begin RuleTopic = <<"t/webhook/rule">>, BridgeConfig = bridge_async_config(#{ type => ?BRIDGE_TYPE, name => ?BRIDGE_NAME, host => "non.existent.host", port => 9999, path => <<"/some/path">>, resume_interval => "100ms", connect_timeout => "1s", request_timeout => "100ms", resource_request_ttl => "100ms" }), {ok, _} = emqx_bridge:create(?BRIDGE_TYPE, ?BRIDGE_NAME, BridgeConfig), {ok, #{<<"id">> := RuleId}} = emqx_bridge_testlib:create_rule_and_action_http(?BRIDGE_TYPE, RuleTopic, Config), Msg = emqx_message:make(RuleTopic, <<"timeout">>), emqx:publish(Msg), ?retry( _Interval = 500, _NAttempts = 20, ?assertMatch( #{ counters := #{ matched := 1, failed := 0, dropped := 1 } }, get_metrics(?BRIDGE_NAME) ) ), ?retry( _Interval = 500, _NAttempts = 20, ?assertMatch( #{ counters := #{ matched := 1, 'actions.failed' := 1, 'actions.failed.unknown' := 1, 'actions.total' := 1 } }, emqx_metrics_worker:get_metrics(rule_metrics, RuleId) ) ), ok end, [] ), ok. t_bridge_probes_header_atoms(Config) -> #{port := Port, path := Path} = ?config(http_server, Config), ?check_trace( begin LocalTopic = <<"t/local/topic">>, BridgeConfig0 = bridge_async_config(#{ type => ?BRIDGE_TYPE, name => ?BRIDGE_NAME, port => Port, path => Path, resume_interval => "100ms", connect_timeout => "1s", request_timeout => "100ms", resource_request_ttl => "100ms", local_topic => LocalTopic }), BridgeConfig = BridgeConfig0#{ <<"headers">> => #{ <<"some-non-existent-atom">> => <<"x">> } }, ?assertMatch( {ok, {{_, 204, _}, _Headers, _Body}}, probe_bridge_api(BridgeConfig) ), ?assertMatch( {ok, {{_, 201, _}, _Headers, _Body}}, emqx_bridge_testlib:create_bridge_api( ?BRIDGE_TYPE, ?BRIDGE_NAME, BridgeConfig ) ), Msg = emqx_message:make(LocalTopic, <<"hi">>), emqx:publish(Msg), receive {http, Headers, _Body} -> ?assertMatch(#{<<"some-non-existent-atom">> := <<"x">>}, Headers), ok after 5_000 -> ct:pal("mailbox: ~p", [process_info(self(), messages)]), ct:fail("request not made") end, ok end, [] ), ok. %% helpers do_send_message(Message) -> Type = emqx_bridge_v2:bridge_v1_type_to_bridge_v2_type(?BRIDGE_TYPE), emqx_bridge_v2:send_message(Type, ?BRIDGE_NAME, Message, #{}). do_t_async_retries(TestCase, TestContext, Error, Fn) -> #{error_attempts := ErrorAttempts} = TestContext, PTKey = {?MODULE, TestCase, attempts}, persistent_term:put(PTKey, 0), on_exit(fun() -> persistent_term:erase(PTKey) end), Get = fun() -> persistent_term:get(PTKey) end, GetAndBump = fun() -> Attempts = persistent_term:get(PTKey), persistent_term:put(PTKey, Attempts + 1), Attempts + 1 end, emqx_common_test_helpers:with_mock( emqx_bridge_http_connector, reply_delegator, fun(Context, ReplyFunAndArgs, Result) -> Attempts = GetAndBump(), case Attempts > ErrorAttempts of true -> ct:pal("succeeding ~p : ~p", [Error, Attempts]), meck:passthrough([Context, ReplyFunAndArgs, Result]); false -> ct:pal("failing ~p : ~p", [Error, Attempts]), meck:passthrough([Context, ReplyFunAndArgs, Error]) end end, fun() -> Fn(Get, Error) end ), persistent_term:erase(PTKey), ok. receive_request_notifications(MessageIDs, _ResponseDelay, _Acc) when map_size(MessageIDs) =:= 0 -> ok; receive_request_notifications(MessageIDs, ResponseDelay, Acc) -> receive {http, _Headers, Body} -> RemainingMessageIDs = remove_message_id(MessageIDs, Body), receive_request_notifications(RemainingMessageIDs, ResponseDelay, [Body | Acc]) after (30 * 1000) -> ct:pal("Waited a long time but did not get any message"), ct:pal("Messages received so far:\n ~p", [Acc]), ct:pal("Mailbox:\n ~p", [?drainMailbox()]), ct:fail("All requests did not reach server at least once") end. remove_message_id(MessageIDs, IDBin) -> ID = erlang:binary_to_integer(IDBin), %% It is acceptable to get the same message more than once maps:without([ID], MessageIDs). probe_bridge_api(BridgeConfig) -> emqx_bridge_testlib:probe_bridge_api(?BRIDGE_TYPE, ?BRIDGE_NAME, BridgeConfig).