refactor(gcp_pubsub): transform connector into opaque client

This commit is contained in:
Thales Macedo Garitezi 2023-06-20 15:27:42 -03:00
parent ffce6fefa8
commit 0463828e84
6 changed files with 139 additions and 154 deletions

View File

@ -2,9 +2,7 @@
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_gcp_pubsub_connector).
-behaviour(emqx_resource).
-module(emqx_bridge_gcp_pubsub_client).
-include_lib("jose/include/jose_jwk.hrl").
-include_lib("emqx_connector/include/emqx_connector_tables.hrl").
@ -13,18 +11,17 @@
-include_lib("emqx/include/logger.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
%% `emqx_resource' API
%% API
-export([
callback_mode/0,
on_start/2,
on_stop/2,
on_query/3,
on_query_async/4,
on_get_status/2
start/2,
stop/1,
query_sync/2,
query_async/3,
get_status/1
]).
-export([reply_delegator/3]).
-export([get_topic/3]).
-export([get_topic/2]).
-export([get_jwt_authorization_header/1]).
@ -37,7 +34,7 @@
service_account_json := service_account_json(),
any() => term()
}.
-type state() :: #{
-opaque state() :: #{
connect_timeout := timer:time(),
jwt_config := emqx_connector_jwt:jwt_config(),
max_retries := non_neg_integer(),
@ -66,13 +63,11 @@
-define(DEFAULT_PIPELINE_SIZE, 100).
%%-------------------------------------------------------------------------------------------------
%% emqx_resource API
%% API
%%-------------------------------------------------------------------------------------------------
callback_mode() -> async_if_possible.
-spec on_start(resource_id(), config()) -> {ok, state()} | {error, term()}.
on_start(
-spec start(resource_id(), config()) -> {ok, state()} | {error, term()}.
start(
ResourceId,
#{
connect_timeout := ConnectTimeout,
@ -81,11 +76,6 @@ on_start(
resource_opts := #{request_ttl := RequestTTL}
} = Config
) ->
?SLOG(info, #{
msg => "starting_gcp_pubsub_bridge",
connector => ResourceId,
config => Config
}),
{Transport, HostPort} = get_transport(),
#{hostname := Host, port := Port} = emqx_schema:parse_server(HostPort, #{default_port => 443}),
PoolType = random,
@ -141,8 +131,8 @@ on_start(
{error, Reason}
end.
-spec on_stop(resource_id(), state() | undefined) -> ok | {error, term()}.
on_stop(ResourceId, _State) ->
-spec stop(resource_id()) -> ok | {error, term()}.
stop(ResourceId) ->
?tp(gcp_pubsub_stop, #{resource_id => ResourceId}),
?SLOG(info, #{
msg => "stopping_gcp_pubsub_bridge",
@ -158,42 +148,41 @@ on_stop(ResourceId, _State) ->
Error
end.
-spec on_query(
resource_id(),
-spec query_sync(
{prepared_request, prepared_request()},
state()
) ->
{ok, map()} | {error, {recoverable_error, term()} | term()}.
on_query(ResourceId, {prepared_request, PreparedRequest = {_Method, _Path, _Body}}, State) ->
query_sync({prepared_request, PreparedRequest = {_Method, _Path, _Body}}, State) ->
PoolName = maps:get(pool_name, State),
?TRACE(
"QUERY_SYNC",
"gcp_pubsub_received",
#{requests => PreparedRequest, connector => ResourceId, state => State}
#{requests => PreparedRequest, connector => PoolName, state => State}
),
do_send_requests_sync(State, {prepared_request, PreparedRequest}, ResourceId).
do_send_requests_sync(State, {prepared_request, PreparedRequest}).
-spec on_query_async(
resource_id(),
-spec query_async(
{prepared_request, prepared_request()},
{ReplyFun :: function(), Args :: list()},
state()
) -> {ok, pid()}.
on_query_async(
ResourceId,
query_async(
{prepared_request, PreparedRequest = {_Method, _Path, _Body}},
ReplyFunAndArgs,
State
) ->
PoolName = maps:get(pool_name, State),
?TRACE(
"QUERY_ASYNC",
"gcp_pubsub_received",
#{requests => PreparedRequest, connector => ResourceId, state => State}
#{requests => PreparedRequest, connector => PoolName, state => State}
),
do_send_requests_async(State, {prepared_request, PreparedRequest}, ReplyFunAndArgs, ResourceId).
do_send_requests_async(State, {prepared_request, PreparedRequest}, ReplyFunAndArgs).
-spec on_get_status(resource_id(), state()) -> connected | disconnected.
on_get_status(ResourceId, #{connect_timeout := Timeout} = State) ->
case do_get_status(ResourceId, Timeout) of
-spec get_status(state()) -> connected | disconnected.
get_status(#{connect_timeout := Timeout, pool_name := PoolName} = State) ->
case do_get_status(PoolName, Timeout) of
true ->
connected;
false ->
@ -208,14 +197,14 @@ on_get_status(ResourceId, #{connect_timeout := Timeout} = State) ->
%% API
%%-------------------------------------------------------------------------------------------------
-spec get_topic(resource_id(), topic(), state()) -> {ok, map()} | {error, term()}.
get_topic(ResourceId, Topic, ConnectorState) ->
-spec get_topic(topic(), state()) -> {ok, map()} | {error, term()}.
get_topic(Topic, ConnectorState) ->
#{project_id := ProjectId} = ConnectorState,
Method = get,
Path = <<"/v1/projects/", ProjectId/binary, "/topics/", Topic/binary>>,
Body = <<>>,
PreparedRequest = {prepared_request, {Method, Path, Body}},
on_query(ResourceId, PreparedRequest, ConnectorState).
query_sync(PreparedRequest, ConnectorState).
%%-------------------------------------------------------------------------------------------------
%% Helper fns
@ -286,11 +275,10 @@ get_jwt_authorization_header(JWTConfig) ->
-spec do_send_requests_sync(
state(),
{prepared_request, prepared_request()},
resource_id()
{prepared_request, prepared_request()}
) ->
{ok, map()} | {error, {recoverable_error, term()} | term()}.
do_send_requests_sync(State, {prepared_request, {Method, Path, Body}}, ResourceId) ->
do_send_requests_sync(State, {prepared_request, {Method, Path, Body}}) ->
#{
pool_name := PoolName,
max_retries := MaxRetries,
@ -301,7 +289,7 @@ do_send_requests_sync(State, {prepared_request, {Method, Path, Body}}, ResourceI
#{
request => {prepared_request, {Method, Path, Body}},
query_mode => sync,
resource_id => ResourceId
resource_id => PoolName
}
),
Request = to_ehttpc_request(State, Method, Path, Body),
@ -312,16 +300,15 @@ do_send_requests_sync(State, {prepared_request, {Method, Path, Body}}, ResourceI
RequestTTL,
MaxRetries
),
handle_response(Response, ResourceId, _QueryMode = sync).
handle_response(Response, PoolName, _QueryMode = sync).
-spec do_send_requests_async(
state(),
{prepared_request, prepared_request()},
{ReplyFun :: function(), Args :: list()},
resource_id()
{ReplyFun :: function(), Args :: list()}
) -> {ok, pid()}.
do_send_requests_async(
State, {prepared_request, {Method, Path, Body}}, ReplyFunAndArgs, ResourceId
State, {prepared_request, {Method, Path, Body}}, ReplyFunAndArgs
) ->
#{
pool_name := PoolName,
@ -332,7 +319,7 @@ do_send_requests_async(
#{
request => {prepared_request, {Method, Path, Body}},
query_mode => async,
resource_id => ResourceId
resource_id => PoolName
}
),
Request = to_ehttpc_request(State, Method, Path, Body),
@ -342,7 +329,7 @@ do_send_requests_async(
Method,
Request,
RequestTTL,
{fun ?MODULE:reply_delegator/3, [ResourceId, ReplyFunAndArgs]}
{fun ?MODULE:reply_delegator/3, [PoolName, ReplyFunAndArgs]}
),
{ok, Worker}.

View File

@ -31,29 +31,31 @@
-type ack_id() :: binary().
-type config() :: #{
ack_retry_interval := emqx_schema:timeout_duration_ms(),
connector_state := emqx_bridge_gcp_pubsub_connector:state(),
client := emqx_bridge_gcp_pubsub_client:state(),
ecpool_worker_id => non_neg_integer(),
hookpoint := binary(),
instance_id := binary(),
mqtt_config => emqx_bridge_gcp_pubsub_impl_consumer:mqtt_config(),
project_id := emqx_bridge_gcp_pubsub_client:project_id(),
pull_max_messages := non_neg_integer(),
subscription_id => subscription_id(),
topic => emqx_bridge_gcp_pubsub_connector:topic()
topic => emqx_bridge_gcp_pubsub_client:topic()
}.
-type state() :: #{
ack_retry_interval := emqx_schema:timeout_duration_ms(),
ack_timer := undefined | reference(),
async_workers := #{pid() => reference()},
connector_state := emqx_bridge_gcp_pubsub_connector:state(),
client := emqx_bridge_gcp_pubsub_client:state(),
ecpool_worker_id := non_neg_integer(),
hookpoint := binary(),
instance_id := binary(),
mqtt_config => emqx_bridge_gcp_pubsub_impl_consumer:mqtt_config(),
pending_acks => [ack_id()],
project_id := emqx_bridge_gcp_pubsub_client:project_id(),
pull_max_messages := non_neg_integer(),
pull_timer := undefined | reference(),
subscription_id => subscription_id(),
topic => emqx_bridge_gcp_pubsub_connector:topic()
topic => emqx_bridge_gcp_pubsub_client:topic()
}.
-type decoded_message() :: map().
@ -129,10 +131,11 @@ connect(Opts0) ->
#{
ack_retry_interval := AckRetryInterval,
bridge_name := BridgeName,
connector_state := ConnectorState,
client := Client,
ecpool_worker_id := WorkerId,
hookpoint := Hookpoint,
instance_id := InstanceId,
project_id := ProjectId,
pull_max_messages := PullMaxMessages,
topic_mapping := TopicMapping
} = Opts,
@ -141,13 +144,14 @@ connect(Opts0) ->
{Topic, MQTTConfig} = lists:nth(Index, TopicMappingList),
Config = #{
ack_retry_interval => AckRetryInterval,
%% Note: the `connector_state' value here must be immutable and not changed by the
%% Note: the `client' value here must be immutable and not changed by the
%% bridge during `on_get_status', since we have handed it over to the pull
%% workers.
connector_state => ConnectorState,
client => Client,
hookpoint => Hookpoint,
instance_id => InstanceId,
mqtt_config => MQTTConfig,
project_id => ProjectId,
pull_max_messages => PullMaxMessages,
topic => Topic,
subscription_id => subscription_id(BridgeName, Topic)
@ -264,7 +268,7 @@ ensure_pull_timer(State) ->
-spec ensure_subscription_exists(state()) -> ok | error.
ensure_subscription_exists(State) ->
#{
connector_state := ConnectorState,
client := Client,
instance_id := InstanceId,
subscription_id := SubscriptionId,
topic := Topic
@ -273,7 +277,7 @@ ensure_subscription_exists(State) ->
Path = path(State, create),
Body = body(State, create),
PreparedRequest = {prepared_request, {Method, Path, Body}},
Res = emqx_bridge_gcp_pubsub_connector:on_query(InstanceId, PreparedRequest, ConnectorState),
Res = emqx_bridge_gcp_pubsub_client:query_sync(PreparedRequest, Client),
case Res of
{error, #{status_code := 409}} ->
%% already exists
@ -287,9 +291,7 @@ ensure_subscription_exists(State) ->
Path1 = path(State, create),
Body1 = body(State, patch_subscription),
PreparedRequest1 = {prepared_request, {Method1, Path1, Body1}},
Res1 = emqx_bridge_gcp_pubsub_connector:on_query(
InstanceId, PreparedRequest1, ConnectorState
),
Res1 = emqx_bridge_gcp_pubsub_client:query_sync(PreparedRequest1, Client),
?SLOG(debug, #{
msg => "gcp_pubsub_consumer_worker_subscription_patch",
instance_id => InstanceId,
@ -319,7 +321,7 @@ ensure_subscription_exists(State) ->
%% We use async requests so that this process will be more responsive to system messages.
do_pull_async(State) ->
#{
connector_state := ConnectorState,
client := Client,
instance_id := InstanceId
} = State,
Method = post,
@ -327,11 +329,10 @@ do_pull_async(State) ->
Body = body(State, pull),
PreparedRequest = {prepared_request, {Method, Path, Body}},
ReplyFunAndArgs = {fun ?MODULE:reply_delegator/3, [self(), InstanceId]},
{ok, AsyncWorkerPid} = emqx_bridge_gcp_pubsub_connector:on_query_async(
InstanceId,
{ok, AsyncWorkerPid} = emqx_bridge_gcp_pubsub_client:query_async(
PreparedRequest,
ReplyFunAndArgs,
ConnectorState
Client
),
ensure_async_worker_monitored(State, AsyncWorkerPid).
@ -361,15 +362,14 @@ acknowledge(State0 = #{pending_acks := []}) ->
acknowledge(State0) ->
State1 = State0#{ack_timer := undefined},
#{
connector_state := ConnectorState,
instance_id := InstanceId,
client := Client,
pending_acks := AckIds
} = State1,
Method = post,
Path = path(State1, ack),
Body = body(State1, ack, #{ack_ids => AckIds}),
PreparedRequest = {prepared_request, {Method, Path, Body}},
Res = emqx_bridge_gcp_pubsub_connector:on_query(InstanceId, PreparedRequest, ConnectorState),
Res = emqx_bridge_gcp_pubsub_client:query_sync(PreparedRequest, Client),
case Res of
{error, Reason} ->
?SLOG(warning, #{msg => "gcp_pubsub_consumer_worker_ack_error", reason => Reason}),
@ -384,14 +384,13 @@ acknowledge(State0) ->
do_get_subscription(State) ->
#{
connector_state := ConnectorState,
instance_id := InstanceId
client := Client
} = State,
Method = get,
Path = path(State, get_subscription),
Body = body(State, get_subscription),
PreparedRequest = {prepared_request, {Method, Path, Body}},
Res = emqx_bridge_gcp_pubsub_connector:on_query(InstanceId, PreparedRequest, ConnectorState),
Res = emqx_bridge_gcp_pubsub_client:query_sync(PreparedRequest, Client),
case Res of
{error, Reason} ->
?SLOG(warning, #{
@ -410,7 +409,7 @@ do_get_subscription(State) ->
{error, Details}
end.
-spec subscription_id(bridge_name(), emqx_bridge_gcp_pubsub_connector:topic()) -> subscription_id().
-spec subscription_id(bridge_name(), emqx_bridge_gcp_pubsub_client:topic()) -> subscription_id().
subscription_id(BridgeName0, Topic) ->
%% The real GCP PubSub accepts colons in subscription names, but its emulator
%% doesn't... We currently validate bridge names to not include that character. The
@ -422,7 +421,7 @@ subscription_id(BridgeName0, Topic) ->
-spec path(state(), pull | create | ack | get_subscription) -> binary().
path(State, Type) ->
#{
connector_state := #{project_id := ProjectId},
client := #{project_id := ProjectId},
subscription_id := SubscriptionId
} = State,
SubscriptionResource = subscription_resource(ProjectId, SubscriptionId),
@ -444,7 +443,7 @@ body(State, pull) ->
body(State, create) ->
#{
ack_retry_interval := AckRetryInterval,
connector_state := #{project_id := ProjectId},
project_id := ProjectId,
topic := PubSubTopic
} = State,
TopicResource = <<"projects/", ProjectId/binary, "/topics/", PubSubTopic/binary>>,
@ -457,7 +456,7 @@ body(State, create) ->
body(State, patch_subscription) ->
#{
ack_retry_interval := AckRetryInterval,
connector_state := #{project_id := ProjectId},
project_id := ProjectId,
topic := PubSubTopic,
subscription_id := SubscriptionId
} = State,
@ -484,7 +483,7 @@ body(_State, ack, Opts) ->
JSON = #{<<"ackIds">> => AckIds},
emqx_utils_json:encode(JSON).
-spec subscription_resource(emqx_bridge_gcp_pubsub_connector:project_id(), subscription_id()) ->
-spec subscription_resource(emqx_bridge_gcp_pubsub_client:project_id(), subscription_id()) ->
binary().
subscription_resource(ProjectId, SubscriptionId) ->
<<"projects/", ProjectId/binary, "/subscriptions/", SubscriptionId/binary>>.

View File

@ -29,11 +29,11 @@
max_retries := non_neg_integer(),
pool_size := non_neg_integer(),
resource_opts := #{request_ttl := infinity | emqx_schema:duration_ms(), any() => term()},
service_account_json := emqx_bridge_gcp_pubsub_connector:service_account_json(),
service_account_json := emqx_bridge_gcp_pubsub_client:service_account_json(),
any() => term()
}.
-type state() :: #{
connector_state := emqx_bridge_gcp_pubsub_connector:state()
client := emqx_bridge_gcp_pubsub_client:state()
}.
-export_type([mqtt_config/0]).
@ -52,24 +52,21 @@ query_mode(_Config) -> no_queries.
-spec on_start(resource_id(), config()) -> {ok, state()} | {error, term()}.
on_start(InstanceId, Config) ->
case emqx_bridge_gcp_pubsub_connector:on_start(InstanceId, Config) of
{ok, ConnectorState} ->
start_consumers(InstanceId, ConnectorState, Config);
case emqx_bridge_gcp_pubsub_client:start(InstanceId, Config) of
{ok, Client} ->
start_consumers(InstanceId, Client, Config);
Error ->
Error
end.
-spec on_stop(resource_id(), state()) -> ok | {error, term()}.
on_stop(InstanceId, #{connector_state := ConnectorState}) ->
on_stop(InstanceId, _State) ->
ok = stop_consumers(InstanceId),
emqx_bridge_gcp_pubsub_connector:on_stop(InstanceId, ConnectorState);
on_stop(InstanceId, undefined = _State) ->
ok = stop_consumers(InstanceId),
emqx_bridge_gcp_pubsub_connector:on_stop(InstanceId, undefined).
emqx_bridge_gcp_pubsub_client:stop(InstanceId).
-spec on_get_status(resource_id(), state()) -> connected | disconnected.
on_get_status(InstanceId, _State) ->
%% Note: do *not* alter the `connector_state' value here. It must be immutable, since
%% Note: do *not* alter the `client' value here. It must be immutable, since
%% we have handed it over to the pull workers.
case
emqx_resource_pool:health_check_workers(
@ -85,11 +82,12 @@ on_get_status(InstanceId, _State) ->
%% Internal fns
%%-------------------------------------------------------------------------------------------------
start_consumers(InstanceId, ConnectorState, Config) ->
start_consumers(InstanceId, Client, Config) ->
#{
bridge_name := BridgeName,
consumer := ConsumerConfig0,
hookpoint := Hookpoint
hookpoint := Hookpoint,
service_account_json := #{project_id := ProjectId}
} = Config,
ConsumerConfig1 = maps:update_with(topic_mapping, fun convert_topic_mapping/1, ConsumerConfig0),
TopicMapping = maps:get(topic_mapping, ConsumerConfig1),
@ -98,18 +96,19 @@ start_consumers(InstanceId, ConnectorState, Config) ->
ConsumerConfig = ConsumerConfig1#{
auto_reconnect => ?AUTO_RECONNECT_S,
bridge_name => BridgeName,
connector_state => ConnectorState,
client => Client,
hookpoint => Hookpoint,
instance_id => InstanceId,
pool_size => PoolSize
pool_size => PoolSize,
project_id => ProjectId
},
ConsumerOpts = maps:to_list(ConsumerConfig),
%% FIXME: mark as unhealthy if topics do not exist!
case validate_pubsub_topics(InstanceId, TopicMapping, ConnectorState) of
case validate_pubsub_topics(TopicMapping, Client) of
ok ->
ok;
error ->
_ = emqx_bridge_gcp_pubsub_connector:on_stop(InstanceId, ConnectorState),
_ = emqx_bridge_gcp_pubsub_client:stop(InstanceId),
throw(
"GCP PubSub topics are invalid. Please check the logs, check if the "
"topic exists in GCP and if the service account has permissions to use them."
@ -120,12 +119,12 @@ start_consumers(InstanceId, ConnectorState, Config) ->
of
ok ->
State = #{
connector_state => ConnectorState,
client => Client,
pool_name => InstanceId
},
{ok, State};
{error, Reason} ->
_ = emqx_bridge_gcp_pubsub_connector:on_stop(InstanceId, ConnectorState),
_ = emqx_bridge_gcp_pubsub_client:stop(InstanceId),
{error, Reason}
end.
@ -163,23 +162,23 @@ convert_topic_mapping(TopicMappingList) ->
TopicMappingList
).
validate_pubsub_topics(InstanceId, TopicMapping, ConnectorState) ->
validate_pubsub_topics(TopicMapping, Client) ->
PubSubTopics = maps:keys(TopicMapping),
do_validate_pubsub_topics(InstanceId, ConnectorState, PubSubTopics).
do_validate_pubsub_topics(Client, PubSubTopics).
do_validate_pubsub_topics(InstanceId, ConnectorState, [Topic | Rest]) ->
case check_for_topic_existence(InstanceId, Topic, ConnectorState) of
do_validate_pubsub_topics(Client, [Topic | Rest]) ->
case check_for_topic_existence(Topic, Client) of
ok ->
do_validate_pubsub_topics(InstanceId, ConnectorState, Rest);
do_validate_pubsub_topics(Client, Rest);
{error, _} ->
error
end;
do_validate_pubsub_topics(_InstanceId, _ConnectorState, []) ->
do_validate_pubsub_topics(_Client, []) ->
%% we already validate that the mapping is not empty in the config schema.
ok.
check_for_topic_existence(InstanceId, Topic, ConnectorState) ->
Res = emqx_bridge_gcp_pubsub_connector:get_topic(InstanceId, Topic, ConnectorState),
check_for_topic_existence(Topic, Client) ->
Res = emqx_bridge_gcp_pubsub_client:get_topic(Topic, Client),
case Res of
{ok, _} ->
ok;

