fix(gcp_pubsub_producer): fix connector `resource_opts` schema

Fixes https://emqx.atlassian.net/browse/EMQX-11703
This commit is contained in:
Thales Macedo Garitezi 2024-01-09 16:45:08 -03:00
parent cc00dd80ee
commit b69fc9af15
8 changed files with 142 additions and 88 deletions

View File

@ -21,7 +21,7 @@
]). ]).
-export([reply_delegator/3]). -export([reply_delegator/3]).
-export([get_topic/2]). -export([get_topic/3]).
-export([get_jwt_authorization_header/1]). -export([get_jwt_authorization_header/1]).
@ -31,7 +31,7 @@
-type config() :: #{ -type config() :: #{
connect_timeout := emqx_schema:duration_ms(), connect_timeout := emqx_schema:duration_ms(),
max_retries := non_neg_integer(), max_retries := non_neg_integer(),
resource_opts := #{request_ttl := infinity | emqx_schema:duration_ms(), any() => term()}, resource_opts := #{atom() => term()},
service_account_json := service_account_json(), service_account_json := service_account_json(),
any() => term() any() => term()
}. }.
@ -40,8 +40,7 @@
jwt_config := emqx_connector_jwt:jwt_config(), jwt_config := emqx_connector_jwt:jwt_config(),
max_retries := non_neg_integer(), max_retries := non_neg_integer(),
pool_name := binary(), pool_name := binary(),
project_id := project_id(), project_id := project_id()
request_ttl := erlang:timeout()
}. }.
-type headers() :: [{binary(), iodata()}]. -type headers() :: [{binary(), iodata()}].
-type body() :: iodata(). -type body() :: iodata().
@ -49,6 +48,7 @@
-type method() :: get | post | put | patch. -type method() :: get | post | put | patch.
-type path() :: binary(). -type path() :: binary().
-type prepared_request() :: {method(), path(), body()}. -type prepared_request() :: {method(), path(), body()}.
-type request_opts() :: #{request_ttl := emqx_schema:duration_ms() | infinity}.
-type topic() :: binary(). -type topic() :: binary().
-export_type([ -export_type([
@ -73,8 +73,7 @@ start(
#{ #{
connect_timeout := ConnectTimeout, connect_timeout := ConnectTimeout,
max_retries := MaxRetries, max_retries := MaxRetries,
pool_size := PoolSize, pool_size := PoolSize
resource_opts := #{request_ttl := RequestTTL}
} = Config } = Config
) -> ) ->
{Transport, HostPort} = get_transport(), {Transport, HostPort} = get_transport(),
@ -106,8 +105,7 @@ start(
jwt_config => JWTConfig, jwt_config => JWTConfig,
max_retries => MaxRetries, max_retries => MaxRetries,
pool_name => ResourceId, pool_name => ResourceId,
project_id => ProjectId, project_id => ProjectId
request_ttl => RequestTTL
}, },
?tp( ?tp(
gcp_pubsub_on_start_before_starting_pool, gcp_pubsub_on_start_before_starting_pool,
@ -151,26 +149,26 @@ stop(ResourceId) ->
end. end.
-spec query_sync( -spec query_sync(
{prepared_request, prepared_request()}, {prepared_request, prepared_request(), request_opts()},
state() state()
) -> ) ->
{ok, map()} | {error, {recoverable_error, term()} | term()}. {ok, map()} | {error, {recoverable_error, term()} | term()}.
query_sync({prepared_request, PreparedRequest = {_Method, _Path, _Body}}, State) -> query_sync({prepared_request, PreparedRequest = {_Method, _Path, _Body}, ReqOpts}, State) ->
PoolName = maps:get(pool_name, State), PoolName = maps:get(pool_name, State),
?TRACE( ?TRACE(
"QUERY_SYNC", "QUERY_SYNC",
"gcp_pubsub_received", "gcp_pubsub_received",
#{requests => PreparedRequest, connector => PoolName, state => State} #{requests => PreparedRequest, connector => PoolName, state => State}
), ),
do_send_requests_sync(State, {prepared_request, PreparedRequest}). do_send_requests_sync(State, {prepared_request, PreparedRequest, ReqOpts}).
-spec query_async( -spec query_async(
{prepared_request, prepared_request()}, {prepared_request, prepared_request(), request_opts()},
{ReplyFun :: function(), Args :: list()}, {ReplyFun :: function(), Args :: list()},
state() state()
) -> {ok, pid()} | {error, no_pool_worker_available}. ) -> {ok, pid()} | {error, no_pool_worker_available}.
query_async( query_async(
{prepared_request, PreparedRequest = {_Method, _Path, _Body}}, {prepared_request, PreparedRequest = {_Method, _Path, _Body}, ReqOpts},
ReplyFunAndArgs, ReplyFunAndArgs,
State State
) -> ) ->
@ -180,7 +178,7 @@ query_async(
"gcp_pubsub_received", "gcp_pubsub_received",
#{requests => PreparedRequest, connector => PoolName, state => State} #{requests => PreparedRequest, connector => PoolName, state => State}
), ),
do_send_requests_async(State, {prepared_request, PreparedRequest}, ReplyFunAndArgs). do_send_requests_async(State, {prepared_request, PreparedRequest, ReqOpts}, ReplyFunAndArgs).
-spec get_status(state()) -> connected | disconnected. -spec get_status(state()) -> connected | disconnected.
get_status(#{connect_timeout := Timeout, pool_name := PoolName} = State) -> get_status(#{connect_timeout := Timeout, pool_name := PoolName} = State) ->
@ -199,13 +197,13 @@ get_status(#{connect_timeout := Timeout, pool_name := PoolName} = State) ->
%% API %% API
%%------------------------------------------------------------------------------------------------- %%-------------------------------------------------------------------------------------------------
-spec get_topic(topic(), state()) -> {ok, map()} | {error, term()}. -spec get_topic(topic(), state(), request_opts()) -> {ok, map()} | {error, term()}.
get_topic(Topic, ConnectorState) -> get_topic(Topic, ConnectorState, ReqOpts) ->
#{project_id := ProjectId} = ConnectorState, #{project_id := ProjectId} = ConnectorState,
Method = get, Method = get,
Path = <<"/v1/projects/", ProjectId/binary, "/topics/", Topic/binary>>, Path = <<"/v1/projects/", ProjectId/binary, "/topics/", Topic/binary>>,
Body = <<>>, Body = <<>>,
PreparedRequest = {prepared_request, {Method, Path, Body}}, PreparedRequest = {prepared_request, {Method, Path, Body}, ReqOpts},
?MODULE:query_sync(PreparedRequest, ConnectorState). ?MODULE:query_sync(PreparedRequest, ConnectorState).
%%------------------------------------------------------------------------------------------------- %%-------------------------------------------------------------------------------------------------
@ -277,19 +275,19 @@ get_jwt_authorization_header(JWTConfig) ->
-spec do_send_requests_sync( -spec do_send_requests_sync(
state(), state(),
{prepared_request, prepared_request()} {prepared_request, prepared_request(), request_opts()}
) -> ) ->
{ok, map()} | {error, {recoverable_error, term()} | term()}. {ok, map()} | {error, {recoverable_error, term()} | term()}.
do_send_requests_sync(State, {prepared_request, {Method, Path, Body}}) -> do_send_requests_sync(State, {prepared_request, {Method, Path, Body}, ReqOpts}) ->
#{ #{
pool_name := PoolName, pool_name := PoolName,
max_retries := MaxRetries, max_retries := MaxRetries
request_ttl := RequestTTL
} = State, } = State,
#{request_ttl := RequestTTL} = ReqOpts,
?tp( ?tp(
gcp_pubsub_bridge_do_send_requests, gcp_pubsub_bridge_do_send_requests,
#{ #{
request => {prepared_request, {Method, Path, Body}}, request => {prepared_request, {Method, Path, Body}, ReqOpts},
query_mode => sync, query_mode => sync,
resource_id => PoolName resource_id => PoolName
} }
@ -306,20 +304,18 @@ do_send_requests_sync(State, {prepared_request, {Method, Path, Body}}) ->
-spec do_send_requests_async( -spec do_send_requests_async(
state(), state(),
{prepared_request, prepared_request()}, {prepared_request, prepared_request(), request_opts()},
{ReplyFun :: function(), Args :: list()} {ReplyFun :: function(), Args :: list()}
) -> {ok, pid()} | {error, no_pool_worker_available}. ) -> {ok, pid()} | {error, no_pool_worker_available}.
do_send_requests_async( do_send_requests_async(
State, {prepared_request, {Method, Path, Body}}, ReplyFunAndArgs State, {prepared_request, {Method, Path, Body}, ReqOpts}, ReplyFunAndArgs
) -> ) ->
#{ #{pool_name := PoolName} = State,
pool_name := PoolName, #{request_ttl := RequestTTL} = ReqOpts,
request_ttl := RequestTTL
} = State,
?tp( ?tp(
gcp_pubsub_bridge_do_send_requests, gcp_pubsub_bridge_do_send_requests,
#{ #{
request => {prepared_request, {Method, Path, Body}}, request => {prepared_request, {Method, Path, Body}, ReqOpts},
query_mode => async, query_mode => async,
resource_id => PoolName resource_id => PoolName
} }

