diff --git a/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl index d96754085..88636785c 100644 --- a/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl @@ -69,12 +69,24 @@ init_per_testcase(t_path_not_found, Config) -> ), ok = emqx_connector_web_hook_server:set_handler(not_found_http_handler()), [{http_server, #{port => HTTPPort, path => HTTPPath}} | Config]; +init_per_testcase(t_too_many_requests, Config) -> + HTTPPath = <<"/path">>, + ServerSSLOpts = false, + {ok, {HTTPPort, _Pid}} = emqx_connector_web_hook_server:start_link( + _Port = random, HTTPPath, ServerSSLOpts + ), + ok = emqx_connector_web_hook_server:set_handler(too_many_requests_http_handler()), + [{http_server, #{port => HTTPPort, path => HTTPPath}} | Config]; init_per_testcase(_TestCase, Config) -> Server = start_http_server(#{response_delay_ms => 0}), [{http_server, Server} | Config]. -end_per_testcase(t_path_not_found, _Config) -> +end_per_testcase(TestCase, _Config) when + TestCase =:= t_path_not_found; + TestCase =:= t_too_many_requests +-> ok = emqx_connector_web_hook_server:stop(), + persistent_term:erase({?MODULE, times_called}), emqx_bridge_testlib:delete_all_bridges(), emqx_common_test_helpers:call_janitor(), ok; @@ -220,7 +232,7 @@ bridge_async_config(#{port := Port} = Config) -> " body = \"${id}\"\n" " resource_opts {\n" " inflight_window = 100\n" - " health_check_interval = \"15s\"\n" + " health_check_interval = \"200ms\"\n" " max_buffer_bytes = \"1GB\"\n" " query_mode = \"~s\"\n" " request_ttl = \"~p\"\n" @@ -284,6 +296,47 @@ not_found_http_handler() -> {ok, Rep, State} end. +too_many_requests_http_handler() -> + GetAndBump = + fun() -> + NCalled = persistent_term:get({?MODULE, times_called}, 0), + persistent_term:put({?MODULE, times_called}, NCalled + 1), + NCalled + 1 + end, + TestPid = self(), + fun(Req0, State) -> + N = GetAndBump(), + {ok, Body, Req} = cowboy_req:read_body(Req0), + TestPid ! {http, cowboy_req:headers(Req), Body}, + Rep = + case N >= 2 of + true -> + cowboy_req:reply( + 200, + #{<<"content-type">> => <<"text/plain">>}, + <<"ok">>, + Req + ); + false -> + cowboy_req:reply( + 429, + #{<<"content-type">> => <<"text/plain">>}, + <<"slow down, buddy">>, + Req + ) + end, + {ok, Rep, State} + end. + +wait_http_request() -> + receive + {http, _Headers, _Req} -> + ok + after 1_000 -> + ct:pal("mailbox: ~p", [process_info(self(), messages)]), + ct:fail("http request not made") + end. + %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ @@ -422,13 +475,7 @@ t_path_not_found(Config) -> {ok, _} = emqx_bridge:create(?BRIDGE_TYPE, ?BRIDGE_NAME, BridgeConfig), Msg = emqx_message:make(MQTTTopic, <<"{}">>), emqx:publish(Msg), - receive - {http, _Headers, _Req} -> - ok - after 1_000 -> - ct:pal("mailbox: ~p", [process_info(self(), messages)]), - ct:fail("http request not made") - end, + wait_http_request(), ?retry( _Interval = 500, _NAttempts = 20, @@ -452,6 +499,47 @@ t_path_not_found(Config) -> ), ok. +t_too_many_requests(Config) -> + ?check_trace( + begin + #{port := Port, path := Path} = ?config(http_server, Config), + MQTTTopic = <<"t/webhook">>, + BridgeConfig = bridge_async_config(#{ + type => ?BRIDGE_TYPE, + name => ?BRIDGE_NAME, + local_topic => MQTTTopic, + port => Port, + path => Path + }), + {ok, _} = emqx_bridge:create(?BRIDGE_TYPE, ?BRIDGE_NAME, BridgeConfig), + Msg = emqx_message:make(MQTTTopic, <<"{}">>), + emqx:publish(Msg), + %% should retry + wait_http_request(), + wait_http_request(), + ?retry( + _Interval = 500, + _NAttempts = 20, + ?assertMatch( + #{ + counters := #{ + matched := 1, + failed := 0, + success := 1 + } + }, + emqx_bridge:get_metrics(?BRIDGE_TYPE, ?BRIDGE_NAME) + ) + ), + ok + end, + fun(Trace) -> + ?assertMatch([_ | _], ?of_kind(webhook_will_retry_async, Trace)), + ok + end + ), + ok. + %% helpers do_t_async_retries(TestContext, Error, Fn) -> #{error_attempts := ErrorAttempts} = TestContext, diff --git a/apps/emqx_connector/src/emqx_connector_http.erl b/apps/emqx_connector/src/emqx_connector_http.erl index dba15b335..c9438c5bd 100644 --- a/apps/emqx_connector/src/emqx_connector_http.erl +++ b/apps/emqx_connector/src/emqx_connector_http.erl @@ -642,8 +642,15 @@ transform_result(Result) -> Result; {ok, StatusCode, _, _} when StatusCode >= 200 andalso StatusCode < 300 -> Result; + {ok, _TooManyRequests = StatusCode = 429, Headers} -> + {error, {recoverable_error, #{status_code => StatusCode, headers => Headers}}}; {ok, StatusCode, Headers} -> {error, {unrecoverable_error, #{status_code => StatusCode, headers => Headers}}}; + {ok, _TooManyRequests = StatusCode = 429, Headers, Body} -> + {error, + {recoverable_error, #{ + status_code => StatusCode, headers => Headers, body => Body + }}}; {ok, StatusCode, Headers, Body} -> {error, {unrecoverable_error, #{