From b69fc9af15475748cbef61435c4ed1681d6a91a6 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Tue, 9 Jan 2024 16:45:08 -0300 Subject: [PATCH] fix(gcp_pubsub_producer): fix connector `resource_opts` schema Fixes https://emqx.atlassian.net/browse/EMQX-11703 --- .../src/emqx_bridge_gcp_pubsub_client.erl | 54 ++++++++--------- ...emqx_bridge_gcp_pubsub_consumer_worker.erl | 28 ++++++--- .../emqx_bridge_gcp_pubsub_impl_consumer.erl | 22 +++---- .../emqx_bridge_gcp_pubsub_impl_producer.erl | 59 +++++++++++++------ ...emqx_bridge_gcp_pubsub_producer_schema.erl | 6 +- .../emqx_bridge_gcp_pubsub_consumer_SUITE.erl | 32 ++++++---- .../emqx_bridge_gcp_pubsub_producer_SUITE.erl | 27 +++++---- changes/ee/breaking-12283.en.md | 2 + 8 files changed, 142 insertions(+), 88 deletions(-) create mode 100644 changes/ee/breaking-12283.en.md diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_client.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_client.erl index 7091bbf8f..17bc10c4b 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_client.erl +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_client.erl @@ -21,7 +21,7 @@ ]). -export([reply_delegator/3]). --export([get_topic/2]). +-export([get_topic/3]). -export([get_jwt_authorization_header/1]). @@ -31,7 +31,7 @@ -type config() :: #{ connect_timeout := emqx_schema:duration_ms(), max_retries := non_neg_integer(), - resource_opts := #{request_ttl := infinity | emqx_schema:duration_ms(), any() => term()}, + resource_opts := #{atom() => term()}, service_account_json := service_account_json(), any() => term() }. @@ -40,8 +40,7 @@ jwt_config := emqx_connector_jwt:jwt_config(), max_retries := non_neg_integer(), pool_name := binary(), - project_id := project_id(), - request_ttl := erlang:timeout() + project_id := project_id() }. -type headers() :: [{binary(), iodata()}]. -type body() :: iodata(). @@ -49,6 +48,7 @@ -type method() :: get | post | put | patch. -type path() :: binary(). -type prepared_request() :: {method(), path(), body()}. +-type request_opts() :: #{request_ttl := emqx_schema:duration_ms() | infinity}. -type topic() :: binary(). -export_type([ @@ -73,8 +73,7 @@ start( #{ connect_timeout := ConnectTimeout, max_retries := MaxRetries, - pool_size := PoolSize, - resource_opts := #{request_ttl := RequestTTL} + pool_size := PoolSize } = Config ) -> {Transport, HostPort} = get_transport(), @@ -106,8 +105,7 @@ start( jwt_config => JWTConfig, max_retries => MaxRetries, pool_name => ResourceId, - project_id => ProjectId, - request_ttl => RequestTTL + project_id => ProjectId }, ?tp( gcp_pubsub_on_start_before_starting_pool, @@ -151,26 +149,26 @@ stop(ResourceId) -> end. -spec query_sync( - {prepared_request, prepared_request()}, + {prepared_request, prepared_request(), request_opts()}, state() ) -> {ok, map()} | {error, {recoverable_error, term()} | term()}. -query_sync({prepared_request, PreparedRequest = {_Method, _Path, _Body}}, State) -> +query_sync({prepared_request, PreparedRequest = {_Method, _Path, _Body}, ReqOpts}, State) -> PoolName = maps:get(pool_name, State), ?TRACE( "QUERY_SYNC", "gcp_pubsub_received", #{requests => PreparedRequest, connector => PoolName, state => State} ), - do_send_requests_sync(State, {prepared_request, PreparedRequest}). + do_send_requests_sync(State, {prepared_request, PreparedRequest, ReqOpts}). -spec query_async( - {prepared_request, prepared_request()}, + {prepared_request, prepared_request(), request_opts()}, {ReplyFun :: function(), Args :: list()}, state() ) -> {ok, pid()} | {error, no_pool_worker_available}. query_async( - {prepared_request, PreparedRequest = {_Method, _Path, _Body}}, + {prepared_request, PreparedRequest = {_Method, _Path, _Body}, ReqOpts}, ReplyFunAndArgs, State ) -> @@ -180,7 +178,7 @@ query_async( "gcp_pubsub_received", #{requests => PreparedRequest, connector => PoolName, state => State} ), - do_send_requests_async(State, {prepared_request, PreparedRequest}, ReplyFunAndArgs). + do_send_requests_async(State, {prepared_request, PreparedRequest, ReqOpts}, ReplyFunAndArgs). -spec get_status(state()) -> connected | disconnected. get_status(#{connect_timeout := Timeout, pool_name := PoolName} = State) -> @@ -199,13 +197,13 @@ get_status(#{connect_timeout := Timeout, pool_name := PoolName} = State) -> %% API %%------------------------------------------------------------------------------------------------- --spec get_topic(topic(), state()) -> {ok, map()} | {error, term()}. -get_topic(Topic, ConnectorState) -> +-spec get_topic(topic(), state(), request_opts()) -> {ok, map()} | {error, term()}. +get_topic(Topic, ConnectorState, ReqOpts) -> #{project_id := ProjectId} = ConnectorState, Method = get, Path = <<"/v1/projects/", ProjectId/binary, "/topics/", Topic/binary>>, Body = <<>>, - PreparedRequest = {prepared_request, {Method, Path, Body}}, + PreparedRequest = {prepared_request, {Method, Path, Body}, ReqOpts}, ?MODULE:query_sync(PreparedRequest, ConnectorState). %%------------------------------------------------------------------------------------------------- @@ -277,19 +275,19 @@ get_jwt_authorization_header(JWTConfig) -> -spec do_send_requests_sync( state(), - {prepared_request, prepared_request()} + {prepared_request, prepared_request(), request_opts()} ) -> {ok, map()} | {error, {recoverable_error, term()} | term()}. -do_send_requests_sync(State, {prepared_request, {Method, Path, Body}}) -> +do_send_requests_sync(State, {prepared_request, {Method, Path, Body}, ReqOpts}) -> #{ pool_name := PoolName, - max_retries := MaxRetries, - request_ttl := RequestTTL + max_retries := MaxRetries } = State, + #{request_ttl := RequestTTL} = ReqOpts, ?tp( gcp_pubsub_bridge_do_send_requests, #{ - request => {prepared_request, {Method, Path, Body}}, + request => {prepared_request, {Method, Path, Body}, ReqOpts}, query_mode => sync, resource_id => PoolName } @@ -306,20 +304,18 @@ do_send_requests_sync(State, {prepared_request, {Method, Path, Body}}) -> -spec do_send_requests_async( state(), - {prepared_request, prepared_request()}, + {prepared_request, prepared_request(), request_opts()}, {ReplyFun :: function(), Args :: list()} ) -> {ok, pid()} | {error, no_pool_worker_available}. do_send_requests_async( - State, {prepared_request, {Method, Path, Body}}, ReplyFunAndArgs + State, {prepared_request, {Method, Path, Body}, ReqOpts}, ReplyFunAndArgs ) -> - #{ - pool_name := PoolName, - request_ttl := RequestTTL - } = State, + #{pool_name := PoolName} = State, + #{request_ttl := RequestTTL} = ReqOpts, ?tp( gcp_pubsub_bridge_do_send_requests, #{ - request => {prepared_request, {Method, Path, Body}}, + request => {prepared_request, {Method, Path, Body}, ReqOpts}, query_mode => async, resource_id => PoolName } diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_consumer_worker.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_consumer_worker.erl index f860e3635..c25e7e1ea 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_consumer_worker.erl +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_consumer_worker.erl @@ -43,6 +43,7 @@ project_id := emqx_bridge_gcp_pubsub_client:project_id(), pull_max_messages := non_neg_integer(), pull_retry_interval := emqx_schema:timeout_duration_ms(), + request_ttl := emqx_schema:duration_ms() | infinity, subscription_id => subscription_id(), topic => emqx_bridge_gcp_pubsub_client:topic() }. @@ -62,6 +63,7 @@ pull_max_messages := non_neg_integer(), pull_retry_interval := emqx_schema:timeout_duration_ms(), pull_timer := undefined | reference(), + request_ttl := emqx_schema:duration_ms() | infinity, %% In order to avoid re-processing the same message twice due to race conditions %% between acknlowledging a message and receiving a duplicate pulled message, we need %% to keep the seen message IDs for a while... @@ -159,6 +161,7 @@ connect(Opts0) -> project_id := ProjectId, pull_max_messages := PullMaxMessages, pull_retry_interval := PullRetryInterval, + request_ttl := RequestTTL, topic_mapping := TopicMapping } = Opts, TopicMappingList = lists:keysort(1, maps:to_list(TopicMapping)), @@ -178,6 +181,7 @@ connect(Opts0) -> project_id => ProjectId, pull_max_messages => PullMaxMessages, pull_retry_interval => PullRetryInterval, + request_ttl => RequestTTL, topic => Topic, subscription_id => subscription_id(BridgeName, Topic) }, @@ -348,13 +352,15 @@ ensure_subscription_exists(State) -> #{ client := Client, instance_id := InstanceId, + request_ttl := RequestTTL, subscription_id := SubscriptionId, topic := Topic } = State, Method = put, Path = path(State, create), Body = body(State, create), - PreparedRequest = {prepared_request, {Method, Path, Body}}, + ReqOpts = #{request_ttl => RequestTTL}, + PreparedRequest = {prepared_request, {Method, Path, Body}, ReqOpts}, Res = emqx_bridge_gcp_pubsub_client:query_sync(PreparedRequest, Client), case Res of {error, #{status_code := 409}} -> @@ -432,12 +438,14 @@ patch_subscription(State) -> client := Client, instance_id := InstanceId, subscription_id := SubscriptionId, + request_ttl := RequestTTL, topic := Topic } = State, Method1 = patch, Path1 = path(State, create), Body1 = body(State, patch_subscription), - PreparedRequest1 = {prepared_request, {Method1, Path1, Body1}}, + ReqOpts = #{request_ttl => RequestTTL}, + PreparedRequest1 = {prepared_request, {Method1, Path1, Body1}, ReqOpts}, Res = emqx_bridge_gcp_pubsub_client:query_sync(PreparedRequest1, Client), case Res of {ok, _} -> @@ -475,12 +483,14 @@ do_pull_async(State0) -> begin #{ client := Client, - instance_id := InstanceId + instance_id := InstanceId, + request_ttl := RequestTTL } = State0, Method = post, Path = path(State0, pull), Body = body(State0, pull), - PreparedRequest = {prepared_request, {Method, Path, Body}}, + ReqOpts = #{request_ttl => RequestTTL}, + PreparedRequest = {prepared_request, {Method, Path, Body}, ReqOpts}, ReplyFunAndArgs = {fun ?MODULE:reply_delegator/4, [self(), pull_async, InstanceId]}, Res = emqx_bridge_gcp_pubsub_client:query_async( PreparedRequest, @@ -559,13 +569,15 @@ do_acknowledge(State0) -> #{ client := Client, forget_interval := ForgetInterval, + request_ttl := RequestTTL, pending_acks := PendingAcks } = State1, AckIds = maps:values(PendingAcks), Method = post, Path = path(State1, ack), Body = body(State1, ack, #{ack_ids => AckIds}), - PreparedRequest = {prepared_request, {Method, Path, Body}}, + ReqOpts = #{request_ttl => RequestTTL}, + PreparedRequest = {prepared_request, {Method, Path, Body}, ReqOpts}, ?tp(gcp_pubsub_consumer_worker_will_acknowledge, #{acks => PendingAcks}), Res = emqx_bridge_gcp_pubsub_client:query_sync(PreparedRequest, Client), case Res of @@ -593,12 +605,14 @@ do_acknowledge(State0) -> -spec do_get_subscription(state()) -> {ok, emqx_utils_json:json_term()} | {error, term()}. do_get_subscription(State) -> #{ - client := Client + client := Client, + request_ttl := RequestTTL } = State, Method = get, Path = path(State, get_subscription), Body = body(State, get_subscription), - PreparedRequest = {prepared_request, {Method, Path, Body}}, + ReqOpts = #{request_ttl => RequestTTL}, + PreparedRequest = {prepared_request, {Method, Path, Body}, ReqOpts}, Res = emqx_bridge_gcp_pubsub_client:query_sync(PreparedRequest, Client), case Res of {error, Reason} -> diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_consumer.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_consumer.erl index 5c726ef9b..bcc5c818e 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_consumer.erl +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_consumer.erl @@ -160,10 +160,12 @@ start_consumers(InstanceId, Client, Config) -> instance_id => InstanceId, pool_size => PoolSize, project_id => ProjectId, - pull_retry_interval => RequestTTL + pull_retry_interval => RequestTTL, + request_ttl => RequestTTL }, ConsumerOpts = maps:to_list(ConsumerConfig), - case validate_pubsub_topics(TopicMapping, Client) of + ReqOpts = #{request_ttl => RequestTTL}, + case validate_pubsub_topics(TopicMapping, Client, ReqOpts) of ok -> ok; {error, not_found} -> @@ -235,23 +237,23 @@ convert_topic_mapping(TopicMappingList) -> TopicMappingList ). -validate_pubsub_topics(TopicMapping, Client) -> +validate_pubsub_topics(TopicMapping, Client, ReqOpts) -> PubSubTopics = maps:keys(TopicMapping), - do_validate_pubsub_topics(Client, PubSubTopics). + do_validate_pubsub_topics(Client, PubSubTopics, ReqOpts). -do_validate_pubsub_topics(Client, [Topic | Rest]) -> - case check_for_topic_existence(Topic, Client) of +do_validate_pubsub_topics(Client, [Topic | Rest], ReqOpts) -> + case check_for_topic_existence(Topic, Client, ReqOpts) of ok -> - do_validate_pubsub_topics(Client, Rest); + do_validate_pubsub_topics(Client, Rest, ReqOpts); {error, _} = Err -> Err end; -do_validate_pubsub_topics(_Client, []) -> +do_validate_pubsub_topics(_Client, [], _ReqOpts) -> %% we already validate that the mapping is not empty in the config schema. ok. -check_for_topic_existence(Topic, Client) -> - Res = emqx_bridge_gcp_pubsub_client:get_topic(Topic, Client), +check_for_topic_existence(Topic, Client, ReqOpts) -> + Res = emqx_bridge_gcp_pubsub_client:get_topic(Topic, Client, ReqOpts), case Res of {ok, _} -> ok; diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl index b75940227..e0d20cd34 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl @@ -32,7 +32,8 @@ attributes_template := #{emqx_placeholder:tmpl_token() => emqx_placeholder:tmpl_token()}, ordering_key_template := emqx_placeholder:tmpl_token(), payload_template := emqx_placeholder:tmpl_token(), - pubsub_topic := binary() + pubsub_topic := binary(), + request_ttl := infinity | emqx_schema:duration_ms() }. -type headers() :: emqx_bridge_gcp_pubsub_client:headers(). -type body() :: emqx_bridge_gcp_pubsub_client:body(). @@ -102,14 +103,18 @@ on_get_status(_InstanceId, #{client := Client} = _State) -> {ok, map()} | {error, {recoverable_error, term()}} | {error, term()}. -on_query(ResourceId, {MessageTag, Selected}, State) -> +on_query(ResourceId, {MessageTag, Selected}, ConnectorState) -> Requests = [{MessageTag, Selected}], ?TRACE( "QUERY_SYNC", "gcp_pubsub_received", - #{requests => Requests, connector => ResourceId, state => State} + #{ + requests => Requests, + connector => ResourceId, + state => emqx_utils:redact(ConnectorState) + } ), - do_send_requests_sync(State, Requests, ResourceId). + do_send_requests_sync(ConnectorState, Requests, ResourceId). -spec on_query_async( connector_resource_id(), @@ -117,15 +122,19 @@ on_query(ResourceId, {MessageTag, Selected}, State) -> {ReplyFun :: function(), Args :: list()}, connector_state() ) -> {ok, pid()} | {error, no_pool_worker_available}. -on_query_async(ResourceId, {MessageTag, Selected}, ReplyFunAndArgs, State) -> +on_query_async(ResourceId, {MessageTag, Selected}, ReplyFunAndArgs, ConnectorState) -> Requests = [{MessageTag, Selected}], ?TRACE( "QUERY_ASYNC", "gcp_pubsub_received", - #{requests => Requests, connector => ResourceId, state => State} + #{ + requests => Requests, + connector => ResourceId, + state => emqx_utils:redact(ConnectorState) + } ), ?tp(gcp_pubsub_producer_async, #{instance_id => ResourceId, requests => Requests}), - do_send_requests_async(State, Requests, ReplyFunAndArgs). + do_send_requests_async(ConnectorState, Requests, ReplyFunAndArgs). -spec on_batch_query( connector_resource_id(), @@ -135,13 +144,17 @@ on_query_async(ResourceId, {MessageTag, Selected}, ReplyFunAndArgs, State) -> {ok, map()} | {error, {recoverable_error, term()}} | {error, term()}. -on_batch_query(ResourceId, Requests, State) -> +on_batch_query(ResourceId, Requests, ConnectorState) -> ?TRACE( "QUERY_SYNC", "gcp_pubsub_received", - #{requests => Requests, connector => ResourceId, state => State} + #{ + requests => Requests, + connector => ResourceId, + state => emqx_utils:redact(ConnectorState) + } ), - do_send_requests_sync(State, Requests, ResourceId). + do_send_requests_sync(ConnectorState, Requests, ResourceId). -spec on_batch_query_async( connector_resource_id(), @@ -149,14 +162,18 @@ on_batch_query(ResourceId, Requests, State) -> {ReplyFun :: function(), Args :: list()}, connector_state() ) -> {ok, pid()} | {error, no_pool_worker_available}. -on_batch_query_async(ResourceId, Requests, ReplyFunAndArgs, State) -> +on_batch_query_async(ResourceId, Requests, ReplyFunAndArgs, ConnectorState) -> ?TRACE( "QUERY_ASYNC", "gcp_pubsub_received", - #{requests => Requests, connector => ResourceId, state => State} + #{ + requests => Requests, + connector => ResourceId, + state => emqx_utils:redact(ConnectorState) + } ), ?tp(gcp_pubsub_producer_async, #{instance_id => ResourceId, requests => Requests}), - do_send_requests_async(State, Requests, ReplyFunAndArgs). + do_send_requests_async(ConnectorState, Requests, ReplyFunAndArgs). -spec on_add_channel( connector_resource_id(), @@ -207,13 +224,17 @@ install_channel(ActionConfig) -> ordering_key_template := OrderingKeyTemplate, payload_template := PayloadTemplate, pubsub_topic := PubSubTopic + }, + resource_opts := #{ + request_ttl := RequestTTL } } = ActionConfig, #{ attributes_template => preproc_attributes(AttributesTemplate), ordering_key_template => emqx_placeholder:preproc_tmpl(OrderingKeyTemplate), payload_template => emqx_placeholder:preproc_tmpl(PayloadTemplate), - pubsub_topic => PubSubTopic + pubsub_topic => PubSubTopic, + request_ttl => RequestTTL }. -spec do_send_requests_sync( @@ -231,7 +252,7 @@ do_send_requests_sync(ConnectorState, Requests, InstanceId) -> %% is it safe to assume the tag is the same??? And not empty??? [{MessageTag, _} | _] = Requests, #{installed_actions := InstalledActions} = ConnectorState, - ChannelState = maps:get(MessageTag, InstalledActions), + ChannelState = #{request_ttl := RequestTTL} = maps:get(MessageTag, InstalledActions), Payloads = lists:map( fun({_MessageTag, Selected}) -> @@ -242,7 +263,8 @@ do_send_requests_sync(ConnectorState, Requests, InstanceId) -> Body = to_pubsub_request(Payloads), Path = publish_path(ConnectorState, ChannelState), Method = post, - Request = {prepared_request, {Method, Path, Body}}, + ReqOpts = #{request_ttl => RequestTTL}, + Request = {prepared_request, {Method, Path, Body}, ReqOpts}, Result = emqx_bridge_gcp_pubsub_client:query_sync(Request, Client), QueryMode = sync, handle_result(Result, Request, QueryMode, InstanceId). @@ -257,7 +279,7 @@ do_send_requests_async(ConnectorState, Requests, ReplyFunAndArgs0) -> %% is it safe to assume the tag is the same??? And not empty??? [{MessageTag, _} | _] = Requests, #{installed_actions := InstalledActions} = ConnectorState, - ChannelState = maps:get(MessageTag, InstalledActions), + ChannelState = #{request_ttl := RequestTTL} = maps:get(MessageTag, InstalledActions), Payloads = lists:map( fun({_MessageTag, Selected}) -> @@ -268,7 +290,8 @@ do_send_requests_async(ConnectorState, Requests, ReplyFunAndArgs0) -> Body = to_pubsub_request(Payloads), Path = publish_path(ConnectorState, ChannelState), Method = post, - Request = {prepared_request, {Method, Path, Body}}, + ReqOpts = #{request_ttl => RequestTTL}, + Request = {prepared_request, {Method, Path, Body}, ReqOpts}, ReplyFunAndArgs = {fun ?MODULE:reply_delegator/2, [ReplyFunAndArgs0]}, emqx_bridge_gcp_pubsub_client:query_async( Request, ReplyFunAndArgs, Client diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_producer_schema.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_producer_schema.erl index a88715409..eec7bff5b 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_producer_schema.erl +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_producer_schema.erl @@ -80,6 +80,8 @@ fields("config_connector") -> %% FIXME emqx_connector_schema:common_fields() ++ connector_config_fields(); +fields(connector_resource_opts) -> + emqx_connector_schema:resource_opts_fields(); %%========================================= %% HTTP API fields: action %%========================================= @@ -101,7 +103,7 @@ fields(Field) when connector_config_fields() -> emqx_bridge_gcp_pubsub:fields(connector_config) ++ - emqx_resource_schema:fields("resource_opts"). + emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts). desc("config_connector") -> ?DESC("config_connector"); @@ -109,6 +111,8 @@ desc(action_parameters) -> ?DESC(action_parameters); desc(producer_action) -> ?DESC(producer_action); +desc(connector_resource_opts) -> + ?DESC(emqx_resource_schema, "resource_opts"); desc(_Name) -> undefined. diff --git a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl index a35a3eecc..4b179b050 100644 --- a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl +++ b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl @@ -14,6 +14,12 @@ -define(BRIDGE_TYPE, gcp_pubsub_consumer). -define(BRIDGE_TYPE_BIN, <<"gcp_pubsub_consumer">>). -define(REPUBLISH_TOPIC, <<"republish/t">>). +-define(PREPARED_REQUEST(METHOD, PATH, BODY), + {prepared_request, {METHOD, PATH, BODY}, #{request_ttl => 1_000}} +). +-define(PREPARED_REQUEST_PAT(METHOD, PATH, BODY), + {prepared_request, {METHOD, PATH, BODY}, _} +). -import(emqx_common_test_helpers, [on_exit/1]). @@ -40,11 +46,13 @@ init_per_suite(Config) -> emqx_conf, emqx_bridge_gcp_pubsub, emqx_bridge, - emqx_rule_engine + emqx_rule_engine, + emqx_management, + {emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"} ], #{work_dir => emqx_cth_suite:work_dir(Config)} ), - emqx_mgmt_api_test_util:init_suite(), + {ok, _Api} = emqx_common_test_http:create_default_app(), HostPort = GCPEmulatorHost ++ ":" ++ GCPEmulatorPortStr, true = os:putenv("PUBSUB_EMULATOR_HOST", HostPort), Client = start_control_client(), @@ -71,7 +79,6 @@ end_per_suite(Config) -> Apps = ?config(apps, Config), Client = ?config(client, Config), stop_control_client(Client), - emqx_mgmt_api_test_util:end_suite(), emqx_cth_suite:stop(Apps), os:unsetenv("PUBSUB_EMULATOR_HOST"), ok. @@ -266,7 +273,7 @@ ensure_topic(Config, Topic) -> Path = <<"/v1/projects/", ProjectId/binary, "/topics/", Topic/binary>>, Body = <<"{}">>, Res = emqx_bridge_gcp_pubsub_client:query_sync( - {prepared_request, {Method, Path, Body}}, + ?PREPARED_REQUEST(Method, Path, Body), Client ), case Res of @@ -285,7 +292,6 @@ start_control_client() -> connect_timeout => 5_000, max_retries => 0, pool_size => 1, - resource_opts => #{request_ttl => 1_000}, service_account_json => RawServiceAccount }, PoolName = <<"control_connector">>, @@ -317,7 +323,7 @@ pubsub_publish(Config, Topic, Messages0) -> ), Body = emqx_utils_json:encode(#{<<"messages">> => Messages}), {ok, _} = emqx_bridge_gcp_pubsub_client:query_sync( - {prepared_request, {Method, Path, Body}}, + ?PREPARED_REQUEST(Method, Path, Body), Client ), ok. @@ -329,7 +335,7 @@ delete_topic(Config, Topic) -> Path = <<"/v1/projects/", ProjectId/binary, "/topics/", Topic/binary>>, Body = <<>>, {ok, _} = emqx_bridge_gcp_pubsub_client:query_sync( - {prepared_request, {Method, Path, Body}}, + ?PREPARED_REQUEST(Method, Path, Body), Client ), ok. @@ -341,7 +347,7 @@ delete_subscription(Config, SubscriptionId) -> Path = <<"/v1/projects/", ProjectId/binary, "/subscriptions/", SubscriptionId/binary>>, Body = <<>>, {ok, _} = emqx_bridge_gcp_pubsub_client:query_sync( - {prepared_request, {Method, Path, Body}}, + ?PREPARED_REQUEST(Method, Path, Body), Client ), ok. @@ -1994,7 +2000,7 @@ t_connection_down_during_ack_redeliver(Config) -> emqx_common_test_helpers:with_mock( emqx_bridge_gcp_pubsub_client, query_sync, - fun(PreparedRequest = {prepared_request, {_Method, Path, _Body}}, Client) -> + fun(PreparedRequest = ?PREPARED_REQUEST_PAT(_Method, Path, _Body), Client) -> case re:run(Path, <<":acknowledge$">>) of {match, _} -> ct:sleep(800), @@ -2162,7 +2168,7 @@ t_permission_denied_topic_check(Config) -> emqx_common_test_helpers:with_mock( emqx_bridge_gcp_pubsub_client, query_sync, - fun(PreparedRequest = {prepared_request, {Method, Path, _Body}}, Client) -> + fun(PreparedRequest = ?PREPARED_REQUEST_PAT(Method, Path, _Body), Client) -> RE = iolist_to_binary(["/topics/", PubSubTopic, "$"]), case {Method =:= get, re:run(Path, RE)} of {true, {match, _}} -> @@ -2201,7 +2207,7 @@ t_permission_denied_worker(Config) -> emqx_common_test_helpers:with_mock( emqx_bridge_gcp_pubsub_client, query_sync, - fun(PreparedRequest = {prepared_request, {Method, _Path, _Body}}, Client) -> + fun(PreparedRequest = ?PREPARED_REQUEST_PAT(Method, _Path, _Body), Client) -> case Method =:= put of true -> permission_denied_response(); @@ -2237,7 +2243,7 @@ t_unauthenticated_topic_check(Config) -> emqx_common_test_helpers:with_mock( emqx_bridge_gcp_pubsub_client, query_sync, - fun(PreparedRequest = {prepared_request, {Method, Path, _Body}}, Client) -> + fun(PreparedRequest = ?PREPARED_REQUEST_PAT(Method, Path, _Body), Client) -> RE = iolist_to_binary(["/topics/", PubSubTopic, "$"]), case {Method =:= get, re:run(Path, RE)} of {true, {match, _}} -> @@ -2276,7 +2282,7 @@ t_unauthenticated_worker(Config) -> emqx_common_test_helpers:with_mock( emqx_bridge_gcp_pubsub_client, query_sync, - fun(PreparedRequest = {prepared_request, {Method, _Path, _Body}}, Client) -> + fun(PreparedRequest = ?PREPARED_REQUEST_PAT(Method, _Path, _Body), Client) -> case Method =:= put of true -> unauthenticated_response(); diff --git a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_producer_SUITE.erl b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_producer_SUITE.erl index 2976fefb2..bb9ad7cda 100644 --- a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_producer_SUITE.erl +++ b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_producer_SUITE.erl @@ -76,18 +76,25 @@ only_sync_tests() -> [t_query_sync]. init_per_suite(Config) -> - ok = emqx_common_test_helpers:start_apps([emqx_conf]), - ok = emqx_connector_test_helpers:start_apps([emqx_resource, emqx_bridge, emqx_rule_engine]), - {ok, _} = application:ensure_all_started(emqx_connector), - emqx_mgmt_api_test_util:init_suite(), + Apps = emqx_cth_suite:start( + [ + emqx, + emqx_conf, + emqx_bridge_gcp_pubsub, + emqx_bridge, + emqx_rule_engine, + emqx_management, + {emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"} + ], + #{work_dir => emqx_cth_suite:work_dir(Config)} + ), + {ok, _Api} = emqx_common_test_http:create_default_app(), persistent_term:put({emqx_bridge_gcp_pubsub_client, transport}, tls), - Config. + [{apps, Apps} | Config]. -end_per_suite(_Config) -> - emqx_mgmt_api_test_util:end_suite(), - ok = emqx_common_test_helpers:stop_apps([emqx_conf]), - ok = emqx_connector_test_helpers:stop_apps([emqx_bridge, emqx_resource, emqx_rule_engine]), - _ = application:stop(emqx_connector), +end_per_suite(Config) -> + Apps = ?config(apps, Config), + emqx_cth_suite:stop(Apps), persistent_term:erase({emqx_bridge_gcp_pubsub_client, transport}), ok. diff --git a/changes/ee/breaking-12283.en.md b/changes/ee/breaking-12283.en.md new file mode 100644 index 000000000..39d92fb03 --- /dev/null +++ b/changes/ee/breaking-12283.en.md @@ -0,0 +1,2 @@ +Fixed the `resource_opts` configuration schema for the GCP PubSub Producer connector so that it contains only relevant fields. +This affects the creation of GCP PubSub Producer connectors via HOCON configuration (`connectors.gcp_pubsub_producer.*.resource_opts`) and the HTTP APIs `POST /connectors` / `PUT /connectors/:id` for this particular connector type.