Merge pull request #11162 from thalesmg/treat-404-as-failure-master

fix(webhook): treat 404 and other error replies as errors in async requests
This commit is contained in:
Thales Macedo Garitezi 2023-06-30 13:13:18 -03:00 committed by GitHub
commit 6fe6aa7997
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 237 additions and 56 deletions

View File

@ -27,6 +27,7 @@
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-define(BRIDGE_TYPE, <<"webhook">>).
-define(BRIDGE_NAME, atom_to_binary(?MODULE)).
@ -60,10 +61,35 @@ init_per_testcase(t_send_async_connection_timeout, Config) ->
ResponseDelayMS = 500,
Server = start_http_server(#{response_delay_ms => ResponseDelayMS}),
[{http_server, Server}, {response_delay_ms, ResponseDelayMS} | Config];
init_per_testcase(t_path_not_found, Config) ->
HTTPPath = <<"/nonexisting/path">>,
ServerSSLOpts = false,
{ok, {HTTPPort, _Pid}} = emqx_connector_web_hook_server:start_link(
_Port = random, HTTPPath, ServerSSLOpts
),
ok = emqx_connector_web_hook_server:set_handler(not_found_http_handler()),
[{http_server, #{port => HTTPPort, path => HTTPPath}} | Config];
init_per_testcase(t_too_many_requests, Config) ->
HTTPPath = <<"/path">>,
ServerSSLOpts = false,
{ok, {HTTPPort, _Pid}} = emqx_connector_web_hook_server:start_link(
_Port = random, HTTPPath, ServerSSLOpts
),
ok = emqx_connector_web_hook_server:set_handler(too_many_requests_http_handler()),
[{http_server, #{port => HTTPPort, path => HTTPPath}} | Config];
init_per_testcase(_TestCase, Config) ->
Server = start_http_server(#{response_delay_ms => 0}),
[{http_server, Server} | Config].
end_per_testcase(TestCase, _Config) when
TestCase =:= t_path_not_found;
TestCase =:= t_too_many_requests
->
ok = emqx_connector_web_hook_server:stop(),
persistent_term:erase({?MODULE, times_called}),
emqx_bridge_testlib:delete_all_bridges(),
emqx_common_test_helpers:call_janitor(),
ok;
end_per_testcase(_TestCase, Config) ->
case ?config(http_server, Config) of
undefined -> ok;
@ -176,27 +202,37 @@ parse_http_request_assertive(ReqStr0) ->
bridge_async_config(#{port := Port} = Config) ->
Type = maps:get(type, Config, ?BRIDGE_TYPE),
Name = maps:get(name, Config, ?BRIDGE_NAME),
Path = maps:get(path, Config, ""),
PoolSize = maps:get(pool_size, Config, 1),
QueryMode = maps:get(query_mode, Config, "async"),
ConnectTimeout = maps:get(connect_timeout, Config, "1s"),
RequestTimeout = maps:get(request_timeout, Config, "10s"),
ResumeInterval = maps:get(resume_interval, Config, "1s"),
ResourceRequestTTL = maps:get(resource_request_ttl, Config, "infinity"),
LocalTopic =
case maps:find(local_topic, Config) of
{ok, LT} ->
lists:flatten(["local_topic = \"", LT, "\""]);
error ->
""
end,
ConfigString = io_lib:format(
"bridges.~s.~s {\n"
" url = \"http://localhost:~p\"\n"
" url = \"http://localhost:~p~s\"\n"
" connect_timeout = \"~p\"\n"
" enable = true\n"
%% local_topic
" ~s\n"
" enable_pipelining = 100\n"
" max_retries = 2\n"
" method = \"post\"\n"
" pool_size = ~p\n"
" pool_type = \"random\"\n"
" request_timeout = \"~s\"\n"
" body = \"${id}\""
" body = \"${id}\"\n"
" resource_opts {\n"
" inflight_window = 100\n"
" health_check_interval = \"15s\"\n"
" health_check_interval = \"200ms\"\n"
" max_buffer_bytes = \"1GB\"\n"
" query_mode = \"~s\"\n"
" request_ttl = \"~p\"\n"
@ -213,7 +249,9 @@ bridge_async_config(#{port := Port} = Config) ->
Type,
Name,
Port,
Path,
ConnectTimeout,
LocalTopic,
PoolSize,
RequestTimeout,
QueryMode,
@ -244,6 +282,61 @@ make_bridge(Config) ->
),
emqx_bridge_resource:bridge_id(Type, Name).
not_found_http_handler() ->
TestPid = self(),
fun(Req0, State) ->
{ok, Body, Req} = cowboy_req:read_body(Req0),
TestPid ! {http, cowboy_req:headers(Req), Body},
Rep = cowboy_req:reply(
404,
#{<<"content-type">> => <<"text/plain">>},
<<"not found">>,
Req
),
{ok, Rep, State}
end.
too_many_requests_http_handler() ->
GetAndBump =
fun() ->
NCalled = persistent_term:get({?MODULE, times_called}, 0),
persistent_term:put({?MODULE, times_called}, NCalled + 1),
NCalled + 1
end,
TestPid = self(),
fun(Req0, State) ->
N = GetAndBump(),
{ok, Body, Req} = cowboy_req:read_body(Req0),
TestPid ! {http, cowboy_req:headers(Req), Body},
Rep =
case N >= 2 of
true ->
cowboy_req:reply(
200,
#{<<"content-type">> => <<"text/plain">>},
<<"ok">>,
Req
);
false ->
cowboy_req:reply(
429,
#{<<"content-type">> => <<"text/plain">>},
<<"slow down, buddy">>,
Req
)
end,
{ok, Rep, State}
end.
wait_http_request() ->
receive
{http, _Headers, _Req} ->
ok
after 1_000 ->
ct:pal("mailbox: ~p", [process_info(self(), messages)]),
ct:fail("http request not made")
end.
%%------------------------------------------------------------------------------
%% Testcases
%%------------------------------------------------------------------------------
@ -367,6 +460,86 @@ t_start_stop(Config) ->
?BRIDGE_TYPE, ?BRIDGE_NAME, BridgeConfig, emqx_connector_http_stopped
).
t_path_not_found(Config) ->
?check_trace(
begin
#{port := Port, path := Path} = ?config(http_server, Config),
MQTTTopic = <<"t/webhook">>,
BridgeConfig = bridge_async_config(#{
type => ?BRIDGE_TYPE,
name => ?BRIDGE_NAME,
local_topic => MQTTTopic,
port => Port,
path => Path
}),
{ok, _} = emqx_bridge:create(?BRIDGE_TYPE, ?BRIDGE_NAME, BridgeConfig),
Msg = emqx_message:make(MQTTTopic, <<"{}">>),
emqx:publish(Msg),
wait_http_request(),
?retry(
_Interval = 500,
_NAttempts = 20,
?assertMatch(
#{
counters := #{
matched := 1,
failed := 1,
success := 0
}
},
emqx_bridge:get_metrics(?BRIDGE_TYPE, ?BRIDGE_NAME)
)
),
ok
end,
fun(Trace) ->
?assertEqual([], ?of_kind(webhook_will_retry_async, Trace)),
ok
end
),
ok.
t_too_many_requests(Config) ->
?check_trace(
begin
#{port := Port, path := Path} = ?config(http_server, Config),
MQTTTopic = <<"t/webhook">>,
BridgeConfig = bridge_async_config(#{
type => ?BRIDGE_TYPE,
name => ?BRIDGE_NAME,
local_topic => MQTTTopic,
port => Port,
path => Path
}),
{ok, _} = emqx_bridge:create(?BRIDGE_TYPE, ?BRIDGE_NAME, BridgeConfig),
Msg = emqx_message:make(MQTTTopic, <<"{}">>),
emqx:publish(Msg),
%% should retry
wait_http_request(),
wait_http_request(),
?retry(
_Interval = 500,
_NAttempts = 20,
?assertMatch(
#{
counters := #{
matched := 1,
failed := 0,
success := 1
}
},
emqx_bridge:get_metrics(?BRIDGE_TYPE, ?BRIDGE_NAME)
)
),
ok
end,
fun(Trace) ->
?assertMatch([_ | _], ?of_kind(webhook_will_retry_async, Trace)),
ok
end
),
ok.
%% helpers
do_t_async_retries(TestContext, Error, Fn) ->
#{error_attempts := ErrorAttempts} = TestContext,

View File

@ -301,57 +301,23 @@ on_query(
),
NRequest = formalize_request(Method, BasePath, Request),
Worker = resolve_pool_worker(State, KeyOrNum),
case
ehttpc:request(
Worker,
Method,
NRequest,
Timeout,
Retry
)
of
{error, Reason} when
Reason =:= econnrefused;
Reason =:= timeout;
Reason =:= {shutdown, normal};
Reason =:= {shutdown, closed}
->
Result0 = ehttpc:request(
Worker,
Method,
NRequest,
Timeout,
Retry
),
Result = transform_result(Result0),
case Result of
{error, {recoverable_error, Reason}} ->
?SLOG(warning, #{
msg => "http_connector_do_request_failed",
reason => Reason,
connector => InstId
}),
{error, {recoverable_error, Reason}};
{error, {closed, _Message} = Reason} ->
%% _Message = "The connection was lost."
?SLOG(warning, #{
msg => "http_connector_do_request_failed",
reason => Reason,
connector => InstId
}),
{error, {recoverable_error, Reason}};
{error, Reason} = Result ->
?SLOG(error, #{
msg => "http_connector_do_request_failed",
request => redact(NRequest),
reason => Reason,
connector => InstId
}),
Result;
{ok, StatusCode, _} = Result when StatusCode >= 200 andalso StatusCode < 300 ->
Result;
{ok, StatusCode, _, _} = Result when StatusCode >= 200 andalso StatusCode < 300 ->
Result;
{ok, StatusCode, Headers} ->
?SLOG(error, #{
msg => "http connector do request, received error response",
note => "the body will be redacted due to security reasons",
request => redact_request(NRequest),
connector => InstId,
status_code => StatusCode
}),
{error, #{status_code => StatusCode, headers => Headers}};
{ok, StatusCode, Headers, Body} ->
{error, #{status_code := StatusCode}} ->
?SLOG(error, #{
msg => "http connector do request, received error response.",
note => "the body will be redacted due to security reasons",
@ -359,7 +325,17 @@ on_query(
connector => InstId,
status_code => StatusCode
}),
{error, #{status_code => StatusCode, headers => Headers, body => Body}}
Result;
{error, Reason} ->
?SLOG(error, #{
msg => "http_connector_do_request_failed",
request => redact(NRequest),
reason => Reason,
connector => InstId
}),
Result;
_Success ->
Result
end.
on_query_async(InstId, {send_message, Msg}, ReplyFunAndArgs, State) ->
@ -639,8 +615,11 @@ to_bin(Str) when is_list(Str) ->
to_bin(Atom) when is_atom(Atom) ->
atom_to_binary(Atom, utf8).
reply_delegator(Context, ReplyFunAndArgs, Result) ->
spawn(fun() -> maybe_retry(Result, Context, ReplyFunAndArgs) end).
reply_delegator(Context, ReplyFunAndArgs, Result0) ->
spawn(fun() ->
Result = transform_result(Result0),
maybe_retry(Result, Context, ReplyFunAndArgs)
end).
transform_result(Result) ->
case Result of
@ -657,14 +636,36 @@ transform_result(Result) ->
{error, {closed, _Message} = Reason} ->
%% _Message = "The connection was lost."
{error, {recoverable_error, Reason}};
_ ->
Result
{error, _Reason} ->
Result;
{ok, StatusCode, _} when StatusCode >= 200 andalso StatusCode < 300 ->
Result;
{ok, StatusCode, _, _} when StatusCode >= 200 andalso StatusCode < 300 ->
Result;
{ok, _TooManyRequests = StatusCode = 429, Headers} ->
{error, {recoverable_error, #{status_code => StatusCode, headers => Headers}}};
{ok, StatusCode, Headers} ->
{error, {unrecoverable_error, #{status_code => StatusCode, headers => Headers}}};
{ok, _TooManyRequests = StatusCode = 429, Headers, Body} ->
{error,
{recoverable_error, #{
status_code => StatusCode, headers => Headers, body => Body
}}};
{ok, StatusCode, Headers, Body} ->
{error,
{unrecoverable_error, #{
status_code => StatusCode, headers => Headers, body => Body
}}}
end.
maybe_retry(Result0, _Context = #{attempt := N, max_attempts := Max}, ReplyFunAndArgs) when
maybe_retry(Result, _Context = #{attempt := N, max_attempts := Max}, ReplyFunAndArgs) when
N >= Max
->
Result = transform_result(Result0),
emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result);
maybe_retry(
{error, {unrecoverable_error, #{status_code := _}}} = Result, _Context, ReplyFunAndArgs
) ->
%% request was successful, but we got an error response; no need to retry
emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result);
maybe_retry({error, Reason}, Context, ReplyFunAndArgs) ->
#{
@ -676,12 +677,18 @@ maybe_retry({error, Reason}, Context, ReplyFunAndArgs) ->
timeout := Timeout
} = Context,
%% TODO: reset the expiration time for free retries?
IsFreeRetry = Reason =:= normal orelse Reason =:= {shutdown, normal},
IsFreeRetry =
case Reason of
{recoverable_error, normal} -> true;
{recoverable_error, {shutdown, normal}} -> true;
_ -> false
end,
NContext =
case IsFreeRetry of
true -> Context;
false -> Context#{attempt := Attempt + 1}
end,
?tp(webhook_will_retry_async, #{}),
Worker = resolve_pool_worker(State, KeyOrNum),
ok = ehttpc:request_async(
Worker,

View File

@ -0,0 +1 @@
Fixed an issue in webhook bridge where, in async query mode, HTTP status codes like 4XX and 5XX would be treated as successes in the bridge metrics.