From 0e57b39cf2050ada74c2684b2235821807565b97 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Fri, 12 Jul 2024 15:27:08 -0300 Subject: [PATCH] feat(gcp pubsub producer): retry on 502 and 503 http status code responses Fixes https://emqx.atlassian.net/browse/EMQX-12625 --- .../src/emqx_bridge_gcp_pubsub.app.src | 2 +- .../emqx_bridge_gcp_pubsub_impl_producer.erl | 97 +++++++++------ .../emqx_bridge_gcp_pubsub_producer_SUITE.erl | 1 - ...qx_bridge_v2_gcp_pubsub_producer_SUITE.erl | 116 +++++++++++++++++- changes/ee/feat-13463.en.md | 1 + 5 files changed, 175 insertions(+), 42 deletions(-) create mode 100644 changes/ee/feat-13463.en.md diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.app.src b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.app.src index d98355a90..eff7847f2 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.app.src +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_gcp_pubsub, [ {description, "EMQX Enterprise GCP Pub/Sub Bridge"}, - {vsn, "0.3.1"}, + {vsn, "0.3.2"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl index 48e50c416..0c668cb95 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl @@ -57,7 +57,7 @@ on_format_query_result/1 ]). --export([reply_delegator/2]). +-export([reply_delegator/4]). %%------------------------------------------------------------------------------------------------- %% `emqx_resource' API @@ -106,18 +106,18 @@ on_get_status(_InstanceId, #{client := Client} = _State) -> {ok, map()} | {error, {recoverable_error, term()}} | {error, term()}. -on_query(ResourceId, {MessageTag, Selected}, ConnectorState) -> +on_query(ConnResId, {MessageTag, Selected}, ConnectorState) -> Requests = [{MessageTag, Selected}], ?TRACE( "QUERY_SYNC", "gcp_pubsub_received", #{ requests => Requests, - connector => ResourceId, + connector => ConnResId, state => emqx_utils:redact(ConnectorState) } ), - do_send_requests_sync(ConnectorState, Requests, ResourceId). + do_send_requests_sync(ConnectorState, Requests, ConnResId). -spec on_query_async( connector_resource_id(), @@ -125,19 +125,19 @@ on_query(ResourceId, {MessageTag, Selected}, ConnectorState) -> {ReplyFun :: function(), Args :: list()}, connector_state() ) -> {ok, pid()} | {error, no_pool_worker_available}. -on_query_async(ResourceId, {MessageTag, Selected}, ReplyFunAndArgs, ConnectorState) -> +on_query_async(ConnResId, {MessageTag, Selected}, ReplyFunAndArgs, ConnectorState) -> Requests = [{MessageTag, Selected}], ?TRACE( "QUERY_ASYNC", "gcp_pubsub_received", #{ requests => Requests, - connector => ResourceId, + connector => ConnResId, state => emqx_utils:redact(ConnectorState) } ), - ?tp(gcp_pubsub_producer_async, #{instance_id => ResourceId, requests => Requests}), - do_send_requests_async(ConnectorState, Requests, ReplyFunAndArgs). + ?tp(gcp_pubsub_producer_async, #{instance_id => ConnResId, requests => Requests}), + do_send_requests_async(ConnResId, ConnectorState, Requests, ReplyFunAndArgs). -spec on_batch_query( connector_resource_id(), @@ -147,17 +147,17 @@ on_query_async(ResourceId, {MessageTag, Selected}, ReplyFunAndArgs, ConnectorSta {ok, map()} | {error, {recoverable_error, term()}} | {error, term()}. -on_batch_query(ResourceId, Requests, ConnectorState) -> +on_batch_query(ConnResId, Requests, ConnectorState) -> ?TRACE( "QUERY_SYNC", "gcp_pubsub_received", #{ requests => Requests, - connector => ResourceId, + connector => ConnResId, state => emqx_utils:redact(ConnectorState) } ), - do_send_requests_sync(ConnectorState, Requests, ResourceId). + do_send_requests_sync(ConnectorState, Requests, ConnResId). -spec on_batch_query_async( connector_resource_id(), @@ -165,18 +165,18 @@ on_batch_query(ResourceId, Requests, ConnectorState) -> {ReplyFun :: function(), Args :: list()}, connector_state() ) -> {ok, pid()} | {error, no_pool_worker_available}. -on_batch_query_async(ResourceId, Requests, ReplyFunAndArgs, ConnectorState) -> +on_batch_query_async(ConnResId, Requests, ReplyFunAndArgs, ConnectorState) -> ?TRACE( "QUERY_ASYNC", "gcp_pubsub_received", #{ requests => Requests, - connector => ResourceId, + connector => ConnResId, state => emqx_utils:redact(ConnectorState) } ), - ?tp(gcp_pubsub_producer_async, #{instance_id => ResourceId, requests => Requests}), - do_send_requests_async(ConnectorState, Requests, ReplyFunAndArgs). + ?tp(gcp_pubsub_producer_async, #{instance_id => ConnResId, requests => Requests}), + do_send_requests_async(ConnResId, ConnectorState, Requests, ReplyFunAndArgs). -spec on_add_channel( connector_resource_id(), @@ -297,11 +297,12 @@ do_send_requests_sync(ConnectorState, Requests, InstanceId) -> handle_result(Result, Request, QueryMode, InstanceId). -spec do_send_requests_async( + connector_resource_id(), connector_state(), [{message_tag(), map()}], {ReplyFun :: function(), Args :: list()} ) -> {ok, pid()} | {error, no_pool_worker_available}. -do_send_requests_async(ConnectorState, Requests, ReplyFunAndArgs0) -> +do_send_requests_async(ConnResId, ConnectorState, Requests, ReplyFunAndArgs0) -> #{client := Client} = ConnectorState, %% is it safe to assume the tag is the same??? And not empty??? [{MessageTag, _} | _] = Requests, @@ -319,7 +320,7 @@ do_send_requests_async(ConnectorState, Requests, ReplyFunAndArgs0) -> Method = post, ReqOpts = #{request_ttl => RequestTTL}, Request = {prepared_request, {Method, Path, Body}, ReqOpts}, - ReplyFunAndArgs = {fun ?MODULE:reply_delegator/2, [ReplyFunAndArgs0]}, + ReplyFunAndArgs = {fun ?MODULE:reply_delegator/4, [ConnResId, Request, ReplyFunAndArgs0]}, emqx_trace:rendered_action_template(MessageTag, #{ method => Method, path => Path, @@ -436,7 +437,7 @@ to_pubsub_request(Payloads) -> publish_path(#{project_id := ProjectId}, #{pubsub_topic := PubSubTopic}) -> <<"/v1/projects/", ProjectId/binary, "/topics/", PubSubTopic/binary, ":publish">>. -handle_result({error, Reason}, _Request, QueryMode, ResourceId) when +handle_result({error, Reason}, _Request, QueryMode, ConnResId) when Reason =:= econnrefused; %% this comes directly from `gun'... Reason =:= {closed, "The connection was lost."}; @@ -449,33 +450,61 @@ handle_result({error, Reason}, _Request, QueryMode, ResourceId) when reason => Reason, query_mode => QueryMode, recoverable_error => true, - connector => ResourceId + connector => ConnResId } ), {error, {recoverable_error, Reason}}; +handle_result( + {error, #{status_code := StatusCode, body := RespBody} = Resp}, + Request, + QueryMode, + ConnResId +) when + StatusCode =:= 502; + StatusCode =:= 503 +-> + ?tp(info, "gcp_pubsub_backoff_error_response", #{ + query_mode => QueryMode, + request => emqx_bridge_http_connector:redact_request(Request), + connector => ConnResId, + status_code => StatusCode, + resp_body => RespBody + }), + {error, {recoverable_error, Resp}}; handle_result( {error, #{status_code := StatusCode, body := RespBody}} = Result, Request, _QueryMode, - ResourceId + ConnResId ) -> ?SLOG(error, #{ msg => "gcp_pubsub_error_response", request => emqx_bridge_http_connector:redact_request(Request), - connector => ResourceId, + connector => ConnResId, status_code => StatusCode, resp_body => RespBody }), Result; -handle_result({error, #{status_code := StatusCode}} = Result, Request, _QueryMode, ResourceId) -> +handle_result({error, #{status_code := StatusCode} = Resp}, Request, QueryMode, ConnResId) when + StatusCode =:= 502; + StatusCode =:= 503 +-> + ?tp(info, "gcp_pubsub_backoff_error_response", #{ + query_mode => QueryMode, + request => emqx_bridge_http_connector:redact_request(Request), + connector => ConnResId, + status_code => StatusCode + }), + {error, {recoverable_error, Resp}}; +handle_result({error, #{status_code := StatusCode}} = Result, Request, _QueryMode, ConnResId) -> ?SLOG(error, #{ msg => "gcp_pubsub_error_response", request => emqx_bridge_http_connector:redact_request(Request), - connector => ResourceId, + connector => ConnResId, status_code => StatusCode }), Result; -handle_result({error, Reason} = Result, _Request, QueryMode, ResourceId) -> +handle_result({error, Reason} = Result, _Request, QueryMode, ConnResId) -> ?tp( error, gcp_pubsub_request_failed, @@ -483,11 +512,11 @@ handle_result({error, Reason} = Result, _Request, QueryMode, ResourceId) -> reason => Reason, query_mode => QueryMode, recoverable_error => false, - connector => ResourceId + connector => ConnResId } ), Result; -handle_result({ok, _} = Result, _Request, _QueryMode, _ResourceId) -> +handle_result({ok, _} = Result, _Request, _QueryMode, _ConnResId) -> Result. on_format_query_result({ok, Info}) -> @@ -495,16 +524,6 @@ on_format_query_result({ok, Info}) -> on_format_query_result(Result) -> Result. -reply_delegator(ReplyFunAndArgs, Response) -> - case Response of - {error, Reason} when - Reason =:= econnrefused; - %% this comes directly from `gun'... - Reason =:= {closed, "The connection was lost."}; - Reason =:= timeout - -> - Result = {error, {recoverable_error, Reason}}, - emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result); - _ -> - emqx_resource:apply_reply_fun(ReplyFunAndArgs, Response) - end. +reply_delegator(ConnResId, Request, ReplyFunAndArgs, Response) -> + Result = handle_result(Response, Request, async, ConnResId), + emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result). diff --git a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_producer_SUITE.erl b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_producer_SUITE.erl index d96157f8c..5c8316fc2 100644 --- a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_producer_SUITE.erl +++ b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_producer_SUITE.erl @@ -293,7 +293,6 @@ start_echo_http_server() -> random, HTTPPath, ServerSSLOpts ), ok = emqx_bridge_http_connector_test_server:set_handler(success_http_handler()), - HTTPHost = "localhost", HostPort = HTTPHost ++ ":" ++ integer_to_list(HTTPPort), true = os:putenv("PUBSUB_EMULATOR_HOST", HostPort), {ok, #{ diff --git a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_v2_gcp_pubsub_producer_SUITE.erl b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_v2_gcp_pubsub_producer_SUITE.erl index f2255c343..aefba12c5 100644 --- a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_v2_gcp_pubsub_producer_SUITE.erl +++ b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_v2_gcp_pubsub_producer_SUITE.erl @@ -14,6 +14,8 @@ -define(CONNECTOR_TYPE_BIN, <<"gcp_pubsub_producer">>). -define(ACTION_TYPE_BIN, <<"gcp_pubsub_producer">>). +-import(emqx_common_test_helpers, [on_exit/1]). + %%------------------------------------------------------------------------------ %% CT boilerplate %%------------------------------------------------------------------------------ @@ -65,7 +67,7 @@ end_per_testcase(_Testcase, Config) -> ProxyPort = ?config(proxy_port, Config), emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(), - emqx_common_test_helpers:call_janitor(60_000), + emqx_common_test_helpers:call_janitor(), ok = snabbkaffe:stop(), ok. @@ -137,6 +139,60 @@ assert_persisted_service_account_json_is_binary(ConnectorName) -> ), ok. +setup_mock_gcp_server() -> + OriginalHostPort = os:getenv("PUBSUB_EMULATOR_HOST"), + {ok, _Server} = emqx_bridge_gcp_pubsub_producer_SUITE:start_echo_http_server(), + persistent_term:put({emqx_bridge_gcp_pubsub_client, transport}, tls), + on_exit(fun() -> + ok = emqx_bridge_http_connector_test_server:stop(), + persistent_term:erase({emqx_bridge_gcp_pubsub_client, transport}), + true = os:putenv("PUBSUB_EMULATOR_HOST", OriginalHostPort) + end), + ok. + +%% Only for errors +fixed_status_handler(StatusCode, FailureAttempts) -> + Tid = ets:new(requests, [public]), + GetAndBump = fun(Method, Path, Body) -> + K = {Method, Path, Body}, + ets:update_counter(Tid, K, 1, {K, 0}) + end, + fun(Req0, State) -> + case {cowboy_req:method(Req0), cowboy_req:path(Req0)} of + {<<"GET">>, <<"/v1/projects/myproject/topics/", _/binary>>} -> + Rep = cowboy_req:reply( + 200, + #{<<"content-type">> => <<"application/json">>}, + <<"{}">>, + Req0 + ), + {ok, Rep, State}; + {Method, Path} -> + {ok, Body, Req} = cowboy_req:read_body(Req0), + N = GetAndBump(Method, Path, Body), + Rep = + case N > FailureAttempts of + false -> + ?tp(request_fail, #{body => Body, method => Method, path => Path}), + cowboy_req:reply( + StatusCode, + #{<<"content-type">> => <<"application/json">>}, + emqx_utils_json:encode(#{<<"gcp">> => <<"is down">>}), + Req + ); + true -> + ?tp(retry_succeeded, #{body => Body, method => Method, path => Path}), + cowboy_req:reply( + 200, + #{<<"content-type">> => <<"application/json">>}, + emqx_utils_json:encode(#{<<"gcp">> => <<"is back up">>}), + Req + ) + end, + {ok, Rep, State} + end + end. + %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ @@ -213,3 +269,61 @@ t_bad_topic(Config) -> [] ), ok. + +%% Verifies the backoff and retry behavior when we receive a 502 or 503 error back. +t_backoff_retry(Config) -> + setup_mock_gcp_server(), + ActionName = ?config(action_name, Config), + ?check_trace( + #{timetrap => 10_000}, + begin + ?assertMatch( + {ok, {{_, 201, _}, _, #{}}}, + emqx_bridge_v2_testlib:create_bridge_api( + Config, + #{ + <<"resource_opts">> => #{ + <<"health_check_interval">> => <<"1s">>, + <<"resume_interval">> => <<"1s">> + } + } + ) + ), + RuleTopic = <<"backoff/retry">>, + {ok, _} = + emqx_bridge_v2_testlib:create_rule_and_action_http( + ?ACTION_TYPE_BIN, RuleTopic, Config + ), + ?retry( + 200, + 10, + ?assertMatch( + {ok, + {{_, 200, _}, _, #{ + <<"status">> := <<"connected">> + }}}, + emqx_bridge_v2_testlib:get_bridge_api(?ACTION_TYPE_BIN, ActionName) + ) + ), + {ok, C} = emqtt:start_link(#{}), + {ok, _} = emqtt:connect(C), + + ok = emqx_bridge_http_connector_test_server:set_handler(fixed_status_handler(502, 2)), + {{ok, _}, {ok, _}} = + ?wait_async_action( + emqtt:publish(C, RuleTopic, <<"{}">>, [{qos, 1}]), + #{?snk_kind := retry_succeeded} + ), + + ok = emqx_bridge_http_connector_test_server:set_handler(fixed_status_handler(503, 2)), + {{ok, _}, {ok, _}} = + ?wait_async_action( + emqtt:publish(C, RuleTopic, <<"{}">>, [{qos, 1}]), + #{?snk_kind := retry_succeeded} + ), + + ok + end, + [] + ), + ok. diff --git a/changes/ee/feat-13463.en.md b/changes/ee/feat-13463.en.md new file mode 100644 index 000000000..d921b055e --- /dev/null +++ b/changes/ee/feat-13463.en.md @@ -0,0 +1 @@ +Now, when GCP PubSub Producer action receives a response from PubSub with the HTTP status codes 502 or 503, it'll retry the request until it succeeds or the message TTL is reached.