diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_connector.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_connector.erl index f61817095..6afccccc0 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_connector.erl +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_connector.erl @@ -20,17 +20,16 @@ on_stop/2, on_query/3, on_query_async/4, - on_batch_query/3, - on_batch_query_async/4, on_get_status/2 ]). -export([reply_delegator/3]). +-export([get_jwt_authorization_header/1]). + -type service_account_json() :: emqx_bridge_gcp_pubsub:service_account_json(). -type config() :: #{ connect_timeout := emqx_schema:duration_ms(), max_retries := non_neg_integer(), - pubsub_topic := binary(), resource_opts := #{request_ttl := infinity | emqx_schema:duration_ms(), any() => term()}, service_account_json := service_account_json(), any() => term() @@ -39,15 +38,18 @@ connect_timeout := timer:time(), jwt_config := emqx_connector_jwt:jwt_config(), max_retries := non_neg_integer(), - payload_template := emqx_placeholder:tmpl_token(), pool_name := binary(), project_id := binary(), - pubsub_topic := binary(), request_ttl := infinity | timer:time() }. -type headers() :: [{binary(), iodata()}]. -type body() :: iodata(). -type status_code() :: 100..599. +-type method() :: post. +-type path() :: binary(). +-type prepared_request() :: {method(), path(), body()}. + +-export_type([service_account_json/0, state/0, headers/0, body/0, status_code/0]). -define(DEFAULT_PIPELINE_SIZE, 100). @@ -63,9 +65,7 @@ on_start( #{ connect_timeout := ConnectTimeout, max_retries := MaxRetries, - payload_template := PayloadTemplate, pool_size := PoolSize, - pubsub_topic := PubSubTopic, resource_opts := #{request_ttl := RequestTTL} } = Config ) -> @@ -101,10 +101,8 @@ on_start( connect_timeout => ConnectTimeout, jwt_config => JWTConfig, max_retries => MaxRetries, - payload_template => emqx_placeholder:preproc_tmpl(PayloadTemplate), pool_name => ResourceId, project_id => ProjectId, - pubsub_topic => PubSubTopic, request_ttl => RequestTTL }, ?tp( @@ -130,7 +128,7 @@ on_start( {error, Reason} end. --spec on_stop(resource_id(), state()) -> ok | {error, term()}. +-spec on_stop(resource_id(), state() | undefined) -> ok | {error, term()}. on_stop(ResourceId, _State) -> ?tp(gcp_pubsub_stop, #{resource_id => ResourceId}), ?SLOG(info, #{ @@ -149,67 +147,39 @@ on_stop(ResourceId, _State) -> -spec on_query( resource_id(), - {send_message, map()}, + {prepared_request, prepared_request()}, state() ) -> {ok, status_code(), headers()} | {ok, status_code(), headers(), body()} | {error, {recoverable_error, term()}} | {error, term()}. -on_query(ResourceId, {send_message, Selected}, State) -> - Requests = [{send_message, Selected}], +on_query(ResourceId, {prepared_request, PreparedRequest = {_Method, _Path, _Body}}, State) -> ?TRACE( "QUERY_SYNC", "gcp_pubsub_received", - #{requests => Requests, connector => ResourceId, state => State} + #{requests => PreparedRequest, connector => ResourceId, state => State} ), - do_send_requests_sync(State, Requests, ResourceId). + do_send_requests_sync(State, {prepared_request, PreparedRequest}, ResourceId). -spec on_query_async( resource_id(), - {send_message, map()}, + {prepared_request, prepared_request()}, {ReplyFun :: function(), Args :: list()}, state() ) -> {ok, pid()}. -on_query_async(ResourceId, {send_message, Selected}, ReplyFunAndArgs, State) -> - Requests = [{send_message, Selected}], - ?TRACE( - "QUERY_ASYNC", - "gcp_pubsub_received", - #{requests => Requests, connector => ResourceId, state => State} - ), - do_send_requests_async(State, Requests, ReplyFunAndArgs, ResourceId). - --spec on_batch_query( - resource_id(), - [{send_message, map()}], - state() +on_query_async( + ResourceId, + {prepared_request, PreparedRequest = {_Method, _Path, _Body}}, + ReplyFunAndArgs, + State ) -> - {ok, status_code(), headers()} - | {ok, status_code(), headers(), body()} - | {error, {recoverable_error, term()}} - | {error, term()}. -on_batch_query(ResourceId, Requests, State) -> - ?TRACE( - "QUERY_SYNC", - "gcp_pubsub_received", - #{requests => Requests, connector => ResourceId, state => State} - ), - do_send_requests_sync(State, Requests, ResourceId). - --spec on_batch_query_async( - resource_id(), - [{send_message, map()}], - {ReplyFun :: function(), Args :: list()}, - state() -) -> {ok, pid()}. -on_batch_query_async(ResourceId, Requests, ReplyFunAndArgs, State) -> ?TRACE( "QUERY_ASYNC", "gcp_pubsub_received", - #{requests => Requests, connector => ResourceId, state => State} + #{requests => PreparedRequest, connector => ResourceId, state => State} ), - do_send_requests_async(State, Requests, ReplyFunAndArgs, ResourceId). + do_send_requests_async(State, {prepared_request, PreparedRequest}, ReplyFunAndArgs, ResourceId). -spec on_get_status(resource_id(), state()) -> connected | disconnected. on_get_status(ResourceId, #{connect_timeout := Timeout} = State) -> @@ -286,28 +256,6 @@ parse_jwt_config(ResourceId, #{ project_id => ProjectId }. --spec encode_payload(state(), Selected :: map()) -> #{data := binary()}. -encode_payload(_State = #{payload_template := PayloadTemplate}, Selected) -> - Interpolated = - case PayloadTemplate of - [] -> emqx_utils_json:encode(Selected); - _ -> emqx_placeholder:proc_tmpl(PayloadTemplate, Selected) - end, - #{data => base64:encode(Interpolated)}. - --spec to_pubsub_request([#{data := binary()}]) -> binary(). -to_pubsub_request(Payloads) -> - emqx_utils_json:encode(#{messages => Payloads}). - --spec publish_path(state()) -> binary(). -publish_path( - _State = #{ - project_id := ProjectId, - pubsub_topic := PubSubTopic - } -) -> - <<"/v1/projects/", ProjectId/binary, "/topics/", PubSubTopic/binary, ":publish">>. - -spec get_jwt_authorization_header(emqx_connector_jwt:jwt_config()) -> [{binary(), binary()}]. get_jwt_authorization_header(JWTConfig) -> JWT = emqx_connector_jwt:ensure_jwt(JWTConfig), @@ -315,14 +263,14 @@ get_jwt_authorization_header(JWTConfig) -> -spec do_send_requests_sync( state(), - [{send_message, map()}], + {prepared_request, prepared_request()}, resource_id() ) -> {ok, status_code(), headers()} | {ok, status_code(), headers(), body()} | {error, {recoverable_error, term()}} | {error, term()}. -do_send_requests_sync(State, Requests, ResourceId) -> +do_send_requests_sync(State, {prepared_request, {Method, Path, Body}}, ResourceId) -> #{ jwt_config := JWTConfig, pool_name := PoolName, @@ -333,21 +281,10 @@ do_send_requests_sync(State, Requests, ResourceId) -> gcp_pubsub_bridge_do_send_requests, #{ query_mode => sync, - resource_id => ResourceId, - requests => Requests + resource_id => ResourceId } ), Headers = get_jwt_authorization_header(JWTConfig), - Payloads = - lists:map( - fun({send_message, Selected}) -> - encode_payload(State, Selected) - end, - Requests - ), - Body = to_pubsub_request(Payloads), - Path = publish_path(State), - Method = post, Request = {Path, Headers, Body}, case ehttpc:request( @@ -443,11 +380,13 @@ do_send_requests_sync(State, Requests, ResourceId) -> -spec do_send_requests_async( state(), - [{send_message, map()}], + {prepared_request, prepared_request()}, {ReplyFun :: function(), Args :: list()}, resource_id() ) -> {ok, pid()}. -do_send_requests_async(State, Requests, ReplyFunAndArgs, ResourceId) -> +do_send_requests_async( + State, {prepared_request, {Method, Path, Body}}, ReplyFunAndArgs, ResourceId +) -> #{ jwt_config := JWTConfig, pool_name := PoolName, @@ -457,21 +396,10 @@ do_send_requests_async(State, Requests, ReplyFunAndArgs, ResourceId) -> gcp_pubsub_bridge_do_send_requests, #{ query_mode => async, - resource_id => ResourceId, - requests => Requests + resource_id => ResourceId } ), Headers = get_jwt_authorization_header(JWTConfig), - Payloads = - lists:map( - fun({send_message, Selected}) -> - encode_payload(State, Selected) - end, - Requests - ), - Body = to_pubsub_request(Payloads), - Path = publish_path(State), - Method = post, Request = {Path, Headers, Body}, Worker = ehttpc_pool:pick_worker(PoolName), ok = ehttpc:request_async( diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl new file mode 100644 index 000000000..840a96ac1 --- /dev/null +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl @@ -0,0 +1,211 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_gcp_pubsub_impl_producer). + +-include_lib("emqx/include/logger.hrl"). +-include_lib("emqx_resource/include/emqx_resource.hrl"). + +-type config() :: #{ + connect_timeout := emqx_schema:duration_ms(), + max_retries := non_neg_integer(), + pubsub_topic := binary(), + resource_opts := #{request_ttl := infinity | emqx_schema:duration_ms(), any() => term()}, + service_account_json := emqx_bridge_gcp_pubsub_connector:service_account_json(), + any() => term() +}. +-type state() :: #{ + connector_state := emqx_bridge_gcp_pubsub_connector:state(), + payload_template := emqx_placeholder:tmpl_token(), + pubsub_topic := binary() +}. +-type headers() :: emqx_bridge_gcp_pubsub_connector:headers(). +-type body() :: emqx_bridge_gcp_pubsub_connector:body(). +-type status_code() :: emqx_bridge_gcp_pubsub_connector:status_code(). + +%% `emqx_resource' API +-export([ + callback_mode/0, + query_mode/1, + on_start/2, + on_stop/2, + on_query/3, + on_query_async/4, + on_batch_query/3, + on_batch_query_async/4, + on_get_status/2 +]). + +%%------------------------------------------------------------------------------------------------- +%% `emqx_resource' API +%%------------------------------------------------------------------------------------------------- + +callback_mode() -> async_if_possible. + +query_mode(_Config) -> async. + +-spec on_start(resource_id(), config()) -> {ok, state()} | {error, term()}. +on_start(InstanceId, Config) -> + #{ + payload_template := PayloadTemplate, + pubsub_topic := PubSubTopic + } = Config, + case emqx_bridge_gcp_pubsub_connector:on_start(InstanceId, Config) of + {ok, ConnectorState} -> + State = #{ + connector_state => ConnectorState, + payload_template => emqx_placeholder:preproc_tmpl(PayloadTemplate), + pubsub_topic => PubSubTopic + }, + {ok, State}; + Error -> + Error + end. + +-spec on_stop(resource_id(), state()) -> ok | {error, term()}. +on_stop(InstanceId, #{connector_state := ConnectorState}) -> + emqx_bridge_gcp_pubsub_connector:on_stop(InstanceId, ConnectorState); +on_stop(InstanceId, undefined = _State) -> + emqx_bridge_gcp_pubsub_connector:on_stop(InstanceId, undefined). + +-spec on_get_status(resource_id(), state()) -> connected | disconnected. +on_get_status(InstanceId, #{connector_state := ConnectorState} = _State) -> + emqx_bridge_gcp_pubsub_connector:on_get_status(InstanceId, ConnectorState). + +-spec on_query( + resource_id(), + {send_message, map()}, + state() +) -> + {ok, status_code(), headers()} + | {ok, status_code(), headers(), body()} + | {error, {recoverable_error, term()}} + | {error, term()}. +on_query(ResourceId, {send_message, Selected}, State) -> + Requests = [{send_message, Selected}], + ?TRACE( + "QUERY_SYNC", + "gcp_pubsub_received", + #{requests => Requests, connector => ResourceId, state => State} + ), + do_send_requests_sync(State, Requests, ResourceId). + +-spec on_query_async( + resource_id(), + {send_message, map()}, + {ReplyFun :: function(), Args :: list()}, + state() +) -> {ok, pid()}. +on_query_async(ResourceId, {send_message, Selected}, ReplyFunAndArgs, State) -> + Requests = [{send_message, Selected}], + ?TRACE( + "QUERY_ASYNC", + "gcp_pubsub_received", + #{requests => Requests, connector => ResourceId, state => State} + ), + do_send_requests_async(State, Requests, ReplyFunAndArgs, ResourceId). + +-spec on_batch_query( + resource_id(), + [{send_message, map()}], + state() +) -> + {ok, status_code(), headers()} + | {ok, status_code(), headers(), body()} + | {error, {recoverable_error, term()}} + | {error, term()}. +on_batch_query(ResourceId, Requests, State) -> + ?TRACE( + "QUERY_SYNC", + "gcp_pubsub_received", + #{requests => Requests, connector => ResourceId, state => State} + ), + do_send_requests_sync(State, Requests, ResourceId). + +-spec on_batch_query_async( + resource_id(), + [{send_message, map()}], + {ReplyFun :: function(), Args :: list()}, + state() +) -> {ok, pid()}. +on_batch_query_async(ResourceId, Requests, ReplyFunAndArgs, State) -> + ?TRACE( + "QUERY_ASYNC", + "gcp_pubsub_received", + #{requests => Requests, connector => ResourceId, state => State} + ), + do_send_requests_async(State, Requests, ReplyFunAndArgs, ResourceId). + +%%------------------------------------------------------------------------------------------------- +%% Helper fns +%%------------------------------------------------------------------------------------------------- + +-spec do_send_requests_sync( + state(), + [{send_message, map()}], + resource_id() +) -> + {ok, status_code(), headers()} + | {ok, status_code(), headers(), body()} + | {error, {recoverable_error, term()}} + | {error, term()}. +do_send_requests_sync(State, Requests, InstanceId) -> + #{connector_state := ConnectorState} = State, + Payloads = + lists:map( + fun({send_message, Selected}) -> + encode_payload(State, Selected) + end, + Requests + ), + Body = to_pubsub_request(Payloads), + Path = publish_path(State), + Method = post, + Request = {prepared_request, {Method, Path, Body}}, + emqx_bridge_gcp_pubsub_connector:on_query(InstanceId, Request, ConnectorState). + +-spec do_send_requests_async( + state(), + [{send_message, map()}], + {ReplyFun :: function(), Args :: list()}, + resource_id() +) -> {ok, pid()}. +do_send_requests_async(State, Requests, ReplyFunAndArgs, InstanceId) -> + #{connector_state := ConnectorState} = State, + Payloads = + lists:map( + fun({send_message, Selected}) -> + encode_payload(State, Selected) + end, + Requests + ), + Body = to_pubsub_request(Payloads), + Path = publish_path(State), + Method = post, + Request = {prepared_request, {Method, Path, Body}}, + emqx_bridge_gcp_pubsub_connector:on_query_async( + InstanceId, Request, ReplyFunAndArgs, ConnectorState + ). + +-spec encode_payload(state(), Selected :: map()) -> #{data := binary()}. +encode_payload(_State = #{payload_template := PayloadTemplate}, Selected) -> + Interpolated = + case PayloadTemplate of + [] -> emqx_utils_json:encode(Selected); + _ -> emqx_placeholder:proc_tmpl(PayloadTemplate, Selected) + end, + #{data => base64:encode(Interpolated)}. + +-spec to_pubsub_request([#{data := binary()}]) -> binary(). +to_pubsub_request(Payloads) -> + emqx_utils_json:encode(#{messages => Payloads}). + +-spec publish_path(state()) -> binary(). +publish_path( + _State = #{ + connector_state := #{project_id := ProjectId}, + pubsub_topic := PubSubTopic + } +) -> + <<"/v1/projects/", ProjectId/binary, "/topics/", PubSubTopic/binary, ":publish">>. diff --git a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_SUITE.erl b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_SUITE.erl index 4dd2682fb..29849db41 100644 --- a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_SUITE.erl +++ b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_SUITE.erl @@ -38,13 +38,13 @@ groups() -> {group, sync_query}, {group, async_query} ], - ResourceGroups = [{group, gcp_pubsub}], + SyncTCs = MatrixTCs, + AsyncTCs = MatrixTCs -- only_sync_tests(), [ {with_batch, SynchronyGroups}, {without_batch, SynchronyGroups}, - {sync_query, ResourceGroups}, - {async_query, ResourceGroups}, - {gcp_pubsub, MatrixTCs} + {sync_query, SyncTCs}, + {async_query, AsyncTCs} ]. %% these should not be influenced by the batch/no @@ -66,6 +66,9 @@ single_config_tests() -> t_on_start_ehttpc_pool_already_started ]. +only_sync_tests() -> + [t_query_sync]. + init_per_suite(Config) -> ok = emqx_common_test_helpers:start_apps([emqx_conf]), ok = emqx_connector_test_helpers:start_apps([emqx_resource, emqx_bridge, emqx_rule_engine]), @@ -1464,3 +1467,30 @@ t_on_start_ehttpc_pool_start_failure(Config) -> end ), ok. + +%% Usually not called, since the bridge has `async_if_possible' callback mode. +t_query_sync(Config) -> + BatchSize0 = ?config(batch_size, Config), + ServiceAccountJSON = ?config(service_account_json, Config), + BatchSize = min(2, BatchSize0), + Topic = <<"t/topic">>, + Payload = <<"payload">>, + ?check_trace( + emqx_common_test_helpers:with_mock( + emqx_bridge_gcp_pubsub_impl_producer, + callback_mode, + fun() -> always_sync end, + fun() -> + {ok, _} = create_bridge(Config), + {ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config), + on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end), + Message = emqx_message:make(Topic, Payload), + emqx_utils:pmap(fun(_) -> emqx:publish(Message) end, lists:seq(1, BatchSize)), + DecodedMessages = assert_http_request(ServiceAccountJSON), + ?assertEqual(BatchSize, length(DecodedMessages)), + ok + end + ), + [] + ), + ok. diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl index 42e10179e..e56a96e4f 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl @@ -91,7 +91,7 @@ resource_type(kafka_consumer) -> emqx_bridge_kafka_impl_consumer; resource_type(kafka) -> emqx_bridge_kafka_impl_producer; resource_type(cassandra) -> emqx_bridge_cassandra_connector; resource_type(hstreamdb) -> emqx_ee_connector_hstreamdb; -resource_type(gcp_pubsub) -> emqx_bridge_gcp_pubsub_connector; +resource_type(gcp_pubsub) -> emqx_bridge_gcp_pubsub_impl_producer; resource_type(mongodb_rs) -> emqx_ee_connector_mongodb; resource_type(mongodb_sharded) -> emqx_ee_connector_mongodb; resource_type(mongodb_single) -> emqx_ee_connector_mongodb;