From 59b109eb5cc77d34cbbc6aa749f7dff09a8c5b6c Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 29 Jun 2023 12:59:24 -0300 Subject: [PATCH 1/2] fix(webhook): treat 404 and other error replies as errors in async requests Fixes https://emqx.atlassian.net/browse/EMQX-10405 The problem here was that, for async requests, ehttpc responses of the form `{ok, 4__, _, _}` and similar were being treated as successes. --- .../test/emqx_bridge_webhook_SUITE.erl | 89 ++++++++++++++- .../src/emqx_connector_http.erl | 106 +++++++++--------- changes/ce/fix-11162.en.md | 1 + 3 files changed, 141 insertions(+), 55 deletions(-) create mode 100644 changes/ce/fix-11162.en.md diff --git a/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl index f8159472b..d96754085 100644 --- a/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl @@ -27,6 +27,7 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -define(BRIDGE_TYPE, <<"webhook">>). -define(BRIDGE_NAME, atom_to_binary(?MODULE)). @@ -60,10 +61,23 @@ init_per_testcase(t_send_async_connection_timeout, Config) -> ResponseDelayMS = 500, Server = start_http_server(#{response_delay_ms => ResponseDelayMS}), [{http_server, Server}, {response_delay_ms, ResponseDelayMS} | Config]; +init_per_testcase(t_path_not_found, Config) -> + HTTPPath = <<"/nonexisting/path">>, + ServerSSLOpts = false, + {ok, {HTTPPort, _Pid}} = emqx_connector_web_hook_server:start_link( + _Port = random, HTTPPath, ServerSSLOpts + ), + ok = emqx_connector_web_hook_server:set_handler(not_found_http_handler()), + [{http_server, #{port => HTTPPort, path => HTTPPath}} | Config]; init_per_testcase(_TestCase, Config) -> Server = start_http_server(#{response_delay_ms => 0}), [{http_server, Server} | Config]. +end_per_testcase(t_path_not_found, _Config) -> + ok = emqx_connector_web_hook_server:stop(), + emqx_bridge_testlib:delete_all_bridges(), + emqx_common_test_helpers:call_janitor(), + ok; end_per_testcase(_TestCase, Config) -> case ?config(http_server, Config) of undefined -> ok; @@ -176,24 +190,34 @@ parse_http_request_assertive(ReqStr0) -> bridge_async_config(#{port := Port} = Config) -> Type = maps:get(type, Config, ?BRIDGE_TYPE), Name = maps:get(name, Config, ?BRIDGE_NAME), + Path = maps:get(path, Config, ""), PoolSize = maps:get(pool_size, Config, 1), QueryMode = maps:get(query_mode, Config, "async"), ConnectTimeout = maps:get(connect_timeout, Config, "1s"), RequestTimeout = maps:get(request_timeout, Config, "10s"), ResumeInterval = maps:get(resume_interval, Config, "1s"), ResourceRequestTTL = maps:get(resource_request_ttl, Config, "infinity"), + LocalTopic = + case maps:find(local_topic, Config) of + {ok, LT} -> + lists:flatten(["local_topic = \"", LT, "\""]); + error -> + "" + end, ConfigString = io_lib:format( "bridges.~s.~s {\n" - " url = \"http://localhost:~p\"\n" + " url = \"http://localhost:~p~s\"\n" " connect_timeout = \"~p\"\n" " enable = true\n" + %% local_topic + " ~s\n" " enable_pipelining = 100\n" " max_retries = 2\n" " method = \"post\"\n" " pool_size = ~p\n" " pool_type = \"random\"\n" " request_timeout = \"~s\"\n" - " body = \"${id}\"" + " body = \"${id}\"\n" " resource_opts {\n" " inflight_window = 100\n" " health_check_interval = \"15s\"\n" @@ -213,7 +237,9 @@ bridge_async_config(#{port := Port} = Config) -> Type, Name, Port, + Path, ConnectTimeout, + LocalTopic, PoolSize, RequestTimeout, QueryMode, @@ -244,6 +270,20 @@ make_bridge(Config) -> ), emqx_bridge_resource:bridge_id(Type, Name). +not_found_http_handler() -> + TestPid = self(), + fun(Req0, State) -> + {ok, Body, Req} = cowboy_req:read_body(Req0), + TestPid ! {http, cowboy_req:headers(Req), Body}, + Rep = cowboy_req:reply( + 404, + #{<<"content-type">> => <<"text/plain">>}, + <<"not found">>, + Req + ), + {ok, Rep, State} + end. + %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ @@ -367,6 +407,51 @@ t_start_stop(Config) -> ?BRIDGE_TYPE, ?BRIDGE_NAME, BridgeConfig, emqx_connector_http_stopped ). +t_path_not_found(Config) -> + ?check_trace( + begin + #{port := Port, path := Path} = ?config(http_server, Config), + MQTTTopic = <<"t/webhook">>, + BridgeConfig = bridge_async_config(#{ + type => ?BRIDGE_TYPE, + name => ?BRIDGE_NAME, + local_topic => MQTTTopic, + port => Port, + path => Path + }), + {ok, _} = emqx_bridge:create(?BRIDGE_TYPE, ?BRIDGE_NAME, BridgeConfig), + Msg = emqx_message:make(MQTTTopic, <<"{}">>), + emqx:publish(Msg), + receive + {http, _Headers, _Req} -> + ok + after 1_000 -> + ct:pal("mailbox: ~p", [process_info(self(), messages)]), + ct:fail("http request not made") + end, + ?retry( + _Interval = 500, + _NAttempts = 20, + ?assertMatch( + #{ + counters := #{ + matched := 1, + failed := 1, + success := 0 + } + }, + emqx_bridge:get_metrics(?BRIDGE_TYPE, ?BRIDGE_NAME) + ) + ), + ok + end, + fun(Trace) -> + ?assertEqual([], ?of_kind(webhook_will_retry_async, Trace)), + ok + end + ), + ok. + %% helpers do_t_async_retries(TestContext, Error, Fn) -> #{error_attempts := ErrorAttempts} = TestContext, diff --git a/apps/emqx_connector/src/emqx_connector_http.erl b/apps/emqx_connector/src/emqx_connector_http.erl index 22024f316..dba15b335 100644 --- a/apps/emqx_connector/src/emqx_connector_http.erl +++ b/apps/emqx_connector/src/emqx_connector_http.erl @@ -301,57 +301,23 @@ on_query( ), NRequest = formalize_request(Method, BasePath, Request), Worker = resolve_pool_worker(State, KeyOrNum), - case - ehttpc:request( - Worker, - Method, - NRequest, - Timeout, - Retry - ) - of - {error, Reason} when - Reason =:= econnrefused; - Reason =:= timeout; - Reason =:= {shutdown, normal}; - Reason =:= {shutdown, closed} - -> + Result0 = ehttpc:request( + Worker, + Method, + NRequest, + Timeout, + Retry + ), + Result = transform_result(Result0), + case Result of + {error, {recoverable_error, Reason}} -> ?SLOG(warning, #{ msg => "http_connector_do_request_failed", reason => Reason, connector => InstId }), {error, {recoverable_error, Reason}}; - {error, {closed, _Message} = Reason} -> - %% _Message = "The connection was lost." - ?SLOG(warning, #{ - msg => "http_connector_do_request_failed", - reason => Reason, - connector => InstId - }), - {error, {recoverable_error, Reason}}; - {error, Reason} = Result -> - ?SLOG(error, #{ - msg => "http_connector_do_request_failed", - request => redact(NRequest), - reason => Reason, - connector => InstId - }), - Result; - {ok, StatusCode, _} = Result when StatusCode >= 200 andalso StatusCode < 300 -> - Result; - {ok, StatusCode, _, _} = Result when StatusCode >= 200 andalso StatusCode < 300 -> - Result; - {ok, StatusCode, Headers} -> - ?SLOG(error, #{ - msg => "http connector do request, received error response", - note => "the body will be redacted due to security reasons", - request => redact_request(NRequest), - connector => InstId, - status_code => StatusCode - }), - {error, #{status_code => StatusCode, headers => Headers}}; - {ok, StatusCode, Headers, Body} -> + {error, #{status_code := StatusCode}} -> ?SLOG(error, #{ msg => "http connector do request, received error response.", note => "the body will be redacted due to security reasons", @@ -359,7 +325,17 @@ on_query( connector => InstId, status_code => StatusCode }), - {error, #{status_code => StatusCode, headers => Headers, body => Body}} + Result; + {error, Reason} -> + ?SLOG(error, #{ + msg => "http_connector_do_request_failed", + request => redact(NRequest), + reason => Reason, + connector => InstId + }), + Result; + _Success -> + Result end. on_query_async(InstId, {send_message, Msg}, ReplyFunAndArgs, State) -> @@ -639,8 +615,11 @@ to_bin(Str) when is_list(Str) -> to_bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8). -reply_delegator(Context, ReplyFunAndArgs, Result) -> - spawn(fun() -> maybe_retry(Result, Context, ReplyFunAndArgs) end). +reply_delegator(Context, ReplyFunAndArgs, Result0) -> + spawn(fun() -> + Result = transform_result(Result0), + maybe_retry(Result, Context, ReplyFunAndArgs) + end). transform_result(Result) -> case Result of @@ -657,14 +636,29 @@ transform_result(Result) -> {error, {closed, _Message} = Reason} -> %% _Message = "The connection was lost." {error, {recoverable_error, Reason}}; - _ -> - Result + {error, _Reason} -> + Result; + {ok, StatusCode, _} when StatusCode >= 200 andalso StatusCode < 300 -> + Result; + {ok, StatusCode, _, _} when StatusCode >= 200 andalso StatusCode < 300 -> + Result; + {ok, StatusCode, Headers} -> + {error, {unrecoverable_error, #{status_code => StatusCode, headers => Headers}}}; + {ok, StatusCode, Headers, Body} -> + {error, + {unrecoverable_error, #{ + status_code => StatusCode, headers => Headers, body => Body + }}} end. -maybe_retry(Result0, _Context = #{attempt := N, max_attempts := Max}, ReplyFunAndArgs) when +maybe_retry(Result, _Context = #{attempt := N, max_attempts := Max}, ReplyFunAndArgs) when N >= Max -> - Result = transform_result(Result0), + emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result); +maybe_retry( + {error, {unrecoverable_error, #{status_code := _}}} = Result, _Context, ReplyFunAndArgs +) -> + %% request was successful, but we got an error response; no need to retry emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result); maybe_retry({error, Reason}, Context, ReplyFunAndArgs) -> #{ @@ -676,12 +670,18 @@ maybe_retry({error, Reason}, Context, ReplyFunAndArgs) -> timeout := Timeout } = Context, %% TODO: reset the expiration time for free retries? - IsFreeRetry = Reason =:= normal orelse Reason =:= {shutdown, normal}, + IsFreeRetry = + case Reason of + {recoverable_error, normal} -> true; + {recoverable_error, {shutdown, normal}} -> true; + _ -> false + end, NContext = case IsFreeRetry of true -> Context; false -> Context#{attempt := Attempt + 1} end, + ?tp(webhook_will_retry_async, #{}), Worker = resolve_pool_worker(State, KeyOrNum), ok = ehttpc:request_async( Worker, diff --git a/changes/ce/fix-11162.en.md b/changes/ce/fix-11162.en.md new file mode 100644 index 000000000..39273f41f --- /dev/null +++ b/changes/ce/fix-11162.en.md @@ -0,0 +1 @@ +Fixed an issue in webhook bridge where, in async query mode, HTTP status codes like 4XX and 5XX would be treated as successes in the bridge metrics. From ca435975de45e2ceb3eb01105356179cf45c492f Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Fri, 30 Jun 2023 09:46:03 -0300 Subject: [PATCH 2/2] fix(webhook): treat http status code 429 as recoverable --- .../test/emqx_bridge_webhook_SUITE.erl | 106 ++++++++++++++++-- .../src/emqx_connector_http.erl | 7 ++ 2 files changed, 104 insertions(+), 9 deletions(-) diff --git a/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl index d96754085..88636785c 100644 --- a/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl @@ -69,12 +69,24 @@ init_per_testcase(t_path_not_found, Config) -> ), ok = emqx_connector_web_hook_server:set_handler(not_found_http_handler()), [{http_server, #{port => HTTPPort, path => HTTPPath}} | Config]; +init_per_testcase(t_too_many_requests, Config) -> + HTTPPath = <<"/path">>, + ServerSSLOpts = false, + {ok, {HTTPPort, _Pid}} = emqx_connector_web_hook_server:start_link( + _Port = random, HTTPPath, ServerSSLOpts + ), + ok = emqx_connector_web_hook_server:set_handler(too_many_requests_http_handler()), + [{http_server, #{port => HTTPPort, path => HTTPPath}} | Config]; init_per_testcase(_TestCase, Config) -> Server = start_http_server(#{response_delay_ms => 0}), [{http_server, Server} | Config]. -end_per_testcase(t_path_not_found, _Config) -> +end_per_testcase(TestCase, _Config) when + TestCase =:= t_path_not_found; + TestCase =:= t_too_many_requests +-> ok = emqx_connector_web_hook_server:stop(), + persistent_term:erase({?MODULE, times_called}), emqx_bridge_testlib:delete_all_bridges(), emqx_common_test_helpers:call_janitor(), ok; @@ -220,7 +232,7 @@ bridge_async_config(#{port := Port} = Config) -> " body = \"${id}\"\n" " resource_opts {\n" " inflight_window = 100\n" - " health_check_interval = \"15s\"\n" + " health_check_interval = \"200ms\"\n" " max_buffer_bytes = \"1GB\"\n" " query_mode = \"~s\"\n" " request_ttl = \"~p\"\n" @@ -284,6 +296,47 @@ not_found_http_handler() -> {ok, Rep, State} end. +too_many_requests_http_handler() -> + GetAndBump = + fun() -> + NCalled = persistent_term:get({?MODULE, times_called}, 0), + persistent_term:put({?MODULE, times_called}, NCalled + 1), + NCalled + 1 + end, + TestPid = self(), + fun(Req0, State) -> + N = GetAndBump(), + {ok, Body, Req} = cowboy_req:read_body(Req0), + TestPid ! {http, cowboy_req:headers(Req), Body}, + Rep = + case N >= 2 of + true -> + cowboy_req:reply( + 200, + #{<<"content-type">> => <<"text/plain">>}, + <<"ok">>, + Req + ); + false -> + cowboy_req:reply( + 429, + #{<<"content-type">> => <<"text/plain">>}, + <<"slow down, buddy">>, + Req + ) + end, + {ok, Rep, State} + end. + +wait_http_request() -> + receive + {http, _Headers, _Req} -> + ok + after 1_000 -> + ct:pal("mailbox: ~p", [process_info(self(), messages)]), + ct:fail("http request not made") + end. + %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ @@ -422,13 +475,7 @@ t_path_not_found(Config) -> {ok, _} = emqx_bridge:create(?BRIDGE_TYPE, ?BRIDGE_NAME, BridgeConfig), Msg = emqx_message:make(MQTTTopic, <<"{}">>), emqx:publish(Msg), - receive - {http, _Headers, _Req} -> - ok - after 1_000 -> - ct:pal("mailbox: ~p", [process_info(self(), messages)]), - ct:fail("http request not made") - end, + wait_http_request(), ?retry( _Interval = 500, _NAttempts = 20, @@ -452,6 +499,47 @@ t_path_not_found(Config) -> ), ok. +t_too_many_requests(Config) -> + ?check_trace( + begin + #{port := Port, path := Path} = ?config(http_server, Config), + MQTTTopic = <<"t/webhook">>, + BridgeConfig = bridge_async_config(#{ + type => ?BRIDGE_TYPE, + name => ?BRIDGE_NAME, + local_topic => MQTTTopic, + port => Port, + path => Path + }), + {ok, _} = emqx_bridge:create(?BRIDGE_TYPE, ?BRIDGE_NAME, BridgeConfig), + Msg = emqx_message:make(MQTTTopic, <<"{}">>), + emqx:publish(Msg), + %% should retry + wait_http_request(), + wait_http_request(), + ?retry( + _Interval = 500, + _NAttempts = 20, + ?assertMatch( + #{ + counters := #{ + matched := 1, + failed := 0, + success := 1 + } + }, + emqx_bridge:get_metrics(?BRIDGE_TYPE, ?BRIDGE_NAME) + ) + ), + ok + end, + fun(Trace) -> + ?assertMatch([_ | _], ?of_kind(webhook_will_retry_async, Trace)), + ok + end + ), + ok. + %% helpers do_t_async_retries(TestContext, Error, Fn) -> #{error_attempts := ErrorAttempts} = TestContext, diff --git a/apps/emqx_connector/src/emqx_connector_http.erl b/apps/emqx_connector/src/emqx_connector_http.erl index dba15b335..c9438c5bd 100644 --- a/apps/emqx_connector/src/emqx_connector_http.erl +++ b/apps/emqx_connector/src/emqx_connector_http.erl @@ -642,8 +642,15 @@ transform_result(Result) -> Result; {ok, StatusCode, _, _} when StatusCode >= 200 andalso StatusCode < 300 -> Result; + {ok, _TooManyRequests = StatusCode = 429, Headers} -> + {error, {recoverable_error, #{status_code => StatusCode, headers => Headers}}}; {ok, StatusCode, Headers} -> {error, {unrecoverable_error, #{status_code => StatusCode, headers => Headers}}}; + {ok, _TooManyRequests = StatusCode = 429, Headers, Body} -> + {error, + {recoverable_error, #{ + status_code => StatusCode, headers => Headers, body => Body + }}}; {ok, StatusCode, Headers, Body} -> {error, {unrecoverable_error, #{