Merge pull request #12283 from thalesmg/fix-gcp-pubsub-produ-resopts-m-20240109

fix(gcp_pubsub_producer): fix connector `resource_opts` schema
This commit is contained in:
Thales Macedo Garitezi 2024-01-10 11:08:39 -03:00 committed by GitHub
commit b395924569
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 142 additions and 88 deletions

View File

@ -21,7 +21,7 @@
]).
-export([reply_delegator/3]).
-export([get_topic/2]).
-export([get_topic/3]).
-export([get_jwt_authorization_header/1]).
@ -31,7 +31,7 @@
-type config() :: #{
connect_timeout := emqx_schema:duration_ms(),
max_retries := non_neg_integer(),
resource_opts := #{request_ttl := infinity | emqx_schema:duration_ms(), any() => term()},
resource_opts := #{atom() => term()},
service_account_json := service_account_json(),
any() => term()
}.
@ -40,8 +40,7 @@
jwt_config := emqx_connector_jwt:jwt_config(),
max_retries := non_neg_integer(),
pool_name := binary(),
project_id := project_id(),
request_ttl := erlang:timeout()
project_id := project_id()
}.
-type headers() :: [{binary(), iodata()}].
-type body() :: iodata().
@ -49,6 +48,7 @@
-type method() :: get | post | put | patch.
-type path() :: binary().
-type prepared_request() :: {method(), path(), body()}.
-type request_opts() :: #{request_ttl := emqx_schema:duration_ms() | infinity}.
-type topic() :: binary().
-export_type([
@ -73,8 +73,7 @@ start(
#{
connect_timeout := ConnectTimeout,
max_retries := MaxRetries,
pool_size := PoolSize,
resource_opts := #{request_ttl := RequestTTL}
pool_size := PoolSize
} = Config
) ->
{Transport, HostPort} = get_transport(),
@ -106,8 +105,7 @@ start(
jwt_config => JWTConfig,
max_retries => MaxRetries,
pool_name => ResourceId,
project_id => ProjectId,
request_ttl => RequestTTL
project_id => ProjectId
},
?tp(
gcp_pubsub_on_start_before_starting_pool,
@ -151,26 +149,26 @@ stop(ResourceId) ->
end.
-spec query_sync(
{prepared_request, prepared_request()},
{prepared_request, prepared_request(), request_opts()},
state()
) ->
{ok, map()} | {error, {recoverable_error, term()} | term()}.
query_sync({prepared_request, PreparedRequest = {_Method, _Path, _Body}}, State) ->
query_sync({prepared_request, PreparedRequest = {_Method, _Path, _Body}, ReqOpts}, State) ->
PoolName = maps:get(pool_name, State),
?TRACE(
"QUERY_SYNC",
"gcp_pubsub_received",
#{requests => PreparedRequest, connector => PoolName, state => State}
),
do_send_requests_sync(State, {prepared_request, PreparedRequest}).
do_send_requests_sync(State, {prepared_request, PreparedRequest, ReqOpts}).
-spec query_async(
{prepared_request, prepared_request()},
{prepared_request, prepared_request(), request_opts()},
{ReplyFun :: function(), Args :: list()},
state()
) -> {ok, pid()} | {error, no_pool_worker_available}.
query_async(
{prepared_request, PreparedRequest = {_Method, _Path, _Body}},
{prepared_request, PreparedRequest = {_Method, _Path, _Body}, ReqOpts},
ReplyFunAndArgs,
State
) ->
@ -180,7 +178,7 @@ query_async(
"gcp_pubsub_received",
#{requests => PreparedRequest, connector => PoolName, state => State}
),
do_send_requests_async(State, {prepared_request, PreparedRequest}, ReplyFunAndArgs).
do_send_requests_async(State, {prepared_request, PreparedRequest, ReqOpts}, ReplyFunAndArgs).
-spec get_status(state()) -> connected | disconnected.
get_status(#{connect_timeout := Timeout, pool_name := PoolName} = State) ->
@ -199,13 +197,13 @@ get_status(#{connect_timeout := Timeout, pool_name := PoolName} = State) ->
%% API
%%-------------------------------------------------------------------------------------------------
-spec get_topic(topic(), state()) -> {ok, map()} | {error, term()}.
get_topic(Topic, ConnectorState) ->
-spec get_topic(topic(), state(), request_opts()) -> {ok, map()} | {error, term()}.
get_topic(Topic, ConnectorState, ReqOpts) ->
#{project_id := ProjectId} = ConnectorState,
Method = get,
Path = <<"/v1/projects/", ProjectId/binary, "/topics/", Topic/binary>>,
Body = <<>>,
PreparedRequest = {prepared_request, {Method, Path, Body}},
PreparedRequest = {prepared_request, {Method, Path, Body}, ReqOpts},
?MODULE:query_sync(PreparedRequest, ConnectorState).
%%-------------------------------------------------------------------------------------------------
@ -277,19 +275,19 @@ get_jwt_authorization_header(JWTConfig) ->
-spec do_send_requests_sync(
state(),
{prepared_request, prepared_request()}
{prepared_request, prepared_request(), request_opts()}
) ->
{ok, map()} | {error, {recoverable_error, term()} | term()}.
do_send_requests_sync(State, {prepared_request, {Method, Path, Body}}) ->
do_send_requests_sync(State, {prepared_request, {Method, Path, Body}, ReqOpts}) ->
#{
pool_name := PoolName,
max_retries := MaxRetries,
request_ttl := RequestTTL
max_retries := MaxRetries
} = State,
#{request_ttl := RequestTTL} = ReqOpts,
?tp(
gcp_pubsub_bridge_do_send_requests,
#{
request => {prepared_request, {Method, Path, Body}},
request => {prepared_request, {Method, Path, Body}, ReqOpts},
query_mode => sync,
resource_id => PoolName
}
@ -306,20 +304,18 @@ do_send_requests_sync(State, {prepared_request, {Method, Path, Body}}) ->
-spec do_send_requests_async(
state(),
{prepared_request, prepared_request()},
{prepared_request, prepared_request(), request_opts()},
{ReplyFun :: function(), Args :: list()}
) -> {ok, pid()} | {error, no_pool_worker_available}.
do_send_requests_async(
State, {prepared_request, {Method, Path, Body}}, ReplyFunAndArgs
State, {prepared_request, {Method, Path, Body}, ReqOpts}, ReplyFunAndArgs
) ->
#{
pool_name := PoolName,
request_ttl := RequestTTL
} = State,
#{pool_name := PoolName} = State,
#{request_ttl := RequestTTL} = ReqOpts,
?tp(
gcp_pubsub_bridge_do_send_requests,
#{
request => {prepared_request, {Method, Path, Body}},
request => {prepared_request, {Method, Path, Body}, ReqOpts},
query_mode => async,
resource_id => PoolName
}

View File

@ -43,6 +43,7 @@
project_id := emqx_bridge_gcp_pubsub_client:project_id(),
pull_max_messages := non_neg_integer(),
pull_retry_interval := emqx_schema:timeout_duration_ms(),
request_ttl := emqx_schema:duration_ms() | infinity,
subscription_id => subscription_id(),
topic => emqx_bridge_gcp_pubsub_client:topic()
}.
@ -62,6 +63,7 @@
pull_max_messages := non_neg_integer(),
pull_retry_interval := emqx_schema:timeout_duration_ms(),
pull_timer := undefined | reference(),
request_ttl := emqx_schema:duration_ms() | infinity,
%% In order to avoid re-processing the same message twice due to race conditions
%% between acknlowledging a message and receiving a duplicate pulled message, we need
%% to keep the seen message IDs for a while...
@ -159,6 +161,7 @@ connect(Opts0) ->
project_id := ProjectId,
pull_max_messages := PullMaxMessages,
pull_retry_interval := PullRetryInterval,
request_ttl := RequestTTL,
topic_mapping := TopicMapping
} = Opts,
TopicMappingList = lists:keysort(1, maps:to_list(TopicMapping)),
@ -178,6 +181,7 @@ connect(Opts0) ->
project_id => ProjectId,
pull_max_messages => PullMaxMessages,
pull_retry_interval => PullRetryInterval,
request_ttl => RequestTTL,
topic => Topic,
subscription_id => subscription_id(BridgeName, Topic)
},
@ -348,13 +352,15 @@ ensure_subscription_exists(State) ->
#{
client := Client,
instance_id := InstanceId,
request_ttl := RequestTTL,
subscription_id := SubscriptionId,
topic := Topic
} = State,
Method = put,
Path = path(State, create),
Body = body(State, create),
PreparedRequest = {prepared_request, {Method, Path, Body}},
ReqOpts = #{request_ttl => RequestTTL},
PreparedRequest = {prepared_request, {Method, Path, Body}, ReqOpts},
Res = emqx_bridge_gcp_pubsub_client:query_sync(PreparedRequest, Client),
case Res of
{error, #{status_code := 409}} ->
@ -432,12 +438,14 @@ patch_subscription(State) ->
client := Client,
instance_id := InstanceId,
subscription_id := SubscriptionId,
request_ttl := RequestTTL,
topic := Topic
} = State,
Method1 = patch,
Path1 = path(State, create),
Body1 = body(State, patch_subscription),
PreparedRequest1 = {prepared_request, {Method1, Path1, Body1}},
ReqOpts = #{request_ttl => RequestTTL},
PreparedRequest1 = {prepared_request, {Method1, Path1, Body1}, ReqOpts},
Res = emqx_bridge_gcp_pubsub_client:query_sync(PreparedRequest1, Client),
case Res of
{ok, _} ->
@ -475,12 +483,14 @@ do_pull_async(State0) ->
begin
#{
client := Client,
instance_id := InstanceId
instance_id := InstanceId,
request_ttl := RequestTTL
} = State0,
Method = post,
Path = path(State0, pull),
Body = body(State0, pull),
PreparedRequest = {prepared_request, {Method, Path, Body}},
ReqOpts = #{request_ttl => RequestTTL},
PreparedRequest = {prepared_request, {Method, Path, Body}, ReqOpts},
ReplyFunAndArgs = {fun ?MODULE:reply_delegator/4, [self(), pull_async, InstanceId]},
Res = emqx_bridge_gcp_pubsub_client:query_async(
PreparedRequest,
@ -559,13 +569,15 @@ do_acknowledge(State0) ->
#{
client := Client,
forget_interval := ForgetInterval,
request_ttl := RequestTTL,
pending_acks := PendingAcks
} = State1,
AckIds = maps:values(PendingAcks),
Method = post,
Path = path(State1, ack),
Body = body(State1, ack, #{ack_ids => AckIds}),
PreparedRequest = {prepared_request, {Method, Path, Body}},
ReqOpts = #{request_ttl => RequestTTL},
PreparedRequest = {prepared_request, {Method, Path, Body}, ReqOpts},
?tp(gcp_pubsub_consumer_worker_will_acknowledge, #{acks => PendingAcks}),
Res = emqx_bridge_gcp_pubsub_client:query_sync(PreparedRequest, Client),
case Res of
@ -593,12 +605,14 @@ do_acknowledge(State0) ->
-spec do_get_subscription(state()) -> {ok, emqx_utils_json:json_term()} | {error, term()}.
do_get_subscription(State) ->
#{
client := Client
client := Client,
request_ttl := RequestTTL
} = State,
Method = get,
Path = path(State, get_subscription),
Body = body(State, get_subscription),
PreparedRequest = {prepared_request, {Method, Path, Body}},
ReqOpts = #{request_ttl => RequestTTL},
PreparedRequest = {prepared_request, {Method, Path, Body}, ReqOpts},
Res = emqx_bridge_gcp_pubsub_client:query_sync(PreparedRequest, Client),
case Res of
{error, Reason} ->

View File

@ -160,10 +160,12 @@ start_consumers(InstanceId, Client, Config) ->
instance_id => InstanceId,
pool_size => PoolSize,
project_id => ProjectId,
pull_retry_interval => RequestTTL
pull_retry_interval => RequestTTL,
request_ttl => RequestTTL
},
ConsumerOpts = maps:to_list(ConsumerConfig),
case validate_pubsub_topics(TopicMapping, Client) of
ReqOpts = #{request_ttl => RequestTTL},
case validate_pubsub_topics(TopicMapping, Client, ReqOpts) of
ok ->
ok;
{error, not_found} ->
@ -235,23 +237,23 @@ convert_topic_mapping(TopicMappingList) ->
TopicMappingList
).
validate_pubsub_topics(TopicMapping, Client) ->
validate_pubsub_topics(TopicMapping, Client, ReqOpts) ->
PubSubTopics = maps:keys(TopicMapping),
do_validate_pubsub_topics(Client, PubSubTopics).
do_validate_pubsub_topics(Client, PubSubTopics, ReqOpts).
do_validate_pubsub_topics(Client, [Topic | Rest]) ->
case check_for_topic_existence(Topic, Client) of
do_validate_pubsub_topics(Client, [Topic | Rest], ReqOpts) ->
case check_for_topic_existence(Topic, Client, ReqOpts) of
ok ->
do_validate_pubsub_topics(Client, Rest);
do_validate_pubsub_topics(Client, Rest, ReqOpts);
{error, _} = Err ->
Err
end;
do_validate_pubsub_topics(_Client, []) ->
do_validate_pubsub_topics(_Client, [], _ReqOpts) ->
%% we already validate that the mapping is not empty in the config schema.
ok.
check_for_topic_existence(Topic, Client) ->
Res = emqx_bridge_gcp_pubsub_client:get_topic(Topic, Client),
check_for_topic_existence(Topic, Client, ReqOpts) ->
Res = emqx_bridge_gcp_pubsub_client:get_topic(Topic, Client, ReqOpts),
case Res of
{ok, _} ->
ok;