View File

@ -43,6 +43,7 @@
project_id := emqx_bridge_gcp_pubsub_client:project_id(), project_id := emqx_bridge_gcp_pubsub_client:project_id(),
pull_max_messages := non_neg_integer(), pull_max_messages := non_neg_integer(),
pull_retry_interval := emqx_schema:timeout_duration_ms(), pull_retry_interval := emqx_schema:timeout_duration_ms(),
request_ttl := emqx_schema:duration_ms() | infinity,
subscription_id => subscription_id(), subscription_id => subscription_id(),
topic => emqx_bridge_gcp_pubsub_client:topic() topic => emqx_bridge_gcp_pubsub_client:topic()
}. }.
@ -62,6 +63,7 @@
pull_max_messages := non_neg_integer(), pull_max_messages := non_neg_integer(),
pull_retry_interval := emqx_schema:timeout_duration_ms(), pull_retry_interval := emqx_schema:timeout_duration_ms(),
pull_timer := undefined | reference(), pull_timer := undefined | reference(),
request_ttl := emqx_schema:duration_ms() | infinity,
%% In order to avoid re-processing the same message twice due to race conditions %% In order to avoid re-processing the same message twice due to race conditions
%% between acknlowledging a message and receiving a duplicate pulled message, we need %% between acknlowledging a message and receiving a duplicate pulled message, we need
%% to keep the seen message IDs for a while... %% to keep the seen message IDs for a while...
@ -159,6 +161,7 @@ connect(Opts0) ->
project_id := ProjectId, project_id := ProjectId,
pull_max_messages := PullMaxMessages, pull_max_messages := PullMaxMessages,
pull_retry_interval := PullRetryInterval, pull_retry_interval := PullRetryInterval,
request_ttl := RequestTTL,
topic_mapping := TopicMapping topic_mapping := TopicMapping
} = Opts, } = Opts,
TopicMappingList = lists:keysort(1, maps:to_list(TopicMapping)), TopicMappingList = lists:keysort(1, maps:to_list(TopicMapping)),
@ -178,6 +181,7 @@ connect(Opts0) ->
project_id => ProjectId, project_id => ProjectId,
pull_max_messages => PullMaxMessages, pull_max_messages => PullMaxMessages,
pull_retry_interval => PullRetryInterval, pull_retry_interval => PullRetryInterval,
request_ttl => RequestTTL,
topic => Topic, topic => Topic,
subscription_id => subscription_id(BridgeName, Topic) subscription_id => subscription_id(BridgeName, Topic)
}, },
@ -348,13 +352,15 @@ ensure_subscription_exists(State) ->
#{ #{
client := Client, client := Client,
instance_id := InstanceId, instance_id := InstanceId,
request_ttl := RequestTTL,
subscription_id := SubscriptionId, subscription_id := SubscriptionId,
topic := Topic topic := Topic
} = State, } = State,
Method = put, Method = put,
Path = path(State, create), Path = path(State, create),
Body = body(State, create), Body = body(State, create),
PreparedRequest = {prepared_request, {Method, Path, Body}}, ReqOpts = #{request_ttl => RequestTTL},
PreparedRequest = {prepared_request, {Method, Path, Body}, ReqOpts},
Res = emqx_bridge_gcp_pubsub_client:query_sync(PreparedRequest, Client), Res = emqx_bridge_gcp_pubsub_client:query_sync(PreparedRequest, Client),
case Res of case Res of
{error, #{status_code := 409}} -> {error, #{status_code := 409}} ->
@ -432,12 +438,14 @@ patch_subscription(State) ->
client := Client, client := Client,
instance_id := InstanceId, instance_id := InstanceId,
subscription_id := SubscriptionId, subscription_id := SubscriptionId,
request_ttl := RequestTTL,
topic := Topic topic := Topic
} = State, } = State,
Method1 = patch, Method1 = patch,
Path1 = path(State, create), Path1 = path(State, create),
Body1 = body(State, patch_subscription), Body1 = body(State, patch_subscription),
PreparedRequest1 = {prepared_request, {Method1, Path1, Body1}}, ReqOpts = #{request_ttl => RequestTTL},
PreparedRequest1 = {prepared_request, {Method1, Path1, Body1}, ReqOpts},
Res = emqx_bridge_gcp_pubsub_client:query_sync(PreparedRequest1, Client), Res = emqx_bridge_gcp_pubsub_client:query_sync(PreparedRequest1, Client),
case Res of case Res of
{ok, _} -> {ok, _} ->
@ -475,12 +483,14 @@ do_pull_async(State0) ->
begin begin
#{ #{
client := Client, client := Client,
instance_id := InstanceId instance_id := InstanceId,
request_ttl := RequestTTL
} = State0, } = State0,
Method = post, Method = post,
Path = path(State0, pull), Path = path(State0, pull),
Body = body(State0, pull), Body = body(State0, pull),
PreparedRequest = {prepared_request, {Method, Path, Body}}, ReqOpts = #{request_ttl => RequestTTL},
PreparedRequest = {prepared_request, {Method, Path, Body}, ReqOpts},
ReplyFunAndArgs = {fun ?MODULE:reply_delegator/4, [self(), pull_async, InstanceId]}, ReplyFunAndArgs = {fun ?MODULE:reply_delegator/4, [self(), pull_async, InstanceId]},
Res = emqx_bridge_gcp_pubsub_client:query_async( Res = emqx_bridge_gcp_pubsub_client:query_async(
PreparedRequest, PreparedRequest,
@ -559,13 +569,15 @@ do_acknowledge(State0) ->
#{ #{
client := Client, client := Client,
forget_interval := ForgetInterval, forget_interval := ForgetInterval,
request_ttl := RequestTTL,
pending_acks := PendingAcks pending_acks := PendingAcks
} = State1, } = State1,
AckIds = maps:values(PendingAcks), AckIds = maps:values(PendingAcks),
Method = post, Method = post,
Path = path(State1, ack), Path = path(State1, ack),
Body = body(State1, ack, #{ack_ids => AckIds}), Body = body(State1, ack, #{ack_ids => AckIds}),
PreparedRequest = {prepared_request, {Method, Path, Body}}, ReqOpts = #{request_ttl => RequestTTL},
PreparedRequest = {prepared_request, {Method, Path, Body}, ReqOpts},
?tp(gcp_pubsub_consumer_worker_will_acknowledge, #{acks => PendingAcks}), ?tp(gcp_pubsub_consumer_worker_will_acknowledge, #{acks => PendingAcks}),
Res = emqx_bridge_gcp_pubsub_client:query_sync(PreparedRequest, Client), Res = emqx_bridge_gcp_pubsub_client:query_sync(PreparedRequest, Client),
case Res of case Res of
@ -593,12 +605,14 @@ do_acknowledge(State0) ->
-spec do_get_subscription(state()) -> {ok, emqx_utils_json:json_term()} | {error, term()}. -spec do_get_subscription(state()) -> {ok, emqx_utils_json:json_term()} | {error, term()}.
do_get_subscription(State) -> do_get_subscription(State) ->
#{ #{
client := Client client := Client,
request_ttl := RequestTTL
} = State, } = State,
Method = get, Method = get,
Path = path(State, get_subscription), Path = path(State, get_subscription),
Body = body(State, get_subscription), Body = body(State, get_subscription),
PreparedRequest = {prepared_request, {Method, Path, Body}}, ReqOpts = #{request_ttl => RequestTTL},
PreparedRequest = {prepared_request, {Method, Path, Body}, ReqOpts},
Res = emqx_bridge_gcp_pubsub_client:query_sync(PreparedRequest, Client), Res = emqx_bridge_gcp_pubsub_client:query_sync(PreparedRequest, Client),
case Res of case Res of
{error, Reason} -> {error, Reason} ->

