refactor(gcp_pubsub): split connector into producer and reusable parts

This commit is contained in:
Thales Macedo Garitezi 2023-06-13 16:55:05 -03:00
parent 5ef19ecf58
commit 5375421954
4 changed files with 274 additions and 105 deletions

View File

@ -20,17 +20,16 @@
on_stop/2,
on_query/3,
on_query_async/4,
on_batch_query/3,
on_batch_query_async/4,
on_get_status/2
]).
-export([reply_delegator/3]).
-export([get_jwt_authorization_header/1]).
-type service_account_json() :: emqx_bridge_gcp_pubsub:service_account_json().
-type config() :: #{
connect_timeout := emqx_schema:duration_ms(),
max_retries := non_neg_integer(),
pubsub_topic := binary(),
resource_opts := #{request_ttl := infinity | emqx_schema:duration_ms(), any() => term()},
service_account_json := service_account_json(),
any() => term()
@ -39,15 +38,18 @@
connect_timeout := timer:time(),
jwt_config := emqx_connector_jwt:jwt_config(),
max_retries := non_neg_integer(),
payload_template := emqx_placeholder:tmpl_token(),
pool_name := binary(),
project_id := binary(),
pubsub_topic := binary(),
request_ttl := infinity | timer:time()
}.
-type headers() :: [{binary(), iodata()}].
-type body() :: iodata().
-type status_code() :: 100..599.
-type method() :: post.
-type path() :: binary().
-type prepared_request() :: {method(), path(), body()}.
-export_type([service_account_json/0, state/0, headers/0, body/0, status_code/0]).
-define(DEFAULT_PIPELINE_SIZE, 100).
@ -63,9 +65,7 @@ on_start(
#{
connect_timeout := ConnectTimeout,
max_retries := MaxRetries,
payload_template := PayloadTemplate,
pool_size := PoolSize,
pubsub_topic := PubSubTopic,
resource_opts := #{request_ttl := RequestTTL}
} = Config
) ->
@ -101,10 +101,8 @@ on_start(
connect_timeout => ConnectTimeout,
jwt_config => JWTConfig,
max_retries => MaxRetries,
payload_template => emqx_placeholder:preproc_tmpl(PayloadTemplate),
pool_name => ResourceId,
project_id => ProjectId,
pubsub_topic => PubSubTopic,
request_ttl => RequestTTL
},
?tp(
@ -130,7 +128,7 @@ on_start(
{error, Reason}
end.
-spec on_stop(resource_id(), state()) -> ok | {error, term()}.
-spec on_stop(resource_id(), state() | undefined) -> ok | {error, term()}.
on_stop(ResourceId, _State) ->
?tp(gcp_pubsub_stop, #{resource_id => ResourceId}),
?SLOG(info, #{
@ -149,67 +147,39 @@ on_stop(ResourceId, _State) ->
-spec on_query(
resource_id(),
{send_message, map()},
{prepared_request, prepared_request()},
state()
) ->
{ok, status_code(), headers()}
| {ok, status_code(), headers(), body()}
| {error, {recoverable_error, term()}}
| {error, term()}.
on_query(ResourceId, {send_message, Selected}, State) ->
Requests = [{send_message, Selected}],
on_query(ResourceId, {prepared_request, PreparedRequest = {_Method, _Path, _Body}}, State) ->
?TRACE(
"QUERY_SYNC",
"gcp_pubsub_received",
#{requests => Requests, connector => ResourceId, state => State}
#{requests => PreparedRequest, connector => ResourceId, state => State}
),
do_send_requests_sync(State, Requests, ResourceId).
do_send_requests_sync(State, {prepared_request, PreparedRequest}, ResourceId).
-spec on_query_async(
resource_id(),
{send_message, map()},
{prepared_request, prepared_request()},
{ReplyFun :: function(), Args :: list()},
state()
) -> {ok, pid()}.
on_query_async(ResourceId, {send_message, Selected}, ReplyFunAndArgs, State) ->
Requests = [{send_message, Selected}],
?TRACE(
"QUERY_ASYNC",
"gcp_pubsub_received",
#{requests => Requests, connector => ResourceId, state => State}
),
do_send_requests_async(State, Requests, ReplyFunAndArgs, ResourceId).
-spec on_batch_query(
resource_id(),
[{send_message, map()}],
state()
on_query_async(
ResourceId,
{prepared_request, PreparedRequest = {_Method, _Path, _Body}},
ReplyFunAndArgs,
State
) ->
{ok, status_code(), headers()}
| {ok, status_code(), headers(), body()}
| {error, {recoverable_error, term()}}
| {error, term()}.
on_batch_query(ResourceId, Requests, State) ->
?TRACE(
"QUERY_SYNC",
"gcp_pubsub_received",
#{requests => Requests, connector => ResourceId, state => State}
),
do_send_requests_sync(State, Requests, ResourceId).
-spec on_batch_query_async(
resource_id(),
[{send_message, map()}],
{ReplyFun :: function(), Args :: list()},
state()
) -> {ok, pid()}.
on_batch_query_async(ResourceId, Requests, ReplyFunAndArgs, State) ->
?TRACE(
"QUERY_ASYNC",
"gcp_pubsub_received",
#{requests => Requests, connector => ResourceId, state => State}
#{requests => PreparedRequest, connector => ResourceId, state => State}
),
do_send_requests_async(State, Requests, ReplyFunAndArgs, ResourceId).
do_send_requests_async(State, {prepared_request, PreparedRequest}, ReplyFunAndArgs, ResourceId).
-spec on_get_status(resource_id(), state()) -> connected | disconnected.
on_get_status(ResourceId, #{connect_timeout := Timeout} = State) ->
@ -286,28 +256,6 @@ parse_jwt_config(ResourceId, #{
project_id => ProjectId
}.
-spec encode_payload(state(), Selected :: map()) -> #{data := binary()}.
encode_payload(_State = #{payload_template := PayloadTemplate}, Selected) ->
Interpolated =
case PayloadTemplate of
[] -> emqx_utils_json:encode(Selected);
_ -> emqx_placeholder:proc_tmpl(PayloadTemplate, Selected)
end,
#{data => base64:encode(Interpolated)}.
-spec to_pubsub_request([#{data := binary()}]) -> binary().
to_pubsub_request(Payloads) ->
emqx_utils_json:encode(#{messages => Payloads}).
-spec publish_path(state()) -> binary().
publish_path(
_State = #{
project_id := ProjectId,
pubsub_topic := PubSubTopic
}
) ->
<<"/v1/projects/", ProjectId/binary, "/topics/", PubSubTopic/binary, ":publish">>.
-spec get_jwt_authorization_header(emqx_connector_jwt:jwt_config()) -> [{binary(), binary()}].
get_jwt_authorization_header(JWTConfig) ->
JWT = emqx_connector_jwt:ensure_jwt(JWTConfig),
@ -315,14 +263,14 @@ get_jwt_authorization_header(JWTConfig) ->
-spec do_send_requests_sync(
state(),
[{send_message, map()}],
{prepared_request, prepared_request()},
resource_id()
) ->
{ok, status_code(), headers()}
| {ok, status_code(), headers(), body()}
| {error, {recoverable_error, term()}}
| {error, term()}.
do_send_requests_sync(State, Requests, ResourceId) ->
do_send_requests_sync(State, {prepared_request, {Method, Path, Body}}, ResourceId) ->
#{
jwt_config := JWTConfig,
pool_name := PoolName,
@ -333,21 +281,10 @@ do_send_requests_sync(State, Requests, ResourceId) ->
gcp_pubsub_bridge_do_send_requests,
#{
query_mode => sync,
resource_id => ResourceId,
requests => Requests
resource_id => ResourceId
}
),
Headers = get_jwt_authorization_header(JWTConfig),
Payloads =
lists:map(
fun({send_message, Selected}) ->
encode_payload(State, Selected)
end,
Requests
),
Body = to_pubsub_request(Payloads),
Path = publish_path(State),
Method = post,
Request = {Path, Headers, Body},
case
ehttpc:request(
@ -443,11 +380,13 @@ do_send_requests_sync(State, Requests, ResourceId) ->
-spec do_send_requests_async(
state(),
[{send_message, map()}],
{prepared_request, prepared_request()},
{ReplyFun :: function(), Args :: list()},
resource_id()
) -> {ok, pid()}.
do_send_requests_async(State, Requests, ReplyFunAndArgs, ResourceId) ->
do_send_requests_async(
State, {prepared_request, {Method, Path, Body}}, ReplyFunAndArgs, ResourceId
) ->
#{
jwt_config := JWTConfig,
pool_name := PoolName,
@ -457,21 +396,10 @@ do_send_requests_async(State, Requests, ReplyFunAndArgs, ResourceId) ->
gcp_pubsub_bridge_do_send_requests,
#{
query_mode => async,
resource_id => ResourceId,
requests => Requests
resource_id => ResourceId
}
),
Headers = get_jwt_authorization_header(JWTConfig),
Payloads =
lists:map(
fun({send_message, Selected}) ->
encode_payload(State, Selected)
end,
Requests
),
Body = to_pubsub_request(Payloads),
Path = publish_path(State),
Method = post,
Request = {Path, Headers, Body},
Worker = ehttpc_pool:pick_worker(PoolName),
ok = ehttpc:request_async(

View File

@ -0,0 +1,211 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_gcp_pubsub_impl_producer).
-include_lib("emqx/include/logger.hrl").
-include_lib("emqx_resource/include/emqx_resource.hrl").
-type config() :: #{
connect_timeout := emqx_schema:duration_ms(),
max_retries := non_neg_integer(),
pubsub_topic := binary(),
resource_opts := #{request_ttl := infinity | emqx_schema:duration_ms(), any() => term()},
service_account_json := emqx_bridge_gcp_pubsub_connector:service_account_json(),
any() => term()
}.
-type state() :: #{
connector_state := emqx_bridge_gcp_pubsub_connector:state(),
payload_template := emqx_placeholder:tmpl_token(),
pubsub_topic := binary()
}.
-type headers() :: emqx_bridge_gcp_pubsub_connector:headers().
-type body() :: emqx_bridge_gcp_pubsub_connector:body().
-type status_code() :: emqx_bridge_gcp_pubsub_connector:status_code().
%% `emqx_resource' API
-export([
callback_mode/0,
query_mode/1,
on_start/2,
on_stop/2,
on_query/3,
on_query_async/4,
on_batch_query/3,
on_batch_query_async/4,
on_get_status/2
]).
%%-------------------------------------------------------------------------------------------------
%% `emqx_resource' API
%%-------------------------------------------------------------------------------------------------
callback_mode() -> async_if_possible.
query_mode(_Config) -> async.
-spec on_start(resource_id(), config()) -> {ok, state()} | {error, term()}.
on_start(InstanceId, Config) ->
#{
payload_template := PayloadTemplate,
pubsub_topic := PubSubTopic
} = Config,
case emqx_bridge_gcp_pubsub_connector:on_start(InstanceId, Config) of
{ok, ConnectorState} ->
State = #{
connector_state => ConnectorState,
payload_template => emqx_placeholder:preproc_tmpl(PayloadTemplate),
pubsub_topic => PubSubTopic
},
{ok, State};
Error ->
Error
end.
-spec on_stop(resource_id(), state()) -> ok | {error, term()}.
on_stop(InstanceId, #{connector_state := ConnectorState}) ->
emqx_bridge_gcp_pubsub_connector:on_stop(InstanceId, ConnectorState);
on_stop(InstanceId, undefined = _State) ->
emqx_bridge_gcp_pubsub_connector:on_stop(InstanceId, undefined).
-spec on_get_status(resource_id(), state()) -> connected | disconnected.
on_get_status(InstanceId, #{connector_state := ConnectorState} = _State) ->
emqx_bridge_gcp_pubsub_connector:on_get_status(InstanceId, ConnectorState).
-spec on_query(
resource_id(),
{send_message, map()},
state()
) ->
{ok, status_code(), headers()}
| {ok, status_code(), headers(), body()}
| {error, {recoverable_error, term()}}
| {error, term()}.
on_query(ResourceId, {send_message, Selected}, State) ->
Requests = [{send_message, Selected}],
?TRACE(
"QUERY_SYNC",
"gcp_pubsub_received",
#{requests => Requests, connector => ResourceId, state => State}
),
do_send_requests_sync(State, Requests, ResourceId).
-spec on_query_async(
resource_id(),
{send_message, map()},
{ReplyFun :: function(), Args :: list()},
state()
) -> {ok, pid()}.
on_query_async(ResourceId, {send_message, Selected}, ReplyFunAndArgs, State) ->
Requests = [{send_message, Selected}],
?TRACE(
"QUERY_ASYNC",
"gcp_pubsub_received",
#{requests => Requests, connector => ResourceId, state => State}
),
do_send_requests_async(State, Requests, ReplyFunAndArgs, ResourceId).
-spec on_batch_query(
resource_id(),
[{send_message, map()}],
state()
) ->
{ok, status_code(), headers()}
| {ok, status_code(), headers(), body()}
| {error, {recoverable_error, term()}}
| {error, term()}.
on_batch_query(ResourceId, Requests, State) ->
?TRACE(
"QUERY_SYNC",
"gcp_pubsub_received",
#{requests => Requests, connector => ResourceId, state => State}
),
do_send_requests_sync(State, Requests, ResourceId).
-spec on_batch_query_async(
resource_id(),
[{send_message, map()}],
{ReplyFun :: function(), Args :: list()},
state()
) -> {ok, pid()}.
on_batch_query_async(ResourceId, Requests, ReplyFunAndArgs, State) ->
?TRACE(
"QUERY_ASYNC",
"gcp_pubsub_received",
#{requests => Requests, connector => ResourceId, state => State}
),
do_send_requests_async(State, Requests, ReplyFunAndArgs, ResourceId).
%%-------------------------------------------------------------------------------------------------
%% Helper fns
%%-------------------------------------------------------------------------------------------------
-spec do_send_requests_sync(
state(),
[{send_message, map()}],
resource_id()
) ->
{ok, status_code(), headers()}
| {ok, status_code(), headers(), body()}
| {error, {recoverable_error, term()}}
| {error, term()}.
do_send_requests_sync(State, Requests, InstanceId) ->
#{connector_state := ConnectorState} = State,
Payloads =
lists:map(
fun({send_message, Selected}) ->
encode_payload(State, Selected)
end,
Requests
),
Body = to_pubsub_request(Payloads),
Path = publish_path(State),
Method = post,
Request = {prepared_request, {Method, Path, Body}},
emqx_bridge_gcp_pubsub_connector:on_query(InstanceId, Request, ConnectorState).
-spec do_send_requests_async(
state(),
[{send_message, map()}],
{ReplyFun :: function(), Args :: list()},
resource_id()
) -> {ok, pid()}.
do_send_requests_async(State, Requests, ReplyFunAndArgs, InstanceId) ->
#{connector_state := ConnectorState} = State,
Payloads =
lists:map(
fun({send_message, Selected}) ->
encode_payload(State, Selected)
end,
Requests
),
Body = to_pubsub_request(Payloads),
Path = publish_path(State),
Method = post,
Request = {prepared_request, {Method, Path, Body}},
emqx_bridge_gcp_pubsub_connector:on_query_async(
InstanceId, Request, ReplyFunAndArgs, ConnectorState
).
-spec encode_payload(state(), Selected :: map()) -> #{data := binary()}.
encode_payload(_State = #{payload_template := PayloadTemplate}, Selected) ->
Interpolated =
case PayloadTemplate of
[] -> emqx_utils_json:encode(Selected);
_ -> emqx_placeholder:proc_tmpl(PayloadTemplate, Selected)
end,
#{data => base64:encode(Interpolated)}.
-spec to_pubsub_request([#{data := binary()}]) -> binary().
to_pubsub_request(Payloads) ->
emqx_utils_json:encode(#{messages => Payloads}).
-spec publish_path(state()) -> binary().
publish_path(
_State = #{
connector_state := #{project_id := ProjectId},
pubsub_topic := PubSubTopic
}
) ->
<<"/v1/projects/", ProjectId/binary, "/topics/", PubSubTopic/binary, ":publish">>.

View File

@ -38,13 +38,13 @@ groups() ->
{group, sync_query},
{group, async_query}
],
ResourceGroups = [{group, gcp_pubsub}],
SyncTCs = MatrixTCs,
AsyncTCs = MatrixTCs -- only_sync_tests(),
[
{with_batch, SynchronyGroups},
{without_batch, SynchronyGroups},
{sync_query, ResourceGroups},
{async_query, ResourceGroups},
{gcp_pubsub, MatrixTCs}
{sync_query, SyncTCs},
{async_query, AsyncTCs}
].
%% these should not be influenced by the batch/no
@ -66,6 +66,9 @@ single_config_tests() ->
t_on_start_ehttpc_pool_already_started
].
only_sync_tests() ->
[t_query_sync].
init_per_suite(Config) ->
ok = emqx_common_test_helpers:start_apps([emqx_conf]),
ok = emqx_connector_test_helpers:start_apps([emqx_resource, emqx_bridge, emqx_rule_engine]),
@ -1464,3 +1467,30 @@ t_on_start_ehttpc_pool_start_failure(Config) ->
end
),
ok.
%% Usually not called, since the bridge has `async_if_possible' callback mode.
t_query_sync(Config) ->
BatchSize0 = ?config(batch_size, Config),
ServiceAccountJSON = ?config(service_account_json, Config),
BatchSize = min(2, BatchSize0),
Topic = <<"t/topic">>,
Payload = <<"payload">>,
?check_trace(
emqx_common_test_helpers:with_mock(
emqx_bridge_gcp_pubsub_impl_producer,
callback_mode,
fun() -> always_sync end,
fun() ->
{ok, _} = create_bridge(Config),
{ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config),
on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
Message = emqx_message:make(Topic, Payload),
emqx_utils:pmap(fun(_) -> emqx:publish(Message) end, lists:seq(1, BatchSize)),
DecodedMessages = assert_http_request(ServiceAccountJSON),
?assertEqual(BatchSize, length(DecodedMessages)),
ok
end
),
[]
),
ok.

View File

@ -91,7 +91,7 @@ resource_type(kafka_consumer) -> emqx_bridge_kafka_impl_consumer;
resource_type(kafka) -> emqx_bridge_kafka_impl_producer;
resource_type(cassandra) -> emqx_bridge_cassandra_connector;
resource_type(hstreamdb) -> emqx_ee_connector_hstreamdb;
resource_type(gcp_pubsub) -> emqx_bridge_gcp_pubsub_connector;
resource_type(gcp_pubsub) -> emqx_bridge_gcp_pubsub_impl_producer;
resource_type(mongodb_rs) -> emqx_ee_connector_mongodb;
resource_type(mongodb_sharded) -> emqx_ee_connector_mongodb;
resource_type(mongodb_single) -> emqx_ee_connector_mongodb;