diff --git a/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl index f8159472b..88636785c 100644 --- a/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl @@ -27,6 +27,7 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -define(BRIDGE_TYPE, <<"webhook">>). -define(BRIDGE_NAME, atom_to_binary(?MODULE)). @@ -60,10 +61,35 @@ init_per_testcase(t_send_async_connection_timeout, Config) -> ResponseDelayMS = 500, Server = start_http_server(#{response_delay_ms => ResponseDelayMS}), [{http_server, Server}, {response_delay_ms, ResponseDelayMS} | Config]; +init_per_testcase(t_path_not_found, Config) -> + HTTPPath = <<"/nonexisting/path">>, + ServerSSLOpts = false, + {ok, {HTTPPort, _Pid}} = emqx_connector_web_hook_server:start_link( + _Port = random, HTTPPath, ServerSSLOpts + ), + ok = emqx_connector_web_hook_server:set_handler(not_found_http_handler()), + [{http_server, #{port => HTTPPort, path => HTTPPath}} | Config]; +init_per_testcase(t_too_many_requests, Config) -> + HTTPPath = <<"/path">>, + ServerSSLOpts = false, + {ok, {HTTPPort, _Pid}} = emqx_connector_web_hook_server:start_link( + _Port = random, HTTPPath, ServerSSLOpts + ), + ok = emqx_connector_web_hook_server:set_handler(too_many_requests_http_handler()), + [{http_server, #{port => HTTPPort, path => HTTPPath}} | Config]; init_per_testcase(_TestCase, Config) -> Server = start_http_server(#{response_delay_ms => 0}), [{http_server, Server} | Config]. +end_per_testcase(TestCase, _Config) when + TestCase =:= t_path_not_found; + TestCase =:= t_too_many_requests +-> + ok = emqx_connector_web_hook_server:stop(), + persistent_term:erase({?MODULE, times_called}), + emqx_bridge_testlib:delete_all_bridges(), + emqx_common_test_helpers:call_janitor(), + ok; end_per_testcase(_TestCase, Config) -> case ?config(http_server, Config) of undefined -> ok; @@ -176,27 +202,37 @@ parse_http_request_assertive(ReqStr0) -> bridge_async_config(#{port := Port} = Config) -> Type = maps:get(type, Config, ?BRIDGE_TYPE), Name = maps:get(name, Config, ?BRIDGE_NAME), + Path = maps:get(path, Config, ""), PoolSize = maps:get(pool_size, Config, 1), QueryMode = maps:get(query_mode, Config, "async"), ConnectTimeout = maps:get(connect_timeout, Config, "1s"), RequestTimeout = maps:get(request_timeout, Config, "10s"), ResumeInterval = maps:get(resume_interval, Config, "1s"), ResourceRequestTTL = maps:get(resource_request_ttl, Config, "infinity"), + LocalTopic = + case maps:find(local_topic, Config) of + {ok, LT} -> + lists:flatten(["local_topic = \"", LT, "\""]); + error -> + "" + end, ConfigString = io_lib:format( "bridges.~s.~s {\n" - " url = \"http://localhost:~p\"\n" + " url = \"http://localhost:~p~s\"\n" " connect_timeout = \"~p\"\n" " enable = true\n" + %% local_topic + " ~s\n" " enable_pipelining = 100\n" " max_retries = 2\n" " method = \"post\"\n" " pool_size = ~p\n" " pool_type = \"random\"\n" " request_timeout = \"~s\"\n" - " body = \"${id}\"" + " body = \"${id}\"\n" " resource_opts {\n" " inflight_window = 100\n" - " health_check_interval = \"15s\"\n" + " health_check_interval = \"200ms\"\n" " max_buffer_bytes = \"1GB\"\n" " query_mode = \"~s\"\n" " request_ttl = \"~p\"\n" @@ -213,7 +249,9 @@ bridge_async_config(#{port := Port} = Config) -> Type, Name, Port, + Path, ConnectTimeout, + LocalTopic, PoolSize, RequestTimeout, QueryMode, @@ -244,6 +282,61 @@ make_bridge(Config) -> ), emqx_bridge_resource:bridge_id(Type, Name). +not_found_http_handler() -> + TestPid = self(), + fun(Req0, State) -> + {ok, Body, Req} = cowboy_req:read_body(Req0), + TestPid ! {http, cowboy_req:headers(Req), Body}, + Rep = cowboy_req:reply( + 404, + #{<<"content-type">> => <<"text/plain">>}, + <<"not found">>, + Req + ), + {ok, Rep, State} + end. + +too_many_requests_http_handler() -> + GetAndBump = + fun() -> + NCalled = persistent_term:get({?MODULE, times_called}, 0), + persistent_term:put({?MODULE, times_called}, NCalled + 1), + NCalled + 1 + end, + TestPid = self(), + fun(Req0, State) -> + N = GetAndBump(), + {ok, Body, Req} = cowboy_req:read_body(Req0), + TestPid ! {http, cowboy_req:headers(Req), Body}, + Rep = + case N >= 2 of + true -> + cowboy_req:reply( + 200, + #{<<"content-type">> => <<"text/plain">>}, + <<"ok">>, + Req + ); + false -> + cowboy_req:reply( + 429, + #{<<"content-type">> => <<"text/plain">>}, + <<"slow down, buddy">>, + Req + ) + end, + {ok, Rep, State} + end. + +wait_http_request() -> + receive + {http, _Headers, _Req} -> + ok + after 1_000 -> + ct:pal("mailbox: ~p", [process_info(self(), messages)]), + ct:fail("http request not made") + end. + %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ @@ -367,6 +460,86 @@ t_start_stop(Config) -> ?BRIDGE_TYPE, ?BRIDGE_NAME, BridgeConfig, emqx_connector_http_stopped ). +t_path_not_found(Config) -> + ?check_trace( + begin + #{port := Port, path := Path} = ?config(http_server, Config), + MQTTTopic = <<"t/webhook">>, + BridgeConfig = bridge_async_config(#{ + type => ?BRIDGE_TYPE, + name => ?BRIDGE_NAME, + local_topic => MQTTTopic, + port => Port, + path => Path + }), + {ok, _} = emqx_bridge:create(?BRIDGE_TYPE, ?BRIDGE_NAME, BridgeConfig), + Msg = emqx_message:make(MQTTTopic, <<"{}">>), + emqx:publish(Msg), + wait_http_request(), + ?retry( + _Interval = 500, + _NAttempts = 20, + ?assertMatch( + #{ + counters := #{ + matched := 1, + failed := 1, + success := 0 + } + }, + emqx_bridge:get_metrics(?BRIDGE_TYPE, ?BRIDGE_NAME) + ) + ), + ok + end, + fun(Trace) -> + ?assertEqual([], ?of_kind(webhook_will_retry_async, Trace)), + ok + end + ), + ok. + +t_too_many_requests(Config) -> + ?check_trace( + begin + #{port := Port, path := Path} = ?config(http_server, Config), + MQTTTopic = <<"t/webhook">>, + BridgeConfig = bridge_async_config(#{ + type => ?BRIDGE_TYPE, + name => ?BRIDGE_NAME, + local_topic => MQTTTopic, + port => Port, + path => Path + }), + {ok, _} = emqx_bridge:create(?BRIDGE_TYPE, ?BRIDGE_NAME, BridgeConfig), + Msg = emqx_message:make(MQTTTopic, <<"{}">>), + emqx:publish(Msg), + %% should retry + wait_http_request(), + wait_http_request(), + ?retry( + _Interval = 500, + _NAttempts = 20, + ?assertMatch( + #{ + counters := #{ + matched := 1, + failed := 0, + success := 1 + } + }, + emqx_bridge:get_metrics(?BRIDGE_TYPE, ?BRIDGE_NAME) + ) + ), + ok + end, + fun(Trace) -> + ?assertMatch([_ | _], ?of_kind(webhook_will_retry_async, Trace)), + ok + end + ), + ok. + %% helpers do_t_async_retries(TestContext, Error, Fn) -> #{error_attempts := ErrorAttempts} = TestContext, diff --git a/apps/emqx_connector/src/emqx_connector_http.erl b/apps/emqx_connector/src/emqx_connector_http.erl index 22024f316..c9438c5bd 100644 --- a/apps/emqx_connector/src/emqx_connector_http.erl +++ b/apps/emqx_connector/src/emqx_connector_http.erl @@ -301,57 +301,23 @@ on_query( ), NRequest = formalize_request(Method, BasePath, Request), Worker = resolve_pool_worker(State, KeyOrNum), - case - ehttpc:request( - Worker, - Method, - NRequest, - Timeout, - Retry - ) - of - {error, Reason} when - Reason =:= econnrefused; - Reason =:= timeout; - Reason =:= {shutdown, normal}; - Reason =:= {shutdown, closed} - -> + Result0 = ehttpc:request( + Worker, + Method, + NRequest, + Timeout, + Retry + ), + Result = transform_result(Result0), + case Result of + {error, {recoverable_error, Reason}} -> ?SLOG(warning, #{ msg => "http_connector_do_request_failed", reason => Reason, connector => InstId }), {error, {recoverable_error, Reason}}; - {error, {closed, _Message} = Reason} -> - %% _Message = "The connection was lost." - ?SLOG(warning, #{ - msg => "http_connector_do_request_failed", - reason => Reason, - connector => InstId - }), - {error, {recoverable_error, Reason}}; - {error, Reason} = Result -> - ?SLOG(error, #{ - msg => "http_connector_do_request_failed", - request => redact(NRequest), - reason => Reason, - connector => InstId - }), - Result; - {ok, StatusCode, _} = Result when StatusCode >= 200 andalso StatusCode < 300 -> - Result; - {ok, StatusCode, _, _} = Result when StatusCode >= 200 andalso StatusCode < 300 -> - Result; - {ok, StatusCode, Headers} -> - ?SLOG(error, #{ - msg => "http connector do request, received error response", - note => "the body will be redacted due to security reasons", - request => redact_request(NRequest), - connector => InstId, - status_code => StatusCode - }), - {error, #{status_code => StatusCode, headers => Headers}}; - {ok, StatusCode, Headers, Body} -> + {error, #{status_code := StatusCode}} -> ?SLOG(error, #{ msg => "http connector do request, received error response.", note => "the body will be redacted due to security reasons", @@ -359,7 +325,17 @@ on_query( connector => InstId, status_code => StatusCode }), - {error, #{status_code => StatusCode, headers => Headers, body => Body}} + Result; + {error, Reason} -> + ?SLOG(error, #{ + msg => "http_connector_do_request_failed", + request => redact(NRequest), + reason => Reason, + connector => InstId + }), + Result; + _Success -> + Result end. on_query_async(InstId, {send_message, Msg}, ReplyFunAndArgs, State) -> @@ -639,8 +615,11 @@ to_bin(Str) when is_list(Str) -> to_bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8). -reply_delegator(Context, ReplyFunAndArgs, Result) -> - spawn(fun() -> maybe_retry(Result, Context, ReplyFunAndArgs) end). +reply_delegator(Context, ReplyFunAndArgs, Result0) -> + spawn(fun() -> + Result = transform_result(Result0), + maybe_retry(Result, Context, ReplyFunAndArgs) + end). transform_result(Result) -> case Result of @@ -657,14 +636,36 @@ transform_result(Result) -> {error, {closed, _Message} = Reason} -> %% _Message = "The connection was lost." {error, {recoverable_error, Reason}}; - _ -> - Result + {error, _Reason} -> + Result; + {ok, StatusCode, _} when StatusCode >= 200 andalso StatusCode < 300 -> + Result; + {ok, StatusCode, _, _} when StatusCode >= 200 andalso StatusCode < 300 -> + Result; + {ok, _TooManyRequests = StatusCode = 429, Headers} -> + {error, {recoverable_error, #{status_code => StatusCode, headers => Headers}}}; + {ok, StatusCode, Headers} -> + {error, {unrecoverable_error, #{status_code => StatusCode, headers => Headers}}}; + {ok, _TooManyRequests = StatusCode = 429, Headers, Body} -> + {error, + {recoverable_error, #{ + status_code => StatusCode, headers => Headers, body => Body + }}}; + {ok, StatusCode, Headers, Body} -> + {error, + {unrecoverable_error, #{ + status_code => StatusCode, headers => Headers, body => Body + }}} end. -maybe_retry(Result0, _Context = #{attempt := N, max_attempts := Max}, ReplyFunAndArgs) when +maybe_retry(Result, _Context = #{attempt := N, max_attempts := Max}, ReplyFunAndArgs) when N >= Max -> - Result = transform_result(Result0), + emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result); +maybe_retry( + {error, {unrecoverable_error, #{status_code := _}}} = Result, _Context, ReplyFunAndArgs +) -> + %% request was successful, but we got an error response; no need to retry emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result); maybe_retry({error, Reason}, Context, ReplyFunAndArgs) -> #{ @@ -676,12 +677,18 @@ maybe_retry({error, Reason}, Context, ReplyFunAndArgs) -> timeout := Timeout } = Context, %% TODO: reset the expiration time for free retries? - IsFreeRetry = Reason =:= normal orelse Reason =:= {shutdown, normal}, + IsFreeRetry = + case Reason of + {recoverable_error, normal} -> true; + {recoverable_error, {shutdown, normal}} -> true; + _ -> false + end, NContext = case IsFreeRetry of true -> Context; false -> Context#{attempt := Attempt + 1} end, + ?tp(webhook_will_retry_async, #{}), Worker = resolve_pool_worker(State, KeyOrNum), ok = ehttpc:request_async( Worker, diff --git a/changes/ce/fix-11162.en.md b/changes/ce/fix-11162.en.md new file mode 100644 index 000000000..39273f41f --- /dev/null +++ b/changes/ce/fix-11162.en.md @@ -0,0 +1 @@ +Fixed an issue in webhook bridge where, in async query mode, HTTP status codes like 4XX and 5XX would be treated as successes in the bridge metrics.