View File

@ -160,10 +160,12 @@ start_consumers(InstanceId, Client, Config) ->
instance_id => InstanceId, instance_id => InstanceId,
pool_size => PoolSize, pool_size => PoolSize,
project_id => ProjectId, project_id => ProjectId,
pull_retry_interval => RequestTTL pull_retry_interval => RequestTTL,
request_ttl => RequestTTL
}, },
ConsumerOpts = maps:to_list(ConsumerConfig), ConsumerOpts = maps:to_list(ConsumerConfig),
case validate_pubsub_topics(TopicMapping, Client) of ReqOpts = #{request_ttl => RequestTTL},
case validate_pubsub_topics(TopicMapping, Client, ReqOpts) of
ok -> ok ->
ok; ok;
{error, not_found} -> {error, not_found} ->
@ -235,23 +237,23 @@ convert_topic_mapping(TopicMappingList) ->
TopicMappingList TopicMappingList
). ).
validate_pubsub_topics(TopicMapping, Client) -> validate_pubsub_topics(TopicMapping, Client, ReqOpts) ->
PubSubTopics = maps:keys(TopicMapping), PubSubTopics = maps:keys(TopicMapping),
do_validate_pubsub_topics(Client, PubSubTopics). do_validate_pubsub_topics(Client, PubSubTopics, ReqOpts).
do_validate_pubsub_topics(Client, [Topic | Rest]) -> do_validate_pubsub_topics(Client, [Topic | Rest], ReqOpts) ->
case check_for_topic_existence(Topic, Client) of case check_for_topic_existence(Topic, Client, ReqOpts) of
ok -> ok ->
do_validate_pubsub_topics(Client, Rest); do_validate_pubsub_topics(Client, Rest, ReqOpts);
{error, _} = Err -> {error, _} = Err ->
Err Err
end; end;
do_validate_pubsub_topics(_Client, []) -> do_validate_pubsub_topics(_Client, [], _ReqOpts) ->
%% we already validate that the mapping is not empty in the config schema. %% we already validate that the mapping is not empty in the config schema.
ok. ok.
check_for_topic_existence(Topic, Client) -> check_for_topic_existence(Topic, Client, ReqOpts) ->
Res = emqx_bridge_gcp_pubsub_client:get_topic(Topic, Client), Res = emqx_bridge_gcp_pubsub_client:get_topic(Topic, Client, ReqOpts),
case Res of case Res of
{ok, _} -> {ok, _} ->
ok; ok;

