feat(gcp_pubsub_producer): migrate GCP PubSub producer to actions
Fixes https://emqx.atlassian.net/browse/EMQX-11157
This commit is contained in:
parent
6c9a8461f7
commit
9e1796ec4f
|
@ -75,6 +75,7 @@ hard_coded_action_info_modules_ee() ->
|
|||
[
|
||||
emqx_bridge_azure_event_hub_action_info,
|
||||
emqx_bridge_confluent_producer_action_info,
|
||||
emqx_bridge_gcp_pubsub_producer_action_info,
|
||||
emqx_bridge_kafka_action_info,
|
||||
emqx_bridge_syskeeper_action_info
|
||||
].
|
||||
|
|
|
@ -40,7 +40,11 @@
|
|||
|
||||
-export([types/0, types_sc/0]).
|
||||
|
||||
-export([make_producer_action_schema/1, make_consumer_action_schema/1]).
|
||||
-export([
|
||||
make_producer_action_schema/1,
|
||||
make_consumer_action_schema/1,
|
||||
top_level_common_action_keys/0
|
||||
]).
|
||||
|
||||
-export_type([action_type/0]).
|
||||
|
||||
|
@ -130,6 +134,8 @@ registered_schema_fields() ->
|
|||
|
||||
desc(actions) ->
|
||||
?DESC("desc_bridges_v2");
|
||||
desc(resource_opts) ->
|
||||
?DESC(emqx_resource_schema, "resource_opts");
|
||||
desc(_) ->
|
||||
undefined.
|
||||
|
||||
|
@ -154,6 +160,16 @@ examples(Method) ->
|
|||
SchemaModules = [Mod || {_, Mod} <- emqx_action_info:registered_schema_modules()],
|
||||
lists:foldl(Fun, #{}, SchemaModules).
|
||||
|
||||
top_level_common_action_keys() ->
|
||||
[
|
||||
<<"connector">>,
|
||||
<<"description">>,
|
||||
<<"enable">>,
|
||||
<<"local_topic">>,
|
||||
<<"parameters">>,
|
||||
<<"resource_opts">>
|
||||
].
|
||||
|
||||
%%======================================================================================
|
||||
%% Helper functions for making HOCON Schema
|
||||
%%======================================================================================
|
||||
|
@ -174,7 +190,10 @@ make_consumer_action_schema(ActionParametersRef) ->
|
|||
{description, emqx_schema:description_schema()},
|
||||
{parameters, ActionParametersRef},
|
||||
{resource_opts,
|
||||
mk(ref(?MODULE, resource_opts), #{default => #{}, desc => ?DESC(resource_opts)})}
|
||||
mk(ref(?MODULE, resource_opts), #{
|
||||
default => #{},
|
||||
desc => ?DESC(emqx_resource_schema, "resource_opts")
|
||||
})}
|
||||
].
|
||||
|
||||
-ifdef(TEST).
|
||||
|
@ -196,7 +215,7 @@ schema_homogeneous_test() ->
|
|||
|
||||
is_bad_schema(#{type := ?MAP(_, ?R_REF(Module, TypeName))}) ->
|
||||
Fields = Module:fields(TypeName),
|
||||
ExpectedFieldNames = common_field_names(),
|
||||
ExpectedFieldNames = lists:map(fun binary_to_atom/1, top_level_common_action_keys()),
|
||||
MissingFileds = lists:filter(
|
||||
fun(Name) -> lists:keyfind(Name, 1, Fields) =:= false end, ExpectedFieldNames
|
||||
),
|
||||
|
@ -211,9 +230,4 @@ is_bad_schema(#{type := ?MAP(_, ?R_REF(Module, TypeName))}) ->
|
|||
}}
|
||||
end.
|
||||
|
||||
common_field_names() ->
|
||||
[
|
||||
enable, description, local_topic, connector, resource_opts, parameters
|
||||
].
|
||||
|
||||
-endif.
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
desc/1
|
||||
]).
|
||||
|
||||
%% emqx_bridge_enterprise "unofficial" API
|
||||
%% `emqx_bridge_v2_schema' "unofficial" API
|
||||
-export([
|
||||
bridge_v2_examples/1,
|
||||
conn_bridge_examples/1,
|
||||
|
|
|
@ -134,7 +134,7 @@ start(
|
|||
|
||||
-spec stop(resource_id()) -> ok | {error, term()}.
|
||||
stop(ResourceId) ->
|
||||
?tp(gcp_pubsub_stop, #{resource_id => ResourceId}),
|
||||
?tp(gcp_pubsub_stop, #{instance_id => ResourceId, resource_id => ResourceId}),
|
||||
?SLOG(info, #{
|
||||
msg => "stopping_gcp_pubsub_bridge",
|
||||
connector => ResourceId
|
||||
|
|
|
@ -8,23 +8,30 @@
|
|||
-include_lib("emqx_resource/include/emqx_resource.hrl").
|
||||
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||
|
||||
-type config() :: #{
|
||||
attributes_template := [#{key := binary(), value := binary()}],
|
||||
-type connector_config() :: #{
|
||||
connect_timeout := emqx_schema:duration_ms(),
|
||||
max_retries := non_neg_integer(),
|
||||
ordering_key_template := binary(),
|
||||
payload_template := binary(),
|
||||
pubsub_topic := binary(),
|
||||
resource_opts := #{request_ttl := infinity | emqx_schema:duration_ms(), any() => term()},
|
||||
service_account_json := emqx_bridge_gcp_pubsub_client:service_account_json(),
|
||||
any() => term()
|
||||
service_account_json := emqx_bridge_gcp_pubsub_client:service_account_json()
|
||||
}.
|
||||
-type state() :: #{
|
||||
attributes_template := #{emqx_placeholder:tmpl_token() => emqx_placeholder:tmpl_token()},
|
||||
-type action_config() :: #{
|
||||
parameters := #{
|
||||
attributes_template := [#{key := binary(), value := binary()}],
|
||||
ordering_key_template := binary(),
|
||||
payload_template := binary(),
|
||||
pubsub_topic := binary()
|
||||
},
|
||||
resource_opts := #{request_ttl := infinity | emqx_schema:duration_ms(), any() => term()}
|
||||
}.
|
||||
-type connector_state() :: #{
|
||||
client := emqx_bridge_gcp_pubsub_client:state(),
|
||||
installed_actions := #{action_resource_id() => action_state()},
|
||||
project_id := emqx_bridge_gcp_pubsub_client:project_id()
|
||||
}.
|
||||
-type action_state() :: #{
|
||||
attributes_template := #{emqx_placeholder:tmpl_token() => emqx_placeholder:tmpl_token()},
|
||||
ordering_key_template := emqx_placeholder:tmpl_token(),
|
||||
payload_template := emqx_placeholder:tmpl_token(),
|
||||
project_id := emqx_bridge_gcp_pubsub_client:project_id(),
|
||||
pubsub_topic := binary()
|
||||
}.
|
||||
-type headers() :: emqx_bridge_gcp_pubsub_client:headers().
|
||||
|
@ -41,7 +48,11 @@
|
|||
on_query_async/4,
|
||||
on_batch_query/3,
|
||||
on_batch_query_async/4,
|
||||
on_get_status/2
|
||||
on_get_status/2,
|
||||
on_add_channel/4,
|
||||
on_remove_channel/3,
|
||||
on_get_channels/1,
|
||||
on_get_channel_status/3
|
||||
]).
|
||||
|
||||
-export([reply_delegator/2]).
|
||||
|
@ -54,53 +65,45 @@ callback_mode() -> async_if_possible.
|
|||
|
||||
query_mode(_Config) -> async.
|
||||
|
||||
-spec on_start(resource_id(), config()) -> {ok, state()} | {error, term()}.
|
||||
-spec on_start(connector_resource_id(), connector_config()) ->
|
||||
{ok, connector_state()} | {error, term()}.
|
||||
on_start(InstanceId, Config0) ->
|
||||
?SLOG(info, #{
|
||||
msg => "starting_gcp_pubsub_bridge",
|
||||
config => Config0
|
||||
}),
|
||||
Config = maps:update_with(service_account_json, fun emqx_utils_maps:binary_key_map/1, Config0),
|
||||
#{
|
||||
attributes_template := AttributesTemplate,
|
||||
ordering_key_template := OrderingKeyTemplate,
|
||||
payload_template := PayloadTemplate,
|
||||
pubsub_topic := PubSubTopic,
|
||||
service_account_json := #{<<"project_id">> := ProjectId}
|
||||
} = Config,
|
||||
#{service_account_json := #{<<"project_id">> := ProjectId}} = Config,
|
||||
case emqx_bridge_gcp_pubsub_client:start(InstanceId, Config) of
|
||||
{ok, Client} ->
|
||||
State = #{
|
||||
client => Client,
|
||||
attributes_template => preproc_attributes(AttributesTemplate),
|
||||
ordering_key_template => emqx_placeholder:preproc_tmpl(OrderingKeyTemplate),
|
||||
payload_template => emqx_placeholder:preproc_tmpl(PayloadTemplate),
|
||||
project_id => ProjectId,
|
||||
pubsub_topic => PubSubTopic
|
||||
installed_actions => #{},
|
||||
project_id => ProjectId
|
||||
},
|
||||
{ok, State};
|
||||
Error ->
|
||||
Error
|
||||
end.
|
||||
|
||||
-spec on_stop(resource_id(), state()) -> ok | {error, term()}.
|
||||
-spec on_stop(connector_resource_id(), connector_state()) -> ok | {error, term()}.
|
||||
on_stop(InstanceId, _State) ->
|
||||
emqx_bridge_gcp_pubsub_client:stop(InstanceId).
|
||||
|
||||
-spec on_get_status(resource_id(), state()) -> connected | disconnected.
|
||||
-spec on_get_status(connector_resource_id(), connector_state()) -> connected | disconnected.
|
||||
on_get_status(_InstanceId, #{client := Client} = _State) ->
|
||||
emqx_bridge_gcp_pubsub_client:get_status(Client).
|
||||
|
||||
-spec on_query(
|
||||
resource_id(),
|
||||
{send_message, map()},
|
||||
state()
|
||||
connector_resource_id(),
|
||||
{message_tag(), map()},
|
||||
connector_state()
|
||||
) ->
|
||||
{ok, map()}
|
||||
| {error, {recoverable_error, term()}}
|
||||
| {error, term()}.
|
||||
on_query(ResourceId, {send_message, Selected}, State) ->
|
||||
Requests = [{send_message, Selected}],
|
||||
on_query(ResourceId, {MessageTag, Selected}, State) ->
|
||||
Requests = [{MessageTag, Selected}],
|
||||
?TRACE(
|
||||
"QUERY_SYNC",
|
||||
"gcp_pubsub_received",
|
||||
|
@ -109,24 +112,25 @@ on_query(ResourceId, {send_message, Selected}, State) ->
|
|||
do_send_requests_sync(State, Requests, ResourceId).
|
||||
|
||||
-spec on_query_async(
|
||||
resource_id(),
|
||||
{send_message, map()},
|
||||
connector_resource_id(),
|
||||
{message_tag(), map()},
|
||||
{ReplyFun :: function(), Args :: list()},
|
||||
state()
|
||||
connector_state()
|
||||
) -> {ok, pid()} | {error, no_pool_worker_available}.
|
||||
on_query_async(ResourceId, {send_message, Selected}, ReplyFunAndArgs, State) ->
|
||||
Requests = [{send_message, Selected}],
|
||||
on_query_async(ResourceId, {MessageTag, Selected}, ReplyFunAndArgs, State) ->
|
||||
Requests = [{MessageTag, Selected}],
|
||||
?TRACE(
|
||||
"QUERY_ASYNC",
|
||||
"gcp_pubsub_received",
|
||||
#{requests => Requests, connector => ResourceId, state => State}
|
||||
),
|
||||
?tp(gcp_pubsub_producer_async, #{instance_id => ResourceId, requests => Requests}),
|
||||
do_send_requests_async(State, Requests, ReplyFunAndArgs).
|
||||
|
||||
-spec on_batch_query(
|
||||
resource_id(),
|
||||
[{send_message, map()}],
|
||||
state()
|
||||
connector_resource_id(),
|
||||
[{message_tag(), map()}],
|
||||
connector_state()
|
||||
) ->
|
||||
{ok, map()}
|
||||
| {error, {recoverable_error, term()}}
|
||||
|
@ -140,10 +144,10 @@ on_batch_query(ResourceId, Requests, State) ->
|
|||
do_send_requests_sync(State, Requests, ResourceId).
|
||||
|
||||
-spec on_batch_query_async(
|
||||
resource_id(),
|
||||
[{send_message, map()}],
|
||||
connector_resource_id(),
|
||||
[{message_tag(), map()}],
|
||||
{ReplyFun :: function(), Args :: list()},
|
||||
state()
|
||||
connector_state()
|
||||
) -> {ok, pid()} | {error, no_pool_worker_available}.
|
||||
on_batch_query_async(ResourceId, Requests, ReplyFunAndArgs, State) ->
|
||||
?TRACE(
|
||||
|
@ -151,32 +155,92 @@ on_batch_query_async(ResourceId, Requests, ReplyFunAndArgs, State) ->
|
|||
"gcp_pubsub_received",
|
||||
#{requests => Requests, connector => ResourceId, state => State}
|
||||
),
|
||||
?tp(gcp_pubsub_producer_async, #{instance_id => ResourceId, requests => Requests}),
|
||||
do_send_requests_async(State, Requests, ReplyFunAndArgs).
|
||||
|
||||
-spec on_add_channel(
|
||||
connector_resource_id(),
|
||||
connector_state(),
|
||||
action_resource_id(),
|
||||
action_config()
|
||||
) ->
|
||||
{ok, connector_state()}.
|
||||
on_add_channel(_ConnectorResId, ConnectorState0, ActionId, ActionConfig) ->
|
||||
#{installed_actions := InstalledActions0} = ConnectorState0,
|
||||
ChannelState = install_channel(ActionConfig),
|
||||
InstalledActions = InstalledActions0#{ActionId => ChannelState},
|
||||
ConnectorState = ConnectorState0#{installed_actions := InstalledActions},
|
||||
{ok, ConnectorState}.
|
||||
|
||||
-spec on_remove_channel(
|
||||
connector_resource_id(),
|
||||
connector_state(),
|
||||
action_resource_id()
|
||||
) ->
|
||||
{ok, connector_state()}.
|
||||
on_remove_channel(_ConnectorResId, ConnectorState0, ActionId) ->
|
||||
#{installed_actions := InstalledActions0} = ConnectorState0,
|
||||
InstalledActions = maps:remove(ActionId, InstalledActions0),
|
||||
ConnectorState = ConnectorState0#{installed_actions := InstalledActions},
|
||||
{ok, ConnectorState}.
|
||||
|
||||
-spec on_get_channels(connector_resource_id()) ->
|
||||
[{action_resource_id(), action_config()}].
|
||||
on_get_channels(ConnectorResId) ->
|
||||
emqx_bridge_v2:get_channels_for_connector(ConnectorResId).
|
||||
|
||||
-spec on_get_channel_status(connector_resource_id(), action_resource_id(), connector_state()) ->
|
||||
health_check_status().
|
||||
on_get_channel_status(_ConnectorResId, _ChannelId, _ConnectorState) ->
|
||||
%% Should we check the underlying client? Same as on_get_status?
|
||||
?status_connected.
|
||||
|
||||
%%-------------------------------------------------------------------------------------------------
|
||||
%% Helper fns
|
||||
%%-------------------------------------------------------------------------------------------------
|
||||
|
||||
%% TODO: check if topic exists ("unhealthy target")
|
||||
install_channel(ActionConfig) ->
|
||||
#{
|
||||
parameters := #{
|
||||
attributes_template := AttributesTemplate,
|
||||
ordering_key_template := OrderingKeyTemplate,
|
||||
payload_template := PayloadTemplate,
|
||||
pubsub_topic := PubSubTopic
|
||||
}
|
||||
} = ActionConfig,
|
||||
#{
|
||||
attributes_template => preproc_attributes(AttributesTemplate),
|
||||
ordering_key_template => emqx_placeholder:preproc_tmpl(OrderingKeyTemplate),
|
||||
payload_template => emqx_placeholder:preproc_tmpl(PayloadTemplate),
|
||||
pubsub_topic => PubSubTopic
|
||||
}.
|
||||
|
||||
-spec do_send_requests_sync(
|
||||
state(),
|
||||
[{send_message, map()}],
|
||||
connector_state(),
|
||||
[{message_tag(), map()}],
|
||||
resource_id()
|
||||
) ->
|
||||
{ok, status_code(), headers()}
|
||||
| {ok, status_code(), headers(), body()}
|
||||
| {error, {recoverable_error, term()}}
|
||||
| {error, term()}.
|
||||
do_send_requests_sync(State, Requests, InstanceId) ->
|
||||
#{client := Client} = State,
|
||||
do_send_requests_sync(ConnectorState, Requests, InstanceId) ->
|
||||
?tp(gcp_pubsub_producer_sync, #{instance_id => InstanceId, requests => Requests}),
|
||||
#{client := Client} = ConnectorState,
|
||||
%% is it safe to assume the tag is the same??? And not empty???
|
||||
[{MessageTag, _} | _] = Requests,
|
||||
#{installed_actions := InstalledActions} = ConnectorState,
|
||||
ChannelState = maps:get(MessageTag, InstalledActions),
|
||||
Payloads =
|
||||
lists:map(
|
||||
fun({send_message, Selected}) ->
|
||||
encode_payload(State, Selected)
|
||||
fun({_MessageTag, Selected}) ->
|
||||
encode_payload(ChannelState, Selected)
|
||||
end,
|
||||
Requests
|
||||
),
|
||||
Body = to_pubsub_request(Payloads),
|
||||
Path = publish_path(State),
|
||||
Path = publish_path(ConnectorState, ChannelState),
|
||||
Method = post,
|
||||
Request = {prepared_request, {Method, Path, Body}},
|
||||
Result = emqx_bridge_gcp_pubsub_client:query_sync(Request, Client),
|
||||
|
@ -184,21 +248,25 @@ do_send_requests_sync(State, Requests, InstanceId) ->
|
|||
handle_result(Result, Request, QueryMode, InstanceId).
|
||||
|
||||
-spec do_send_requests_async(
|
||||
state(),
|
||||
[{send_message, map()}],
|
||||
connector_state(),
|
||||
[{message_tag(), map()}],
|
||||
{ReplyFun :: function(), Args :: list()}
|
||||
) -> {ok, pid()} | {error, no_pool_worker_available}.
|
||||
do_send_requests_async(State, Requests, ReplyFunAndArgs0) ->
|
||||
#{client := Client} = State,
|
||||
do_send_requests_async(ConnectorState, Requests, ReplyFunAndArgs0) ->
|
||||
#{client := Client} = ConnectorState,
|
||||
%% is it safe to assume the tag is the same??? And not empty???
|
||||
[{MessageTag, _} | _] = Requests,
|
||||
#{installed_actions := InstalledActions} = ConnectorState,
|
||||
ChannelState = maps:get(MessageTag, InstalledActions),
|
||||
Payloads =
|
||||
lists:map(
|
||||
fun({send_message, Selected}) ->
|
||||
encode_payload(State, Selected)
|
||||
fun({_MessageTag, Selected}) ->
|
||||
encode_payload(ChannelState, Selected)
|
||||
end,
|
||||
Requests
|
||||
),
|
||||
Body = to_pubsub_request(Payloads),
|
||||
Path = publish_path(State),
|
||||
Path = publish_path(ConnectorState, ChannelState),
|
||||
Method = post,
|
||||
Request = {prepared_request, {Method, Path, Body}},
|
||||
ReplyFunAndArgs = {fun ?MODULE:reply_delegator/2, [ReplyFunAndArgs0]},
|
||||
|
@ -206,18 +274,18 @@ do_send_requests_async(State, Requests, ReplyFunAndArgs0) ->
|
|||
Request, ReplyFunAndArgs, Client
|
||||
).
|
||||
|
||||
-spec encode_payload(state(), Selected :: map()) ->
|
||||
-spec encode_payload(action_state(), Selected :: map()) ->
|
||||
#{
|
||||
data := binary(),
|
||||
attributes => #{binary() => binary()},
|
||||
'orderingKey' => binary()
|
||||
}.
|
||||
encode_payload(State, Selected) ->
|
||||
encode_payload(ActionState, Selected) ->
|
||||
#{
|
||||
attributes_template := AttributesTemplate,
|
||||
ordering_key_template := OrderingKeyTemplate,
|
||||
payload_template := PayloadTemplate
|
||||
} = State,
|
||||
} = ActionState,
|
||||
Data = render_payload(PayloadTemplate, Selected),
|
||||
OrderingKey = render_key(OrderingKeyTemplate, Selected),
|
||||
Attributes = proc_attributes(AttributesTemplate, Selected),
|
||||
|
@ -307,13 +375,8 @@ proc_attributes(AttributesTemplate, Selected) ->
|
|||
to_pubsub_request(Payloads) ->
|
||||
emqx_utils_json:encode(#{messages => Payloads}).
|
||||
|
||||
-spec publish_path(state()) -> binary().
|
||||
publish_path(
|
||||
_State = #{
|
||||
project_id := ProjectId,
|
||||
pubsub_topic := PubSubTopic
|
||||
}
|
||||
) ->
|
||||
-spec publish_path(connector_state(), action_state()) -> binary().
|
||||
publish_path(#{project_id := ProjectId}, #{pubsub_topic := PubSubTopic}) ->
|
||||
<<"/v1/projects/", ProjectId/binary, "/topics/", PubSubTopic/binary, ":publish">>.
|
||||
|
||||
handle_result({error, Reason}, _Request, QueryMode, ResourceId) when
|
||||
|
|
|
@ -0,0 +1,46 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_bridge_gcp_pubsub_producer_action_info).
|
||||
|
||||
-behaviour(emqx_action_info).
|
||||
|
||||
-export([
|
||||
bridge_v1_type_name/0,
|
||||
action_type_name/0,
|
||||
connector_type_name/0,
|
||||
schema_module/0,
|
||||
bridge_v1_config_to_action_config/2
|
||||
]).
|
||||
|
||||
bridge_v1_type_name() -> gcp_pubsub.
|
||||
|
||||
action_type_name() -> gcp_pubsub_producer.
|
||||
|
||||
connector_type_name() -> gcp_pubsub_producer.
|
||||
|
||||
schema_module() -> emqx_bridge_gcp_pubsub_producer_schema.
|
||||
|
||||
bridge_v1_config_to_action_config(BridgeV1Config, ConnectorName) ->
|
||||
CommonActionKeys = emqx_bridge_v2_schema:top_level_common_action_keys(),
|
||||
ParamsKeys = producer_action_parameters_field_keys(),
|
||||
Config1 = maps:with(CommonActionKeys, BridgeV1Config),
|
||||
Params = maps:with(ParamsKeys, BridgeV1Config),
|
||||
Config1#{
|
||||
<<"connector">> => ConnectorName,
|
||||
<<"parameters">> => Params
|
||||
}.
|
||||
|
||||
%%------------------------------------------------------------------------------------------
|
||||
%% Internal helper fns
|
||||
%%------------------------------------------------------------------------------------------
|
||||
|
||||
producer_action_parameters_field_keys() ->
|
||||
[
|
||||
to_bin(K)
|
||||
|| {K, _} <- emqx_bridge_gcp_pubsub_producer_schema:fields(action_parameters)
|
||||
].
|
||||
|
||||
to_bin(L) when is_list(L) -> list_to_binary(L);
|
||||
to_bin(A) when is_atom(A) -> atom_to_binary(A, utf8).
|
|
@ -0,0 +1,223 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_bridge_gcp_pubsub_producer_schema).
|
||||
|
||||
-import(hoconsc, [mk/2, ref/2]).
|
||||
|
||||
-include_lib("typerefl/include/types.hrl").
|
||||
-include_lib("hocon/include/hoconsc.hrl").
|
||||
|
||||
%% `hocon_schema' API
|
||||
-export([
|
||||
namespace/0,
|
||||
roots/0,
|
||||
fields/1,
|
||||
desc/1
|
||||
]).
|
||||
|
||||
%% `emqx_bridge_v2_schema' "unofficial" API
|
||||
-export([
|
||||
bridge_v2_examples/1,
|
||||
conn_bridge_examples/1,
|
||||
connector_examples/1
|
||||
]).
|
||||
|
||||
%%-------------------------------------------------------------------------------------------------
|
||||
%% `hocon_schema' API
|
||||
%%-------------------------------------------------------------------------------------------------
|
||||
|
||||
namespace() ->
|
||||
"gcp_pubsub_producer".
|
||||
|
||||
roots() ->
|
||||
[].
|
||||
|
||||
%%=========================================
|
||||
%% Action fields
|
||||
%%=========================================
|
||||
fields(action) ->
|
||||
{gcp_pubsub_producer,
|
||||
mk(
|
||||
hoconsc:map(name, ref(?MODULE, producer_action)),
|
||||
#{
|
||||
desc => <<"GCP PubSub Producer Action Config">>,
|
||||
required => false
|
||||
}
|
||||
)};
|
||||
fields(producer_action) ->
|
||||
emqx_bridge_v2_schema:make_producer_action_schema(
|
||||
mk(
|
||||
ref(?MODULE, action_parameters),
|
||||
#{
|
||||
required => true,
|
||||
desc => ?DESC(producer_action)
|
||||
}
|
||||
)
|
||||
);
|
||||
fields(action_parameters) ->
|
||||
UnsupportedFields = [local_topic],
|
||||
lists:filter(
|
||||
fun({Key, _Schema}) -> not lists:member(Key, UnsupportedFields) end,
|
||||
emqx_bridge_gcp_pubsub:fields(producer)
|
||||
);
|
||||
%%=========================================
|
||||
%% Connector fields
|
||||
%%=========================================
|
||||
fields("config_connector") ->
|
||||
%% FIXME
|
||||
emqx_connector_schema:common_fields() ++
|
||||
emqx_bridge_gcp_pubsub:fields(connector_config) ++
|
||||
emqx_resource_schema:fields("resource_opts");
|
||||
%%=========================================
|
||||
%% HTTP API fields
|
||||
%%=========================================
|
||||
fields("get_bridge_v2") ->
|
||||
emqx_bridge_schema:status_fields() ++ fields("post_bridge_v2");
|
||||
fields("post_bridge_v2") ->
|
||||
[type_field(), name_field() | fields("put_bridge_v2")];
|
||||
fields("put_bridge_v2") ->
|
||||
fields(producer_action).
|
||||
|
||||
desc("config_connector") ->
|
||||
?DESC("config_connector");
|
||||
desc(action_parameters) ->
|
||||
?DESC(action_parameters);
|
||||
desc(producer_action) ->
|
||||
?DESC(producer_action);
|
||||
desc(_Name) ->
|
||||
undefined.
|
||||
|
||||
type_field() ->
|
||||
{type, mk(gcp_pubsub_producer, #{required => true, desc => ?DESC("desc_type")})}.
|
||||
|
||||
name_field() ->
|
||||
{name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}.
|
||||
|
||||
%%-------------------------------------------------------------------------------------------------
|
||||
%% `emqx_bridge_v2_schema' "unofficial" API
|
||||
%%-------------------------------------------------------------------------------------------------
|
||||
|
||||
bridge_v2_examples(Method) ->
|
||||
[
|
||||
#{
|
||||
<<"gcp_pubsub_producer">> => #{
|
||||
summary => <<"GCP PubSub Producer Action">>,
|
||||
value => action_example(Method)
|
||||
}
|
||||
}
|
||||
].
|
||||
|
||||
connector_examples(Method) ->
|
||||
[
|
||||
#{
|
||||
<<"gcp_pubsub_producer">> => #{
|
||||
summary => <<"GCP PubSub Producer Connector">>,
|
||||
value => connector_example(Method)
|
||||
}
|
||||
}
|
||||
].
|
||||
|
||||
conn_bridge_examples(Method) ->
|
||||
emqx_bridge_gcp_pubsub:conn_bridge_examples(Method).
|
||||
|
||||
action_example(post) ->
|
||||
maps:merge(
|
||||
action_example(put),
|
||||
#{
|
||||
type => <<"gcp_pubsub_producer">>,
|
||||
name => <<"my_action">>
|
||||
}
|
||||
);
|
||||
action_example(get) ->
|
||||
maps:merge(
|
||||
action_example(put),
|
||||
#{
|
||||
status => <<"connected">>,
|
||||
node_status => [
|
||||
#{
|
||||
node => <<"emqx@localhost">>,
|
||||
status => <<"connected">>
|
||||
}
|
||||
]
|
||||
}
|
||||
);
|
||||
action_example(put) ->
|
||||
#{
|
||||
enable => true,
|
||||
connector => <<"my_connector_name">>,
|
||||
description => <<"My action">>,
|
||||
local_topic => <<"local/topic">>,
|
||||
resource_opts =>
|
||||
#{batch_size => 5},
|
||||
parameters =>
|
||||
#{
|
||||
pubsub_topic => <<"mytopic">>,
|
||||
ordering_key_template => <<"${payload.ok}">>,
|
||||
payload_template => <<"${payload}">>,
|
||||
attributes_template =>
|
||||
[
|
||||
#{
|
||||
key => <<"${payload.attrs.k}">>,
|
||||
value => <<"${payload.attrs.v}">>
|
||||
}
|
||||
]
|
||||
}
|
||||
}.
|
||||
|
||||
connector_example(get) ->
|
||||
maps:merge(
|
||||
connector_example(put),
|
||||
#{
|
||||
status => <<"connected">>,
|
||||
node_status => [
|
||||
#{
|
||||
node => <<"emqx@localhost">>,
|
||||
status => <<"connected">>
|
||||
}
|
||||
]
|
||||
}
|
||||
);
|
||||
connector_example(post) ->
|
||||
maps:merge(
|
||||
connector_example(put),
|
||||
#{
|
||||
type => <<"gcp_pubsub_producer">>,
|
||||
name => <<"my_connector">>
|
||||
}
|
||||
);
|
||||
connector_example(put) ->
|
||||
#{
|
||||
enable => true,
|
||||
connect_timeout => <<"10s">>,
|
||||
pool_size => 8,
|
||||
pipelining => 100,
|
||||
max_retries => 2,
|
||||
resource_opts => #{request_ttl => <<"60s">>},
|
||||
service_account_json =>
|
||||
#{
|
||||
auth_provider_x509_cert_url =>
|
||||
<<"https://www.googleapis.com/oauth2/v1/certs">>,
|
||||
auth_uri =>
|
||||
<<"https://accounts.google.com/o/oauth2/auth">>,
|
||||
client_email =>
|
||||
<<"test@myproject.iam.gserviceaccount.com">>,
|
||||
client_id => <<"123812831923812319190">>,
|
||||
client_x509_cert_url =>
|
||||
<<
|
||||
"https://www.googleapis.com/robot/v1/"
|
||||
"metadata/x509/test%40myproject.iam.gserviceaccount.com"
|
||||
>>,
|
||||
private_key =>
|
||||
<<
|
||||
"-----BEGIN PRIVATE KEY-----\n"
|
||||
"MIIEvQI..."
|
||||
>>,
|
||||
private_key_id => <<"kid">>,
|
||||
project_id => <<"myproject">>,
|
||||
token_uri =>
|
||||
<<"https://oauth2.googleapis.com/token">>,
|
||||
type => <<"service_account">>
|
||||
}
|
||||
}.
|
|
@ -13,8 +13,12 @@
|
|||
-include_lib("jose/include/jose_jwt.hrl").
|
||||
-include_lib("jose/include/jose_jws.hrl").
|
||||
|
||||
-define(BRIDGE_TYPE, gcp_pubsub).
|
||||
-define(BRIDGE_TYPE_BIN, <<"gcp_pubsub">>).
|
||||
-define(ACTION_TYPE, gcp_pubsub_producer).
|
||||
-define(ACTION_TYPE_BIN, <<"gcp_pubsub_producer">>).
|
||||
-define(CONNECTOR_TYPE, gcp_pubsub_producer).
|
||||
-define(CONNECTOR_TYPE_BIN, <<"gcp_pubsub_producer">>).
|
||||
-define(BRIDGE_V1_TYPE, gcp_pubsub).
|
||||
-define(BRIDGE_V1_TYPE_BIN, <<"gcp_pubsub">>).
|
||||
|
||||
-import(emqx_common_test_helpers, [on_exit/1]).
|
||||
|
||||
|
@ -141,19 +145,24 @@ end_per_testcase(_TestCase, _Config) ->
|
|||
|
||||
generate_config(Config0) ->
|
||||
#{
|
||||
name := Name,
|
||||
name := ActionName,
|
||||
config_string := ConfigString,
|
||||
pubsub_config := PubSubConfig,
|
||||
service_account_json := ServiceAccountJSON
|
||||
} = gcp_pubsub_config(Config0),
|
||||
ResourceId = emqx_bridge_resource:resource_id(?BRIDGE_TYPE_BIN, Name),
|
||||
BridgeId = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_BIN, Name),
|
||||
%% FIXME
|
||||
%% `emqx_bridge_resource:resource_id' requires an existing connector in the config.....
|
||||
ConnectorName = <<"connector_", ActionName/binary>>,
|
||||
ConnectorResourceId = <<"connector:", ?CONNECTOR_TYPE_BIN/binary, ":", ConnectorName/binary>>,
|
||||
ActionResourceId = emqx_bridge_v2:id(?ACTION_TYPE_BIN, ActionName, ConnectorName),
|
||||
BridgeId = emqx_bridge_resource:bridge_id(?BRIDGE_V1_TYPE_BIN, ActionName),
|
||||
[
|
||||
{gcp_pubsub_name, Name},
|
||||
{gcp_pubsub_name, ActionName},
|
||||
{gcp_pubsub_config, PubSubConfig},
|
||||
{gcp_pubsub_config_string, ConfigString},
|
||||
{service_account_json, ServiceAccountJSON},
|
||||
{resource_id, ResourceId},
|
||||
{connector_resource_id, ConnectorResourceId},
|
||||
{action_resource_id, ActionResourceId},
|
||||
{bridge_id, BridgeId}
|
||||
| Config0
|
||||
].
|
||||
|
@ -168,7 +177,7 @@ delete_all_bridges() ->
|
|||
).
|
||||
|
||||
delete_bridge(Config) ->
|
||||
Type = ?BRIDGE_TYPE,
|
||||
Type = ?BRIDGE_V1_TYPE,
|
||||
Name = ?config(gcp_pubsub_name, Config),
|
||||
ct:pal("deleting bridge ~p", [{Type, Name}]),
|
||||
emqx_bridge:remove(Type, Name).
|
||||
|
@ -177,7 +186,7 @@ create_bridge(Config) ->
|
|||
create_bridge(Config, _GCPPubSubConfigOverrides = #{}).
|
||||
|
||||
create_bridge(Config, GCPPubSubConfigOverrides) ->
|
||||
TypeBin = ?BRIDGE_TYPE_BIN,
|
||||
TypeBin = ?BRIDGE_V1_TYPE_BIN,
|
||||
Name = ?config(gcp_pubsub_name, Config),
|
||||
GCPPubSubConfig0 = ?config(gcp_pubsub_config, Config),
|
||||
GCPPubSubConfig = emqx_utils_maps:deep_merge(GCPPubSubConfig0, GCPPubSubConfigOverrides),
|
||||
|
@ -190,7 +199,7 @@ create_bridge_http(Config) ->
|
|||
create_bridge_http(Config, _GCPPubSubConfigOverrides = #{}).
|
||||
|
||||
create_bridge_http(Config, GCPPubSubConfigOverrides) ->
|
||||
TypeBin = ?BRIDGE_TYPE_BIN,
|
||||
TypeBin = ?BRIDGE_V1_TYPE_BIN,
|
||||
Name = ?config(gcp_pubsub_name, Config),
|
||||
GCPPubSubConfig0 = ?config(gcp_pubsub_config, Config),
|
||||
GCPPubSubConfig = emqx_utils_maps:deep_merge(GCPPubSubConfig0, GCPPubSubConfigOverrides),
|
||||
|
@ -225,7 +234,7 @@ create_bridge_http(Config, GCPPubSubConfigOverrides) ->
|
|||
|
||||
create_rule_and_action_http(Config) ->
|
||||
GCPPubSubName = ?config(gcp_pubsub_name, Config),
|
||||
BridgeId = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_BIN, GCPPubSubName),
|
||||
BridgeId = emqx_bridge_resource:bridge_id(?BRIDGE_V1_TYPE_BIN, GCPPubSubName),
|
||||
Params = #{
|
||||
enable => true,
|
||||
sql => <<"SELECT * FROM \"t/topic\"">>,
|
||||
|
@ -382,9 +391,14 @@ assert_metrics(ExpectedMetrics, ResourceId) ->
|
|||
CurrentMetrics = current_metrics(ResourceId),
|
||||
TelemetryTable = get(telemetry_table),
|
||||
RecordedEvents = ets:tab2list(TelemetryTable),
|
||||
?assertEqual(ExpectedMetrics, Metrics, #{
|
||||
current_metrics => CurrentMetrics, recorded_events => RecordedEvents
|
||||
}),
|
||||
?retry(
|
||||
_Sleep0 = 300,
|
||||
_Attempts = 20,
|
||||
?assertEqual(ExpectedMetrics, Metrics, #{
|
||||
current_metrics => CurrentMetrics,
|
||||
recorded_events => RecordedEvents
|
||||
})
|
||||
),
|
||||
ok.
|
||||
|
||||
assert_empty_metrics(ResourceId) ->
|
||||
|
@ -535,8 +549,30 @@ install_telemetry_handler(TestCase) ->
|
|||
end),
|
||||
Tid.
|
||||
|
||||
mk_res_id_filter(ResourceId) ->
|
||||
fun(Event) ->
|
||||
case Event of
|
||||
#{metadata := #{resource_id := ResId}} when ResId =:= ResourceId ->
|
||||
true;
|
||||
_ ->
|
||||
false
|
||||
end
|
||||
end.
|
||||
|
||||
wait_until_gauge_is(GaugeName, ExpectedValue, Timeout) ->
|
||||
Events = receive_all_events(GaugeName, Timeout),
|
||||
wait_until_gauge_is(#{
|
||||
gauge_name => GaugeName,
|
||||
expected => ExpectedValue,
|
||||
timeout => Timeout
|
||||
}).
|
||||
|
||||
wait_until_gauge_is(#{} = Opts) ->
|
||||
GaugeName = maps:get(gauge_name, Opts),
|
||||
ExpectedValue = maps:get(expected, Opts),
|
||||
Timeout = maps:get(timeout, Opts),
|
||||
MaxEvents = maps:get(max_events, Opts, 10),
|
||||
FilterFn = maps:get(filter_fn, Opts, fun(_Event) -> true end),
|
||||
Events = receive_all_events(GaugeName, Timeout, MaxEvents, FilterFn),
|
||||
case length(Events) > 0 andalso lists:last(Events) of
|
||||
#{measurements := #{gauge_set := ExpectedValue}} ->
|
||||
ok;
|
||||
|
@ -550,15 +586,36 @@ wait_until_gauge_is(GaugeName, ExpectedValue, Timeout) ->
|
|||
ct:pal("no ~p gauge events received!", [GaugeName])
|
||||
end.
|
||||
|
||||
receive_all_events(EventName, Timeout) ->
|
||||
receive_all_events(EventName, Timeout, _MaxEvents = 10, _Count = 0, _Acc = []).
|
||||
receive_all_events(EventName, Timeout, MaxEvents, FilterFn) ->
|
||||
receive_all_events(EventName, Timeout, MaxEvents, FilterFn, _Count = 0, _Acc = []).
|
||||
|
||||
receive_all_events(_EventName, _Timeout, MaxEvents, Count, Acc) when Count >= MaxEvents ->
|
||||
receive_all_events(_EventName, _Timeout, MaxEvents, _FilterFn, Count, Acc) when
|
||||
Count >= MaxEvents
|
||||
->
|
||||
lists:reverse(Acc);
|
||||
receive_all_events(EventName, Timeout, MaxEvents, Count, Acc) ->
|
||||
receive_all_events(EventName, Timeout, MaxEvents, FilterFn, Count, Acc) ->
|
||||
receive
|
||||
{telemetry, #{name := [_, _, EventName]} = Event} ->
|
||||
receive_all_events(EventName, Timeout, MaxEvents, Count + 1, [Event | Acc])
|
||||
case FilterFn(Event) of
|
||||
true ->
|
||||
receive_all_events(
|
||||
EventName,
|
||||
Timeout,
|
||||
MaxEvents,
|
||||
FilterFn,
|
||||
Count + 1,
|
||||
[Event | Acc]
|
||||
);
|
||||
false ->
|
||||
receive_all_events(
|
||||
EventName,
|
||||
Timeout,
|
||||
MaxEvents,
|
||||
FilterFn,
|
||||
Count,
|
||||
Acc
|
||||
)
|
||||
end
|
||||
after Timeout ->
|
||||
lists:reverse(Acc)
|
||||
end.
|
||||
|
@ -597,14 +654,14 @@ wait_n_events(TelemetryTable, ResourceId, NEvents, Timeout, EventName) ->
|
|||
%%------------------------------------------------------------------------------
|
||||
|
||||
t_publish_success(Config) ->
|
||||
ResourceId = ?config(resource_id, Config),
|
||||
ActionResourceId = ?config(action_resource_id, Config),
|
||||
ServiceAccountJSON = ?config(service_account_json, Config),
|
||||
TelemetryTable = ?config(telemetry_table, Config),
|
||||
Topic = <<"t/topic">>,
|
||||
?assertMatch({ok, _}, create_bridge(Config)),
|
||||
{ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config),
|
||||
on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
|
||||
assert_empty_metrics(ResourceId),
|
||||
assert_empty_metrics(ActionResourceId),
|
||||
Payload = <<"payload">>,
|
||||
Message = emqx_message:make(Topic, Payload),
|
||||
emqx:publish(Message),
|
||||
|
@ -620,7 +677,7 @@ t_publish_success(Config) ->
|
|||
DecodedMessages
|
||||
),
|
||||
%% to avoid test flakiness
|
||||
wait_telemetry_event(TelemetryTable, success, ResourceId),
|
||||
wait_telemetry_event(TelemetryTable, success, ActionResourceId),
|
||||
wait_until_gauge_is(queuing, 0, 500),
|
||||
wait_until_gauge_is(inflight, 0, 500),
|
||||
assert_metrics(
|
||||
|
@ -633,7 +690,7 @@ t_publish_success(Config) ->
|
|||
retried => 0,
|
||||
success => 1
|
||||
},
|
||||
ResourceId
|
||||
ActionResourceId
|
||||
),
|
||||
ok.
|
||||
|
||||
|
@ -662,12 +719,12 @@ t_publish_success_infinity_timeout(Config) ->
|
|||
ok.
|
||||
|
||||
t_publish_success_local_topic(Config) ->
|
||||
ResourceId = ?config(resource_id, Config),
|
||||
ActionResourceId = ?config(action_resource_id, Config),
|
||||
ServiceAccountJSON = ?config(service_account_json, Config),
|
||||
TelemetryTable = ?config(telemetry_table, Config),
|
||||
LocalTopic = <<"local/topic">>,
|
||||
{ok, _} = create_bridge(Config, #{<<"local_topic">> => LocalTopic}),
|
||||
assert_empty_metrics(ResourceId),
|
||||
assert_empty_metrics(ActionResourceId),
|
||||
Payload = <<"payload">>,
|
||||
Message = emqx_message:make(LocalTopic, Payload),
|
||||
emqx:publish(Message),
|
||||
|
@ -682,7 +739,7 @@ t_publish_success_local_topic(Config) ->
|
|||
DecodedMessages
|
||||
),
|
||||
%% to avoid test flakiness
|
||||
wait_telemetry_event(TelemetryTable, success, ResourceId),
|
||||
wait_telemetry_event(TelemetryTable, success, ActionResourceId),
|
||||
wait_until_gauge_is(queuing, 0, 500),
|
||||
wait_until_gauge_is(inflight, 0, 500),
|
||||
assert_metrics(
|
||||
|
@ -695,7 +752,7 @@ t_publish_success_local_topic(Config) ->
|
|||
retried => 0,
|
||||
success => 1
|
||||
},
|
||||
ResourceId
|
||||
ActionResourceId
|
||||
),
|
||||
ok.
|
||||
|
||||
|
@ -704,7 +761,7 @@ t_create_via_http(Config) ->
|
|||
ok.
|
||||
|
||||
t_publish_templated(Config) ->
|
||||
ResourceId = ?config(resource_id, Config),
|
||||
ActionResourceId = ?config(action_resource_id, Config),
|
||||
ServiceAccountJSON = ?config(service_account_json, Config),
|
||||
TelemetryTable = ?config(telemetry_table, Config),
|
||||
Topic = <<"t/topic">>,
|
||||
|
@ -721,7 +778,7 @@ t_publish_templated(Config) ->
|
|||
),
|
||||
{ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config),
|
||||
on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
|
||||
assert_empty_metrics(ResourceId),
|
||||
assert_empty_metrics(ActionResourceId),
|
||||
Payload = <<"payload">>,
|
||||
Message =
|
||||
emqx_message:set_header(
|
||||
|
@ -747,7 +804,7 @@ t_publish_templated(Config) ->
|
|||
DecodedMessages
|
||||
),
|
||||
%% to avoid test flakiness
|
||||
wait_telemetry_event(TelemetryTable, success, ResourceId),
|
||||
wait_telemetry_event(TelemetryTable, success, ActionResourceId),
|
||||
wait_until_gauge_is(queuing, 0, 500),
|
||||
wait_until_gauge_is(inflight, 0, 500),
|
||||
assert_metrics(
|
||||
|
@ -760,7 +817,7 @@ t_publish_templated(Config) ->
|
|||
retried => 0,
|
||||
success => 1
|
||||
},
|
||||
ResourceId
|
||||
ActionResourceId
|
||||
),
|
||||
ok.
|
||||
|
||||
|
@ -774,7 +831,7 @@ t_publish_success_batch(Config) ->
|
|||
end.
|
||||
|
||||
test_publish_success_batch(Config) ->
|
||||
ResourceId = ?config(resource_id, Config),
|
||||
ActionResourceId = ?config(action_resource_id, Config),
|
||||
ServiceAccountJSON = ?config(service_account_json, Config),
|
||||
TelemetryTable = ?config(telemetry_table, Config),
|
||||
Topic = <<"t/topic">>,
|
||||
|
@ -796,7 +853,7 @@ test_publish_success_batch(Config) ->
|
|||
),
|
||||
{ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config),
|
||||
on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
|
||||
assert_empty_metrics(ResourceId),
|
||||
assert_empty_metrics(ActionResourceId),
|
||||
NumMessages = BatchSize * 2,
|
||||
Messages = [emqx_message:make(Topic, integer_to_binary(N)) || N <- lists:seq(1, NumMessages)],
|
||||
%% publish in parallel to avoid each client blocking and then
|
||||
|
@ -822,7 +879,7 @@ test_publish_success_batch(Config) ->
|
|||
wait_telemetry_event(
|
||||
TelemetryTable,
|
||||
success,
|
||||
ResourceId,
|
||||
ActionResourceId,
|
||||
#{timeout => 15_000, n_events => NumMessages}
|
||||
),
|
||||
wait_until_gauge_is(queuing, 0, _Timeout = 400),
|
||||
|
@ -837,7 +894,7 @@ test_publish_success_batch(Config) ->
|
|||
retried => 0,
|
||||
success => NumMessages
|
||||
},
|
||||
ResourceId
|
||||
ActionResourceId
|
||||
),
|
||||
ok.
|
||||
|
||||
|
@ -1045,7 +1102,7 @@ t_jose_other_error(Config) ->
|
|||
fun(Res, Trace) ->
|
||||
?assertMatch({ok, _}, Res),
|
||||
?assertMatch(
|
||||
[#{error := {invalid_private_key, {unknown, error}}}],
|
||||
[#{error := {invalid_private_key, {unknown, error}}} | _],
|
||||
?of_kind(gcp_pubsub_connector_startup_error, Trace)
|
||||
),
|
||||
ok
|
||||
|
@ -1054,7 +1111,7 @@ t_jose_other_error(Config) ->
|
|||
ok.
|
||||
|
||||
t_publish_econnrefused(Config) ->
|
||||
ResourceId = ?config(resource_id, Config),
|
||||
ResourceId = ?config(connector_resource_id, Config),
|
||||
%% set pipelining to 1 so that one of the 2 requests is `pending'
|
||||
%% in ehttpc.
|
||||
{ok, _} = create_bridge(
|
||||
|
@ -1071,7 +1128,7 @@ t_publish_econnrefused(Config) ->
|
|||
do_econnrefused_or_timeout_test(Config, econnrefused).
|
||||
|
||||
t_publish_timeout(Config) ->
|
||||
ResourceId = ?config(resource_id, Config),
|
||||
ActionResourceId = ?config(action_resource_id, Config),
|
||||
%% set pipelining to 1 so that one of the 2 requests is `pending'
|
||||
%% in ehttpc. also, we set the batch size to 1 to also ensure the
|
||||
%% requests are done separately.
|
||||
|
@ -1079,12 +1136,13 @@ t_publish_timeout(Config) ->
|
|||
<<"pipelining">> => 1,
|
||||
<<"resource_opts">> => #{
|
||||
<<"batch_size">> => 1,
|
||||
<<"resume_interval">> => <<"1s">>
|
||||
<<"resume_interval">> => <<"1s">>,
|
||||
<<"metrics_flush_interval">> => <<"700ms">>
|
||||
}
|
||||
}),
|
||||
{ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config),
|
||||
on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
|
||||
assert_empty_metrics(ResourceId),
|
||||
assert_empty_metrics(ActionResourceId),
|
||||
TestPid = self(),
|
||||
TimeoutHandler =
|
||||
fun(Req0, State) ->
|
||||
|
@ -1107,7 +1165,8 @@ t_publish_timeout(Config) ->
|
|||
do_econnrefused_or_timeout_test(Config, timeout).
|
||||
|
||||
do_econnrefused_or_timeout_test(Config, Error) ->
|
||||
ResourceId = ?config(resource_id, Config),
|
||||
ActionResourceId = ?config(action_resource_id, Config),
|
||||
ConnectorResourceId = ?config(connector_resource_id, Config),
|
||||
TelemetryTable = ?config(telemetry_table, Config),
|
||||
Topic = <<"t/topic">>,
|
||||
Payload = <<"payload">>,
|
||||
|
@ -1156,9 +1215,9 @@ do_econnrefused_or_timeout_test(Config, Error) ->
|
|||
case Error of
|
||||
econnrefused ->
|
||||
case ?of_kind(gcp_pubsub_request_failed, Trace) of
|
||||
[#{reason := Error, connector := ResourceId} | _] ->
|
||||
[#{reason := Error, connector := ConnectorResourceId} | _] ->
|
||||
ok;
|
||||
[#{reason := {closed, _Msg}, connector := ResourceId} | _] ->
|
||||
[#{reason := {closed, _Msg}, connector := ConnectorResourceId} | _] ->
|
||||
%% _Msg = "The connection was lost."
|
||||
ok;
|
||||
Trace0 ->
|
||||
|
@ -1182,7 +1241,7 @@ do_econnrefused_or_timeout_test(Config, Error) ->
|
|||
%% even waiting, hard to avoid flakiness... simpler to just sleep
|
||||
%% a bit until stabilization.
|
||||
ct:sleep(200),
|
||||
CurrentMetrics = current_metrics(ResourceId),
|
||||
CurrentMetrics = current_metrics(ActionResourceId),
|
||||
RecordedEvents = ets:tab2list(TelemetryTable),
|
||||
ct:pal("telemetry events: ~p", [RecordedEvents]),
|
||||
?assertMatch(
|
||||
|
@ -1198,7 +1257,19 @@ do_econnrefused_or_timeout_test(Config, Error) ->
|
|||
CurrentMetrics
|
||||
);
|
||||
timeout ->
|
||||
wait_until_gauge_is(inflight, 0, _Timeout = 1_000),
|
||||
wait_telemetry_event(
|
||||
TelemetryTable,
|
||||
late_reply,
|
||||
ActionResourceId,
|
||||
#{timeout => 5_000, n_events => 2}
|
||||
),
|
||||
wait_until_gauge_is(#{
|
||||
gauge_name => inflight,
|
||||
expected => 0,
|
||||
filter_fn => mk_res_id_filter(ActionResourceId),
|
||||
timeout => 1_000,
|
||||
max_events => 20
|
||||
}),
|
||||
wait_until_gauge_is(queuing, 0, _Timeout = 1_000),
|
||||
assert_metrics(
|
||||
#{
|
||||
|
@ -1211,7 +1282,7 @@ do_econnrefused_or_timeout_test(Config, Error) ->
|
|||
success => 0,
|
||||
late_reply => 2
|
||||
},
|
||||
ResourceId
|
||||
ActionResourceId
|
||||
)
|
||||
end,
|
||||
|
||||
|
@ -1334,7 +1405,8 @@ t_failure_no_body(Config) ->
|
|||
ok.
|
||||
|
||||
t_unrecoverable_error(Config) ->
|
||||
ResourceId = ?config(resource_id, Config),
|
||||
ActionResourceId = ?config(action_resource_id, Config),
|
||||
TelemetryTable = ?config(telemetry_table, Config),
|
||||
TestPid = self(),
|
||||
FailureNoBodyHandler =
|
||||
fun(Req0, State) ->
|
||||
|
@ -1358,7 +1430,7 @@ t_unrecoverable_error(Config) ->
|
|||
ok = emqx_bridge_http_connector_test_server:set_handler(FailureNoBodyHandler),
|
||||
Topic = <<"t/topic">>,
|
||||
{ok, _} = create_bridge(Config),
|
||||
assert_empty_metrics(ResourceId),
|
||||
assert_empty_metrics(ActionResourceId),
|
||||
{ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config),
|
||||
on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
|
||||
Payload = <<"payload">>,
|
||||
|
@ -1386,6 +1458,7 @@ t_unrecoverable_error(Config) ->
|
|||
%% removed, this inflight should be 1, because we retry if
|
||||
%% the worker is killed.
|
||||
wait_until_gauge_is(inflight, 0, _Timeout = 400),
|
||||
wait_telemetry_event(TelemetryTable, failed, ActionResourceId),
|
||||
assert_metrics(
|
||||
#{
|
||||
dropped => 0,
|
||||
|
@ -1398,7 +1471,7 @@ t_unrecoverable_error(Config) ->
|
|||
retried => 0,
|
||||
success => 0
|
||||
},
|
||||
ResourceId
|
||||
ActionResourceId
|
||||
),
|
||||
ok.
|
||||
|
||||
|
@ -1407,7 +1480,7 @@ t_stop(Config) ->
|
|||
{ok, _} = create_bridge(Config),
|
||||
?check_trace(
|
||||
?wait_async_action(
|
||||
emqx_bridge_resource:stop(?BRIDGE_TYPE, Name),
|
||||
emqx_bridge_resource:stop(?BRIDGE_V1_TYPE, Name),
|
||||
#{?snk_kind := gcp_pubsub_stop},
|
||||
5_000
|
||||
),
|
||||
|
@ -1421,13 +1494,13 @@ t_stop(Config) ->
|
|||
ok.
|
||||
|
||||
t_get_status_ok(Config) ->
|
||||
ResourceId = ?config(resource_id, Config),
|
||||
ResourceId = ?config(connector_resource_id, Config),
|
||||
{ok, _} = create_bridge(Config),
|
||||
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)),
|
||||
ok.
|
||||
|
||||
t_get_status_no_worker(Config) ->
|
||||
ResourceId = ?config(resource_id, Config),
|
||||
ResourceId = ?config(connector_resource_id, Config),
|
||||
{ok, _} = create_bridge(Config),
|
||||
emqx_common_test_helpers:with_mock(
|
||||
ehttpc,
|
||||
|
@ -1441,7 +1514,7 @@ t_get_status_no_worker(Config) ->
|
|||
ok.
|
||||
|
||||
t_get_status_down(Config) ->
|
||||
ResourceId = ?config(resource_id, Config),
|
||||
ResourceId = ?config(connector_resource_id, Config),
|
||||
{ok, _} = create_bridge(Config),
|
||||
emqx_common_test_helpers:with_mock(
|
||||
ehttpc,
|
||||
|
@ -1457,7 +1530,7 @@ t_get_status_down(Config) ->
|
|||
ok.
|
||||
|
||||
t_get_status_timeout_calling_workers(Config) ->
|
||||
ResourceId = ?config(resource_id, Config),
|
||||
ResourceId = ?config(connector_resource_id, Config),
|
||||
{ok, _} = create_bridge(Config),
|
||||
emqx_common_test_helpers:with_mock(
|
||||
ehttpc,
|
||||
|
@ -1520,7 +1593,7 @@ t_on_start_ehttpc_pool_start_failure(Config) ->
|
|||
),
|
||||
fun(Trace) ->
|
||||
?assertMatch(
|
||||
[#{reason := some_error}],
|
||||
[#{reason := some_error} | _],
|
||||
?of_kind(gcp_pubsub_ehttpc_pool_start_failure, Trace)
|
||||
),
|
||||
ok
|
||||
|
@ -1668,7 +1741,7 @@ t_attributes(Config) ->
|
|||
),
|
||||
%% ensure loading cluster override file doesn't mangle the attribute
|
||||
%% placeholders...
|
||||
#{<<"bridges">> := #{?BRIDGE_TYPE_BIN := #{Name := RawConf}}} =
|
||||
#{<<"actions">> := #{?ACTION_TYPE_BIN := #{Name := RawConf}}} =
|
||||
emqx_config:read_override_conf(#{override_to => cluster}),
|
||||
?assertEqual(
|
||||
[
|
||||
|
@ -1689,7 +1762,7 @@ t_attributes(Config) ->
|
|||
<<"value">> => <<"${.payload.value}">>
|
||||
}
|
||||
],
|
||||
maps:get(<<"attributes_template">>, RawConf)
|
||||
emqx_utils_maps:deep_get([<<"parameters">>, <<"attributes_template">>], RawConf)
|
||||
),
|
||||
ok
|
||||
end,
|
||||
|
|
|
@ -25,6 +25,8 @@ resource_type(azure_event_hub_producer) ->
|
|||
emqx_bridge_kafka_impl_producer;
|
||||
resource_type(confluent_producer) ->
|
||||
emqx_bridge_kafka_impl_producer;
|
||||
resource_type(gcp_pubsub_producer) ->
|
||||
emqx_bridge_gcp_pubsub_impl_producer;
|
||||
resource_type(kafka_producer) ->
|
||||
emqx_bridge_kafka_impl_producer;
|
||||
resource_type(syskeeper_forwarder) ->
|
||||
|
@ -65,6 +67,14 @@ connector_structs() ->
|
|||
required => false
|
||||
}
|
||||
)},
|
||||
{gcp_pubsub_producer,
|
||||
mk(
|
||||
hoconsc:map(name, ref(emqx_bridge_gcp_pubsub_producer_schema, "config_connector")),
|
||||
#{
|
||||
desc => <<"GCP PubSub Producer Connector Config">>,
|
||||
required => false
|
||||
}
|
||||
)},
|
||||
{kafka_producer,
|
||||
mk(
|
||||
hoconsc:map(name, ref(emqx_bridge_kafka, "config_connector")),
|
||||
|
|
|
@ -24,7 +24,8 @@
|
|||
|
||||
-export([
|
||||
transform_bridges_v1_to_connectors_and_bridges_v2/1,
|
||||
transform_bridge_v1_config_to_action_config/4
|
||||
transform_bridge_v1_config_to_action_config/4,
|
||||
top_level_common_connector_keys/0
|
||||
]).
|
||||
|
||||
-export([roots/0, fields/1, desc/1, namespace/0, tags/0]).
|
||||
|
@ -32,6 +33,7 @@
|
|||
-export([get_response/0, put_request/0, post_request/0]).
|
||||
|
||||
-export([connector_type_to_bridge_types/1]).
|
||||
-export([common_fields/0]).
|
||||
|
||||
-if(?EMQX_RELEASE_EDITION == ee).
|
||||
enterprise_api_schemas(Method) ->
|
||||
|
@ -64,6 +66,7 @@ enterprise_fields_connectors() -> [].
|
|||
|
||||
connector_type_to_bridge_types(azure_event_hub_producer) -> [azure_event_hub_producer];
|
||||
connector_type_to_bridge_types(confluent_producer) -> [confluent_producer];
|
||||
connector_type_to_bridge_types(gcp_pubsub_producer) -> [gcp_pubsub_producer];
|
||||
connector_type_to_bridge_types(kafka_producer) -> [kafka, kafka_producer];
|
||||
connector_type_to_bridge_types(syskeeper_forwarder) -> [syskeeper_forwarder];
|
||||
connector_type_to_bridge_types(syskeeper_proxy) -> [].
|
||||
|
@ -159,17 +162,20 @@ transform_bridge_v1_config_to_action_config(
|
|||
BridgeV1Conf, ConnectorName, ConnectorFields
|
||||
).
|
||||
|
||||
transform_bridge_v1_config_to_action_config(
|
||||
BridgeV1Conf, ConnectorName, ConnectorFields
|
||||
) ->
|
||||
TopKeys = [
|
||||
top_level_common_connector_keys() ->
|
||||
[
|
||||
<<"enable">>,
|
||||
<<"connector">>,
|
||||
<<"local_topic">>,
|
||||
<<"resource_opts">>,
|
||||
<<"description">>,
|
||||
<<"parameters">>
|
||||
],
|
||||
].
|
||||
|
||||
transform_bridge_v1_config_to_action_config(
|
||||
BridgeV1Conf, ConnectorName, ConnectorFields
|
||||
) ->
|
||||
TopKeys = top_level_common_connector_keys(),
|
||||
TopKeysMap = maps:from_keys(TopKeys, true),
|
||||
%% Remove connector fields
|
||||
ActionMap0 = lists:foldl(
|
||||
|
@ -352,6 +358,12 @@ desc(connectors) ->
|
|||
desc(_) ->
|
||||
undefined.
|
||||
|
||||
common_fields() ->
|
||||
[
|
||||
{enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
|
||||
{description, emqx_schema:description_schema()}
|
||||
].
|
||||
|
||||
%%======================================================================================
|
||||
%% Helper Functions
|
||||
%%======================================================================================
|
||||
|
|
|
@ -111,6 +111,10 @@
|
|||
| {error, {recoverable_error, term()}}
|
||||
| {error, term()}.
|
||||
|
||||
-type action_resource_id() :: resource_id().
|
||||
-type connector_resource_id() :: resource_id().
|
||||
-type message_tag() :: action_resource_id().
|
||||
|
||||
-define(WORKER_POOL_SIZE, 16).
|
||||
|
||||
-define(DEFAULT_BUFFER_BYTES, 256 * 1024 * 1024).
|
||||
|
|
|
@ -0,0 +1,18 @@
|
|||
emqx_bridge_gcp_pubsub_producer_schema {
|
||||
|
||||
action_parameters.desc:
|
||||
"""Action specific configs."""
|
||||
action_parameters.label:
|
||||
"""Action"""
|
||||
|
||||
producer_action.desc:
|
||||
"""Action configs."""
|
||||
producer_action.label:
|
||||
"""Action"""
|
||||
|
||||
config_connector.desc:
|
||||
"""Configuration for a GCP PubSub Producer Client."""
|
||||
config_connector.label:
|
||||
"""GCP PubSub Producer Client Configuration"""
|
||||
|
||||
}
|
|
@ -6,4 +6,14 @@ desc_bridges_v2.desc:
|
|||
desc_bridges_v2.label:
|
||||
"""Bridge Configuration"""
|
||||
|
||||
mqtt_topic.desc:
|
||||
"""MQTT topic or topic filter as data source (action input). If rule action is used as data source, this config should be left empty, otherwise messages will be duplicated in the remote system."""
|
||||
mqtt_topic.label:
|
||||
"""Source MQTT Topic"""
|
||||
|
||||
config_enable.desc:
|
||||
"""Enable (true) or disable (false) this action."""
|
||||
config_enable.label:
|
||||
"""Enable or Disable"""
|
||||
|
||||
}
|
||||
|
|
|
@ -2,15 +2,17 @@ emqx_connector_schema {
|
|||
|
||||
desc_connectors.desc:
|
||||
"""Connectors that are used to connect to external systems"""
|
||||
|
||||
desc_connectors.label:
|
||||
"""Connectors"""
|
||||
|
||||
|
||||
connector_field.desc:
|
||||
"""Name of connector used to connect to the resource where the action is to be performed."""
|
||||
|
||||
connector_field.label:
|
||||
"""Connector"""
|
||||
|
||||
config_enable.desc:
|
||||
"""Enable (true) or disable (false) this connector."""
|
||||
config_enable.label:
|
||||
"""Enable or Disable"""
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue