diff --git a/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl index f08c87b6e..45cc82251 100644 --- a/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl @@ -23,6 +23,7 @@ -compile(export_all). -import(emqx_mgmt_api_test_util, [request/3, uri/1]). +-import(emqx_common_test_helpers, [on_exit/1]). -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). @@ -52,6 +53,13 @@ end_per_suite(_Config) -> suite() -> [{timetrap, {seconds, 60}}]. +init_per_testcase(_TestCase, Config) -> + Config. + +end_per_testcase(_TestCase, _Config) -> + emqx_common_test_helpers:call_janitor(), + ok. + %%------------------------------------------------------------------------------ %% HTTP server for testing %% (Orginally copied from emqx_bridge_api_SUITE) @@ -158,7 +166,8 @@ bridge_async_config(#{port := Port} = Config) -> QueryMode = maps:get(query_mode, Config, "async"), ConnectTimeout = maps:get(connect_timeout, Config, 1), RequestTimeout = maps:get(request_timeout, Config, 10000), - ResourceRequestTimeout = maps:get(resouce_request_timeout, Config, "infinity"), + ResumeInterval = maps:get(resume_interval, Config, "1s"), + ResourceRequestTimeout = maps:get(resource_request_timeout, Config, "infinity"), ConfigString = io_lib:format( "bridges.~s.~s {\n" " url = \"http://localhost:~p\"\n" @@ -177,7 +186,8 @@ bridge_async_config(#{port := Port} = Config) -> " health_check_interval = \"15s\"\n" " max_buffer_bytes = \"1GB\"\n" " query_mode = \"~s\"\n" - " request_timeout = \"~s\"\n" + " request_timeout = \"~p\"\n" + " resume_interval = \"~s\"\n" " start_after_created = \"true\"\n" " start_timeout = \"5s\"\n" " worker_pool_size = \"1\"\n" @@ -194,7 +204,8 @@ bridge_async_config(#{port := Port} = Config) -> PoolSize, RequestTimeout, QueryMode, - ResourceRequestTimeout + ResourceRequestTimeout, + ResumeInterval ] ), ct:pal(ConfigString), @@ -236,7 +247,7 @@ t_send_async_connection_timeout(_Config) -> query_mode => "async", connect_timeout => ResponseDelayMS * 2, request_timeout => 10000, - resouce_request_timeout => "infinity" + resource_request_timeout => "infinity" }), NumberOfMessagesToSend = 10, [ @@ -250,6 +261,97 @@ t_send_async_connection_timeout(_Config) -> stop_http_server(Server), ok. +t_async_free_retries(_Config) -> + #{port := Port} = start_http_server(#{response_delay_ms => 0}), + BridgeID = make_bridge(#{ + port => Port, + pool_size => 1, + query_mode => "sync", + connect_timeout => 1_000, + request_timeout => 10_000, + resource_request_timeout => "10000s" + }), + %% Fail 5 times then succeed. + Context = #{error_attempts => 5}, + ExpectedAttempts = 6, + Fn = fun(Get, Error) -> + ?assertMatch( + {ok, 200, _, _}, + emqx_bridge:send_message(BridgeID, #{<<"hello">> => <<"world">>}), + #{error => Error} + ), + ?assertEqual(ExpectedAttempts, Get(), #{error => Error}) + end, + do_t_async_retries(Context, {error, normal}, Fn), + do_t_async_retries(Context, {error, {shutdown, normal}}, Fn), + ok. + +t_async_common_retries(_Config) -> + #{port := Port} = start_http_server(#{response_delay_ms => 0}), + BridgeID = make_bridge(#{ + port => Port, + pool_size => 1, + query_mode => "sync", + resume_interval => "100ms", + connect_timeout => 1_000, + request_timeout => 10_000, + resource_request_timeout => "10000s" + }), + %% Keeps failing until connector gives up. + Context = #{error_attempts => infinity}, + ExpectedAttempts = 3, + FnSucceed = fun(Get, Error) -> + ?assertMatch( + {ok, 200, _, _}, + emqx_bridge:send_message(BridgeID, #{<<"hello">> => <<"world">>}), + #{error => Error, attempts => Get()} + ), + ?assertEqual(ExpectedAttempts, Get(), #{error => Error}) + end, + FnFail = fun(Get, Error) -> + ?assertMatch( + Error, + emqx_bridge:send_message(BridgeID, #{<<"hello">> => <<"world">>}), + #{error => Error, attempts => Get()} + ), + ?assertEqual(ExpectedAttempts, Get(), #{error => Error}) + end, + %% These two succeed because they're further retried by the buffer + %% worker synchronously, and we're not mock that call. + do_t_async_retries(Context, {error, {closed, "The connection was lost."}}, FnSucceed), + do_t_async_retries(Context, {error, {shutdown, closed}}, FnSucceed), + %% This fails because this error is treated as unrecoverable. + do_t_async_retries(Context, {error, something_else}, FnFail), + ok. + +do_t_async_retries(TestContext, Error, Fn) -> + #{error_attempts := ErrorAttempts} = TestContext, + persistent_term:put({?MODULE, ?FUNCTION_NAME, attempts}, 0), + on_exit(fun() -> persistent_term:erase({?MODULE, ?FUNCTION_NAME, attempts}) end), + Get = fun() -> persistent_term:get({?MODULE, ?FUNCTION_NAME, attempts}) end, + GetAndBump = fun() -> + Attempts = persistent_term:get({?MODULE, ?FUNCTION_NAME, attempts}), + persistent_term:put({?MODULE, ?FUNCTION_NAME, attempts}, Attempts + 1), + Attempts + 1 + end, + emqx_common_test_helpers:with_mock( + emqx_connector_http, + reply_delegator, + fun(Context, ReplyFunAndArgs, Result) -> + Attempts = GetAndBump(), + case Attempts > ErrorAttempts of + true -> + ct:pal("succeeding ~p : ~p", [Error, Attempts]), + meck:passthrough([Context, ReplyFunAndArgs, Result]); + false -> + ct:pal("failing ~p : ~p", [Error, Attempts]), + meck:passthrough([Context, ReplyFunAndArgs, Error]) + end + end, + fun() -> Fn(Get, Error) end + ), + ok. + receive_request_notifications(MessageIDs, _ResponseDelay) when map_size(MessageIDs) =:= 0 -> ok; receive_request_notifications(MessageIDs, ResponseDelay) -> diff --git a/apps/emqx_connector/src/emqx_connector_http.erl b/apps/emqx_connector/src/emqx_connector_http.erl index 7218258e8..6ecfd0b59 100644 --- a/apps/emqx_connector/src/emqx_connector_http.erl +++ b/apps/emqx_connector/src/emqx_connector_http.erl @@ -32,7 +32,7 @@ on_query/3, on_query_async/4, on_get_status/2, - reply_delegator/2 + reply_delegator/3 ]). -export([ @@ -245,10 +245,11 @@ on_query(InstId, {send_message, Msg}, State) -> request_timeout := Timeout } = process_request(Request, Msg), %% bridge buffer worker has retry, do not let ehttpc retry - Retry = 0, + Retry = 2, + ClientId = maps:get(clientid, Msg, undefined), on_query( InstId, - {undefined, Method, {Path, Headers, Body}, Timeout, Retry}, + {ClientId, Method, {Path, Headers, Body}, Timeout, Retry}, State ) end; @@ -348,9 +349,10 @@ on_query_async(InstId, {send_message, Msg}, ReplyFunAndArgs, State) -> headers := Headers, request_timeout := Timeout } = process_request(Request, Msg), + ClientId = maps:get(clientid, Msg, undefined), on_query_async( InstId, - {undefined, Method, {Path, Headers, Body}, Timeout}, + {ClientId, Method, {Path, Headers, Body}, Timeout}, ReplyFunAndArgs, State ) @@ -372,12 +374,22 @@ on_query_async( } ), NRequest = formalize_request(Method, BasePath, Request), + MaxAttempts = maps:get(max_attempts, State, 3), + Context = #{ + attempt => 1, + max_attempts => MaxAttempts, + state => State, + key_or_num => KeyOrNum, + method => Method, + request => NRequest, + timeout => Timeout + }, ok = ehttpc:request_async( Worker, Method, NRequest, Timeout, - {fun ?MODULE:reply_delegator/2, [ReplyFunAndArgs]} + {fun ?MODULE:reply_delegator/3, [Context, ReplyFunAndArgs]} ), {ok, Worker}. @@ -598,7 +610,10 @@ to_bin(Str) when is_list(Str) -> to_bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8). -reply_delegator(ReplyFunAndArgs, Result) -> +reply_delegator(Context, ReplyFunAndArgs, Result) -> + spawn(fun() -> maybe_retry(Result, Context, ReplyFunAndArgs) end). + +transform_result(Result) -> case Result of %% The normal reason happens when the HTTP connection times out before %% the request has been fully processed @@ -609,16 +624,47 @@ reply_delegator(ReplyFunAndArgs, Result) -> Reason =:= {shutdown, normal}; Reason =:= {shutdown, closed} -> - Result1 = {error, {recoverable_error, Reason}}, - emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result1); + {error, {recoverable_error, Reason}}; {error, {closed, _Message} = Reason} -> %% _Message = "The connection was lost." - Result1 = {error, {recoverable_error, Reason}}, - emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result1); + {error, {recoverable_error, Reason}}; _ -> - emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result) + Result end. +maybe_retry(Result0, _Context = #{attempt := N, max_attempts := Max}, ReplyFunAndArgs) when + N >= Max +-> + Result = transform_result(Result0), + emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result); +maybe_retry({error, Reason}, Context, ReplyFunAndArgs) -> + #{ + state := State, + attempt := Attempt, + key_or_num := KeyOrNum, + method := Method, + request := Request, + timeout := Timeout + } = Context, + %% TODO: reset the expiration time for free retries? + IsFreeRetry = Reason =:= normal orelse Reason =:= {shutdown, normal}, + NContext = + case IsFreeRetry of + true -> Context; + false -> Context#{attempt := Attempt + 1} + end, + Worker = resolve_pool_worker(State, KeyOrNum), + ok = ehttpc:request_async( + Worker, + Method, + Request, + Timeout, + {fun ?MODULE:reply_delegator/3, [NContext, ReplyFunAndArgs]} + ), + ok; +maybe_retry(Result, _Context, ReplyFunAndArgs) -> + emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result). + %% The HOCON schema system may generate sensitive keys with this format is_sensitive_key([{str, StringKey}]) -> is_sensitive_key(StringKey); diff --git a/changes/ce/perf-10690.en.md b/changes/ce/perf-10690.en.md new file mode 100644 index 000000000..4f7c49e53 --- /dev/null +++ b/changes/ce/perf-10690.en.md @@ -0,0 +1,3 @@ +Added a retry mechanism to webhook bridge that attempts to improve throughput. + +This optimization retries request failures without blocking the buffering layer, which can improve throughput in situations of high messaging rate. diff --git a/mix.exs b/mix.exs index 28fa2c6cc..717172041 100644 --- a/mix.exs +++ b/mix.exs @@ -193,7 +193,7 @@ defmodule EMQXUmbrella.MixProject do {:erlcloud, github: "emqx/erlcloud", tag: "3.5.16-emqx-1", override: true}, # erlcloud's rebar.config requires rebar3 and does not support Mix, # so it tries to fetch deps from git. We need to override this. - {:lhttpc, tag: "1.6.2", override: true}, + {:lhttpc, "1.6.2", override: true}, {:eini, "1.2.9", override: true}, {:base16, "1.0.0", override: true}, # end of erlcloud's deps