View File

@ -32,7 +32,8 @@
attributes_template := #{emqx_placeholder:tmpl_token() => emqx_placeholder:tmpl_token()}, attributes_template := #{emqx_placeholder:tmpl_token() => emqx_placeholder:tmpl_token()},
ordering_key_template := emqx_placeholder:tmpl_token(), ordering_key_template := emqx_placeholder:tmpl_token(),
payload_template := emqx_placeholder:tmpl_token(), payload_template := emqx_placeholder:tmpl_token(),
pubsub_topic := binary() pubsub_topic := binary(),
request_ttl := infinity | emqx_schema:duration_ms()
}. }.
-type headers() :: emqx_bridge_gcp_pubsub_client:headers(). -type headers() :: emqx_bridge_gcp_pubsub_client:headers().
-type body() :: emqx_bridge_gcp_pubsub_client:body(). -type body() :: emqx_bridge_gcp_pubsub_client:body().
@ -102,14 +103,18 @@ on_get_status(_InstanceId, #{client := Client} = _State) ->
{ok, map()} {ok, map()}
| {error, {recoverable_error, term()}} | {error, {recoverable_error, term()}}
| {error, term()}. | {error, term()}.
on_query(ResourceId, {MessageTag, Selected}, State) -> on_query(ResourceId, {MessageTag, Selected}, ConnectorState) ->
Requests = [{MessageTag, Selected}], Requests = [{MessageTag, Selected}],
?TRACE( ?TRACE(
"QUERY_SYNC", "QUERY_SYNC",
"gcp_pubsub_received", "gcp_pubsub_received",
#{requests => Requests, connector => ResourceId, state => State} #{
requests => Requests,
connector => ResourceId,
state => emqx_utils:redact(ConnectorState)
}
), ),
do_send_requests_sync(State, Requests, ResourceId). do_send_requests_sync(ConnectorState, Requests, ResourceId).
-spec on_query_async( -spec on_query_async(
connector_resource_id(), connector_resource_id(),
@ -117,15 +122,19 @@ on_query(ResourceId, {MessageTag, Selected}, State) ->
{ReplyFun :: function(), Args :: list()}, {ReplyFun :: function(), Args :: list()},
connector_state() connector_state()
) -> {ok, pid()} | {error, no_pool_worker_available}. ) -> {ok, pid()} | {error, no_pool_worker_available}.
on_query_async(ResourceId, {MessageTag, Selected}, ReplyFunAndArgs, State) -> on_query_async(ResourceId, {MessageTag, Selected}, ReplyFunAndArgs, ConnectorState) ->
Requests = [{MessageTag, Selected}], Requests = [{MessageTag, Selected}],
?TRACE( ?TRACE(
"QUERY_ASYNC", "QUERY_ASYNC",
"gcp_pubsub_received", "gcp_pubsub_received",
#{requests => Requests, connector => ResourceId, state => State} #{
requests => Requests,
connector => ResourceId,
state => emqx_utils:redact(ConnectorState)
}
), ),
?tp(gcp_pubsub_producer_async, #{instance_id => ResourceId, requests => Requests}), ?tp(gcp_pubsub_producer_async, #{instance_id => ResourceId, requests => Requests}),
do_send_requests_async(State, Requests, ReplyFunAndArgs). do_send_requests_async(ConnectorState, Requests, ReplyFunAndArgs).
-spec on_batch_query( -spec on_batch_query(
connector_resource_id(), connector_resource_id(),
@ -135,13 +144,17 @@ on_query_async(ResourceId, {MessageTag, Selected}, ReplyFunAndArgs, State) ->
{ok, map()} {ok, map()}
| {error, {recoverable_error, term()}} | {error, {recoverable_error, term()}}
| {error, term()}. | {error, term()}.
on_batch_query(ResourceId, Requests, State) -> on_batch_query(ResourceId, Requests, ConnectorState) ->
?TRACE( ?TRACE(
"QUERY_SYNC", "QUERY_SYNC",
"gcp_pubsub_received", "gcp_pubsub_received",
#{requests => Requests, connector => ResourceId, state => State} #{
requests => Requests,
connector => ResourceId,
state => emqx_utils:redact(ConnectorState)
}
), ),
do_send_requests_sync(State, Requests, ResourceId). do_send_requests_sync(ConnectorState, Requests, ResourceId).
-spec on_batch_query_async( -spec on_batch_query_async(
connector_resource_id(), connector_resource_id(),
@ -149,14 +162,18 @@ on_batch_query(ResourceId, Requests, State) ->
{ReplyFun :: function(), Args :: list()}, {ReplyFun :: function(), Args :: list()},
connector_state() connector_state()
) -> {ok, pid()} | {error, no_pool_worker_available}. ) -> {ok, pid()} | {error, no_pool_worker_available}.
on_batch_query_async(ResourceId, Requests, ReplyFunAndArgs, State) -> on_batch_query_async(ResourceId, Requests, ReplyFunAndArgs, ConnectorState) ->
?TRACE( ?TRACE(
"QUERY_ASYNC", "QUERY_ASYNC",
"gcp_pubsub_received", "gcp_pubsub_received",
#{requests => Requests, connector => ResourceId, state => State} #{
requests => Requests,
connector => ResourceId,
state => emqx_utils:redact(ConnectorState)
}
), ),
?tp(gcp_pubsub_producer_async, #{instance_id => ResourceId, requests => Requests}), ?tp(gcp_pubsub_producer_async, #{instance_id => ResourceId, requests => Requests}),
do_send_requests_async(State, Requests, ReplyFunAndArgs). do_send_requests_async(ConnectorState, Requests, ReplyFunAndArgs).
-spec on_add_channel( -spec on_add_channel(
connector_resource_id(), connector_resource_id(),
@ -207,13 +224,17 @@ install_channel(ActionConfig) ->
ordering_key_template := OrderingKeyTemplate, ordering_key_template := OrderingKeyTemplate,
payload_template := PayloadTemplate, payload_template := PayloadTemplate,
pubsub_topic := PubSubTopic pubsub_topic := PubSubTopic
},
resource_opts := #{
request_ttl := RequestTTL
} }
} = ActionConfig, } = ActionConfig,
#{ #{
attributes_template => preproc_attributes(AttributesTemplate), attributes_template => preproc_attributes(AttributesTemplate),
ordering_key_template => emqx_placeholder:preproc_tmpl(OrderingKeyTemplate), ordering_key_template => emqx_placeholder:preproc_tmpl(OrderingKeyTemplate),
payload_template => emqx_placeholder:preproc_tmpl(PayloadTemplate), payload_template => emqx_placeholder:preproc_tmpl(PayloadTemplate),
pubsub_topic => PubSubTopic pubsub_topic => PubSubTopic,
request_ttl => RequestTTL
}. }.
-spec do_send_requests_sync( -spec do_send_requests_sync(
@ -231,7 +252,7 @@ do_send_requests_sync(ConnectorState, Requests, InstanceId) ->
%% is it safe to assume the tag is the same??? And not empty??? %% is it safe to assume the tag is the same??? And not empty???
[{MessageTag, _} | _] = Requests, [{MessageTag, _} | _] = Requests,
#{installed_actions := InstalledActions} = ConnectorState, #{installed_actions := InstalledActions} = ConnectorState,
ChannelState = maps:get(MessageTag, InstalledActions), ChannelState = #{request_ttl := RequestTTL} = maps:get(MessageTag, InstalledActions),
Payloads = Payloads =
lists:map( lists:map(
fun({_MessageTag, Selected}) -> fun({_MessageTag, Selected}) ->
@ -242,7 +263,8 @@ do_send_requests_sync(ConnectorState, Requests, InstanceId) ->
Body = to_pubsub_request(Payloads), Body = to_pubsub_request(Payloads),
Path = publish_path(ConnectorState, ChannelState), Path = publish_path(ConnectorState, ChannelState),
Method = post, Method = post,
Request = {prepared_request, {Method, Path, Body}}, ReqOpts = #{request_ttl => RequestTTL},
Request = {prepared_request, {Method, Path, Body}, ReqOpts},
Result = emqx_bridge_gcp_pubsub_client:query_sync(Request, Client), Result = emqx_bridge_gcp_pubsub_client:query_sync(Request, Client),
QueryMode = sync, QueryMode = sync,
handle_result(Result, Request, QueryMode, InstanceId). handle_result(Result, Request, QueryMode, InstanceId).
@ -257,7 +279,7 @@ do_send_requests_async(ConnectorState, Requests, ReplyFunAndArgs0) ->
%% is it safe to assume the tag is the same??? And not empty??? %% is it safe to assume the tag is the same??? And not empty???
[{MessageTag, _} | _] = Requests, [{MessageTag, _} | _] = Requests,
#{installed_actions := InstalledActions} = ConnectorState, #{installed_actions := InstalledActions} = ConnectorState,
ChannelState = maps:get(MessageTag, InstalledActions), ChannelState = #{request_ttl := RequestTTL} = maps:get(MessageTag, InstalledActions),
Payloads = Payloads =
lists:map( lists:map(
fun({_MessageTag, Selected}) -> fun({_MessageTag, Selected}) ->
@ -268,7 +290,8 @@ do_send_requests_async(ConnectorState, Requests, ReplyFunAndArgs0) ->
Body = to_pubsub_request(Payloads), Body = to_pubsub_request(Payloads),
Path = publish_path(ConnectorState, ChannelState), Path = publish_path(ConnectorState, ChannelState),
Method = post, Method = post,
Request = {prepared_request, {Method, Path, Body}}, ReqOpts = #{request_ttl => RequestTTL},
Request = {prepared_request, {Method, Path, Body}, ReqOpts},
ReplyFunAndArgs = {fun ?MODULE:reply_delegator/2, [ReplyFunAndArgs0]}, ReplyFunAndArgs = {fun ?MODULE:reply_delegator/2, [ReplyFunAndArgs0]},
emqx_bridge_gcp_pubsub_client:query_async( emqx_bridge_gcp_pubsub_client:query_async(
Request, ReplyFunAndArgs, Client Request, ReplyFunAndArgs, Client

View File

@ -80,6 +80,8 @@ fields("config_connector") ->
%% FIXME %% FIXME
emqx_connector_schema:common_fields() ++ emqx_connector_schema:common_fields() ++
connector_config_fields(); connector_config_fields();
fields(connector_resource_opts) ->
emqx_connector_schema:resource_opts_fields();
%%========================================= %%=========================================
%% HTTP API fields: action %% HTTP API fields: action
%%========================================= %%=========================================
@ -101,7 +103,7 @@ fields(Field) when
connector_config_fields() -> connector_config_fields() ->
emqx_bridge_gcp_pubsub:fields(connector_config) ++ emqx_bridge_gcp_pubsub:fields(connector_config) ++
emqx_resource_schema:fields("resource_opts"). emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts).
desc("config_connector") -> desc("config_connector") ->
?DESC("config_connector"); ?DESC("config_connector");
@ -109,6 +111,8 @@ desc(action_parameters) ->
?DESC(action_parameters); ?DESC(action_parameters);
desc(producer_action) -> desc(producer_action) ->
?DESC(producer_action); ?DESC(producer_action);
desc(connector_resource_opts) ->
?DESC(emqx_resource_schema, "resource_opts");
desc(_Name) -> desc(_Name) ->
undefined. undefined.

View File

@ -14,6 +14,12 @@
-define(BRIDGE_TYPE, gcp_pubsub_consumer). -define(BRIDGE_TYPE, gcp_pubsub_consumer).
-define(BRIDGE_TYPE_BIN, <<"gcp_pubsub_consumer">>). -define(BRIDGE_TYPE_BIN, <<"gcp_pubsub_consumer">>).
-define(REPUBLISH_TOPIC, <<"republish/t">>). -define(REPUBLISH_TOPIC, <<"republish/t">>).
-define(PREPARED_REQUEST(METHOD, PATH, BODY),
{prepared_request, {METHOD, PATH, BODY}, #{request_ttl => 1_000}}
).
-define(PREPARED_REQUEST_PAT(METHOD, PATH, BODY),
{prepared_request, {METHOD, PATH, BODY}, _}
).
-import(emqx_common_test_helpers, [on_exit/1]). -import(emqx_common_test_helpers, [on_exit/1]).
@ -40,11 +46,13 @@ init_per_suite(Config) ->
emqx_conf, emqx_conf,
emqx_bridge_gcp_pubsub, emqx_bridge_gcp_pubsub,
emqx_bridge, emqx_bridge,
emqx_rule_engine emqx_rule_engine,
emqx_management,
{emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}
], ],
#{work_dir => emqx_cth_suite:work_dir(Config)} #{work_dir => emqx_cth_suite:work_dir(Config)}
), ),
emqx_mgmt_api_test_util:init_suite(), {ok, _Api} = emqx_common_test_http:create_default_app(),
HostPort = GCPEmulatorHost ++ ":" ++ GCPEmulatorPortStr, HostPort = GCPEmulatorHost ++ ":" ++ GCPEmulatorPortStr,
true = os:putenv("PUBSUB_EMULATOR_HOST", HostPort), true = os:putenv("PUBSUB_EMULATOR_HOST", HostPort),
Client = start_control_client(), Client = start_control_client(),
@ -71,7 +79,6 @@ end_per_suite(Config) ->
Apps = ?config(apps, Config), Apps = ?config(apps, Config),
Client = ?config(client, Config), Client = ?config(client, Config),
stop_control_client(Client), stop_control_client(Client),
emqx_mgmt_api_test_util:end_suite(),
emqx_cth_suite:stop(Apps), emqx_cth_suite:stop(Apps),
os:unsetenv("PUBSUB_EMULATOR_HOST"), os:unsetenv("PUBSUB_EMULATOR_HOST"),
ok. ok.
@ -266,7 +273,7 @@ ensure_topic(Config, Topic) ->
Path = <<"/v1/projects/", ProjectId/binary, "/topics/", Topic/binary>>, Path = <<"/v1/projects/", ProjectId/binary, "/topics/", Topic/binary>>,
Body = <<"{}">>, Body = <<"{}">>,
Res = emqx_bridge_gcp_pubsub_client:query_sync( Res = emqx_bridge_gcp_pubsub_client:query_sync(
{prepared_request, {Method, Path, Body}}, ?PREPARED_REQUEST(Method, Path, Body),
Client Client
), ),
case Res of case Res of
@ -285,7 +292,6 @@ start_control_client() ->
connect_timeout => 5_000, connect_timeout => 5_000,
max_retries => 0, max_retries => 0,
pool_size => 1, pool_size => 1,
resource_opts => #{request_ttl => 1_000},
service_account_json => RawServiceAccount service_account_json => RawServiceAccount
}, },
PoolName = <<"control_connector">>, PoolName = <<"control_connector">>,
@ -317,7 +323,7 @@ pubsub_publish(Config, Topic, Messages0) ->
), ),
Body = emqx_utils_json:encode(#{<<"messages">> => Messages}), Body = emqx_utils_json:encode(#{<<"messages">> => Messages}),
{ok, _} = emqx_bridge_gcp_pubsub_client:query_sync( {ok, _} = emqx_bridge_gcp_pubsub_client:query_sync(
{prepared_request, {Method, Path, Body}}, ?PREPARED_REQUEST(Method, Path, Body),
Client Client
), ),
ok. ok.
@ -329,7 +335,7 @@ delete_topic(Config, Topic) ->
Path = <<"/v1/projects/", ProjectId/binary, "/topics/", Topic/binary>>, Path = <<"/v1/projects/", ProjectId/binary, "/topics/", Topic/binary>>,
Body = <<>>, Body = <<>>,
{ok, _} = emqx_bridge_gcp_pubsub_client:query_sync( {ok, _} = emqx_bridge_gcp_pubsub_client:query_sync(
{prepared_request, {Method, Path, Body}}, ?PREPARED_REQUEST(Method, Path, Body),
Client Client
), ),
ok. ok.
@ -341,7 +347,7 @@ delete_subscription(Config, SubscriptionId) ->
Path = <<"/v1/projects/", ProjectId/binary, "/subscriptions/", SubscriptionId/binary>>, Path = <<"/v1/projects/", ProjectId/binary, "/subscriptions/", SubscriptionId/binary>>,
Body = <<>>, Body = <<>>,
{ok, _} = emqx_bridge_gcp_pubsub_client:query_sync( {ok, _} = emqx_bridge_gcp_pubsub_client:query_sync(
{prepared_request, {Method, Path, Body}}, ?PREPARED_REQUEST(Method, Path, Body),
Client Client
), ),
ok. ok.
@ -1994,7 +2000,7 @@ t_connection_down_during_ack_redeliver(Config) ->
emqx_common_test_helpers:with_mock( emqx_common_test_helpers:with_mock(
emqx_bridge_gcp_pubsub_client, emqx_bridge_gcp_pubsub_client,
query_sync, query_sync,
fun(PreparedRequest = {prepared_request, {_Method, Path, _Body}}, Client) -> fun(PreparedRequest = ?PREPARED_REQUEST_PAT(_Method, Path, _Body), Client) ->
case re:run(Path, <<":acknowledge$">>) of case re:run(Path, <<":acknowledge$">>) of
{match, _} -> {match, _} ->
ct:sleep(800), ct:sleep(800),
@ -2162,7 +2168,7 @@ t_permission_denied_topic_check(Config) ->
emqx_common_test_helpers:with_mock( emqx_common_test_helpers:with_mock(
emqx_bridge_gcp_pubsub_client, emqx_bridge_gcp_pubsub_client,
query_sync, query_sync,
fun(PreparedRequest = {prepared_request, {Method, Path, _Body}}, Client) -> fun(PreparedRequest = ?PREPARED_REQUEST_PAT(Method, Path, _Body), Client) ->
RE = iolist_to_binary(["/topics/", PubSubTopic, "$"]), RE = iolist_to_binary(["/topics/", PubSubTopic, "$"]),
case {Method =:= get, re:run(Path, RE)} of case {Method =:= get, re:run(Path, RE)} of
{true, {match, _}} -> {true, {match, _}} ->
@ -2201,7 +2207,7 @@ t_permission_denied_worker(Config) ->
emqx_common_test_helpers:with_mock( emqx_common_test_helpers:with_mock(
emqx_bridge_gcp_pubsub_client, emqx_bridge_gcp_pubsub_client,
query_sync, query_sync,
fun(PreparedRequest = {prepared_request, {Method, _Path, _Body}}, Client) -> fun(PreparedRequest = ?PREPARED_REQUEST_PAT(Method, _Path, _Body), Client) ->
case Method =:= put of case Method =:= put of
true -> true ->
permission_denied_response(); permission_denied_response();
@ -2237,7 +2243,7 @@ t_unauthenticated_topic_check(Config) ->
emqx_common_test_helpers:with_mock( emqx_common_test_helpers:with_mock(
emqx_bridge_gcp_pubsub_client, emqx_bridge_gcp_pubsub_client,
query_sync, query_sync,
fun(PreparedRequest = {prepared_request, {Method, Path, _Body}}, Client) -> fun(PreparedRequest = ?PREPARED_REQUEST_PAT(Method, Path, _Body), Client) ->
RE = iolist_to_binary(["/topics/", PubSubTopic, "$"]), RE = iolist_to_binary(["/topics/", PubSubTopic, "$"]),
case {Method =:= get, re:run(Path, RE)} of case {Method =:= get, re:run(Path, RE)} of
{true, {match, _}} -> {true, {match, _}} ->
@ -2276,7 +2282,7 @@ t_unauthenticated_worker(Config) ->
emqx_common_test_helpers:with_mock( emqx_common_test_helpers:with_mock(
emqx_bridge_gcp_pubsub_client, emqx_bridge_gcp_pubsub_client,
query_sync, query_sync,
fun(PreparedRequest = {prepared_request, {Method, _Path, _Body}}, Client) -> fun(PreparedRequest = ?PREPARED_REQUEST_PAT(Method, _Path, _Body), Client) ->
case Method =:= put of case Method =:= put of
true -> true ->
unauthenticated_response(); unauthenticated_response();

View File

@ -76,18 +76,25 @@ only_sync_tests() ->
[t_query_sync]. [t_query_sync].
init_per_suite(Config) -> init_per_suite(Config) ->
ok = emqx_common_test_helpers:start_apps([emqx_conf]), Apps = emqx_cth_suite:start(
ok = emqx_connector_test_helpers:start_apps([emqx_resource, emqx_bridge, emqx_rule_engine]), [
{ok, _} = application:ensure_all_started(emqx_connector), emqx,
emqx_mgmt_api_test_util:init_suite(), emqx_conf,
emqx_bridge_gcp_pubsub,
emqx_bridge,
emqx_rule_engine,
emqx_management,
{emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}
],
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
{ok, _Api} = emqx_common_test_http:create_default_app(),
persistent_term:put({emqx_bridge_gcp_pubsub_client, transport}, tls), persistent_term:put({emqx_bridge_gcp_pubsub_client, transport}, tls),
Config. [{apps, Apps} | Config].
end_per_suite(_Config) -> end_per_suite(Config) ->
emqx_mgmt_api_test_util:end_suite(), Apps = ?config(apps, Config),
ok = emqx_common_test_helpers:stop_apps([emqx_conf]), emqx_cth_suite:stop(Apps),
ok = emqx_connector_test_helpers:stop_apps([emqx_bridge, emqx_resource, emqx_rule_engine]),
_ = application:stop(emqx_connector),
persistent_term:erase({emqx_bridge_gcp_pubsub_client, transport}), persistent_term:erase({emqx_bridge_gcp_pubsub_client, transport}),
ok. ok.

View File

@ -0,0 +1,2 @@
Fixed the `resource_opts` configuration schema for the GCP PubSub Producer connector so that it contains only relevant fields.
This affects the creation of GCP PubSub Producer connectors via HOCON configuration (`connectors.gcp_pubsub_producer.*.resource_opts`) and the HTTP APIs `POST /connectors` / `PUT /connectors/:id` for this particular connector type.