diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_connector.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_client.erl similarity index 88% rename from apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_connector.erl rename to apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_client.erl index 35a4a7797..f2fa87d38 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_connector.erl +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_client.erl @@ -2,9 +2,7 @@ %% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- --module(emqx_bridge_gcp_pubsub_connector). - --behaviour(emqx_resource). +-module(emqx_bridge_gcp_pubsub_client). -include_lib("jose/include/jose_jwk.hrl"). -include_lib("emqx_connector/include/emqx_connector_tables.hrl"). @@ -13,18 +11,17 @@ -include_lib("emqx/include/logger.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -%% `emqx_resource' API +%% API -export([ - callback_mode/0, - on_start/2, - on_stop/2, - on_query/3, - on_query_async/4, - on_get_status/2 + start/2, + stop/1, + query_sync/2, + query_async/3, + get_status/1 ]). -export([reply_delegator/3]). --export([get_topic/3]). +-export([get_topic/2]). -export([get_jwt_authorization_header/1]). @@ -37,7 +34,7 @@ service_account_json := service_account_json(), any() => term() }. --type state() :: #{ +-opaque state() :: #{ connect_timeout := timer:time(), jwt_config := emqx_connector_jwt:jwt_config(), max_retries := non_neg_integer(), @@ -66,13 +63,11 @@ -define(DEFAULT_PIPELINE_SIZE, 100). %%------------------------------------------------------------------------------------------------- -%% emqx_resource API +%% API %%------------------------------------------------------------------------------------------------- -callback_mode() -> async_if_possible. - --spec on_start(resource_id(), config()) -> {ok, state()} | {error, term()}. -on_start( +-spec start(resource_id(), config()) -> {ok, state()} | {error, term()}. +start( ResourceId, #{ connect_timeout := ConnectTimeout, @@ -81,11 +76,6 @@ on_start( resource_opts := #{request_ttl := RequestTTL} } = Config ) -> - ?SLOG(info, #{ - msg => "starting_gcp_pubsub_bridge", - connector => ResourceId, - config => Config - }), {Transport, HostPort} = get_transport(), #{hostname := Host, port := Port} = emqx_schema:parse_server(HostPort, #{default_port => 443}), PoolType = random, @@ -141,8 +131,8 @@ on_start( {error, Reason} end. --spec on_stop(resource_id(), state() | undefined) -> ok | {error, term()}. -on_stop(ResourceId, _State) -> +-spec stop(resource_id()) -> ok | {error, term()}. +stop(ResourceId) -> ?tp(gcp_pubsub_stop, #{resource_id => ResourceId}), ?SLOG(info, #{ msg => "stopping_gcp_pubsub_bridge", @@ -158,42 +148,41 @@ on_stop(ResourceId, _State) -> Error end. --spec on_query( - resource_id(), +-spec query_sync( {prepared_request, prepared_request()}, state() ) -> {ok, map()} | {error, {recoverable_error, term()} | term()}. -on_query(ResourceId, {prepared_request, PreparedRequest = {_Method, _Path, _Body}}, State) -> +query_sync({prepared_request, PreparedRequest = {_Method, _Path, _Body}}, State) -> + PoolName = maps:get(pool_name, State), ?TRACE( "QUERY_SYNC", "gcp_pubsub_received", - #{requests => PreparedRequest, connector => ResourceId, state => State} + #{requests => PreparedRequest, connector => PoolName, state => State} ), - do_send_requests_sync(State, {prepared_request, PreparedRequest}, ResourceId). + do_send_requests_sync(State, {prepared_request, PreparedRequest}). --spec on_query_async( - resource_id(), +-spec query_async( {prepared_request, prepared_request()}, {ReplyFun :: function(), Args :: list()}, state() ) -> {ok, pid()}. -on_query_async( - ResourceId, +query_async( {prepared_request, PreparedRequest = {_Method, _Path, _Body}}, ReplyFunAndArgs, State ) -> + PoolName = maps:get(pool_name, State), ?TRACE( "QUERY_ASYNC", "gcp_pubsub_received", - #{requests => PreparedRequest, connector => ResourceId, state => State} + #{requests => PreparedRequest, connector => PoolName, state => State} ), - do_send_requests_async(State, {prepared_request, PreparedRequest}, ReplyFunAndArgs, ResourceId). + do_send_requests_async(State, {prepared_request, PreparedRequest}, ReplyFunAndArgs). --spec on_get_status(resource_id(), state()) -> connected | disconnected. -on_get_status(ResourceId, #{connect_timeout := Timeout} = State) -> - case do_get_status(ResourceId, Timeout) of +-spec get_status(state()) -> connected | disconnected. +get_status(#{connect_timeout := Timeout, pool_name := PoolName} = State) -> + case do_get_status(PoolName, Timeout) of true -> connected; false -> @@ -208,14 +197,14 @@ on_get_status(ResourceId, #{connect_timeout := Timeout} = State) -> %% API %%------------------------------------------------------------------------------------------------- --spec get_topic(resource_id(), topic(), state()) -> {ok, map()} | {error, term()}. -get_topic(ResourceId, Topic, ConnectorState) -> +-spec get_topic(topic(), state()) -> {ok, map()} | {error, term()}. +get_topic(Topic, ConnectorState) -> #{project_id := ProjectId} = ConnectorState, Method = get, Path = <<"/v1/projects/", ProjectId/binary, "/topics/", Topic/binary>>, Body = <<>>, PreparedRequest = {prepared_request, {Method, Path, Body}}, - on_query(ResourceId, PreparedRequest, ConnectorState). + query_sync(PreparedRequest, ConnectorState). %%------------------------------------------------------------------------------------------------- %% Helper fns @@ -286,11 +275,10 @@ get_jwt_authorization_header(JWTConfig) -> -spec do_send_requests_sync( state(), - {prepared_request, prepared_request()}, - resource_id() + {prepared_request, prepared_request()} ) -> {ok, map()} | {error, {recoverable_error, term()} | term()}. -do_send_requests_sync(State, {prepared_request, {Method, Path, Body}}, ResourceId) -> +do_send_requests_sync(State, {prepared_request, {Method, Path, Body}}) -> #{ pool_name := PoolName, max_retries := MaxRetries, @@ -301,7 +289,7 @@ do_send_requests_sync(State, {prepared_request, {Method, Path, Body}}, ResourceI #{ request => {prepared_request, {Method, Path, Body}}, query_mode => sync, - resource_id => ResourceId + resource_id => PoolName } ), Request = to_ehttpc_request(State, Method, Path, Body), @@ -312,16 +300,15 @@ do_send_requests_sync(State, {prepared_request, {Method, Path, Body}}, ResourceI RequestTTL, MaxRetries ), - handle_response(Response, ResourceId, _QueryMode = sync). + handle_response(Response, PoolName, _QueryMode = sync). -spec do_send_requests_async( state(), {prepared_request, prepared_request()}, - {ReplyFun :: function(), Args :: list()}, - resource_id() + {ReplyFun :: function(), Args :: list()} ) -> {ok, pid()}. do_send_requests_async( - State, {prepared_request, {Method, Path, Body}}, ReplyFunAndArgs, ResourceId + State, {prepared_request, {Method, Path, Body}}, ReplyFunAndArgs ) -> #{ pool_name := PoolName, @@ -332,7 +319,7 @@ do_send_requests_async( #{ request => {prepared_request, {Method, Path, Body}}, query_mode => async, - resource_id => ResourceId + resource_id => PoolName } ), Request = to_ehttpc_request(State, Method, Path, Body), @@ -342,7 +329,7 @@ do_send_requests_async( Method, Request, RequestTTL, - {fun ?MODULE:reply_delegator/3, [ResourceId, ReplyFunAndArgs]} + {fun ?MODULE:reply_delegator/3, [PoolName, ReplyFunAndArgs]} ), {ok, Worker}. diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_consumer_worker.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_consumer_worker.erl index 7684d1469..bb9e75a9c 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_consumer_worker.erl +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_consumer_worker.erl @@ -31,29 +31,31 @@ -type ack_id() :: binary(). -type config() :: #{ ack_retry_interval := emqx_schema:timeout_duration_ms(), - connector_state := emqx_bridge_gcp_pubsub_connector:state(), + client := emqx_bridge_gcp_pubsub_client:state(), ecpool_worker_id => non_neg_integer(), hookpoint := binary(), instance_id := binary(), mqtt_config => emqx_bridge_gcp_pubsub_impl_consumer:mqtt_config(), + project_id := emqx_bridge_gcp_pubsub_client:project_id(), pull_max_messages := non_neg_integer(), subscription_id => subscription_id(), - topic => emqx_bridge_gcp_pubsub_connector:topic() + topic => emqx_bridge_gcp_pubsub_client:topic() }. -type state() :: #{ ack_retry_interval := emqx_schema:timeout_duration_ms(), ack_timer := undefined | reference(), async_workers := #{pid() => reference()}, - connector_state := emqx_bridge_gcp_pubsub_connector:state(), + client := emqx_bridge_gcp_pubsub_client:state(), ecpool_worker_id := non_neg_integer(), hookpoint := binary(), instance_id := binary(), mqtt_config => emqx_bridge_gcp_pubsub_impl_consumer:mqtt_config(), pending_acks => [ack_id()], + project_id := emqx_bridge_gcp_pubsub_client:project_id(), pull_max_messages := non_neg_integer(), pull_timer := undefined | reference(), subscription_id => subscription_id(), - topic => emqx_bridge_gcp_pubsub_connector:topic() + topic => emqx_bridge_gcp_pubsub_client:topic() }. -type decoded_message() :: map(). @@ -129,10 +131,11 @@ connect(Opts0) -> #{ ack_retry_interval := AckRetryInterval, bridge_name := BridgeName, - connector_state := ConnectorState, + client := Client, ecpool_worker_id := WorkerId, hookpoint := Hookpoint, instance_id := InstanceId, + project_id := ProjectId, pull_max_messages := PullMaxMessages, topic_mapping := TopicMapping } = Opts, @@ -141,13 +144,14 @@ connect(Opts0) -> {Topic, MQTTConfig} = lists:nth(Index, TopicMappingList), Config = #{ ack_retry_interval => AckRetryInterval, - %% Note: the `connector_state' value here must be immutable and not changed by the + %% Note: the `client' value here must be immutable and not changed by the %% bridge during `on_get_status', since we have handed it over to the pull %% workers. - connector_state => ConnectorState, + client => Client, hookpoint => Hookpoint, instance_id => InstanceId, mqtt_config => MQTTConfig, + project_id => ProjectId, pull_max_messages => PullMaxMessages, topic => Topic, subscription_id => subscription_id(BridgeName, Topic) @@ -264,7 +268,7 @@ ensure_pull_timer(State) -> -spec ensure_subscription_exists(state()) -> ok | error. ensure_subscription_exists(State) -> #{ - connector_state := ConnectorState, + client := Client, instance_id := InstanceId, subscription_id := SubscriptionId, topic := Topic @@ -273,7 +277,7 @@ ensure_subscription_exists(State) -> Path = path(State, create), Body = body(State, create), PreparedRequest = {prepared_request, {Method, Path, Body}}, - Res = emqx_bridge_gcp_pubsub_connector:on_query(InstanceId, PreparedRequest, ConnectorState), + Res = emqx_bridge_gcp_pubsub_client:query_sync(PreparedRequest, Client), case Res of {error, #{status_code := 409}} -> %% already exists @@ -287,9 +291,7 @@ ensure_subscription_exists(State) -> Path1 = path(State, create), Body1 = body(State, patch_subscription), PreparedRequest1 = {prepared_request, {Method1, Path1, Body1}}, - Res1 = emqx_bridge_gcp_pubsub_connector:on_query( - InstanceId, PreparedRequest1, ConnectorState - ), + Res1 = emqx_bridge_gcp_pubsub_client:query_sync(PreparedRequest1, Client), ?SLOG(debug, #{ msg => "gcp_pubsub_consumer_worker_subscription_patch", instance_id => InstanceId, @@ -319,7 +321,7 @@ ensure_subscription_exists(State) -> %% We use async requests so that this process will be more responsive to system messages. do_pull_async(State) -> #{ - connector_state := ConnectorState, + client := Client, instance_id := InstanceId } = State, Method = post, @@ -327,11 +329,10 @@ do_pull_async(State) -> Body = body(State, pull), PreparedRequest = {prepared_request, {Method, Path, Body}}, ReplyFunAndArgs = {fun ?MODULE:reply_delegator/3, [self(), InstanceId]}, - {ok, AsyncWorkerPid} = emqx_bridge_gcp_pubsub_connector:on_query_async( - InstanceId, + {ok, AsyncWorkerPid} = emqx_bridge_gcp_pubsub_client:query_async( PreparedRequest, ReplyFunAndArgs, - ConnectorState + Client ), ensure_async_worker_monitored(State, AsyncWorkerPid). @@ -361,15 +362,14 @@ acknowledge(State0 = #{pending_acks := []}) -> acknowledge(State0) -> State1 = State0#{ack_timer := undefined}, #{ - connector_state := ConnectorState, - instance_id := InstanceId, + client := Client, pending_acks := AckIds } = State1, Method = post, Path = path(State1, ack), Body = body(State1, ack, #{ack_ids => AckIds}), PreparedRequest = {prepared_request, {Method, Path, Body}}, - Res = emqx_bridge_gcp_pubsub_connector:on_query(InstanceId, PreparedRequest, ConnectorState), + Res = emqx_bridge_gcp_pubsub_client:query_sync(PreparedRequest, Client), case Res of {error, Reason} -> ?SLOG(warning, #{msg => "gcp_pubsub_consumer_worker_ack_error", reason => Reason}), @@ -384,14 +384,13 @@ acknowledge(State0) -> do_get_subscription(State) -> #{ - connector_state := ConnectorState, - instance_id := InstanceId + client := Client } = State, Method = get, Path = path(State, get_subscription), Body = body(State, get_subscription), PreparedRequest = {prepared_request, {Method, Path, Body}}, - Res = emqx_bridge_gcp_pubsub_connector:on_query(InstanceId, PreparedRequest, ConnectorState), + Res = emqx_bridge_gcp_pubsub_client:query_sync(PreparedRequest, Client), case Res of {error, Reason} -> ?SLOG(warning, #{ @@ -410,7 +409,7 @@ do_get_subscription(State) -> {error, Details} end. --spec subscription_id(bridge_name(), emqx_bridge_gcp_pubsub_connector:topic()) -> subscription_id(). +-spec subscription_id(bridge_name(), emqx_bridge_gcp_pubsub_client:topic()) -> subscription_id(). subscription_id(BridgeName0, Topic) -> %% The real GCP PubSub accepts colons in subscription names, but its emulator %% doesn't... We currently validate bridge names to not include that character. The @@ -422,7 +421,7 @@ subscription_id(BridgeName0, Topic) -> -spec path(state(), pull | create | ack | get_subscription) -> binary(). path(State, Type) -> #{ - connector_state := #{project_id := ProjectId}, + client := #{project_id := ProjectId}, subscription_id := SubscriptionId } = State, SubscriptionResource = subscription_resource(ProjectId, SubscriptionId), @@ -444,7 +443,7 @@ body(State, pull) -> body(State, create) -> #{ ack_retry_interval := AckRetryInterval, - connector_state := #{project_id := ProjectId}, + project_id := ProjectId, topic := PubSubTopic } = State, TopicResource = <<"projects/", ProjectId/binary, "/topics/", PubSubTopic/binary>>, @@ -457,7 +456,7 @@ body(State, create) -> body(State, patch_subscription) -> #{ ack_retry_interval := AckRetryInterval, - connector_state := #{project_id := ProjectId}, + project_id := ProjectId, topic := PubSubTopic, subscription_id := SubscriptionId } = State, @@ -484,7 +483,7 @@ body(_State, ack, Opts) -> JSON = #{<<"ackIds">> => AckIds}, emqx_utils_json:encode(JSON). --spec subscription_resource(emqx_bridge_gcp_pubsub_connector:project_id(), subscription_id()) -> +-spec subscription_resource(emqx_bridge_gcp_pubsub_client:project_id(), subscription_id()) -> binary(). subscription_resource(ProjectId, SubscriptionId) -> <<"projects/", ProjectId/binary, "/subscriptions/", SubscriptionId/binary>>. diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_consumer.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_consumer.erl index e004875f9..88b57be1e 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_consumer.erl +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_consumer.erl @@ -29,11 +29,11 @@ max_retries := non_neg_integer(), pool_size := non_neg_integer(), resource_opts := #{request_ttl := infinity | emqx_schema:duration_ms(), any() => term()}, - service_account_json := emqx_bridge_gcp_pubsub_connector:service_account_json(), + service_account_json := emqx_bridge_gcp_pubsub_client:service_account_json(), any() => term() }. -type state() :: #{ - connector_state := emqx_bridge_gcp_pubsub_connector:state() + client := emqx_bridge_gcp_pubsub_client:state() }. -export_type([mqtt_config/0]). @@ -52,24 +52,21 @@ query_mode(_Config) -> no_queries. -spec on_start(resource_id(), config()) -> {ok, state()} | {error, term()}. on_start(InstanceId, Config) -> - case emqx_bridge_gcp_pubsub_connector:on_start(InstanceId, Config) of - {ok, ConnectorState} -> - start_consumers(InstanceId, ConnectorState, Config); + case emqx_bridge_gcp_pubsub_client:start(InstanceId, Config) of + {ok, Client} -> + start_consumers(InstanceId, Client, Config); Error -> Error end. -spec on_stop(resource_id(), state()) -> ok | {error, term()}. -on_stop(InstanceId, #{connector_state := ConnectorState}) -> +on_stop(InstanceId, _State) -> ok = stop_consumers(InstanceId), - emqx_bridge_gcp_pubsub_connector:on_stop(InstanceId, ConnectorState); -on_stop(InstanceId, undefined = _State) -> - ok = stop_consumers(InstanceId), - emqx_bridge_gcp_pubsub_connector:on_stop(InstanceId, undefined). + emqx_bridge_gcp_pubsub_client:stop(InstanceId). -spec on_get_status(resource_id(), state()) -> connected | disconnected. on_get_status(InstanceId, _State) -> - %% Note: do *not* alter the `connector_state' value here. It must be immutable, since + %% Note: do *not* alter the `client' value here. It must be immutable, since %% we have handed it over to the pull workers. case emqx_resource_pool:health_check_workers( @@ -85,11 +82,12 @@ on_get_status(InstanceId, _State) -> %% Internal fns %%------------------------------------------------------------------------------------------------- -start_consumers(InstanceId, ConnectorState, Config) -> +start_consumers(InstanceId, Client, Config) -> #{ bridge_name := BridgeName, consumer := ConsumerConfig0, - hookpoint := Hookpoint + hookpoint := Hookpoint, + service_account_json := #{project_id := ProjectId} } = Config, ConsumerConfig1 = maps:update_with(topic_mapping, fun convert_topic_mapping/1, ConsumerConfig0), TopicMapping = maps:get(topic_mapping, ConsumerConfig1), @@ -98,18 +96,19 @@ start_consumers(InstanceId, ConnectorState, Config) -> ConsumerConfig = ConsumerConfig1#{ auto_reconnect => ?AUTO_RECONNECT_S, bridge_name => BridgeName, - connector_state => ConnectorState, + client => Client, hookpoint => Hookpoint, instance_id => InstanceId, - pool_size => PoolSize + pool_size => PoolSize, + project_id => ProjectId }, ConsumerOpts = maps:to_list(ConsumerConfig), %% FIXME: mark as unhealthy if topics do not exist! - case validate_pubsub_topics(InstanceId, TopicMapping, ConnectorState) of + case validate_pubsub_topics(TopicMapping, Client) of ok -> ok; error -> - _ = emqx_bridge_gcp_pubsub_connector:on_stop(InstanceId, ConnectorState), + _ = emqx_bridge_gcp_pubsub_client:stop(InstanceId), throw( "GCP PubSub topics are invalid. Please check the logs, check if the " "topic exists in GCP and if the service account has permissions to use them." @@ -120,12 +119,12 @@ start_consumers(InstanceId, ConnectorState, Config) -> of ok -> State = #{ - connector_state => ConnectorState, + client => Client, pool_name => InstanceId }, {ok, State}; {error, Reason} -> - _ = emqx_bridge_gcp_pubsub_connector:on_stop(InstanceId, ConnectorState), + _ = emqx_bridge_gcp_pubsub_client:stop(InstanceId), {error, Reason} end. @@ -163,23 +162,23 @@ convert_topic_mapping(TopicMappingList) -> TopicMappingList ). -validate_pubsub_topics(InstanceId, TopicMapping, ConnectorState) -> +validate_pubsub_topics(TopicMapping, Client) -> PubSubTopics = maps:keys(TopicMapping), - do_validate_pubsub_topics(InstanceId, ConnectorState, PubSubTopics). + do_validate_pubsub_topics(Client, PubSubTopics). -do_validate_pubsub_topics(InstanceId, ConnectorState, [Topic | Rest]) -> - case check_for_topic_existence(InstanceId, Topic, ConnectorState) of +do_validate_pubsub_topics(Client, [Topic | Rest]) -> + case check_for_topic_existence(Topic, Client) of ok -> - do_validate_pubsub_topics(InstanceId, ConnectorState, Rest); + do_validate_pubsub_topics(Client, Rest); {error, _} -> error end; -do_validate_pubsub_topics(_InstanceId, _ConnectorState, []) -> +do_validate_pubsub_topics(_Client, []) -> %% we already validate that the mapping is not empty in the config schema. ok. -check_for_topic_existence(InstanceId, Topic, ConnectorState) -> - Res = emqx_bridge_gcp_pubsub_connector:get_topic(InstanceId, Topic, ConnectorState), +check_for_topic_existence(Topic, Client) -> + Res = emqx_bridge_gcp_pubsub_client:get_topic(Topic, Client), case Res of {ok, _} -> ok; diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl index 1f198af23..d2469160b 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl @@ -13,17 +13,18 @@ max_retries := non_neg_integer(), pubsub_topic := binary(), resource_opts := #{request_ttl := infinity | emqx_schema:duration_ms(), any() => term()}, - service_account_json := emqx_bridge_gcp_pubsub_connector:service_account_json(), + service_account_json := emqx_bridge_gcp_pubsub_client:service_account_json(), any() => term() }. -type state() :: #{ - connector_state := emqx_bridge_gcp_pubsub_connector:state(), + client := emqx_bridge_gcp_pubsub_client:state(), payload_template := emqx_placeholder:tmpl_token(), + project_id := emqx_bridge_gcp_pubsub_client:project_id(), pubsub_topic := binary() }. --type headers() :: emqx_bridge_gcp_pubsub_connector:headers(). --type body() :: emqx_bridge_gcp_pubsub_connector:body(). --type status_code() :: emqx_bridge_gcp_pubsub_connector:status_code(). +-type headers() :: emqx_bridge_gcp_pubsub_client:headers(). +-type body() :: emqx_bridge_gcp_pubsub_client:body(). +-type status_code() :: emqx_bridge_gcp_pubsub_client:status_code(). %% `emqx_resource' API -export([ @@ -50,15 +51,21 @@ query_mode(_Config) -> async. -spec on_start(resource_id(), config()) -> {ok, state()} | {error, term()}. on_start(InstanceId, Config) -> + ?SLOG(info, #{ + msg => "starting_gcp_pubsub_bridge", + config => Config + }), #{ payload_template := PayloadTemplate, - pubsub_topic := PubSubTopic + pubsub_topic := PubSubTopic, + service_account_json := #{project_id := ProjectId} } = Config, - case emqx_bridge_gcp_pubsub_connector:on_start(InstanceId, Config) of - {ok, ConnectorState} -> + case emqx_bridge_gcp_pubsub_client:start(InstanceId, Config) of + {ok, Client} -> State = #{ - connector_state => ConnectorState, + client => Client, payload_template => emqx_placeholder:preproc_tmpl(PayloadTemplate), + project_id => ProjectId, pubsub_topic => PubSubTopic }, {ok, State}; @@ -67,22 +74,19 @@ on_start(InstanceId, Config) -> end. -spec on_stop(resource_id(), state()) -> ok | {error, term()}. -on_stop(InstanceId, #{connector_state := ConnectorState}) -> - emqx_bridge_gcp_pubsub_connector:on_stop(InstanceId, ConnectorState); -on_stop(InstanceId, undefined = _State) -> - emqx_bridge_gcp_pubsub_connector:on_stop(InstanceId, undefined). +on_stop(InstanceId, _State) -> + emqx_bridge_gcp_pubsub_client:stop(InstanceId). -spec on_get_status(resource_id(), state()) -> connected | disconnected. -on_get_status(InstanceId, #{connector_state := ConnectorState} = _State) -> - emqx_bridge_gcp_pubsub_connector:on_get_status(InstanceId, ConnectorState). +on_get_status(_InstanceId, #{client := Client} = _State) -> + emqx_bridge_gcp_pubsub_client:get_status(Client). -spec on_query( resource_id(), {send_message, map()}, state() ) -> - {ok, status_code(), headers()} - | {ok, status_code(), headers(), body()} + {ok, map()} | {error, {recoverable_error, term()}} | {error, term()}. on_query(ResourceId, {send_message, Selected}, State) -> @@ -107,15 +111,14 @@ on_query_async(ResourceId, {send_message, Selected}, ReplyFunAndArgs, State) -> "gcp_pubsub_received", #{requests => Requests, connector => ResourceId, state => State} ), - do_send_requests_async(State, Requests, ReplyFunAndArgs, ResourceId). + do_send_requests_async(State, Requests, ReplyFunAndArgs). -spec on_batch_query( resource_id(), [{send_message, map()}], state() ) -> - {ok, status_code(), headers()} - | {ok, status_code(), headers(), body()} + {ok, map()} | {error, {recoverable_error, term()}} | {error, term()}. on_batch_query(ResourceId, Requests, State) -> @@ -138,7 +141,7 @@ on_batch_query_async(ResourceId, Requests, ReplyFunAndArgs, State) -> "gcp_pubsub_received", #{requests => Requests, connector => ResourceId, state => State} ), - do_send_requests_async(State, Requests, ReplyFunAndArgs, ResourceId). + do_send_requests_async(State, Requests, ReplyFunAndArgs). %%------------------------------------------------------------------------------------------------- %% Helper fns @@ -154,7 +157,7 @@ on_batch_query_async(ResourceId, Requests, ReplyFunAndArgs, State) -> | {error, {recoverable_error, term()}} | {error, term()}. do_send_requests_sync(State, Requests, InstanceId) -> - #{connector_state := ConnectorState} = State, + #{client := Client} = State, Payloads = lists:map( fun({send_message, Selected}) -> @@ -166,18 +169,17 @@ do_send_requests_sync(State, Requests, InstanceId) -> Path = publish_path(State), Method = post, Request = {prepared_request, {Method, Path, Body}}, - Result = emqx_bridge_gcp_pubsub_connector:on_query(InstanceId, Request, ConnectorState), + Result = emqx_bridge_gcp_pubsub_client:query_sync(Request, Client), QueryMode = sync, handle_result(Result, Request, QueryMode, InstanceId). -spec do_send_requests_async( state(), [{send_message, map()}], - {ReplyFun :: function(), Args :: list()}, - resource_id() + {ReplyFun :: function(), Args :: list()} ) -> {ok, pid()}. -do_send_requests_async(State, Requests, ReplyFunAndArgs0, InstanceId) -> - #{connector_state := ConnectorState} = State, +do_send_requests_async(State, Requests, ReplyFunAndArgs0) -> + #{client := Client} = State, Payloads = lists:map( fun({send_message, Selected}) -> @@ -190,8 +192,8 @@ do_send_requests_async(State, Requests, ReplyFunAndArgs0, InstanceId) -> Method = post, Request = {prepared_request, {Method, Path, Body}}, ReplyFunAndArgs = {fun ?MODULE:reply_delegator/2, [ReplyFunAndArgs0]}, - emqx_bridge_gcp_pubsub_connector:on_query_async( - InstanceId, Request, ReplyFunAndArgs, ConnectorState + emqx_bridge_gcp_pubsub_client:query_async( + Request, ReplyFunAndArgs, Client ). -spec encode_payload(state(), Selected :: map()) -> #{data := binary()}. @@ -210,7 +212,7 @@ to_pubsub_request(Payloads) -> -spec publish_path(state()) -> binary(). publish_path( _State = #{ - connector_state := #{project_id := ProjectId}, + project_id := ProjectId, pubsub_topic := PubSubTopic } ) -> diff --git a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl index 6d81b88d2..2e32f630a 100644 --- a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl +++ b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl @@ -44,14 +44,14 @@ init_per_suite(Config) -> emqx_mgmt_api_test_util:init_suite(), HostPort = GCPEmulatorHost ++ ":" ++ GCPEmulatorPortStr, true = os:putenv("PUBSUB_EMULATOR_HOST", HostPort), - ConnectorState = start_control_connector(), + Client = start_control_connector(), [ {proxy_name, ProxyName}, {proxy_host, ProxyHost}, {proxy_port, ProxyPort}, {gcp_emulator_host, GCPEmulatorHost}, {gcp_emulator_port, GCPEmulatorPort}, - {connector_state, ConnectorState} + {client, Client} | Config ]; false -> @@ -64,8 +64,8 @@ init_per_suite(Config) -> end. end_per_suite(Config) -> - ConnectorState = ?config(connector_state, Config), - stop_control_connector(ConnectorState), + Client = ?config(client, Config), + stop_control_connector(Client), emqx_mgmt_api_test_util:end_suite(), ok = emqx_common_test_helpers:stop_apps([emqx_conf]), ok = emqx_connector_test_helpers:stop_apps([emqx_bridge, emqx_resource, emqx_rule_engine]), @@ -229,14 +229,13 @@ ensure_topics(Config) -> ensure_topic(Config, Topic) -> ProjectId = ?config(project_id, Config), - ConnectorState = #{pool_name := PoolName} = ?config(connector_state, Config), + Client = ?config(client, Config), Method = put, Path = <<"/v1/projects/", ProjectId/binary, "/topics/", Topic/binary>>, Body = <<"{}">>, - Res = emqx_bridge_gcp_pubsub_connector:on_query( - PoolName, + Res = emqx_bridge_gcp_pubsub_client:query_sync( {prepared_request, {Method, Path, Body}}, - ConnectorState + Client ), case Res of {ok, _} -> @@ -259,16 +258,15 @@ start_control_connector() -> service_account_json => ServiceAccount }, PoolName = <<"control_connector">>, - {ok, ConnectorState} = emqx_bridge_gcp_pubsub_connector:on_start(PoolName, ConnectorConfig), - ConnectorState. + {ok, Client} = emqx_bridge_gcp_pubsub_client:start(PoolName, ConnectorConfig), + Client. -stop_control_connector(ConnectorState) -> - #{pool_name := PoolName} = ConnectorState, - ok = emqx_bridge_gcp_pubsub_connector:on_stop(PoolName, ConnectorState), +stop_control_connector(Client) -> + ok = emqx_bridge_gcp_pubsub_client:stop(Client), ok. pubsub_publish(Config, Topic, Messages0) -> - ConnectorState = #{pool_name := PoolName} = ?config(connector_state, Config), + Client = ?config(client, Config), ProjectId = ?config(project_id, Config), Method = post, Path = <<"/v1/projects/", ProjectId/binary, "/topics/", Topic/binary, ":publish">>, @@ -287,10 +285,9 @@ pubsub_publish(Config, Topic, Messages0) -> Messages0 ), Body = emqx_utils_json:encode(#{<<"messages">> => Messages}), - {ok, _} = emqx_bridge_gcp_pubsub_connector:on_query( - PoolName, + {ok, _} = emqx_bridge_gcp_pubsub_client:query_sync( {prepared_request, {Method, Path, Body}}, - ConnectorState + Client ), ok. @@ -688,3 +685,4 @@ t_bridge_rule_action_source(Config) -> %% * connection down during ack %% * topic deleted while consumer is running %% * subscription deleted while consumer is running +%% * ensure client is terminated when bridge stops diff --git a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_producer_SUITE.erl b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_producer_SUITE.erl index 5b1e7c6c6..67e4f52d1 100644 --- a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_producer_SUITE.erl +++ b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_producer_SUITE.erl @@ -74,7 +74,7 @@ init_per_suite(Config) -> ok = emqx_connector_test_helpers:start_apps([emqx_resource, emqx_bridge, emqx_rule_engine]), {ok, _} = application:ensure_all_started(emqx_connector), emqx_mgmt_api_test_util:init_suite(), - persistent_term:put({emqx_bridge_gcp_pubsub_connector, transport}, tls), + persistent_term:put({emqx_bridge_gcp_pubsub_client, transport}, tls), Config. end_per_suite(_Config) -> @@ -82,7 +82,7 @@ end_per_suite(_Config) -> ok = emqx_common_test_helpers:stop_apps([emqx_conf]), ok = emqx_connector_test_helpers:stop_apps([emqx_bridge, emqx_resource, emqx_rule_engine]), _ = application:stop(emqx_connector), - persistent_term:erase({emqx_bridge_gcp_pubsub_connector, transport}), + persistent_term:erase({emqx_bridge_gcp_pubsub_client, transport}), ok. init_per_group(sync_query, Config) ->