View File

@ -32,7 +32,8 @@
attributes_template := #{emqx_placeholder:tmpl_token() => emqx_placeholder:tmpl_token()},
ordering_key_template := emqx_placeholder:tmpl_token(),
payload_template := emqx_placeholder:tmpl_token(),
pubsub_topic := binary()
pubsub_topic := binary(),
request_ttl := infinity | emqx_schema:duration_ms()
}.
-type headers() :: emqx_bridge_gcp_pubsub_client:headers().
-type body() :: emqx_bridge_gcp_pubsub_client:body().
@ -102,14 +103,18 @@ on_get_status(_InstanceId, #{client := Client} = _State) ->
{ok, map()}
| {error, {recoverable_error, term()}}
| {error, term()}.
on_query(ResourceId, {MessageTag, Selected}, State) ->
on_query(ResourceId, {MessageTag, Selected}, ConnectorState) ->
Requests = [{MessageTag, Selected}],
?TRACE(
"QUERY_SYNC",
"gcp_pubsub_received",
#{requests => Requests, connector => ResourceId, state => State}
#{
requests => Requests,
connector => ResourceId,
state => emqx_utils:redact(ConnectorState)
}
),
do_send_requests_sync(State, Requests, ResourceId).
do_send_requests_sync(ConnectorState, Requests, ResourceId).
-spec on_query_async(
connector_resource_id(),
@ -117,15 +122,19 @@ on_query(ResourceId, {MessageTag, Selected}, State) ->
{ReplyFun :: function(), Args :: list()},
connector_state()
) -> {ok, pid()} | {error, no_pool_worker_available}.
on_query_async(ResourceId, {MessageTag, Selected}, ReplyFunAndArgs, State) ->
on_query_async(ResourceId, {MessageTag, Selected}, ReplyFunAndArgs, ConnectorState) ->
Requests = [{MessageTag, Selected}],
?TRACE(
"QUERY_ASYNC",
"gcp_pubsub_received",
#{requests => Requests, connector => ResourceId, state => State}
#{
requests => Requests,
connector => ResourceId,
state => emqx_utils:redact(ConnectorState)
}
),
?tp(gcp_pubsub_producer_async, #{instance_id => ResourceId, requests => Requests}),
do_send_requests_async(State, Requests, ReplyFunAndArgs).
do_send_requests_async(ConnectorState, Requests, ReplyFunAndArgs).
-spec on_batch_query(
connector_resource_id(),
@ -135,13 +144,17 @@ on_query_async(ResourceId, {MessageTag, Selected}, ReplyFunAndArgs, State) ->
{ok, map()}
| {error, {recoverable_error, term()}}
| {error, term()}.
on_batch_query(ResourceId, Requests, State) ->
on_batch_query(ResourceId, Requests, ConnectorState) ->
?TRACE(
"QUERY_SYNC",
"gcp_pubsub_received",
#{requests => Requests, connector => ResourceId, state => State}
#{
requests => Requests,
connector => ResourceId,
state => emqx_utils:redact(ConnectorState)
}
),
do_send_requests_sync(State, Requests, ResourceId).
do_send_requests_sync(ConnectorState, Requests, ResourceId).
-spec on_batch_query_async(
connector_resource_id(),
@ -149,14 +162,18 @@ on_batch_query(ResourceId, Requests, State) ->
{ReplyFun :: function(), Args :: list()},
connector_state()
) -> {ok, pid()} | {error, no_pool_worker_available}.
on_batch_query_async(ResourceId, Requests, ReplyFunAndArgs, State) ->
on_batch_query_async(ResourceId, Requests, ReplyFunAndArgs, ConnectorState) ->
?TRACE(
"QUERY_ASYNC",
"gcp_pubsub_received",
#{requests => Requests, connector => ResourceId, state => State}
#{
requests => Requests,
connector => ResourceId,
state => emqx_utils:redact(ConnectorState)
}
),
?tp(gcp_pubsub_producer_async, #{instance_id => ResourceId, requests => Requests}),
do_send_requests_async(State, Requests, ReplyFunAndArgs).
do_send_requests_async(ConnectorState, Requests, ReplyFunAndArgs).
-spec on_add_channel(
connector_resource_id(),
@ -207,13 +224,17 @@ install_channel(ActionConfig) ->
ordering_key_template := OrderingKeyTemplate,
payload_template := PayloadTemplate,
pubsub_topic := PubSubTopic
},
resource_opts := #{
request_ttl := RequestTTL
}
} = ActionConfig,
#{
attributes_template => preproc_attributes(AttributesTemplate),
ordering_key_template => emqx_placeholder:preproc_tmpl(OrderingKeyTemplate),
payload_template => emqx_placeholder:preproc_tmpl(PayloadTemplate),
pubsub_topic => PubSubTopic
pubsub_topic => PubSubTopic,
request_ttl => RequestTTL
}.
-spec do_send_requests_sync(
@ -231,7 +252,7 @@ do_send_requests_sync(ConnectorState, Requests, InstanceId) ->
%% is it safe to assume the tag is the same??? And not empty???
[{MessageTag, _} | _] = Requests,
#{installed_actions := InstalledActions} = ConnectorState,
ChannelState = maps:get(MessageTag, InstalledActions),
ChannelState = #{request_ttl := RequestTTL} = maps:get(MessageTag, InstalledActions),
Payloads =
lists:map(
fun({_MessageTag, Selected}) ->
@ -242,7 +263,8 @@ do_send_requests_sync(ConnectorState, Requests, InstanceId) ->
Body = to_pubsub_request(Payloads),
Path = publish_path(ConnectorState, ChannelState),
Method = post,
Request = {prepared_request, {Method, Path, Body}},
ReqOpts = #{request_ttl => RequestTTL},
Request = {prepared_request, {Method, Path, Body}, ReqOpts},
Result = emqx_bridge_gcp_pubsub_client:query_sync(Request, Client),
QueryMode = sync,
handle_result(Result, Request, QueryMode, InstanceId).
@ -257,7 +279,7 @@ do_send_requests_async(ConnectorState, Requests, ReplyFunAndArgs0) ->
%% is it safe to assume the tag is the same??? And not empty???
[{MessageTag, _} | _] = Requests,
#{installed_actions := InstalledActions} = ConnectorState,
ChannelState = maps:get(MessageTag, InstalledActions),
ChannelState = #{request_ttl := RequestTTL} = maps:get(MessageTag, InstalledActions),
Payloads =
lists:map(
fun({_MessageTag, Selected}) ->
@ -268,7 +290,8 @@ do_send_requests_async(ConnectorState, Requests, ReplyFunAndArgs0) ->
Body = to_pubsub_request(Payloads),
Path = publish_path(ConnectorState, ChannelState),
Method = post,
Request = {prepared_request, {Method, Path, Body}},
ReqOpts = #{request_ttl => RequestTTL},
Request = {prepared_request, {Method, Path, Body}, ReqOpts},
ReplyFunAndArgs = {fun ?MODULE:reply_delegator/2, [ReplyFunAndArgs0]},
emqx_bridge_gcp_pubsub_client:query_async(
Request, ReplyFunAndArgs, Client

View File

@ -80,6 +80,8 @@ fields("config_connector") ->
%% FIXME
emqx_connector_schema:common_fields() ++
connector_config_fields();
fields(connector_resource_opts) ->
emqx_connector_schema:resource_opts_fields();
%%=========================================
%% HTTP API fields: action
%%=========================================
@ -101,7 +103,7 @@ fields(Field) when
connector_config_fields() ->
emqx_bridge_gcp_pubsub:fields(connector_config) ++
emqx_resource_schema:fields("resource_opts").
emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts).
desc("config_connector") ->
?DESC("config_connector");
@ -109,6 +111,8 @@ desc(action_parameters) ->
?DESC(action_parameters);
desc(producer_action) ->
?DESC(producer_action);
desc(connector_resource_opts) ->
?DESC(emqx_resource_schema, "resource_opts");
desc(_Name) ->
undefined.

View File

@ -14,6 +14,12 @@
-define(BRIDGE_TYPE, gcp_pubsub_consumer).
-define(BRIDGE_TYPE_BIN, <<"gcp_pubsub_consumer">>).
-define(REPUBLISH_TOPIC, <<"republish/t">>).
-define(PREPARED_REQUEST(METHOD, PATH, BODY),
{prepared_request, {METHOD, PATH, BODY}, #{request_ttl => 1_000}}
).
-define(PREPARED_REQUEST_PAT(METHOD, PATH, BODY),
{prepared_request, {METHOD, PATH, BODY}, _}
).
-import(emqx_common_test_helpers, [on_exit/1]).
@ -40,11 +46,13 @@ init_per_suite(Config) ->
emqx_conf,
emqx_bridge_gcp_pubsub,
emqx_bridge,
emqx_rule_engine
emqx_rule_engine,
emqx_management,
{emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}
],
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
emqx_mgmt_api_test_util:init_suite(),
{ok, _Api} = emqx_common_test_http:create_default_app(),
HostPort = GCPEmulatorHost ++ ":" ++ GCPEmulatorPortStr,
true = os:putenv("PUBSUB_EMULATOR_HOST", HostPort),
Client = start_control_client(),
@ -71,7 +79,6 @@ end_per_suite(Config) ->
Apps = ?config(apps, Config),
Client = ?config(client, Config),
stop_control_client(Client),
emqx_mgmt_api_test_util:end_suite(),
emqx_cth_suite:stop(Apps),
os:unsetenv("PUBSUB_EMULATOR_HOST"),
ok.
@ -266,7 +273,7 @@ ensure_topic(Config, Topic) ->
Path = <<"/v1/projects/", ProjectId/binary, "/topics/", Topic/binary>>,
Body = <<"{}">>,
Res = emqx_bridge_gcp_pubsub_client:query_sync(
{prepared_request, {Method, Path, Body}},
?PREPARED_REQUEST(Method, Path, Body),
Client
),
case Res of
@ -285,7 +292,6 @@ start_control_client() ->
connect_timeout => 5_000,
max_retries => 0,
pool_size => 1,
resource_opts => #{request_ttl => 1_000},
service_account_json => RawServiceAccount
},
PoolName = <<"control_connector">>,
@ -317,7 +323,7 @@ pubsub_publish(Config, Topic, Messages0) ->
),
Body = emqx_utils_json:encode(#{<<"messages">> => Messages}),
{ok, _} = emqx_bridge_gcp_pubsub_client:query_sync(
{prepared_request, {Method, Path, Body}},
?PREPARED_REQUEST(Method, Path, Body),
Client
),
ok.
@ -329,7 +335,7 @@ delete_topic(Config, Topic) ->
Path = <<"/v1/projects/", ProjectId/binary, "/topics/", Topic/binary>>,
Body = <<>>,
{ok, _} = emqx_bridge_gcp_pubsub_client:query_sync(
{prepared_request, {Method, Path, Body}},
?PREPARED_REQUEST(Method, Path, Body),
Client
),
ok.
@ -341,7 +347,7 @@ delete_subscription(Config, SubscriptionId) ->
Path = <<"/v1/projects/", ProjectId/binary, "/subscriptions/", SubscriptionId/binary>>,
Body = <<>>,
{ok, _} = emqx_bridge_gcp_pubsub_client:query_sync(
{prepared_request, {Method, Path, Body}},
?PREPARED_REQUEST(Method, Path, Body),
Client
),
ok.
@ -1994,7 +2000,7 @@ t_connection_down_during_ack_redeliver(Config) ->
emqx_common_test_helpers:with_mock(
emqx_bridge_gcp_pubsub_client,
query_sync,
fun(PreparedRequest = {prepared_request, {_Method, Path, _Body}}, Client) ->
fun(PreparedRequest = ?PREPARED_REQUEST_PAT(_Method, Path, _Body), Client) ->
case re:run(Path, <<":acknowledge$">>) of
{match, _} ->
ct:sleep(800),
@ -2162,7 +2168,7 @@ t_permission_denied_topic_check(Config) ->
emqx_common_test_helpers:with_mock(
emqx_bridge_gcp_pubsub_client,
query_sync,
fun(PreparedRequest = {prepared_request, {Method, Path, _Body}}, Client) ->
fun(PreparedRequest = ?PREPARED_REQUEST_PAT(Method, Path, _Body), Client) ->
RE = iolist_to_binary(["/topics/", PubSubTopic, "$"]),
case {Method =:= get, re:run(Path, RE)} of
{true, {match, _}} ->
@ -2201,7 +2207,7 @@ t_permission_denied_worker(Config) ->
emqx_common_test_helpers:with_mock(
emqx_bridge_gcp_pubsub_client,
query_sync,
fun(PreparedRequest = {prepared_request, {Method, _Path, _Body}}, Client) ->
fun(PreparedRequest = ?PREPARED_REQUEST_PAT(Method, _Path, _Body), Client) ->
case Method =:= put of
true ->
permission_denied_response();
@ -2237,7 +2243,7 @@ t_unauthenticated_topic_check(Config) ->
emqx_common_test_helpers:with_mock(
emqx_bridge_gcp_pubsub_client,
query_sync,
fun(PreparedRequest = {prepared_request, {Method, Path, _Body}}, Client) ->
fun(PreparedRequest = ?PREPARED_REQUEST_PAT(Method, Path, _Body), Client) ->
RE = iolist_to_binary(["/topics/", PubSubTopic, "$"]),
case {Method =:= get, re:run(Path, RE)} of
{true, {match, _}} ->
@ -2276,7 +2282,7 @@ t_unauthenticated_worker(Config) ->
emqx_common_test_helpers:with_mock(
emqx_bridge_gcp_pubsub_client,
query_sync,
fun(PreparedRequest = {prepared_request, {Method, _Path, _Body}}, Client) ->
fun(PreparedRequest = ?PREPARED_REQUEST_PAT(Method, _Path, _Body), Client) ->
case Method =:= put of
true ->
unauthenticated_response();

View File

@ -76,18 +76,25 @@ only_sync_tests() ->
[t_query_sync].
init_per_suite(Config) ->
ok = emqx_common_test_helpers:start_apps([emqx_conf]),
ok = emqx_connector_test_helpers:start_apps([emqx_resource, emqx_bridge, emqx_rule_engine]),
{ok, _} = application:ensure_all_started(emqx_connector),
emqx_mgmt_api_test_util:init_suite(),
Apps = emqx_cth_suite:start(
[
emqx,
emqx_conf,
emqx_bridge_gcp_pubsub,
emqx_bridge,
emqx_rule_engine,
emqx_management,
{emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}
],
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
{ok, _Api} = emqx_common_test_http:create_default_app(),
persistent_term:put({emqx_bridge_gcp_pubsub_client, transport}, tls),
Config.
[{apps, Apps} | Config].
end_per_suite(_Config) ->
emqx_mgmt_api_test_util:end_suite(),
ok = emqx_common_test_helpers:stop_apps([emqx_conf]),
ok = emqx_connector_test_helpers:stop_apps([emqx_bridge, emqx_resource, emqx_rule_engine]),
_ = application:stop(emqx_connector),
end_per_suite(Config) ->
Apps = ?config(apps, Config),
emqx_cth_suite:stop(Apps),
persistent_term:erase({emqx_bridge_gcp_pubsub_client, transport}),
ok.

View File

@ -0,0 +1,2 @@
Fixed the `resource_opts` configuration schema for the GCP PubSub Producer connector so that it contains only relevant fields.
This affects the creation of GCP PubSub Producer connectors via HOCON configuration (`connectors.gcp_pubsub_producer.*.resource_opts`) and the HTTP APIs `POST /connectors` / `PUT /connectors/:id` for this particular connector type.