diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_connector.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_connector.erl index 4595358d7..f651bc95d 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_connector.erl +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_connector.erl @@ -286,98 +286,14 @@ do_send_requests_sync(State, {prepared_request, {Method, Path, Body}}, ResourceI ), Headers = get_jwt_authorization_header(JWTConfig), Request = {Path, Headers, Body}, - case - ehttpc:request( - PoolName, - Method, - Request, - RequestTTL, - MaxRetries - ) - of - {error, Reason} when - Reason =:= econnrefused; - %% this comes directly from `gun'... - Reason =:= {closed, "The connection was lost."}; - Reason =:= timeout - -> - ?tp( - warning, - gcp_pubsub_request_failed, - #{ - reason => Reason, - query_mode => sync, - recoverable_error => true, - connector => ResourceId - } - ), - {error, {recoverable_error, Reason}}; - {error, Reason} = Result -> - ?tp( - error, - gcp_pubsub_request_failed, - #{ - reason => Reason, - query_mode => sync, - recoverable_error => false, - connector => ResourceId - } - ), - Result; - {ok, StatusCode, _} = Result when StatusCode >= 200 andalso StatusCode < 300 -> - ?tp( - gcp_pubsub_response, - #{ - response => Result, - query_mode => sync, - connector => ResourceId - } - ), - Result; - {ok, StatusCode, _, _} = Result when StatusCode >= 200 andalso StatusCode < 300 -> - ?tp( - gcp_pubsub_response, - #{ - response => Result, - query_mode => sync, - connector => ResourceId - } - ), - Result; - {ok, StatusCode, RespHeaders} = _Result -> - ?tp( - gcp_pubsub_response, - #{ - response => _Result, - query_mode => sync, - connector => ResourceId - } - ), - ?SLOG(error, #{ - msg => "gcp_pubsub_error_response", - request => emqx_connector_http:redact_request(Request), - connector => ResourceId, - status_code => StatusCode - }), - {error, #{status_code => StatusCode, headers => RespHeaders}}; - {ok, StatusCode, RespHeaders, RespBody} = _Result -> - ?tp( - gcp_pubsub_response, - #{ - response => _Result, - query_mode => sync, - connector => ResourceId - } - ), - ?SLOG(error, #{ - msg => "gcp_pubsub_error_response", - request => emqx_connector_http:redact_request(Request), - connector => ResourceId, - status_code => StatusCode, - resp_body => RespBody - }), - {error, #{status_code => StatusCode, headers => RespHeaders, body => RespBody}} - end. + Response = ehttpc:request( + PoolName, + Method, + Request, + RequestTTL, + MaxRetries + ), + handle_response(Response, ResourceId, _QueryMode = sync). -spec do_send_requests_async( state(), @@ -412,41 +328,70 @@ do_send_requests_async( ), {ok, Worker}. +handle_response(Result, ResourceId, QueryMode) -> + case Result of + {error, Reason} -> + ?tp( + gcp_pubsub_request_failed, + #{ + reason => Reason, + query_mode => QueryMode, + connector => ResourceId + } + ), + {error, Reason}; + {ok, StatusCode, RespHeaders} when StatusCode >= 200 andalso StatusCode < 300 -> + ?tp( + gcp_pubsub_response, + #{ + response => Result, + query_mode => QueryMode, + connector => ResourceId + } + ), + {ok, #{status_code => StatusCode, headers => RespHeaders}}; + {ok, StatusCode, RespHeaders, RespBody} when + StatusCode >= 200 andalso StatusCode < 300 + -> + ?tp( + gcp_pubsub_response, + #{ + response => Result, + query_mode => QueryMode, + connector => ResourceId + } + ), + {ok, #{status_code => StatusCode, headers => RespHeaders, body => RespBody}}; + {ok, StatusCode, RespHeaders} = _Result -> + ?tp( + gcp_pubsub_response, + #{ + response => _Result, + query_mode => QueryMode, + connector => ResourceId + } + ), + {error, #{status_code => StatusCode, headers => RespHeaders}}; + {ok, StatusCode, RespHeaders, RespBody} = _Result -> + ?tp( + gcp_pubsub_response, + #{ + response => _Result, + query_mode => QueryMode, + connector => ResourceId + } + ), + {error, #{status_code => StatusCode, headers => RespHeaders, body => RespBody}} + end. + -spec reply_delegator( resource_id(), {ReplyFun :: function(), Args :: list()}, term() | {error, econnrefused | timeout | term()} ) -> ok. -reply_delegator(_ResourceId, ReplyFunAndArgs, Result) -> - case Result of - {error, Reason} when - Reason =:= econnrefused; - %% this comes directly from `gun'... - Reason =:= {closed, "The connection was lost."}; - Reason =:= timeout - -> - ?tp( - gcp_pubsub_request_failed, - #{ - reason => Reason, - query_mode => async, - recoverable_error => true, - connector => _ResourceId - } - ), - Result1 = {error, {recoverable_error, Reason}}, - emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result1); - _ -> - ?tp( - gcp_pubsub_response, - #{ - response => Result, - query_mode => async, - connector => _ResourceId - } - ), - emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result) - end. +reply_delegator(ResourceId, ReplyFunAndArgs, Response) -> + Result = handle_response(Response, ResourceId, _QueryMode = async), + emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result). -spec do_get_status(resource_id(), timer:time()) -> boolean(). do_get_status(ResourceId, Timeout) -> diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl index 840a96ac1..6504ffb59 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl @@ -6,6 +6,7 @@ -include_lib("emqx/include/logger.hrl"). -include_lib("emqx_resource/include/emqx_resource.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -type config() :: #{ connect_timeout := emqx_schema:duration_ms(), @@ -37,6 +38,8 @@ on_get_status/2 ]). +-export([reply_delegator/2]). + %%------------------------------------------------------------------------------------------------- %% `emqx_resource' API %%------------------------------------------------------------------------------------------------- @@ -163,7 +166,9 @@ do_send_requests_sync(State, Requests, InstanceId) -> Path = publish_path(State), Method = post, Request = {prepared_request, {Method, Path, Body}}, - emqx_bridge_gcp_pubsub_connector:on_query(InstanceId, Request, ConnectorState). + Result = emqx_bridge_gcp_pubsub_connector:on_query(InstanceId, Request, ConnectorState), + QueryMode = sync, + handle_result(Result, Request, QueryMode, InstanceId). -spec do_send_requests_async( state(), @@ -171,7 +176,7 @@ do_send_requests_sync(State, Requests, InstanceId) -> {ReplyFun :: function(), Args :: list()}, resource_id() ) -> {ok, pid()}. -do_send_requests_async(State, Requests, ReplyFunAndArgs, InstanceId) -> +do_send_requests_async(State, Requests, ReplyFunAndArgs0, InstanceId) -> #{connector_state := ConnectorState} = State, Payloads = lists:map( @@ -184,6 +189,7 @@ do_send_requests_async(State, Requests, ReplyFunAndArgs, InstanceId) -> Path = publish_path(State), Method = post, Request = {prepared_request, {Method, Path, Body}}, + ReplyFunAndArgs = {fun ?MODULE:reply_delegator/2, [ReplyFunAndArgs0]}, emqx_bridge_gcp_pubsub_connector:on_query_async( InstanceId, Request, ReplyFunAndArgs, ConnectorState ). @@ -209,3 +215,71 @@ publish_path( } ) -> <<"/v1/projects/", ProjectId/binary, "/topics/", PubSubTopic/binary, ":publish">>. + +handle_result({error, Reason}, _Request, QueryMode, ResourceId) when + Reason =:= econnrefused; + %% this comes directly from `gun'... + Reason =:= {closed, "The connection was lost."}; + Reason =:= timeout +-> + ?tp( + warning, + gcp_pubsub_request_failed, + #{ + reason => Reason, + query_mode => QueryMode, + recoverable_error => true, + connector => ResourceId + } + ), + {error, {recoverable_error, Reason}}; +handle_result( + {error, #{status_code := StatusCode, body := RespBody}} = Result, + Request, + _QueryMode, + ResourceId +) -> + ?SLOG(error, #{ + msg => "gcp_pubsub_error_response", + request => emqx_connector_http:redact_request(Request), + connector => ResourceId, + status_code => StatusCode, + resp_body => RespBody + }), + Result; +handle_result({error, #{status_code := StatusCode}} = Result, Request, _QueryMode, ResourceId) -> + ?SLOG(error, #{ + msg => "gcp_pubsub_error_response", + request => emqx_connector_http:redact_request(Request), + connector => ResourceId, + status_code => StatusCode + }), + Result; +handle_result({error, Reason} = Result, _Request, QueryMode, ResourceId) -> + ?tp( + error, + gcp_pubsub_request_failed, + #{ + reason => Reason, + query_mode => QueryMode, + recoverable_error => false, + connector => ResourceId + } + ), + Result; +handle_result({ok, _} = Result, _Request, _QueryMode, _ResourceId) -> + Result. + +reply_delegator(ReplyFunAndArgs, Response) -> + case Response of + {error, Reason} when + Reason =:= econnrefused; + %% this comes directly from `gun'... + Reason =:= {closed, "The connection was lost."}; + Reason =:= timeout + -> + Result1 = {error, {recoverable_error, Reason}}, + emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result1); + _ -> + emqx_resource:apply_reply_fun(ReplyFunAndArgs, Response) + end. diff --git a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_SUITE.erl b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_producer_SUITE.erl similarity index 99% rename from apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_SUITE.erl rename to apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_producer_SUITE.erl index 29849db41..2b9584432 100644 --- a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_SUITE.erl +++ b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_producer_SUITE.erl @@ -2,7 +2,7 @@ %% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- --module(emqx_bridge_gcp_pubsub_SUITE). +-module(emqx_bridge_gcp_pubsub_producer_SUITE). -compile(nowarn_export_all). -compile(export_all). @@ -1071,7 +1071,7 @@ do_econnrefused_or_timeout_test(Config, Error) -> #{ ?snk_kind := gcp_pubsub_request_failed, query_mode := async, - recoverable_error := true + reason := econnrefused }, 15_000 ); @@ -1307,13 +1307,13 @@ t_unrecoverable_error(Config) -> {_, {ok, _}} = ?wait_async_action( emqx:publish(Message), - #{?snk_kind := gcp_pubsub_response}, + #{?snk_kind := gcp_pubsub_request_failed}, 5_000 ), fun(Trace) -> ?assertMatch( - [#{response := {error, killed}}], - ?of_kind(gcp_pubsub_response, Trace) + [#{reason := killed}], + ?of_kind(gcp_pubsub_request_failed, Trace) ), ok end