feat(gcp pubsub producer): retry on 502 and 503 http status code responses

Fixes https://emqx.atlassian.net/browse/EMQX-12625
This commit is contained in:
Thales Macedo Garitezi 2024-07-12 15:27:08 -03:00
parent ffa69df6f8
commit 0e57b39cf2
5 changed files with 175 additions and 42 deletions

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_gcp_pubsub, [
{description, "EMQX Enterprise GCP Pub/Sub Bridge"},
{vsn, "0.3.1"},
{vsn, "0.3.2"},
{registered, []},
{applications, [
kernel,

View File

@ -57,7 +57,7 @@
on_format_query_result/1
]).
-export([reply_delegator/2]).
-export([reply_delegator/4]).
%%-------------------------------------------------------------------------------------------------
%% `emqx_resource' API
@ -106,18 +106,18 @@ on_get_status(_InstanceId, #{client := Client} = _State) ->
{ok, map()}
| {error, {recoverable_error, term()}}
| {error, term()}.
on_query(ResourceId, {MessageTag, Selected}, ConnectorState) ->
on_query(ConnResId, {MessageTag, Selected}, ConnectorState) ->
Requests = [{MessageTag, Selected}],
?TRACE(
"QUERY_SYNC",
"gcp_pubsub_received",
#{
requests => Requests,
connector => ResourceId,
connector => ConnResId,
state => emqx_utils:redact(ConnectorState)
}
),
do_send_requests_sync(ConnectorState, Requests, ResourceId).
do_send_requests_sync(ConnectorState, Requests, ConnResId).
-spec on_query_async(
connector_resource_id(),
@ -125,19 +125,19 @@ on_query(ResourceId, {MessageTag, Selected}, ConnectorState) ->
{ReplyFun :: function(), Args :: list()},
connector_state()
) -> {ok, pid()} | {error, no_pool_worker_available}.
on_query_async(ResourceId, {MessageTag, Selected}, ReplyFunAndArgs, ConnectorState) ->
on_query_async(ConnResId, {MessageTag, Selected}, ReplyFunAndArgs, ConnectorState) ->
Requests = [{MessageTag, Selected}],
?TRACE(
"QUERY_ASYNC",
"gcp_pubsub_received",
#{
requests => Requests,
connector => ResourceId,
connector => ConnResId,
state => emqx_utils:redact(ConnectorState)
}
),
?tp(gcp_pubsub_producer_async, #{instance_id => ResourceId, requests => Requests}),
do_send_requests_async(ConnectorState, Requests, ReplyFunAndArgs).
?tp(gcp_pubsub_producer_async, #{instance_id => ConnResId, requests => Requests}),
do_send_requests_async(ConnResId, ConnectorState, Requests, ReplyFunAndArgs).
-spec on_batch_query(
connector_resource_id(),
@ -147,17 +147,17 @@ on_query_async(ResourceId, {MessageTag, Selected}, ReplyFunAndArgs, ConnectorSta
{ok, map()}
| {error, {recoverable_error, term()}}
| {error, term()}.
on_batch_query(ResourceId, Requests, ConnectorState) ->
on_batch_query(ConnResId, Requests, ConnectorState) ->
?TRACE(
"QUERY_SYNC",
"gcp_pubsub_received",
#{
requests => Requests,
connector => ResourceId,
connector => ConnResId,
state => emqx_utils:redact(ConnectorState)
}
),
do_send_requests_sync(ConnectorState, Requests, ResourceId).
do_send_requests_sync(ConnectorState, Requests, ConnResId).
-spec on_batch_query_async(
connector_resource_id(),
@ -165,18 +165,18 @@ on_batch_query(ResourceId, Requests, ConnectorState) ->
{ReplyFun :: function(), Args :: list()},
connector_state()
) -> {ok, pid()} | {error, no_pool_worker_available}.
on_batch_query_async(ResourceId, Requests, ReplyFunAndArgs, ConnectorState) ->
on_batch_query_async(ConnResId, Requests, ReplyFunAndArgs, ConnectorState) ->
?TRACE(
"QUERY_ASYNC",
"gcp_pubsub_received",
#{
requests => Requests,
connector => ResourceId,
connector => ConnResId,
state => emqx_utils:redact(ConnectorState)
}
),
?tp(gcp_pubsub_producer_async, #{instance_id => ResourceId, requests => Requests}),
do_send_requests_async(ConnectorState, Requests, ReplyFunAndArgs).
?tp(gcp_pubsub_producer_async, #{instance_id => ConnResId, requests => Requests}),
do_send_requests_async(ConnResId, ConnectorState, Requests, ReplyFunAndArgs).
-spec on_add_channel(
connector_resource_id(),
@ -297,11 +297,12 @@ do_send_requests_sync(ConnectorState, Requests, InstanceId) ->
handle_result(Result, Request, QueryMode, InstanceId).
-spec do_send_requests_async(
connector_resource_id(),
connector_state(),
[{message_tag(), map()}],
{ReplyFun :: function(), Args :: list()}
) -> {ok, pid()} | {error, no_pool_worker_available}.
do_send_requests_async(ConnectorState, Requests, ReplyFunAndArgs0) ->
do_send_requests_async(ConnResId, ConnectorState, Requests, ReplyFunAndArgs0) ->
#{client := Client} = ConnectorState,
%% is it safe to assume the tag is the same??? And not empty???
[{MessageTag, _} | _] = Requests,
@ -319,7 +320,7 @@ do_send_requests_async(ConnectorState, Requests, ReplyFunAndArgs0) ->
Method = post,
ReqOpts = #{request_ttl => RequestTTL},
Request = {prepared_request, {Method, Path, Body}, ReqOpts},
ReplyFunAndArgs = {fun ?MODULE:reply_delegator/2, [ReplyFunAndArgs0]},
ReplyFunAndArgs = {fun ?MODULE:reply_delegator/4, [ConnResId, Request, ReplyFunAndArgs0]},
emqx_trace:rendered_action_template(MessageTag, #{
method => Method,
path => Path,
@ -436,7 +437,7 @@ to_pubsub_request(Payloads) ->
publish_path(#{project_id := ProjectId}, #{pubsub_topic := PubSubTopic}) ->
<<"/v1/projects/", ProjectId/binary, "/topics/", PubSubTopic/binary, ":publish">>.
handle_result({error, Reason}, _Request, QueryMode, ResourceId) when
handle_result({error, Reason}, _Request, QueryMode, ConnResId) when
Reason =:= econnrefused;
%% this comes directly from `gun'...
Reason =:= {closed, "The connection was lost."};
@ -449,33 +450,61 @@ handle_result({error, Reason}, _Request, QueryMode, ResourceId) when
reason => Reason,
query_mode => QueryMode,
recoverable_error => true,
connector => ResourceId
connector => ConnResId
}
),
{error, {recoverable_error, Reason}};
handle_result(
{error, #{status_code := StatusCode, body := RespBody} = Resp},
Request,
QueryMode,
ConnResId
) when
StatusCode =:= 502;
StatusCode =:= 503
->
?tp(info, "gcp_pubsub_backoff_error_response", #{
query_mode => QueryMode,
request => emqx_bridge_http_connector:redact_request(Request),
connector => ConnResId,
status_code => StatusCode,
resp_body => RespBody
}),
{error, {recoverable_error, Resp}};
handle_result(
{error, #{status_code := StatusCode, body := RespBody}} = Result,
Request,
_QueryMode,
ResourceId
ConnResId
) ->
?SLOG(error, #{
msg => "gcp_pubsub_error_response",
request => emqx_bridge_http_connector:redact_request(Request),
connector => ResourceId,
connector => ConnResId,
status_code => StatusCode,
resp_body => RespBody
}),
Result;
handle_result({error, #{status_code := StatusCode}} = Result, Request, _QueryMode, ResourceId) ->
handle_result({error, #{status_code := StatusCode} = Resp}, Request, QueryMode, ConnResId) when
StatusCode =:= 502;
StatusCode =:= 503
->
?tp(info, "gcp_pubsub_backoff_error_response", #{
query_mode => QueryMode,
request => emqx_bridge_http_connector:redact_request(Request),
connector => ConnResId,
status_code => StatusCode
}),
{error, {recoverable_error, Resp}};
handle_result({error, #{status_code := StatusCode}} = Result, Request, _QueryMode, ConnResId) ->
?SLOG(error, #{
msg => "gcp_pubsub_error_response",
request => emqx_bridge_http_connector:redact_request(Request),
connector => ResourceId,
connector => ConnResId,
status_code => StatusCode
}),
Result;
handle_result({error, Reason} = Result, _Request, QueryMode, ResourceId) ->
handle_result({error, Reason} = Result, _Request, QueryMode, ConnResId) ->
?tp(
error,
gcp_pubsub_request_failed,
@ -483,11 +512,11 @@ handle_result({error, Reason} = Result, _Request, QueryMode, ResourceId) ->
reason => Reason,
query_mode => QueryMode,
recoverable_error => false,
connector => ResourceId
connector => ConnResId
}
),
Result;
handle_result({ok, _} = Result, _Request, _QueryMode, _ResourceId) ->
handle_result({ok, _} = Result, _Request, _QueryMode, _ConnResId) ->
Result.
on_format_query_result({ok, Info}) ->
@ -495,16 +524,6 @@ on_format_query_result({ok, Info}) ->
on_format_query_result(Result) ->
Result.
reply_delegator(ReplyFunAndArgs, Response) ->
case Response of
{error, Reason} when
Reason =:= econnrefused;
%% this comes directly from `gun'...
Reason =:= {closed, "The connection was lost."};
Reason =:= timeout
->
Result = {error, {recoverable_error, Reason}},
emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result);
_ ->
emqx_resource:apply_reply_fun(ReplyFunAndArgs, Response)
end.
reply_delegator(ConnResId, Request, ReplyFunAndArgs, Response) ->
Result = handle_result(Response, Request, async, ConnResId),
emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result).

View File

@ -293,7 +293,6 @@ start_echo_http_server() ->
random, HTTPPath, ServerSSLOpts
),
ok = emqx_bridge_http_connector_test_server:set_handler(success_http_handler()),
HTTPHost = "localhost",
HostPort = HTTPHost ++ ":" ++ integer_to_list(HTTPPort),
true = os:putenv("PUBSUB_EMULATOR_HOST", HostPort),
{ok, #{

View File

@ -14,6 +14,8 @@
-define(CONNECTOR_TYPE_BIN, <<"gcp_pubsub_producer">>).
-define(ACTION_TYPE_BIN, <<"gcp_pubsub_producer">>).
-import(emqx_common_test_helpers, [on_exit/1]).
%%------------------------------------------------------------------------------
%% CT boilerplate
%%------------------------------------------------------------------------------
@ -65,7 +67,7 @@ end_per_testcase(_Testcase, Config) ->
ProxyPort = ?config(proxy_port, Config),
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(),
emqx_common_test_helpers:call_janitor(60_000),
emqx_common_test_helpers:call_janitor(),
ok = snabbkaffe:stop(),
ok.
@ -137,6 +139,60 @@ assert_persisted_service_account_json_is_binary(ConnectorName) ->
),
ok.
setup_mock_gcp_server() ->
OriginalHostPort = os:getenv("PUBSUB_EMULATOR_HOST"),
{ok, _Server} = emqx_bridge_gcp_pubsub_producer_SUITE:start_echo_http_server(),
persistent_term:put({emqx_bridge_gcp_pubsub_client, transport}, tls),
on_exit(fun() ->
ok = emqx_bridge_http_connector_test_server:stop(),
persistent_term:erase({emqx_bridge_gcp_pubsub_client, transport}),
true = os:putenv("PUBSUB_EMULATOR_HOST", OriginalHostPort)
end),
ok.
%% Only for errors
fixed_status_handler(StatusCode, FailureAttempts) ->
Tid = ets:new(requests, [public]),
GetAndBump = fun(Method, Path, Body) ->
K = {Method, Path, Body},
ets:update_counter(Tid, K, 1, {K, 0})
end,
fun(Req0, State) ->
case {cowboy_req:method(Req0), cowboy_req:path(Req0)} of
{<<"GET">>, <<"/v1/projects/myproject/topics/", _/binary>>} ->
Rep = cowboy_req:reply(
200,
#{<<"content-type">> => <<"application/json">>},
<<"{}">>,
Req0
),
{ok, Rep, State};
{Method, Path} ->
{ok, Body, Req} = cowboy_req:read_body(Req0),
N = GetAndBump(Method, Path, Body),
Rep =
case N > FailureAttempts of
false ->
?tp(request_fail, #{body => Body, method => Method, path => Path}),
cowboy_req:reply(
StatusCode,
#{<<"content-type">> => <<"application/json">>},
emqx_utils_json:encode(#{<<"gcp">> => <<"is down">>}),
Req
);
true ->
?tp(retry_succeeded, #{body => Body, method => Method, path => Path}),
cowboy_req:reply(
200,
#{<<"content-type">> => <<"application/json">>},
emqx_utils_json:encode(#{<<"gcp">> => <<"is back up">>}),
Req
)
end,
{ok, Rep, State}
end
end.
%%------------------------------------------------------------------------------
%% Testcases
%%------------------------------------------------------------------------------
@ -213,3 +269,61 @@ t_bad_topic(Config) ->
[]
),
ok.
%% Verifies the backoff and retry behavior when we receive a 502 or 503 error back.
t_backoff_retry(Config) ->
setup_mock_gcp_server(),
ActionName = ?config(action_name, Config),
?check_trace(
#{timetrap => 10_000},
begin
?assertMatch(
{ok, {{_, 201, _}, _, #{}}},
emqx_bridge_v2_testlib:create_bridge_api(
Config,
#{
<<"resource_opts">> => #{
<<"health_check_interval">> => <<"1s">>,
<<"resume_interval">> => <<"1s">>
}
}
)
),
RuleTopic = <<"backoff/retry">>,
{ok, _} =
emqx_bridge_v2_testlib:create_rule_and_action_http(
?ACTION_TYPE_BIN, RuleTopic, Config
),
?retry(
200,
10,
?assertMatch(
{ok,
{{_, 200, _}, _, #{
<<"status">> := <<"connected">>
}}},
emqx_bridge_v2_testlib:get_bridge_api(?ACTION_TYPE_BIN, ActionName)
)
),
{ok, C} = emqtt:start_link(#{}),
{ok, _} = emqtt:connect(C),
ok = emqx_bridge_http_connector_test_server:set_handler(fixed_status_handler(502, 2)),
{{ok, _}, {ok, _}} =
?wait_async_action(
emqtt:publish(C, RuleTopic, <<"{}">>, [{qos, 1}]),
#{?snk_kind := retry_succeeded}
),
ok = emqx_bridge_http_connector_test_server:set_handler(fixed_status_handler(503, 2)),
{{ok, _}, {ok, _}} =
?wait_async_action(
emqtt:publish(C, RuleTopic, <<"{}">>, [{qos, 1}]),
#{?snk_kind := retry_succeeded}
),
ok
end,
[]
),
ok.

View File

@ -0,0 +1 @@
Now, when GCP PubSub Producer action receives a response from PubSub with the HTTP status codes 502 or 503, it'll retry the request until it succeeds or the message TTL is reached.