View File

@ -13,17 +13,18 @@
max_retries := non_neg_integer(),
pubsub_topic := binary(),
resource_opts := #{request_ttl := infinity | emqx_schema:duration_ms(), any() => term()},
service_account_json := emqx_bridge_gcp_pubsub_connector:service_account_json(),
service_account_json := emqx_bridge_gcp_pubsub_client:service_account_json(),
any() => term()
}.
-type state() :: #{
connector_state := emqx_bridge_gcp_pubsub_connector:state(),
client := emqx_bridge_gcp_pubsub_client:state(),
payload_template := emqx_placeholder:tmpl_token(),
project_id := emqx_bridge_gcp_pubsub_client:project_id(),
pubsub_topic := binary()
}.
-type headers() :: emqx_bridge_gcp_pubsub_connector:headers().
-type body() :: emqx_bridge_gcp_pubsub_connector:body().
-type status_code() :: emqx_bridge_gcp_pubsub_connector:status_code().
-type headers() :: emqx_bridge_gcp_pubsub_client:headers().
-type body() :: emqx_bridge_gcp_pubsub_client:body().
-type status_code() :: emqx_bridge_gcp_pubsub_client:status_code().
%% `emqx_resource' API
-export([
@ -50,15 +51,21 @@ query_mode(_Config) -> async.
-spec on_start(resource_id(), config()) -> {ok, state()} | {error, term()}.
on_start(InstanceId, Config) ->
?SLOG(info, #{
msg => "starting_gcp_pubsub_bridge",
config => Config
}),
#{
payload_template := PayloadTemplate,
pubsub_topic := PubSubTopic
pubsub_topic := PubSubTopic,
service_account_json := #{project_id := ProjectId}
} = Config,
case emqx_bridge_gcp_pubsub_connector:on_start(InstanceId, Config) of
{ok, ConnectorState} ->
case emqx_bridge_gcp_pubsub_client:start(InstanceId, Config) of
{ok, Client} ->
State = #{
connector_state => ConnectorState,
client => Client,
payload_template => emqx_placeholder:preproc_tmpl(PayloadTemplate),
project_id => ProjectId,
pubsub_topic => PubSubTopic
},
{ok, State};
@ -67,22 +74,19 @@ on_start(InstanceId, Config) ->
end.
-spec on_stop(resource_id(), state()) -> ok | {error, term()}.
on_stop(InstanceId, #{connector_state := ConnectorState}) ->
emqx_bridge_gcp_pubsub_connector:on_stop(InstanceId, ConnectorState);
on_stop(InstanceId, undefined = _State) ->
emqx_bridge_gcp_pubsub_connector:on_stop(InstanceId, undefined).
on_stop(InstanceId, _State) ->
emqx_bridge_gcp_pubsub_client:stop(InstanceId).
-spec on_get_status(resource_id(), state()) -> connected | disconnected.
on_get_status(InstanceId, #{connector_state := ConnectorState} = _State) ->
emqx_bridge_gcp_pubsub_connector:on_get_status(InstanceId, ConnectorState).
on_get_status(_InstanceId, #{client := Client} = _State) ->
emqx_bridge_gcp_pubsub_client:get_status(Client).
-spec on_query(
resource_id(),
{send_message, map()},
state()
) ->
{ok, status_code(), headers()}
| {ok, status_code(), headers(), body()}
{ok, map()}
| {error, {recoverable_error, term()}}
| {error, term()}.
on_query(ResourceId, {send_message, Selected}, State) ->
@ -107,15 +111,14 @@ on_query_async(ResourceId, {send_message, Selected}, ReplyFunAndArgs, State) ->
"gcp_pubsub_received",
#{requests => Requests, connector => ResourceId, state => State}
),
do_send_requests_async(State, Requests, ReplyFunAndArgs, ResourceId).
do_send_requests_async(State, Requests, ReplyFunAndArgs).
-spec on_batch_query(
resource_id(),
[{send_message, map()}],
state()
) ->
{ok, status_code(), headers()}
| {ok, status_code(), headers(), body()}
{ok, map()}
| {error, {recoverable_error, term()}}
| {error, term()}.
on_batch_query(ResourceId, Requests, State) ->
@ -138,7 +141,7 @@ on_batch_query_async(ResourceId, Requests, ReplyFunAndArgs, State) ->
"gcp_pubsub_received",
#{requests => Requests, connector => ResourceId, state => State}
),
do_send_requests_async(State, Requests, ReplyFunAndArgs, ResourceId).
do_send_requests_async(State, Requests, ReplyFunAndArgs).
%%-------------------------------------------------------------------------------------------------
%% Helper fns
@ -154,7 +157,7 @@ on_batch_query_async(ResourceId, Requests, ReplyFunAndArgs, State) ->
| {error, {recoverable_error, term()}}
| {error, term()}.
do_send_requests_sync(State, Requests, InstanceId) ->
#{connector_state := ConnectorState} = State,
#{client := Client} = State,
Payloads =
lists:map(
fun({send_message, Selected}) ->
@ -166,18 +169,17 @@ do_send_requests_sync(State, Requests, InstanceId) ->
Path = publish_path(State),
Method = post,
Request = {prepared_request, {Method, Path, Body}},
Result = emqx_bridge_gcp_pubsub_connector:on_query(InstanceId, Request, ConnectorState),
Result = emqx_bridge_gcp_pubsub_client:query_sync(Request, Client),
QueryMode = sync,
handle_result(Result, Request, QueryMode, InstanceId).
-spec do_send_requests_async(
state(),
[{send_message, map()}],
{ReplyFun :: function(), Args :: list()},
resource_id()
{ReplyFun :: function(), Args :: list()}
) -> {ok, pid()}.
do_send_requests_async(State, Requests, ReplyFunAndArgs0, InstanceId) ->
#{connector_state := ConnectorState} = State,
do_send_requests_async(State, Requests, ReplyFunAndArgs0) ->
#{client := Client} = State,
Payloads =
lists:map(
fun({send_message, Selected}) ->
@ -190,8 +192,8 @@ do_send_requests_async(State, Requests, ReplyFunAndArgs0, InstanceId) ->
Method = post,
Request = {prepared_request, {Method, Path, Body}},
ReplyFunAndArgs = {fun ?MODULE:reply_delegator/2, [ReplyFunAndArgs0]},
emqx_bridge_gcp_pubsub_connector:on_query_async(
InstanceId, Request, ReplyFunAndArgs, ConnectorState
emqx_bridge_gcp_pubsub_client:query_async(
Request, ReplyFunAndArgs, Client
).
-spec encode_payload(state(), Selected :: map()) -> #{data := binary()}.
@ -210,7 +212,7 @@ to_pubsub_request(Payloads) ->
-spec publish_path(state()) -> binary().
publish_path(
_State = #{
connector_state := #{project_id := ProjectId},
project_id := ProjectId,
pubsub_topic := PubSubTopic
}
) ->

View File

@ -44,14 +44,14 @@ init_per_suite(Config) ->
emqx_mgmt_api_test_util:init_suite(),
HostPort = GCPEmulatorHost ++ ":" ++ GCPEmulatorPortStr,
true = os:putenv("PUBSUB_EMULATOR_HOST", HostPort),
ConnectorState = start_control_connector(),
Client = start_control_connector(),
[
{proxy_name, ProxyName},
{proxy_host, ProxyHost},
{proxy_port, ProxyPort},
{gcp_emulator_host, GCPEmulatorHost},
{gcp_emulator_port, GCPEmulatorPort},
{connector_state, ConnectorState}
{client, Client}
| Config
];
false ->
@ -64,8 +64,8 @@ init_per_suite(Config) ->
end.
end_per_suite(Config) ->
ConnectorState = ?config(connector_state, Config),
stop_control_connector(ConnectorState),
Client = ?config(client, Config),
stop_control_connector(Client),
emqx_mgmt_api_test_util:end_suite(),
ok = emqx_common_test_helpers:stop_apps([emqx_conf]),
ok = emqx_connector_test_helpers:stop_apps([emqx_bridge, emqx_resource, emqx_rule_engine]),
@ -229,14 +229,13 @@ ensure_topics(Config) ->
ensure_topic(Config, Topic) ->
ProjectId = ?config(project_id, Config),
ConnectorState = #{pool_name := PoolName} = ?config(connector_state, Config),
Client = ?config(client, Config),
Method = put,
Path = <<"/v1/projects/", ProjectId/binary, "/topics/", Topic/binary>>,
Body = <<"{}">>,
Res = emqx_bridge_gcp_pubsub_connector:on_query(
PoolName,
Res = emqx_bridge_gcp_pubsub_client:query_sync(
{prepared_request, {Method, Path, Body}},
ConnectorState
Client
),
case Res of
{ok, _} ->
@ -259,16 +258,15 @@ start_control_connector() ->
service_account_json => ServiceAccount
},
PoolName = <<"control_connector">>,
{ok, ConnectorState} = emqx_bridge_gcp_pubsub_connector:on_start(PoolName, ConnectorConfig),
ConnectorState.
{ok, Client} = emqx_bridge_gcp_pubsub_client:start(PoolName, ConnectorConfig),
Client.
stop_control_connector(ConnectorState) ->
#{pool_name := PoolName} = ConnectorState,
ok = emqx_bridge_gcp_pubsub_connector:on_stop(PoolName, ConnectorState),
stop_control_connector(Client) ->
ok = emqx_bridge_gcp_pubsub_client:stop(Client),
ok.
pubsub_publish(Config, Topic, Messages0) ->
ConnectorState = #{pool_name := PoolName} = ?config(connector_state, Config),
Client = ?config(client, Config),
ProjectId = ?config(project_id, Config),
Method = post,
Path = <<"/v1/projects/", ProjectId/binary, "/topics/", Topic/binary, ":publish">>,
@ -287,10 +285,9 @@ pubsub_publish(Config, Topic, Messages0) ->
Messages0
),
Body = emqx_utils_json:encode(#{<<"messages">> => Messages}),
{ok, _} = emqx_bridge_gcp_pubsub_connector:on_query(
PoolName,
{ok, _} = emqx_bridge_gcp_pubsub_client:query_sync(
{prepared_request, {Method, Path, Body}},
ConnectorState
Client
),
ok.
@ -688,3 +685,4 @@ t_bridge_rule_action_source(Config) ->
%% * connection down during ack
%% * topic deleted while consumer is running
%% * subscription deleted while consumer is running
%% * ensure client is terminated when bridge stops

View File

@ -74,7 +74,7 @@ init_per_suite(Config) ->
ok = emqx_connector_test_helpers:start_apps([emqx_resource, emqx_bridge, emqx_rule_engine]),
{ok, _} = application:ensure_all_started(emqx_connector),
emqx_mgmt_api_test_util:init_suite(),
persistent_term:put({emqx_bridge_gcp_pubsub_connector, transport}, tls),
persistent_term:put({emqx_bridge_gcp_pubsub_client, transport}, tls),
Config.
end_per_suite(_Config) ->
@ -82,7 +82,7 @@ end_per_suite(_Config) ->
ok = emqx_common_test_helpers:stop_apps([emqx_conf]),
ok = emqx_connector_test_helpers:stop_apps([emqx_bridge, emqx_resource, emqx_rule_engine]),
_ = application:stop(emqx_connector),
persistent_term:erase({emqx_bridge_gcp_pubsub_connector, transport}),
persistent_term:erase({emqx_bridge_gcp_pubsub_client, transport}),
ok.
init_per_group(sync_query, Config) ->