Merge pull request #10690 from thalesmg/perf-webhook-retry-async-reply-v50

perf(webhook): add async retries and evaluate reply callback in fresh process
This commit is contained in:
Thales Macedo Garitezi 2023-05-17 11:06:48 -03:00 committed by GitHub
commit b2afe4e90c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 167 additions and 16 deletions

View File

@ -23,6 +23,7 @@
-compile(export_all). -compile(export_all).
-import(emqx_mgmt_api_test_util, [request/3, uri/1]). -import(emqx_mgmt_api_test_util, [request/3, uri/1]).
-import(emqx_common_test_helpers, [on_exit/1]).
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
@ -52,6 +53,13 @@ end_per_suite(_Config) ->
suite() -> suite() ->
[{timetrap, {seconds, 60}}]. [{timetrap, {seconds, 60}}].
init_per_testcase(_TestCase, Config) ->
Config.
end_per_testcase(_TestCase, _Config) ->
emqx_common_test_helpers:call_janitor(),
ok.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% HTTP server for testing %% HTTP server for testing
%% (Orginally copied from emqx_bridge_api_SUITE) %% (Orginally copied from emqx_bridge_api_SUITE)
@ -158,7 +166,8 @@ bridge_async_config(#{port := Port} = Config) ->
QueryMode = maps:get(query_mode, Config, "async"), QueryMode = maps:get(query_mode, Config, "async"),
ConnectTimeout = maps:get(connect_timeout, Config, 1), ConnectTimeout = maps:get(connect_timeout, Config, 1),
RequestTimeout = maps:get(request_timeout, Config, 10000), RequestTimeout = maps:get(request_timeout, Config, 10000),
ResourceRequestTimeout = maps:get(resouce_request_timeout, Config, "infinity"), ResumeInterval = maps:get(resume_interval, Config, "1s"),
ResourceRequestTimeout = maps:get(resource_request_timeout, Config, "infinity"),
ConfigString = io_lib:format( ConfigString = io_lib:format(
"bridges.~s.~s {\n" "bridges.~s.~s {\n"
" url = \"http://localhost:~p\"\n" " url = \"http://localhost:~p\"\n"
@ -177,7 +186,8 @@ bridge_async_config(#{port := Port} = Config) ->
" health_check_interval = \"15s\"\n" " health_check_interval = \"15s\"\n"
" max_buffer_bytes = \"1GB\"\n" " max_buffer_bytes = \"1GB\"\n"
" query_mode = \"~s\"\n" " query_mode = \"~s\"\n"
" request_timeout = \"~s\"\n" " request_timeout = \"~p\"\n"
" resume_interval = \"~s\"\n"
" start_after_created = \"true\"\n" " start_after_created = \"true\"\n"
" start_timeout = \"5s\"\n" " start_timeout = \"5s\"\n"
" worker_pool_size = \"1\"\n" " worker_pool_size = \"1\"\n"
@ -194,7 +204,8 @@ bridge_async_config(#{port := Port} = Config) ->
PoolSize, PoolSize,
RequestTimeout, RequestTimeout,
QueryMode, QueryMode,
ResourceRequestTimeout ResourceRequestTimeout,
ResumeInterval
] ]
), ),
ct:pal(ConfigString), ct:pal(ConfigString),
@ -236,7 +247,7 @@ t_send_async_connection_timeout(_Config) ->
query_mode => "async", query_mode => "async",
connect_timeout => ResponseDelayMS * 2, connect_timeout => ResponseDelayMS * 2,
request_timeout => 10000, request_timeout => 10000,
resouce_request_timeout => "infinity" resource_request_timeout => "infinity"
}), }),
NumberOfMessagesToSend = 10, NumberOfMessagesToSend = 10,
[ [
@ -250,6 +261,97 @@ t_send_async_connection_timeout(_Config) ->
stop_http_server(Server), stop_http_server(Server),
ok. ok.
t_async_free_retries(_Config) ->
#{port := Port} = start_http_server(#{response_delay_ms => 0}),
BridgeID = make_bridge(#{
port => Port,
pool_size => 1,
query_mode => "sync",
connect_timeout => 1_000,
request_timeout => 10_000,
resource_request_timeout => "10000s"
}),
%% Fail 5 times then succeed.
Context = #{error_attempts => 5},
ExpectedAttempts = 6,
Fn = fun(Get, Error) ->
?assertMatch(
{ok, 200, _, _},
emqx_bridge:send_message(BridgeID, #{<<"hello">> => <<"world">>}),
#{error => Error}
),
?assertEqual(ExpectedAttempts, Get(), #{error => Error})
end,
do_t_async_retries(Context, {error, normal}, Fn),
do_t_async_retries(Context, {error, {shutdown, normal}}, Fn),
ok.
t_async_common_retries(_Config) ->
#{port := Port} = start_http_server(#{response_delay_ms => 0}),
BridgeID = make_bridge(#{
port => Port,
pool_size => 1,
query_mode => "sync",
resume_interval => "100ms",
connect_timeout => 1_000,
request_timeout => 10_000,
resource_request_timeout => "10000s"
}),
%% Keeps failing until connector gives up.
Context = #{error_attempts => infinity},
ExpectedAttempts = 3,
FnSucceed = fun(Get, Error) ->
?assertMatch(
{ok, 200, _, _},
emqx_bridge:send_message(BridgeID, #{<<"hello">> => <<"world">>}),
#{error => Error, attempts => Get()}
),
?assertEqual(ExpectedAttempts, Get(), #{error => Error})
end,
FnFail = fun(Get, Error) ->
?assertMatch(
Error,
emqx_bridge:send_message(BridgeID, #{<<"hello">> => <<"world">>}),
#{error => Error, attempts => Get()}
),
?assertEqual(ExpectedAttempts, Get(), #{error => Error})
end,
%% These two succeed because they're further retried by the buffer
%% worker synchronously, and we're not mock that call.
do_t_async_retries(Context, {error, {closed, "The connection was lost."}}, FnSucceed),
do_t_async_retries(Context, {error, {shutdown, closed}}, FnSucceed),
%% This fails because this error is treated as unrecoverable.
do_t_async_retries(Context, {error, something_else}, FnFail),
ok.
do_t_async_retries(TestContext, Error, Fn) ->
#{error_attempts := ErrorAttempts} = TestContext,
persistent_term:put({?MODULE, ?FUNCTION_NAME, attempts}, 0),
on_exit(fun() -> persistent_term:erase({?MODULE, ?FUNCTION_NAME, attempts}) end),
Get = fun() -> persistent_term:get({?MODULE, ?FUNCTION_NAME, attempts}) end,
GetAndBump = fun() ->
Attempts = persistent_term:get({?MODULE, ?FUNCTION_NAME, attempts}),
persistent_term:put({?MODULE, ?FUNCTION_NAME, attempts}, Attempts + 1),
Attempts + 1
end,
emqx_common_test_helpers:with_mock(
emqx_connector_http,
reply_delegator,
fun(Context, ReplyFunAndArgs, Result) ->
Attempts = GetAndBump(),
case Attempts > ErrorAttempts of
true ->
ct:pal("succeeding ~p : ~p", [Error, Attempts]),
meck:passthrough([Context, ReplyFunAndArgs, Result]);
false ->
ct:pal("failing ~p : ~p", [Error, Attempts]),
meck:passthrough([Context, ReplyFunAndArgs, Error])
end
end,
fun() -> Fn(Get, Error) end
),
ok.
receive_request_notifications(MessageIDs, _ResponseDelay) when map_size(MessageIDs) =:= 0 -> receive_request_notifications(MessageIDs, _ResponseDelay) when map_size(MessageIDs) =:= 0 ->
ok; ok;
receive_request_notifications(MessageIDs, ResponseDelay) -> receive_request_notifications(MessageIDs, ResponseDelay) ->

View File

@ -32,7 +32,7 @@
on_query/3, on_query/3,
on_query_async/4, on_query_async/4,
on_get_status/2, on_get_status/2,
reply_delegator/2 reply_delegator/3
]). ]).
-export([ -export([
@ -245,10 +245,11 @@ on_query(InstId, {send_message, Msg}, State) ->
request_timeout := Timeout request_timeout := Timeout
} = process_request(Request, Msg), } = process_request(Request, Msg),
%% bridge buffer worker has retry, do not let ehttpc retry %% bridge buffer worker has retry, do not let ehttpc retry
Retry = 0, Retry = 2,
ClientId = maps:get(clientid, Msg, undefined),
on_query( on_query(
InstId, InstId,
{undefined, Method, {Path, Headers, Body}, Timeout, Retry}, {ClientId, Method, {Path, Headers, Body}, Timeout, Retry},
State State
) )
end; end;
@ -348,9 +349,10 @@ on_query_async(InstId, {send_message, Msg}, ReplyFunAndArgs, State) ->
headers := Headers, headers := Headers,
request_timeout := Timeout request_timeout := Timeout
} = process_request(Request, Msg), } = process_request(Request, Msg),
ClientId = maps:get(clientid, Msg, undefined),
on_query_async( on_query_async(
InstId, InstId,
{undefined, Method, {Path, Headers, Body}, Timeout}, {ClientId, Method, {Path, Headers, Body}, Timeout},
ReplyFunAndArgs, ReplyFunAndArgs,
State State
) )
@ -372,12 +374,22 @@ on_query_async(
} }
), ),
NRequest = formalize_request(Method, BasePath, Request), NRequest = formalize_request(Method, BasePath, Request),
MaxAttempts = maps:get(max_attempts, State, 3),
Context = #{
attempt => 1,
max_attempts => MaxAttempts,
state => State,
key_or_num => KeyOrNum,
method => Method,
request => NRequest,
timeout => Timeout
},
ok = ehttpc:request_async( ok = ehttpc:request_async(
Worker, Worker,
Method, Method,
NRequest, NRequest,
Timeout, Timeout,
{fun ?MODULE:reply_delegator/2, [ReplyFunAndArgs]} {fun ?MODULE:reply_delegator/3, [Context, ReplyFunAndArgs]}
), ),
{ok, Worker}. {ok, Worker}.
@ -598,7 +610,10 @@ to_bin(Str) when is_list(Str) ->
to_bin(Atom) when is_atom(Atom) -> to_bin(Atom) when is_atom(Atom) ->
atom_to_binary(Atom, utf8). atom_to_binary(Atom, utf8).
reply_delegator(ReplyFunAndArgs, Result) -> reply_delegator(Context, ReplyFunAndArgs, Result) ->
spawn(fun() -> maybe_retry(Result, Context, ReplyFunAndArgs) end).
transform_result(Result) ->
case Result of case Result of
%% The normal reason happens when the HTTP connection times out before %% The normal reason happens when the HTTP connection times out before
%% the request has been fully processed %% the request has been fully processed
@ -609,16 +624,47 @@ reply_delegator(ReplyFunAndArgs, Result) ->
Reason =:= {shutdown, normal}; Reason =:= {shutdown, normal};
Reason =:= {shutdown, closed} Reason =:= {shutdown, closed}
-> ->
Result1 = {error, {recoverable_error, Reason}}, {error, {recoverable_error, Reason}};
emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result1);
{error, {closed, _Message} = Reason} -> {error, {closed, _Message} = Reason} ->
%% _Message = "The connection was lost." %% _Message = "The connection was lost."
Result1 = {error, {recoverable_error, Reason}}, {error, {recoverable_error, Reason}};
emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result1);
_ -> _ ->
emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result) Result
end. end.
maybe_retry(Result0, _Context = #{attempt := N, max_attempts := Max}, ReplyFunAndArgs) when
N >= Max
->
Result = transform_result(Result0),
emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result);
maybe_retry({error, Reason}, Context, ReplyFunAndArgs) ->
#{
state := State,
attempt := Attempt,
key_or_num := KeyOrNum,
method := Method,
request := Request,
timeout := Timeout
} = Context,
%% TODO: reset the expiration time for free retries?
IsFreeRetry = Reason =:= normal orelse Reason =:= {shutdown, normal},
NContext =
case IsFreeRetry of
true -> Context;
false -> Context#{attempt := Attempt + 1}
end,
Worker = resolve_pool_worker(State, KeyOrNum),
ok = ehttpc:request_async(
Worker,
Method,
Request,
Timeout,
{fun ?MODULE:reply_delegator/3, [NContext, ReplyFunAndArgs]}
),
ok;
maybe_retry(Result, _Context, ReplyFunAndArgs) ->
emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result).
%% The HOCON schema system may generate sensitive keys with this format %% The HOCON schema system may generate sensitive keys with this format
is_sensitive_key([{str, StringKey}]) -> is_sensitive_key([{str, StringKey}]) ->
is_sensitive_key(StringKey); is_sensitive_key(StringKey);

View File

@ -0,0 +1,3 @@
Added a retry mechanism to webhook bridge that attempts to improve throughput.
This optimization retries request failures without blocking the buffering layer, which can improve throughput in situations of high messaging rate.

View File

@ -193,7 +193,7 @@ defmodule EMQXUmbrella.MixProject do
{:erlcloud, github: "emqx/erlcloud", tag: "3.5.16-emqx-1", override: true}, {:erlcloud, github: "emqx/erlcloud", tag: "3.5.16-emqx-1", override: true},
# erlcloud's rebar.config requires rebar3 and does not support Mix, # erlcloud's rebar.config requires rebar3 and does not support Mix,
# so it tries to fetch deps from git. We need to override this. # so it tries to fetch deps from git. We need to override this.
{:lhttpc, tag: "1.6.2", override: true}, {:lhttpc, "1.6.2", override: true},
{:eini, "1.2.9", override: true}, {:eini, "1.2.9", override: true},
{:base16, "1.0.0", override: true}, {:base16, "1.0.0", override: true},
# end of erlcloud's deps # end of erlcloud's deps