feat: convert `gcp_pubsub_consumer` to connector/source

Fixes https://emqx.atlassian.net/browse/EMQX-11471
This commit is contained in:
Thales Macedo Garitezi 2024-02-06 16:19:16 -03:00
parent 866fea7648
commit 9a32895a1a
15 changed files with 719 additions and 199 deletions

View File

@ -88,6 +88,7 @@ hard_coded_action_info_modules_ee() ->
[ [
emqx_bridge_azure_event_hub_action_info, emqx_bridge_azure_event_hub_action_info,
emqx_bridge_confluent_producer_action_info, emqx_bridge_confluent_producer_action_info,
emqx_bridge_gcp_pubsub_consumer_action_info,
emqx_bridge_gcp_pubsub_producer_action_info, emqx_bridge_gcp_pubsub_producer_action_info,
emqx_bridge_kafka_action_info, emqx_bridge_kafka_action_info,
emqx_bridge_kinesis_action_info, emqx_bridge_kinesis_action_info,

View File

@ -27,7 +27,9 @@
%% Note: this is strange right now, because it lives in `emqx_bridge_v2', but it shall be %% Note: this is strange right now, because it lives in `emqx_bridge_v2', but it shall be
%% refactored into a new module/application with appropriate name. %% refactored into a new module/application with appropriate name.
-define(ROOT_KEY_ACTIONS, actions). -define(ROOT_KEY_ACTIONS, actions).
-define(ROOT_KEY_ACTIONS_BIN, <<"actions">>).
-define(ROOT_KEY_SOURCES, sources). -define(ROOT_KEY_SOURCES, sources).
-define(ROOT_KEY_SOURCES_BIN, <<"sources">>).
%% Loading and unloading config when EMQX starts and stops %% Loading and unloading config when EMQX starts and stops
-export([ -export([
@ -91,6 +93,7 @@
-export([ -export([
id/2, id/2,
id/3, id/3,
source_id/3,
bridge_v1_is_valid/2, bridge_v1_is_valid/2,
bridge_v1_is_valid/3, bridge_v1_is_valid/3,
extract_connector_id_from_bridge_v2_id/1 extract_connector_id_from_bridge_v2_id/1
@ -475,7 +478,7 @@ augment_channel_config(
bridge_type => bin(BridgeV2Type), bridge_type => bin(BridgeV2Type),
bridge_name => bin(BridgeName) bridge_name => bin(BridgeName)
}, },
case emqx_action_info:is_source(BridgeV2Type) of case emqx_action_info:is_source(BridgeV2Type) andalso ConfigRoot =:= ?ROOT_KEY_SOURCES of
true -> true ->
BId = emqx_bridge_resource:bridge_id(BridgeV2Type, BridgeName), BId = emqx_bridge_resource:bridge_id(BridgeV2Type, BridgeName),
BridgeHookpoint = emqx_bridge_resource:bridge_hookpoint(BId), BridgeHookpoint = emqx_bridge_resource:bridge_hookpoint(BId),
@ -750,7 +753,9 @@ create_dry_run(ConfRootKey, Type, Conf0) ->
not_found -> not_found ->
{error, iolist_to_binary(io_lib:format("Connector ~p not found", [ConnectorName]))}; {error, iolist_to_binary(io_lib:format("Connector ~p not found", [ConnectorName]))};
ConnectorRawConf -> ConnectorRawConf ->
create_dry_run_helper(Type, ConnectorRawConf, Conf1) create_dry_run_helper(
ensure_atom_root_key(ConfRootKey), Type, ConnectorRawConf, Conf1
)
end end
catch catch
%% validation errors %% validation errors
@ -758,9 +763,6 @@ create_dry_run(ConfRootKey, Type, Conf0) ->
{error, Reason1} {error, Reason1}
end. end.
create_dry_run_helper(BridgeType, ConnectorRawConf, BridgeV2RawConf) ->
create_dry_run_helper(?ROOT_KEY_ACTIONS, BridgeType, ConnectorRawConf, BridgeV2RawConf).
create_dry_run_helper(ConfRootKey, BridgeType, ConnectorRawConf, BridgeV2RawConf) -> create_dry_run_helper(ConfRootKey, BridgeType, ConnectorRawConf, BridgeV2RawConf) ->
BridgeName = iolist_to_binary([?TEST_ID_PREFIX, emqx_utils:gen_id(8)]), BridgeName = iolist_to_binary([?TEST_ID_PREFIX, emqx_utils:gen_id(8)]),
ConnectorType = connector_type(BridgeType), ConnectorType = connector_type(BridgeType),
@ -969,6 +971,9 @@ id(BridgeType, BridgeName) ->
id(BridgeType, BridgeName, ConnectorName) -> id(BridgeType, BridgeName, ConnectorName) ->
id_with_root_name(?ROOT_KEY_ACTIONS, BridgeType, BridgeName, ConnectorName). id_with_root_name(?ROOT_KEY_ACTIONS, BridgeType, BridgeName, ConnectorName).
source_id(BridgeType, BridgeName, ConnectorName) ->
id_with_root_name(?ROOT_KEY_SOURCES, BridgeType, BridgeName, ConnectorName).
id_with_root_name(RootName, BridgeType, BridgeName) -> id_with_root_name(RootName, BridgeType, BridgeName) ->
case lookup_conf(RootName, BridgeType, BridgeName) of case lookup_conf(RootName, BridgeType, BridgeName) of
#{connector := ConnectorName} -> #{connector := ConnectorName} ->
@ -1633,6 +1638,7 @@ bridge_v1_create_dry_run(BridgeType, RawConfig0) ->
PreviousRawConf = undefined, PreviousRawConf = undefined,
try try
#{ #{
conf_root_key := ConfRootKey,
connector_type := _ConnectorType, connector_type := _ConnectorType,
connector_name := _NewConnectorName, connector_name := _NewConnectorName,
connector_conf := ConnectorRawConf, connector_conf := ConnectorRawConf,
@ -1640,7 +1646,9 @@ bridge_v1_create_dry_run(BridgeType, RawConfig0) ->
bridge_v2_name := _BridgeName, bridge_v2_name := _BridgeName,
bridge_v2_conf := BridgeV2RawConf bridge_v2_conf := BridgeV2RawConf
} = split_and_validate_bridge_v1_config(BridgeType, TmpName, RawConf, PreviousRawConf), } = split_and_validate_bridge_v1_config(BridgeType, TmpName, RawConf, PreviousRawConf),
create_dry_run_helper(BridgeV2Type, ConnectorRawConf, BridgeV2RawConf) create_dry_run_helper(
ensure_atom_root_key(ConfRootKey), BridgeV2Type, ConnectorRawConf, BridgeV2RawConf
)
catch catch
throw:Reason -> throw:Reason ->
{error, Reason} {error, Reason}
@ -1858,6 +1866,13 @@ extract_connector_id_from_bridge_v2_id(Id) ->
error({error, iolist_to_binary(io_lib:format("Invalid action ID: ~p", [Id]))}) error({error, iolist_to_binary(io_lib:format("Invalid action ID: ~p", [Id]))})
end. end.
ensure_atom_root_key(ConfRootKey) when is_atom(ConfRootKey) ->
ConfRootKey;
ensure_atom_root_key(?ROOT_KEY_ACTIONS_BIN) ->
?ROOT_KEY_ACTIONS;
ensure_atom_root_key(?ROOT_KEY_SOURCES_BIN) ->
?ROOT_KEY_SOURCES.
to_existing_atom(X) -> to_existing_atom(X) ->
case emqx_utils:safe_to_existing_atom(X, utf8) of case emqx_utils:safe_to_existing_atom(X, utf8) of
{ok, A} -> A; {ok, A} -> A;

View File

@ -64,8 +64,9 @@
make_producer_action_schema/1, make_producer_action_schema/2, make_producer_action_schema/1, make_producer_action_schema/2,
make_consumer_action_schema/1, make_consumer_action_schema/2, make_consumer_action_schema/1, make_consumer_action_schema/2,
top_level_common_action_keys/0, top_level_common_action_keys/0,
top_level_common_source_keys/0,
project_to_actions_resource_opts/1, project_to_actions_resource_opts/1,
project_to_sources_resource_opts/1 project_to_sources_resource_opts/1, project_to_sources_resource_opts/2
]). ]).
-export([actions_convert_from_connectors/1]). -export([actions_convert_from_connectors/1]).
@ -422,6 +423,16 @@ top_level_common_action_keys() ->
<<"resource_opts">> <<"resource_opts">>
]. ].
top_level_common_source_keys() ->
[
<<"connector">>,
<<"tags">>,
<<"description">>,
<<"enable">>,
<<"parameters">>,
<<"resource_opts">>
].
%%====================================================================================== %%======================================================================================
%% Helper functions for making HOCON Schema %% Helper functions for making HOCON Schema
%%====================================================================================== %%======================================================================================
@ -474,7 +485,9 @@ project_to_actions_resource_opts(OldResourceOpts) ->
maps:with(Subfields, OldResourceOpts). maps:with(Subfields, OldResourceOpts).
project_to_sources_resource_opts(OldResourceOpts) -> project_to_sources_resource_opts(OldResourceOpts) ->
Subfields = common_source_resource_opts_subfields_bin(), project_to_sources_resource_opts(OldResourceOpts, common_source_resource_opts_subfields_bin()).
project_to_sources_resource_opts(OldResourceOpts, Subfields) ->
maps:with(Subfields, OldResourceOpts). maps:with(Subfields, OldResourceOpts).
actions_convert_from_connectors(RawConf = #{<<"actions">> := Actions}) -> actions_convert_from_connectors(RawConf = #{<<"actions">> := Actions}) ->

View File

@ -105,9 +105,15 @@ parse_and_check(BridgeType, BridgeName, ConfigString) ->
BridgeConfig. BridgeConfig.
resource_id(Config) -> resource_id(Config) ->
BridgeKind = proplists:get_value(bridge_kind, Config, action),
ConfRootKey =
case BridgeKind of
action -> actions;
source -> sources
end,
BridgeType = ?config(bridge_type, Config), BridgeType = ?config(bridge_type, Config),
BridgeName = ?config(bridge_name, Config), BridgeName = ?config(bridge_name, Config),
emqx_bridge_resource:resource_id(BridgeType, BridgeName). emqx_bridge_resource:resource_id(ConfRootKey, BridgeType, BridgeName).
create_bridge(Config) -> create_bridge(Config) ->
create_bridge(Config, _Overrides = #{}). create_bridge(Config, _Overrides = #{}).

View File

@ -8,7 +8,12 @@
emqx_resource, emqx_resource,
ehttpc ehttpc
]}, ]},
{env, [{emqx_action_info_modules, [emqx_bridge_gcp_pubsub_producer_action_info]}]}, {env, [
{emqx_action_info_modules, [
emqx_bridge_gcp_pubsub_producer_action_info,
emqx_bridge_gcp_pubsub_consumer_action_info
]}
]},
{modules, []}, {modules, []},
{links, []} {links, []}
]}. ]}.

View File

@ -180,17 +180,17 @@ query_async(
), ),
do_send_requests_async(State, {prepared_request, PreparedRequest, ReqOpts}, ReplyFunAndArgs). do_send_requests_async(State, {prepared_request, PreparedRequest, ReqOpts}, ReplyFunAndArgs).
-spec get_status(state()) -> connected | disconnected. -spec get_status(state()) -> ?status_connected | ?status_disconnected.
get_status(#{connect_timeout := Timeout, pool_name := PoolName} = State) -> get_status(#{connect_timeout := Timeout, pool_name := PoolName} = State) ->
case do_get_status(PoolName, Timeout) of case do_get_status(PoolName, Timeout) of
true -> true ->
connected; ?status_connected;
false -> false ->
?SLOG(error, #{ ?SLOG(error, #{
msg => "gcp_pubsub_bridge_get_status_failed", msg => "gcp_pubsub_bridge_get_status_failed",
state => State state => State
}), }),
disconnected ?status_disconnected
end. end.
%%------------------------------------------------------------------------------------------------- %%-------------------------------------------------------------------------------------------------

View File

@ -0,0 +1,87 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_gcp_pubsub_consumer_action_info).
-behaviour(emqx_action_info).
-export([
is_source/0,
is_action/0,
bridge_v1_type_name/0,
action_type_name/0,
connector_type_name/0,
schema_module/0,
bridge_v1_config_to_action_config/2,
connector_action_config_to_bridge_v1_config/2
]).
is_source() -> true.
is_action() -> false.
bridge_v1_type_name() -> gcp_pubsub_consumer.
action_type_name() -> gcp_pubsub_consumer.
connector_type_name() -> gcp_pubsub_consumer.
schema_module() -> emqx_bridge_gcp_pubsub_consumer_schema.
bridge_v1_config_to_action_config(BridgeV1Config, ConnectorName) ->
CommonSourceKeys = emqx_bridge_v2_schema:top_level_common_source_keys(),
ParamsKeys = source_action_parameters_field_keys(),
Config1 = maps:with(CommonSourceKeys, BridgeV1Config),
ConsumerCfg = maps:get(<<"consumer">>, BridgeV1Config, #{}),
Params = maps:with(ParamsKeys, ConsumerCfg),
{source, gcp_pubsub_consumer,
emqx_utils_maps:update_if_present(
<<"resource_opts">>,
fun(Cfg) ->
emqx_bridge_v2_schema:project_to_sources_resource_opts(
Cfg,
resource_opts_fields()
)
end,
Config1#{
<<"connector">> => ConnectorName,
<<"parameters">> => Params
}
)}.
connector_action_config_to_bridge_v1_config(ConnectorConfig, SourceConfig) ->
BridgeV1Config1 = maps:remove(<<"connector">>, SourceConfig),
BridgeV1Config2 = emqx_utils_maps:deep_merge(ConnectorConfig, BridgeV1Config1),
BridgeV1Config3 =
emqx_utils_maps:update_if_present(
<<"resource_opts">>,
fun(RO) -> maps:with(bridge_v1_resource_opts_fields(), RO) end,
BridgeV1Config2
),
emqx_utils_maps:rename(<<"parameters">>, <<"consumer">>, BridgeV1Config3).
%%------------------------------------------------------------------------------------------
%% Internal helper fns
%%------------------------------------------------------------------------------------------
resource_opts_fields() ->
[
to_bin(K)
|| {K, _} <- emqx_bridge_gcp_pubsub_consumer_schema:fields(source_resource_opts)
].
bridge_v1_resource_opts_fields() ->
[
to_bin(K)
|| {K, _} <- emqx_bridge_gcp_pubsub:fields("consumer_resource_opts")
].
source_action_parameters_field_keys() ->
[
to_bin(K)
|| {K, _} <- emqx_bridge_gcp_pubsub_consumer_schema:fields(source_parameters)
].
to_bin(L) when is_list(L) -> list_to_binary(L);
to_bin(A) when is_atom(A) -> atom_to_binary(A, utf8).

View File

@ -0,0 +1,253 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_gcp_pubsub_consumer_schema).
-import(hoconsc, [mk/2, ref/2]).
-include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl").
%% `hocon_schema' API
-export([
namespace/0,
roots/0,
fields/1,
desc/1
]).
%% `emqx_bridge_v2_schema' "unofficial" API
-export([
source_examples/1,
conn_bridge_examples/1,
connector_examples/1
]).
-define(CONNECTOR_TYPE, gcp_pubsub_consumer).
-define(SOURCE_TYPE, gcp_pubsub_consumer).
%%-------------------------------------------------------------------------------------------------
%% `hocon_schema' API
%%-------------------------------------------------------------------------------------------------
namespace() ->
"gcp_pubsub_consumer".
roots() ->
[].
%%=========================================
%% Action fields
%%=========================================
fields(source) ->
{gcp_pubsub_consumer,
mk(
hoconsc:map(name, ref(?MODULE, consumer_source)),
#{
desc => <<"GCP PubSub Consumer Source Config">>,
required => false
}
)};
fields(consumer_source) ->
emqx_bridge_v2_schema:make_consumer_action_schema(
mk(
ref(?MODULE, source_parameters),
#{
required => true,
desc => ?DESC(consumer_source)
}
),
#{resource_opts_ref => ref(?MODULE, source_resource_opts)}
);
fields(source_parameters) ->
%% FIXME: check
emqx_bridge_gcp_pubsub:fields(consumer);
fields(source_resource_opts) ->
Fields = [
health_check_interval,
%% the workers pull the messages
request_ttl,
resume_interval
],
lists:filter(
fun({Key, _Sc}) -> lists:member(Key, Fields) end,
emqx_resource_schema:create_opts(
_Overrides = [
{health_check_interval, #{default => <<"30s">>}}
]
)
);
%%=========================================
%% Connector fields
%%=========================================
fields("config_connector") ->
%% FIXME
emqx_connector_schema:common_fields() ++
connector_config_fields();
fields(connector_resource_opts) ->
emqx_connector_schema:resource_opts_fields();
%%=========================================
%% HTTP API fields: source
%%=========================================
fields(Field) when
Field == "get_source";
Field == "post_source";
Field == "put_source"
->
emqx_bridge_v2_schema:api_fields(Field, ?SOURCE_TYPE, fields(consumer_source));
%%=========================================
%% HTTP API fields: connector
%%=========================================
fields(Field) when
Field == "get_connector";
Field == "put_connector";
Field == "post_connector"
->
emqx_connector_schema:api_fields(Field, ?CONNECTOR_TYPE, connector_config_fields()).
connector_config_fields() ->
emqx_bridge_gcp_pubsub:fields(connector_config) ++
emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts).
desc("config_connector") ->
?DESC("config_connector");
desc(source_parameters) ->
?DESC(source_parameters);
desc(consumer_source) ->
?DESC(consumer_source);
desc(connector_resource_opts) ->
?DESC(emqx_resource_schema, "resource_opts");
desc(source_resource_opts) ->
?DESC(emqx_resource_schema, "resource_opts");
desc(_Name) ->
undefined.
%%-------------------------------------------------------------------------------------------------
%% `emqx_bridge_v2_schema' "unofficial" API
%%-------------------------------------------------------------------------------------------------
source_examples(Method) ->
[
#{
<<"gcp_pubsub_consumer">> => #{
summary => <<"GCP PubSub Consumer Source">>,
value => source_example(Method)
}
}
].
connector_examples(Method) ->
[
#{
<<"gcp_pubsub_consumer">> => #{
summary => <<"GCP PubSub Consumer Connector">>,
value => connector_example(Method)
}
}
].
conn_bridge_examples(Method) ->
emqx_bridge_gcp_pubsub:conn_bridge_examples(Method).
source_example(post) ->
maps:merge(
source_example(put),
#{
type => <<"gcp_pubsub_consumer">>,
name => <<"my_action">>
}
);
source_example(get) ->
maps:merge(
source_example(put),
#{
status => <<"connected">>,
node_status => [
#{
node => <<"emqx@localhost">>,
status => <<"connected">>
}
]
}
);
source_example(put) ->
#{
enable => true,
connector => <<"my_connector_name">>,
description => <<"My action">>,
local_topic => <<"local/topic">>,
resource_opts =>
#{batch_size => 5},
parameters =>
#{
pubsub_topic => <<"mytopic">>,
ordering_key_template => <<"${payload.ok}">>,
payload_template => <<"${payload}">>,
attributes_template =>
[
#{
key => <<"${payload.attrs.k}">>,
value => <<"${payload.attrs.v}">>
}
]
}
}.
connector_example(get) ->
maps:merge(
connector_example(post),
#{
status => <<"connected">>,
node_status => [
#{
node => <<"emqx@localhost">>,
status => <<"connected">>
}
],
actions => [<<"my_action">>]
}
);
connector_example(post) ->
maps:merge(
connector_example(put),
#{
type => <<"gcp_pubsub_producer">>,
name => <<"my_connector">>
}
);
connector_example(put) ->
%% FIXME: revisit
#{
enable => true,
connect_timeout => <<"10s">>,
pool_size => 8,
pipelining => 100,
max_retries => 2,
resource_opts => #{request_ttl => <<"60s">>},
service_account_json =>
#{
auth_provider_x509_cert_url =>
<<"https://www.googleapis.com/oauth2/v1/certs">>,
auth_uri =>
<<"https://accounts.google.com/o/oauth2/auth">>,
client_email =>
<<"test@myproject.iam.gserviceaccount.com">>,
client_id => <<"123812831923812319190">>,
client_x509_cert_url =>
<<
"https://www.googleapis.com/robot/v1/"
"metadata/x509/test%40myproject.iam.gserviceaccount.com"
>>,
private_key =>
<<
"-----BEGIN PRIVATE KEY-----\n"
"MIIEvQI..."
>>,
private_key_id => <<"kid">>,
project_id => <<"myproject">>,
token_uri =>
<<"https://oauth2.googleapis.com/token">>,
type => <<"service_account">>
}
}.

View File

@ -37,8 +37,9 @@
client := emqx_bridge_gcp_pubsub_client:state(), client := emqx_bridge_gcp_pubsub_client:state(),
ecpool_worker_id => non_neg_integer(), ecpool_worker_id => non_neg_integer(),
forget_interval := duration(), forget_interval := duration(),
hookpoint := binary(), hookpoints := [binary()],
instance_id := binary(), connector_resource_id := binary(),
source_resource_id := binary(),
mqtt_config => emqx_bridge_gcp_pubsub_impl_consumer:mqtt_config(), mqtt_config => emqx_bridge_gcp_pubsub_impl_consumer:mqtt_config(),
project_id := emqx_bridge_gcp_pubsub_client:project_id(), project_id := emqx_bridge_gcp_pubsub_client:project_id(),
pull_max_messages := non_neg_integer(), pull_max_messages := non_neg_integer(),
@ -55,8 +56,9 @@
client := emqx_bridge_gcp_pubsub_client:state(), client := emqx_bridge_gcp_pubsub_client:state(),
ecpool_worker_id := non_neg_integer(), ecpool_worker_id := non_neg_integer(),
forget_interval := duration(), forget_interval := duration(),
hookpoint := binary(), hookpoints := [binary()],
instance_id := binary(), connector_resource_id := binary(),
source_resource_id := binary(),
mqtt_config := emqx_bridge_gcp_pubsub_impl_consumer:mqtt_config(), mqtt_config := emqx_bridge_gcp_pubsub_impl_consumer:mqtt_config(),
pending_acks := #{message_id() => ack_id()}, pending_acks := #{message_id() => ack_id()},
project_id := emqx_bridge_gcp_pubsub_client:project_id(), project_id := emqx_bridge_gcp_pubsub_client:project_id(),
@ -97,7 +99,7 @@ ensure_subscription(WorkerPid) ->
gen_server:cast(WorkerPid, ensure_subscription). gen_server:cast(WorkerPid, ensure_subscription).
-spec reply_delegator(pid(), pull_async, binary(), {ok, map()} | {error, timeout | term()}) -> ok. -spec reply_delegator(pid(), pull_async, binary(), {ok, map()} | {error, timeout | term()}) -> ok.
reply_delegator(WorkerPid, pull_async = _Action, InstanceId, Result) -> reply_delegator(WorkerPid, pull_async = _Action, SourceResId, Result) ->
?tp(gcp_pubsub_consumer_worker_reply_delegator, #{result => Result}), ?tp(gcp_pubsub_consumer_worker_reply_delegator, #{result => Result}),
case Result of case Result of
{error, timeout} -> {error, timeout} ->
@ -107,7 +109,7 @@ reply_delegator(WorkerPid, pull_async = _Action, InstanceId, Result) ->
warning, warning,
"gcp_pubsub_consumer_worker_pull_error", "gcp_pubsub_consumer_worker_pull_error",
#{ #{
instance_id => InstanceId, instance_id => SourceResId,
reason => Reason reason => Reason
} }
), ),
@ -156,8 +158,9 @@ connect(Opts0) ->
client := Client, client := Client,
ecpool_worker_id := WorkerId, ecpool_worker_id := WorkerId,
forget_interval := ForgetInterval, forget_interval := ForgetInterval,
hookpoint := Hookpoint, hookpoints := Hookpoints,
instance_id := InstanceId, connector_resource_id := ConnectorResId,
source_resource_id := SourceResId,
project_id := ProjectId, project_id := ProjectId,
pull_max_messages := PullMaxMessages, pull_max_messages := PullMaxMessages,
pull_retry_interval := PullRetryInterval, pull_retry_interval := PullRetryInterval,
@ -175,8 +178,9 @@ connect(Opts0) ->
%% workers. %% workers.
client => Client, client => Client,
forget_interval => ForgetInterval, forget_interval => ForgetInterval,
hookpoint => Hookpoint, hookpoints => Hookpoints,
instance_id => InstanceId, connector_resource_id => ConnectorResId,
source_resource_id => SourceResId,
mqtt_config => MQTTConfig, mqtt_config => MQTTConfig,
project_id => ProjectId, project_id => ProjectId,
pull_max_messages => PullMaxMessages, pull_max_messages => PullMaxMessages,
@ -185,6 +189,7 @@ connect(Opts0) ->
topic => Topic, topic => Topic,
subscription_id => subscription_id(BridgeName, Topic) subscription_id => subscription_id(BridgeName, Topic)
}, },
?tp(gcp_pubsub_consumer_worker_about_to_spawn, #{}),
start_link(Config). start_link(Config).
%%------------------------------------------------------------------------------------------------- %%-------------------------------------------------------------------------------------------------
@ -209,13 +214,13 @@ handle_continue(?ensure_subscription, State0) ->
already_exists -> already_exists ->
{noreply, State0, {continue, ?patch_subscription}}; {noreply, State0, {continue, ?patch_subscription}};
continue -> continue ->
#{instance_id := InstanceId} = State0, #{source_resource_id := SourceResId} = State0,
?MODULE:pull_async(self()), ?MODULE:pull_async(self()),
optvar:set(?OPTVAR_SUB_OK(self()), subscription_ok), optvar:set(?OPTVAR_SUB_OK(self()), subscription_ok),
?tp( ?tp(
debug, debug,
"gcp_pubsub_consumer_worker_subscription_ready", "gcp_pubsub_consumer_worker_subscription_ready",
#{instance_id => InstanceId} #{instance_id => SourceResId}
), ),
{noreply, State0}; {noreply, State0};
retry -> retry ->
@ -232,13 +237,13 @@ handle_continue(?patch_subscription, State0) ->
?tp(gcp_pubsub_consumer_worker_patch_subscription_enter, #{}), ?tp(gcp_pubsub_consumer_worker_patch_subscription_enter, #{}),
case patch_subscription(State0) of case patch_subscription(State0) of
ok -> ok ->
#{instance_id := InstanceId} = State0, #{source_resource_id := SourceResId} = State0,
?MODULE:pull_async(self()), ?MODULE:pull_async(self()),
optvar:set(?OPTVAR_SUB_OK(self()), subscription_ok), optvar:set(?OPTVAR_SUB_OK(self()), subscription_ok),
?tp( ?tp(
debug, debug,
"gcp_pubsub_consumer_worker_subscription_ready", "gcp_pubsub_consumer_worker_subscription_ready",
#{instance_id => InstanceId} #{instance_id => SourceResId}
), ),
{noreply, State0}; {noreply, State0};
error -> error ->
@ -292,13 +297,13 @@ handle_info({forget_message_ids, MsgIds}, State0) ->
{noreply, State}; {noreply, State};
handle_info(Msg, State0) -> handle_info(Msg, State0) ->
#{ #{
instance_id := InstanceId, source_resource_id := SoureceResId,
topic := Topic topic := Topic
} = State0, } = State0,
?SLOG(debug, #{ ?SLOG(debug, #{
msg => "gcp_pubsub_consumer_worker_unexpected_message", msg => "gcp_pubsub_consumer_worker_unexpected_message",
unexpected_msg => Msg, unexpected_msg => Msg,
instance_id => InstanceId, instance_id => SoureceResId,
topic => Topic topic => Topic
}), }),
{noreply, State0}. {noreply, State0}.
@ -309,11 +314,11 @@ terminate({error, Reason}, State) when
Reason =:= permission_denied Reason =:= permission_denied
-> ->
#{ #{
instance_id := InstanceId, source_resource_id := SourceResId,
topic := _Topic topic := _Topic
} = State, } = State,
optvar:unset(?OPTVAR_SUB_OK(self())), optvar:unset(?OPTVAR_SUB_OK(self())),
emqx_bridge_gcp_pubsub_impl_consumer:mark_as_unhealthy(InstanceId, Reason), emqx_bridge_gcp_pubsub_impl_consumer:mark_as_unhealthy(SourceResId, Reason),
?tp(gcp_pubsub_consumer_worker_terminate, #{reason => {error, Reason}, topic => _Topic}), ?tp(gcp_pubsub_consumer_worker_terminate, #{reason => {error, Reason}, topic => _Topic}),
ok; ok;
terminate(_Reason, _State) -> terminate(_Reason, _State) ->
@ -351,7 +356,7 @@ ensure_subscription_exists(State) ->
?tp(gcp_pubsub_consumer_worker_create_subscription_enter, #{}), ?tp(gcp_pubsub_consumer_worker_create_subscription_enter, #{}),
#{ #{
client := Client, client := Client,
instance_id := InstanceId, source_resource_id := SourceResId,
request_ttl := RequestTTL, request_ttl := RequestTTL,
subscription_id := SubscriptionId, subscription_id := SubscriptionId,
topic := Topic topic := Topic
@ -369,7 +374,7 @@ ensure_subscription_exists(State) ->
debug, debug,
"gcp_pubsub_consumer_worker_subscription_already_exists", "gcp_pubsub_consumer_worker_subscription_already_exists",
#{ #{
instance_id => InstanceId, instance_id => SourceResId,
topic => Topic, topic => Topic,
subscription_id => SubscriptionId subscription_id => SubscriptionId
} }
@ -381,7 +386,7 @@ ensure_subscription_exists(State) ->
warning, warning,
"gcp_pubsub_consumer_worker_nonexistent_topic", "gcp_pubsub_consumer_worker_nonexistent_topic",
#{ #{
instance_id => InstanceId, instance_id => SourceResId,
topic => Topic topic => Topic
} }
), ),
@ -392,7 +397,7 @@ ensure_subscription_exists(State) ->
warning, warning,
"gcp_pubsub_consumer_worker_permission_denied", "gcp_pubsub_consumer_worker_permission_denied",
#{ #{
instance_id => InstanceId, instance_id => SourceResId,
topic => Topic topic => Topic
} }
), ),
@ -403,7 +408,7 @@ ensure_subscription_exists(State) ->
warning, warning,
"gcp_pubsub_consumer_worker_bad_credentials", "gcp_pubsub_consumer_worker_bad_credentials",
#{ #{
instance_id => InstanceId, instance_id => SourceResId,
topic => Topic topic => Topic
} }
), ),
@ -413,7 +418,7 @@ ensure_subscription_exists(State) ->
debug, debug,
"gcp_pubsub_consumer_worker_subscription_created", "gcp_pubsub_consumer_worker_subscription_created",
#{ #{
instance_id => InstanceId, instance_id => SourceResId,
topic => Topic, topic => Topic,
subscription_id => SubscriptionId subscription_id => SubscriptionId
} }
@ -424,7 +429,7 @@ ensure_subscription_exists(State) ->
error, error,
"gcp_pubsub_consumer_worker_subscription_error", "gcp_pubsub_consumer_worker_subscription_error",
#{ #{
instance_id => InstanceId, instance_id => SourceResId,
topic => Topic, topic => Topic,
reason => Reason reason => Reason
} }
@ -436,7 +441,7 @@ ensure_subscription_exists(State) ->
patch_subscription(State) -> patch_subscription(State) ->
#{ #{
client := Client, client := Client,
instance_id := InstanceId, source_resource_id := SourceResId,
subscription_id := SubscriptionId, subscription_id := SubscriptionId,
request_ttl := RequestTTL, request_ttl := RequestTTL,
topic := Topic topic := Topic
@ -453,7 +458,7 @@ patch_subscription(State) ->
debug, debug,
"gcp_pubsub_consumer_worker_subscription_patched", "gcp_pubsub_consumer_worker_subscription_patched",
#{ #{
instance_id => InstanceId, instance_id => SourceResId,
topic => Topic, topic => Topic,
subscription_id => SubscriptionId, subscription_id => SubscriptionId,
result => Res result => Res
@ -465,7 +470,7 @@ patch_subscription(State) ->
warning, warning,
"gcp_pubsub_consumer_worker_subscription_patch_error", "gcp_pubsub_consumer_worker_subscription_patch_error",
#{ #{
instance_id => InstanceId, instance_id => SourceResId,
topic => Topic, topic => Topic,
subscription_id => SubscriptionId, subscription_id => SubscriptionId,
reason => Reason reason => Reason
@ -483,7 +488,7 @@ do_pull_async(State0) ->
begin begin
#{ #{
client := Client, client := Client,
instance_id := InstanceId, source_resource_id := SourceResId,
request_ttl := RequestTTL request_ttl := RequestTTL
} = State0, } = State0,
Method = post, Method = post,
@ -491,7 +496,7 @@ do_pull_async(State0) ->
Body = body(State0, pull), Body = body(State0, pull),
ReqOpts = #{request_ttl => RequestTTL}, ReqOpts = #{request_ttl => RequestTTL},
PreparedRequest = {prepared_request, {Method, Path, Body}, ReqOpts}, PreparedRequest = {prepared_request, {Method, Path, Body}, ReqOpts},
ReplyFunAndArgs = {fun ?MODULE:reply_delegator/4, [self(), pull_async, InstanceId]}, ReplyFunAndArgs = {fun ?MODULE:reply_delegator/4, [self(), pull_async, SourceResId]},
Res = emqx_bridge_gcp_pubsub_client:query_async( Res = emqx_bridge_gcp_pubsub_client:query_async(
PreparedRequest, PreparedRequest,
ReplyFunAndArgs, ReplyFunAndArgs,
@ -734,8 +739,8 @@ handle_message(State, #{<<"ackId">> := AckId, <<"message">> := InnerMsg} = _Mess
#{message_id => maps:get(<<"messageId">>, InnerMsg), message => _Message, ack_id => AckId}, #{message_id => maps:get(<<"messageId">>, InnerMsg), message => _Message, ack_id => AckId},
begin begin
#{ #{
instance_id := InstanceId, source_resource_id := SourceResId,
hookpoint := Hookpoint, hookpoints := Hookpoints,
mqtt_config := #{ mqtt_config := #{
payload_template := PayloadTemplate, payload_template := PayloadTemplate,
qos := MQTTQoS, qos := MQTTQoS,
@ -764,11 +769,15 @@ handle_message(State, #{<<"ackId">> := AckId, <<"message">> := InnerMsg} = _Mess
{<<"orderingKey">>, ordering_key} {<<"orderingKey">>, ordering_key}
] ]
), ),
%% TODO: this should be optional
Payload = render(FullMessage, PayloadTemplate), Payload = render(FullMessage, PayloadTemplate),
MQTTMessage = emqx_message:make(InstanceId, MQTTQoS, MQTTTopic, Payload), MQTTMessage = emqx_message:make(SourceResId, MQTTQoS, MQTTTopic, Payload),
_ = emqx:publish(MQTTMessage), _ = emqx:publish(MQTTMessage),
emqx_hooks:run(Hookpoint, [FullMessage]), lists:foreach(
emqx_resource_metrics:received_inc(InstanceId), fun(Hookpoint) -> emqx_hooks:run(Hookpoint, [FullMessage]) end,
Hookpoints
),
emqx_resource_metrics:received_inc(SourceResId),
ok ok
end end
). ).

View File

@ -12,7 +12,12 @@
query_mode/1, query_mode/1,
on_start/2, on_start/2,
on_stop/2, on_stop/2,
on_get_status/2 on_get_status/2,
on_add_channel/4,
on_remove_channel/3,
on_get_channels/1,
on_get_channel_status/3
]). ]).
%% health check API %% health check API
@ -31,7 +36,7 @@
qos := emqx_types:qos(), qos := emqx_types:qos(),
payload_template := string() payload_template := string()
}. }.
-type config() :: #{ -type connector_config() :: #{
connect_timeout := emqx_schema:duration_ms(), connect_timeout := emqx_schema:duration_ms(),
max_retries := non_neg_integer(), max_retries := non_neg_integer(),
pool_size := non_neg_integer(), pool_size := non_neg_integer(),
@ -39,9 +44,27 @@
service_account_json := emqx_bridge_gcp_pubsub_client:service_account_json(), service_account_json := emqx_bridge_gcp_pubsub_client:service_account_json(),
any() => term() any() => term()
}. }.
-type state() :: #{ -type connector_state() :: #{
client := emqx_bridge_gcp_pubsub_client:state() client := emqx_bridge_gcp_pubsub_client:state(),
installed_sources := #{source_resource_id() => source_state()},
project_id := binary()
}. }.
-type topic_mapping() :: #{
pubsub_topic := binary(),
mqtt_topic := binary(),
qos := emqx_types:qos(),
payload_template := binary()
}.
-type source_config() :: #{
bridge_name := binary(),
hookpoints := [binary()],
parameters := #{
consumer_workers_per_topic := pos_integer(),
topic_mapping := [topic_mapping(), ...]
},
resource_opts := #{request_ttl := infinity | emqx_schema:duration_ms(), any() => term()}
}.
-type source_state() :: #{}.
-export_type([mqtt_config/0]). -export_type([mqtt_config/0]).
@ -67,38 +90,91 @@ callback_mode() -> async_if_possible.
-spec query_mode(any()) -> query_mode(). -spec query_mode(any()) -> query_mode().
query_mode(_Config) -> no_queries. query_mode(_Config) -> no_queries.
-spec on_start(resource_id(), config()) -> {ok, state()} | {error, term()}. -spec on_start(connector_resource_id(), connector_config()) ->
on_start(InstanceId, Config0) -> {ok, connector_state()} | {error, term()}.
on_start(ConnectorResId, Config0) ->
%% ensure it's a binary key map %% ensure it's a binary key map
Config = maps:update_with(service_account_json, fun emqx_utils_maps:binary_key_map/1, Config0), Config = maps:update_with(service_account_json, fun emqx_utils_maps:binary_key_map/1, Config0),
case emqx_bridge_gcp_pubsub_client:start(InstanceId, Config) of #{service_account_json := #{<<"project_id">> := ProjectId}} = Config,
case emqx_bridge_gcp_pubsub_client:start(ConnectorResId, Config) of
{ok, Client} -> {ok, Client} ->
start_consumers(InstanceId, Client, Config); ConnectorState = #{
client => Client,
installed_sources => #{},
project_id => ProjectId
},
{ok, ConnectorState};
Error -> Error ->
Error Error
end. end.
-spec on_stop(resource_id(), state()) -> ok | {error, term()}. -spec on_stop(resource_id(), connector_state()) -> ok | {error, term()}.
on_stop(InstanceId, _State) -> on_stop(ConnectorResId, ConnectorState) ->
?tp(gcp_pubsub_consumer_stop_enter, #{}), ?tp(gcp_pubsub_consumer_stop_enter, #{}),
clear_unhealthy(InstanceId), clear_unhealthy(ConnectorState),
ok = stop_consumers(InstanceId), ok = stop_consumers(ConnectorState),
emqx_bridge_gcp_pubsub_client:stop(InstanceId). emqx_bridge_gcp_pubsub_client:stop(ConnectorResId).
-spec on_get_status(resource_id(), state()) -> connected | connecting | {disconnected, state(), _}. -spec on_get_status(resource_id(), connector_state()) ->
on_get_status(InstanceId, State) -> ?status_connected | ?status_connecting.
on_get_status(_ConnectorResId, ConnectorState) ->
#{client := Client} = ConnectorState,
get_client_status(Client).
-spec on_add_channel(
connector_resource_id(),
connector_state(),
source_resource_id(),
source_config()
) ->
{ok, connector_state()}.
on_add_channel(ConnectorResId, ConnectorState0, SourceResId, SourceConfig) ->
#{
client := Client,
installed_sources := InstalledSources0,
project_id := ProjectId
} = ConnectorState0,
case start_consumers(ConnectorResId, SourceResId, Client, ProjectId, SourceConfig) of
{ok, SourceState} ->
InstalledSources = InstalledSources0#{SourceResId => SourceState},
ConnectorState = ConnectorState0#{installed_sources := InstalledSources},
{ok, ConnectorState};
Error = {error, _} ->
Error
end.
-spec on_remove_channel(
connector_resource_id(),
connector_state(),
source_resource_id()
) ->
{ok, connector_state()}.
on_remove_channel(_ConnectorResId, ConnectorState0, SourceResId) ->
#{installed_sources := InstalledSources0} = ConnectorState0,
InstalledSources = maps:remove(SourceResId, InstalledSources0),
ConnectorState = ConnectorState0#{installed_sources := InstalledSources},
{ok, ConnectorState}.
-spec on_get_channels(connector_resource_id()) ->
[{action_resource_id(), source_config()}].
on_get_channels(ConnectorResId) ->
emqx_bridge_v2:get_channels_for_connector(ConnectorResId).
-spec on_get_channel_status(connector_resource_id(), source_resource_id(), connector_state()) ->
health_check_status().
on_get_channel_status(_ConnectorResId, SourceResId, ConnectorState) ->
%% We need to check this flag separately because the workers might be gone when we %% We need to check this flag separately because the workers might be gone when we
%% check them. %% check them.
case check_if_unhealthy(InstanceId) of case check_if_unhealthy(SourceResId) of
{error, topic_not_found} -> {error, topic_not_found} ->
{disconnected, State, {unhealthy_target, ?TOPIC_MESSAGE}}; {?status_disconnected, {unhealthy_target, ?TOPIC_MESSAGE}};
{error, permission_denied} -> {error, permission_denied} ->
{disconnected, State, {unhealthy_target, ?PERMISSION_MESSAGE}}; {?status_disconnected, {unhealthy_target, ?PERMISSION_MESSAGE}};
{error, bad_credentials} -> {error, bad_credentials} ->
{disconnected, State, {unhealthy_target, ?PERMISSION_MESSAGE}}; {?status_disconnected, {unhealthy_target, ?PERMISSION_MESSAGE}};
ok -> ok ->
#{client := Client} = State, #{client := Client} = ConnectorState,
check_workers(InstanceId, Client) check_workers(SourceResId, Client)
end. end.
%%------------------------------------------------------------------------------------------------- %%-------------------------------------------------------------------------------------------------
@ -106,29 +182,35 @@ on_get_status(InstanceId, State) ->
%%------------------------------------------------------------------------------------------------- %%-------------------------------------------------------------------------------------------------
-spec mark_as_unhealthy( -spec mark_as_unhealthy(
resource_id(), source_resource_id(),
topic_not_found topic_not_found
| permission_denied | permission_denied
| bad_credentials | bad_credentials
) -> ok. ) -> ok.
mark_as_unhealthy(InstanceId, Reason) -> mark_as_unhealthy(SourceResId, Reason) ->
optvar:set(?OPTVAR_UNHEALTHY(InstanceId), Reason), optvar:set(?OPTVAR_UNHEALTHY(SourceResId), Reason),
ok. ok.
-spec clear_unhealthy(resource_id()) -> ok. -spec clear_unhealthy(connector_state()) -> ok.
clear_unhealthy(InstanceId) -> clear_unhealthy(ConnectorState) ->
optvar:unset(?OPTVAR_UNHEALTHY(InstanceId)), #{installed_sources := InstalledSources} = ConnectorState,
maps:foreach(
fun(SourceResId, _SourceState) ->
optvar:unset(?OPTVAR_UNHEALTHY(SourceResId))
end,
InstalledSources
),
?tp(gcp_pubsub_consumer_clear_unhealthy, #{}), ?tp(gcp_pubsub_consumer_clear_unhealthy, #{}),
ok. ok.
-spec check_if_unhealthy(resource_id()) -> -spec check_if_unhealthy(source_resource_id()) ->
ok ok
| {error, | {error,
topic_not_found topic_not_found
| permission_denied | permission_denied
| bad_credentials}. | bad_credentials}.
check_if_unhealthy(InstanceId) -> check_if_unhealthy(SourceResId) ->
case optvar:peek(?OPTVAR_UNHEALTHY(InstanceId)) of case optvar:peek(?OPTVAR_UNHEALTHY(SourceResId)) of
{ok, Reason} -> {ok, Reason} ->
{error, Reason}; {error, Reason};
undefined -> undefined ->
@ -139,14 +221,13 @@ check_if_unhealthy(InstanceId) ->
%% Internal fns %% Internal fns
%%------------------------------------------------------------------------------------------------- %%-------------------------------------------------------------------------------------------------
start_consumers(InstanceId, Client, Config) -> start_consumers(ConnectorResId, SourceResId, Client, ProjectId, SourceConfig) ->
#{ #{
bridge_name := BridgeName, bridge_name := BridgeName,
consumer := ConsumerConfig0, parameters := ConsumerConfig0,
hookpoint := Hookpoint, hookpoints := Hookpoints,
resource_opts := #{request_ttl := RequestTTL}, resource_opts := #{request_ttl := RequestTTL}
service_account_json := #{<<"project_id">> := ProjectId} } = SourceConfig,
} = Config,
ConsumerConfig1 = maps:update_with(topic_mapping, fun convert_topic_mapping/1, ConsumerConfig0), ConsumerConfig1 = maps:update_with(topic_mapping, fun convert_topic_mapping/1, ConsumerConfig0),
TopicMapping = maps:get(topic_mapping, ConsumerConfig1), TopicMapping = maps:get(topic_mapping, ConsumerConfig1),
ConsumerWorkersPerTopic = maps:get(consumer_workers_per_topic, ConsumerConfig1), ConsumerWorkersPerTopic = maps:get(consumer_workers_per_topic, ConsumerConfig1),
@ -156,8 +237,9 @@ start_consumers(InstanceId, Client, Config) ->
bridge_name => BridgeName, bridge_name => BridgeName,
client => Client, client => Client,
forget_interval => forget_interval(RequestTTL), forget_interval => forget_interval(RequestTTL),
hookpoint => Hookpoint, hookpoints => Hookpoints,
instance_id => InstanceId, connector_resource_id => ConnectorResId,
source_resource_id => SourceResId,
pool_size => PoolSize, pool_size => PoolSize,
project_id => ProjectId, project_id => ProjectId,
pull_retry_interval => RequestTTL, pull_retry_interval => RequestTTL,
@ -169,17 +251,14 @@ start_consumers(InstanceId, Client, Config) ->
ok -> ok ->
ok; ok;
{error, not_found} -> {error, not_found} ->
_ = emqx_bridge_gcp_pubsub_client:stop(InstanceId),
throw( throw(
{unhealthy_target, ?TOPIC_MESSAGE} {unhealthy_target, ?TOPIC_MESSAGE}
); );
{error, permission_denied} -> {error, permission_denied} ->
_ = emqx_bridge_gcp_pubsub_client:stop(InstanceId),
throw( throw(
{unhealthy_target, ?PERMISSION_MESSAGE} {unhealthy_target, ?PERMISSION_MESSAGE}
); );
{error, bad_credentials} -> {error, bad_credentials} ->
_ = emqx_bridge_gcp_pubsub_client:stop(InstanceId),
throw( throw(
{unhealthy_target, ?PERMISSION_MESSAGE} {unhealthy_target, ?PERMISSION_MESSAGE}
); );
@ -190,30 +269,34 @@ start_consumers(InstanceId, Client, Config) ->
ok ok
end, end,
case case
emqx_resource_pool:start(InstanceId, emqx_bridge_gcp_pubsub_consumer_worker, ConsumerOpts) emqx_resource_pool:start(SourceResId, emqx_bridge_gcp_pubsub_consumer_worker, ConsumerOpts)
of of
ok -> ok ->
State = #{ State = #{
client => Client, client => Client,
pool_name => InstanceId pool_name => SourceResId
}, },
{ok, State}; {ok, State};
{error, Reason} -> {error, Reason} ->
_ = emqx_bridge_gcp_pubsub_client:stop(InstanceId),
{error, Reason} {error, Reason}
end. end.
stop_consumers(InstanceId) -> stop_consumers(ConnectorState) ->
_ = log_when_error( #{installed_sources := InstalledSources} = ConnectorState,
fun() -> maps:foreach(
ok = emqx_resource_pool:stop(InstanceId) fun(SourceResId, _SourceState) ->
_ = log_when_error(
fun() ->
ok = emqx_resource_pool:stop(SourceResId)
end,
#{
msg => "failed_to_stop_pull_worker_pool",
instance_id => SourceResId
}
)
end, end,
#{ InstalledSources
msg => "failed_to_stop_pull_worker_pool", ).
instance_id => InstanceId
}
),
ok.
convert_topic_mapping(TopicMappingList) -> convert_topic_mapping(TopicMappingList) ->
lists:foldl( lists:foldl(
@ -268,35 +351,37 @@ check_for_topic_existence(Topic, Client, ReqOpts) ->
{error, Reason} {error, Reason}
end. end.
-spec get_client_status(emqx_bridge_gcp_pubsub_client:state()) -> connected | connecting. -spec get_client_status(emqx_bridge_gcp_pubsub_client:state()) ->
?status_connected | ?status_connecting.
get_client_status(Client) -> get_client_status(Client) ->
case emqx_bridge_gcp_pubsub_client:get_status(Client) of case emqx_bridge_gcp_pubsub_client:get_status(Client) of
disconnected -> connecting; ?status_disconnected -> ?status_connecting;
connected -> connected ?status_connected -> ?status_connected
end. end.
-spec check_workers(resource_id(), emqx_bridge_gcp_pubsub_client:state()) -> connected | connecting. -spec check_workers(source_resource_id(), emqx_bridge_gcp_pubsub_client:state()) ->
check_workers(InstanceId, Client) -> ?status_connected | ?status_connecting.
check_workers(SourceResId, Client) ->
case case
emqx_resource_pool:health_check_workers( emqx_resource_pool:health_check_workers(
InstanceId, SourceResId,
fun emqx_bridge_gcp_pubsub_consumer_worker:health_check/1, fun emqx_bridge_gcp_pubsub_consumer_worker:health_check/1,
emqx_resource_pool:health_check_timeout(), emqx_resource_pool:health_check_timeout(),
#{return_values => true} #{return_values => true}
) )
of of
{ok, []} -> {ok, []} ->
connecting; ?status_connecting;
{ok, Values} -> {ok, Values} ->
AllOk = lists:all(fun(S) -> S =:= subscription_ok end, Values), AllOk = lists:all(fun(S) -> S =:= subscription_ok end, Values),
case AllOk of case AllOk of
true -> true ->
get_client_status(Client); get_client_status(Client);
false -> false ->
connecting ?status_connecting
end; end;
{error, _} -> {error, _} ->
connecting ?status_connecting
end. end.
log_when_error(Fun, Log) -> log_when_error(Fun, Log) ->

View File

@ -13,6 +13,8 @@
-define(BRIDGE_TYPE, gcp_pubsub_consumer). -define(BRIDGE_TYPE, gcp_pubsub_consumer).
-define(BRIDGE_TYPE_BIN, <<"gcp_pubsub_consumer">>). -define(BRIDGE_TYPE_BIN, <<"gcp_pubsub_consumer">>).
-define(CONNECTOR_TYPE_BIN, <<"gcp_pubsub_consumer">>).
-define(SOURCE_TYPE_BIN, <<"gcp_pubsub_consumer">>).
-define(REPUBLISH_TOPIC, <<"republish/t">>). -define(REPUBLISH_TOPIC, <<"republish/t">>).
-define(PREPARED_REQUEST(METHOD, PATH, BODY), -define(PREPARED_REQUEST(METHOD, PATH, BODY),
{prepared_request, {METHOD, PATH, BODY}, #{request_ttl => 1_000}} {prepared_request, {METHOD, PATH, BODY}, #{request_ttl => 1_000}}
@ -147,6 +149,7 @@ common_init_per_testcase(TestCase, Config0) ->
ensure_topics(Config), ensure_topics(Config),
ok = snabbkaffe:start_trace(), ok = snabbkaffe:start_trace(),
[ [
{bridge_kind, source},
{bridge_type, ?BRIDGE_TYPE}, {bridge_type, ?BRIDGE_TYPE},
{bridge_name, Name}, {bridge_name, Name},
{bridge_config, ConsumerConfig}, {bridge_config, ConsumerConfig},
@ -157,18 +160,13 @@ common_init_per_testcase(TestCase, Config0) ->
]. ].
end_per_testcase(_Testcase, Config) -> end_per_testcase(_Testcase, Config) ->
case proplists:get_bool(skip_does_not_apply, Config) of ProxyHost = ?config(proxy_host, Config),
true -> ProxyPort = ?config(proxy_port, Config),
ok; emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
false -> emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(),
ProxyHost = ?config(proxy_host, Config), emqx_common_test_helpers:call_janitor(60_000),
ProxyPort = ?config(proxy_port, Config), ok = snabbkaffe:stop(),
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), ok.
emqx_bridge_testlib:delete_all_bridges(),
emqx_common_test_helpers:call_janitor(60_000),
ok = snabbkaffe:stop(),
ok
end.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Helper fns %% Helper fns
@ -421,9 +419,22 @@ start_and_subscribe_mqtt(Config) ->
ok. ok.
resource_id(Config) -> resource_id(Config) ->
Type = ?BRIDGE_TYPE_BIN,
Name = ?config(consumer_name, Config), Name = ?config(consumer_name, Config),
emqx_bridge_resource:resource_id(Type, Name). emqx_bridge_v2:source_id(?SOURCE_TYPE_BIN, Name, Name).
connector_resource_id(Config) ->
Name = ?config(consumer_name, Config),
emqx_connector_resource:resource_id(?CONNECTOR_TYPE_BIN, Name).
health_check(Config) ->
#{status := Status} = health_check_channel(Config),
{ok, Status}.
health_check_channel(Config) ->
Name = ?config(consumer_name, Config),
ConnectorResId = emqx_connector_resource:resource_id(?CONNECTOR_TYPE_BIN, Name),
SourceResId = resource_id(Config),
emqx_resource_manager:channel_health_check(ConnectorResId, SourceResId).
bridge_id(Config) -> bridge_id(Config) ->
Type = ?BRIDGE_TYPE_BIN, Type = ?BRIDGE_TYPE_BIN,
@ -564,7 +575,7 @@ get_pull_worker_pids(Config) ->
Pids. Pids.
get_async_worker_pids(Config) -> get_async_worker_pids(Config) ->
ResourceId = resource_id(Config), ResourceId = connector_resource_id(Config),
Pids = Pids =
[ [
AsyncWorkerPid AsyncWorkerPid
@ -902,7 +913,6 @@ unauthenticated_response() ->
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
t_start_stop(Config) -> t_start_stop(Config) ->
ResourceId = resource_id(Config),
[#{pubsub_topic := PubSubTopic} | _] = ?config(topic_mapping, Config), [#{pubsub_topic := PubSubTopic} | _] = ?config(topic_mapping, Config),
?check_trace( ?check_trace(
begin begin
@ -913,7 +923,7 @@ t_start_stop(Config) ->
), ),
?assertMatch({ok, _}, create_bridge(Config)), ?assertMatch({ok, _}, create_bridge(Config)),
{ok, _} = snabbkaffe:receive_events(SRef0), {ok, _} = snabbkaffe:receive_events(SRef0),
?assertMatch({ok, connected}, emqx_resource_manager:health_check(ResourceId)), ?assertMatch({ok, connected}, health_check(Config)),
?assertMatch(ok, remove_bridge(Config)), ?assertMatch(ok, remove_bridge(Config)),
ok ok
@ -1045,8 +1055,8 @@ t_consume_ok(Config) ->
), ),
%% Check that the bridge probe API doesn't leak atoms. %% Check that the bridge probe API doesn't leak atoms.
ProbeRes0 = probe_bridge_api(Config), ?assertMatch({ok, {{_, 204, _}, _Headers, _Body}}, probe_bridge_api(Config)),
?assertMatch({ok, {{_, 204, _}, _Headers, _Body}}, ProbeRes0), ?assertMatch({ok, {{_, 204, _}, _Headers, _Body}}, probe_bridge_api(Config)),
AtomsBefore = erlang:system_info(atom_count), AtomsBefore = erlang:system_info(atom_count),
%% Probe again; shouldn't have created more atoms. %% Probe again; shouldn't have created more atoms.
ProbeRes1 = probe_bridge_api(Config), ProbeRes1 = probe_bridge_api(Config),
@ -1143,13 +1153,12 @@ t_bridge_rule_action_source(Config) ->
ok. ok.
t_on_get_status(Config) -> t_on_get_status(Config) ->
ResourceId = resource_id(Config),
emqx_bridge_testlib:t_on_get_status(Config, #{failure_status => connecting}), emqx_bridge_testlib:t_on_get_status(Config, #{failure_status => connecting}),
%% no workers alive %% no workers alive
?retry( ?retry(
_Interval0 = 200, _Interval0 = 200,
_NAttempts0 = 20, _NAttempts0 = 20,
?assertMatch({ok, connected}, emqx_resource_manager:health_check(ResourceId)) ?assertMatch({ok, connected}, health_check(Config))
), ),
WorkerPids = get_pull_worker_pids(Config), WorkerPids = get_pull_worker_pids(Config),
emqx_utils:pmap( emqx_utils:pmap(
@ -1163,7 +1172,7 @@ t_on_get_status(Config) ->
end, end,
WorkerPids WorkerPids
), ),
?assertMatch({ok, connecting}, emqx_resource_manager:health_check(ResourceId)), ?assertMatch({ok, connecting}, health_check(Config)),
ok. ok.
t_create_update_via_http_api(Config) -> t_create_update_via_http_api(Config) ->
@ -1331,7 +1340,6 @@ t_multiple_pull_workers(Config) ->
t_nonexistent_topic(Config) -> t_nonexistent_topic(Config) ->
BridgeName = ?config(bridge_name, Config), BridgeName = ?config(bridge_name, Config),
[Mapping0] = ?config(topic_mapping, Config), [Mapping0] = ?config(topic_mapping, Config),
ResourceId = resource_id(Config),
PubSubTopic = <<"nonexistent-", (emqx_guid:to_hexstr(emqx_guid:gen()))/binary>>, PubSubTopic = <<"nonexistent-", (emqx_guid:to_hexstr(emqx_guid:gen()))/binary>>,
TopicMapping0 = [Mapping0#{pubsub_topic := PubSubTopic}], TopicMapping0 = [Mapping0#{pubsub_topic := PubSubTopic}],
TopicMapping = emqx_utils_maps:binary_key_map(TopicMapping0), TopicMapping = emqx_utils_maps:binary_key_map(TopicMapping0),
@ -1347,11 +1355,14 @@ t_nonexistent_topic(Config) ->
), ),
?assertMatch( ?assertMatch(
{ok, disconnected}, {ok, disconnected},
emqx_resource_manager:health_check(ResourceId) health_check(Config)
), ),
?assertMatch( ?assertMatch(
{ok, _Group, #{error := {unhealthy_target, "GCP PubSub topics are invalid" ++ _}}}, #{
emqx_resource_manager:lookup_cached(ResourceId) status := disconnected,
error := {unhealthy_target, "GCP PubSub topics are invalid" ++ _}
},
health_check_channel(Config)
), ),
%% now create the topic and restart the bridge %% now create the topic and restart the bridge
ensure_topic(Config, PubSubTopic), ensure_topic(Config, PubSubTopic),
@ -1362,11 +1373,14 @@ t_nonexistent_topic(Config) ->
?retry( ?retry(
_Interval0 = 200, _Interval0 = 200,
_NAttempts0 = 20, _NAttempts0 = 20,
?assertMatch({ok, connected}, emqx_resource_manager:health_check(ResourceId)) ?assertMatch({ok, connected}, health_check(Config))
), ),
?assertMatch( ?assertMatch(
{ok, _Group, #{error := undefined}}, #{
emqx_resource_manager:lookup_cached(ResourceId) status := connected,
error := undefined
},
health_check_channel(Config)
), ),
ok ok
end, end,
@ -1383,7 +1397,6 @@ t_nonexistent_topic(Config) ->
t_topic_deleted_while_consumer_is_running(Config) -> t_topic_deleted_while_consumer_is_running(Config) ->
TopicMapping = [#{pubsub_topic := PubSubTopic} | _] = ?config(topic_mapping, Config), TopicMapping = [#{pubsub_topic := PubSubTopic} | _] = ?config(topic_mapping, Config),
NTopics = length(TopicMapping), NTopics = length(TopicMapping),
ResourceId = resource_id(Config),
?check_trace( ?check_trace(
begin begin
{ok, SRef0} = {ok, SRef0} =
@ -1395,7 +1408,7 @@ t_topic_deleted_while_consumer_is_running(Config) ->
{ok, _} = create_bridge(Config), {ok, _} = create_bridge(Config),
{ok, _} = snabbkaffe:receive_events(SRef0), {ok, _} = snabbkaffe:receive_events(SRef0),
?assertMatch({ok, connected}, emqx_resource_manager:health_check(ResourceId)), ?assertMatch({ok, connected}, health_check(Config)),
%% curiously, gcp pubsub doesn't seem to return any errors from the %% curiously, gcp pubsub doesn't seem to return any errors from the
%% subscription if the topic is deleted while the subscription still exists... %% subscription if the topic is deleted while the subscription still exists...
@ -1421,7 +1434,6 @@ t_connection_down_before_starting(Config) ->
ProxyName = ?config(proxy_name, Config), ProxyName = ?config(proxy_name, Config),
ProxyHost = ?config(proxy_host, Config), ProxyHost = ?config(proxy_host, Config),
ProxyPort = ?config(proxy_port, Config), ProxyPort = ?config(proxy_port, Config),
ResourceId = resource_id(Config),
?check_trace( ?check_trace(
begin begin
emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() -> emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
@ -1433,13 +1445,13 @@ t_connection_down_before_starting(Config) ->
10_000 10_000
) )
), ),
?assertMatch({ok, connecting}, emqx_resource_manager:health_check(ResourceId)), ?assertMatch({ok, disconnected}, health_check(Config)),
ok ok
end), end),
?retry( ?retry(
_Interval0 = 200, _Interval0 = 200,
_NAttempts0 = 20, _NAttempts0 = 20,
?assertMatch({ok, connected}, emqx_resource_manager:health_check(ResourceId)) ?assertMatch({ok, connected}, health_check(Config))
), ),
ok ok
end, end,
@ -1451,7 +1463,6 @@ t_connection_timeout_before_starting(Config) ->
ProxyName = ?config(proxy_name, Config), ProxyName = ?config(proxy_name, Config),
ProxyHost = ?config(proxy_host, Config), ProxyHost = ?config(proxy_host, Config),
ProxyPort = ?config(proxy_port, Config), ProxyPort = ?config(proxy_port, Config),
ResourceId = resource_id(Config),
?check_trace( ?check_trace(
begin begin
emqx_common_test_helpers:with_failure( emqx_common_test_helpers:with_failure(
@ -1464,14 +1475,14 @@ t_connection_timeout_before_starting(Config) ->
10_000 10_000
) )
), ),
?assertMatch({ok, connecting}, emqx_resource_manager:health_check(ResourceId)), ?assertMatch({ok, connecting}, health_check(Config)),
ok ok
end end
), ),
?retry( ?retry(
_Interval0 = 200, _Interval0 = 200,
_NAttempts0 = 20, _NAttempts0 = 20,
?assertMatch({ok, connected}, emqx_resource_manager:health_check(ResourceId)) ?assertMatch({ok, connected}, health_check(Config))
), ),
ok ok
end, end,
@ -1480,7 +1491,6 @@ t_connection_timeout_before_starting(Config) ->
ok. ok.
t_pull_worker_death(Config) -> t_pull_worker_death(Config) ->
ResourceId = resource_id(Config),
?check_trace( ?check_trace(
begin begin
?assertMatch( ?assertMatch(
@ -1500,13 +1510,13 @@ t_pull_worker_death(Config) ->
ok ok
after 500 -> ct:fail("pull worker didn't die") after 500 -> ct:fail("pull worker didn't die")
end, end,
?assertMatch({ok, connecting}, emqx_resource_manager:health_check(ResourceId)), ?assertMatch({ok, connecting}, health_check(Config)),
%% recovery %% recovery
?retry( ?retry(
_Interval0 = 200, _Interval0 = 200,
_NAttempts0 = 20, _NAttempts0 = 20,
?assertMatch({ok, connected}, emqx_resource_manager:health_check(ResourceId)) ?assertMatch({ok, connected}, health_check(Config))
), ),
ok ok
@ -1641,19 +1651,32 @@ t_connection_error_while_creating_subscription(Config) ->
ProxyPort = ?config(proxy_port, Config), ProxyPort = ?config(proxy_port, Config),
?check_trace( ?check_trace(
begin begin
emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() -> ?force_ordering(
%% check retries #{?snk_kind := gcp_pubsub_consumer_worker_init},
{ok, SRef0} = #{?snk_kind := will_cut_connection}
snabbkaffe:subscribe( ),
?match_event(#{?snk_kind := "gcp_pubsub_consumer_worker_subscription_error"}), ?force_ordering(
_NEvents0 = 2, #{?snk_kind := connection_down},
10_000 #{?snk_kind := gcp_pubsub_consumer_worker_create_subscription_enter}
), ),
{ok, _} = create_bridge(Config), spawn_link(fun() ->
{ok, _} = snabbkaffe:receive_events(SRef0), ?tp(notice, will_cut_connection, #{}),
ok emqx_common_test_helpers:enable_failure(down, ProxyName, ProxyHost, ProxyPort),
?tp(notice, connection_down, #{})
end), end),
%% eventually succeeds %% check retries
{ok, SRef0} =
snabbkaffe:subscribe(
?match_event(#{?snk_kind := "gcp_pubsub_consumer_worker_subscription_error"}),
_NEvents0 = 2,
10_000
),
{ok, _} = create_bridge(Config),
{ok, _} = snabbkaffe:receive_events(SRef0),
emqx_common_test_helpers:heal_failure(down, ProxyName, ProxyHost, ProxyPort),
%% should eventually succeed
?tp(notice, "waiting for recovery", #{}),
{ok, _} = {ok, _} =
?block_until( ?block_until(
#{?snk_kind := "gcp_pubsub_consumer_worker_subscription_created"}, #{?snk_kind := "gcp_pubsub_consumer_worker_subscription_created"},
@ -1763,7 +1786,6 @@ t_subscription_patch_error(Config) ->
t_topic_deleted_while_creating_subscription(Config) -> t_topic_deleted_while_creating_subscription(Config) ->
[#{pubsub_topic := PubSubTopic}] = ?config(topic_mapping, Config), [#{pubsub_topic := PubSubTopic}] = ?config(topic_mapping, Config),
ResourceId = resource_id(Config),
?check_trace( ?check_trace(
begin begin
?force_ordering( ?force_ordering(
@ -1787,7 +1809,7 @@ t_topic_deleted_while_creating_subscription(Config) ->
#{?snk_kind := gcp_pubsub_consumer_worker_terminate}, #{?snk_kind := gcp_pubsub_consumer_worker_terminate},
10_000 10_000
), ),
?assertMatch({ok, disconnected}, emqx_resource_manager:health_check(ResourceId)), ?assertMatch({ok, disconnected}, health_check(Config)),
ok ok
end, end,
[] []
@ -1797,7 +1819,6 @@ t_topic_deleted_while_creating_subscription(Config) ->
t_topic_deleted_while_patching_subscription(Config) -> t_topic_deleted_while_patching_subscription(Config) ->
BridgeName = ?config(bridge_name, Config), BridgeName = ?config(bridge_name, Config),
[#{pubsub_topic := PubSubTopic}] = ?config(topic_mapping, Config), [#{pubsub_topic := PubSubTopic}] = ?config(topic_mapping, Config),
ResourceId = resource_id(Config),
?check_trace( ?check_trace(
begin begin
{{ok, _}, {ok, _}} = {{ok, _}, {ok, _}} =
@ -1832,7 +1853,7 @@ t_topic_deleted_while_patching_subscription(Config) ->
#{?snk_kind := "gcp_pubsub_consumer_worker_subscription_ready"}, #{?snk_kind := "gcp_pubsub_consumer_worker_subscription_ready"},
10_000 10_000
), ),
?assertMatch({ok, connected}, emqx_resource_manager:health_check(ResourceId)), ?assertMatch({ok, connected}, health_check(Config)),
ok ok
end, end,
[] []
@ -1840,7 +1861,6 @@ t_topic_deleted_while_patching_subscription(Config) ->
ok. ok.
t_subscription_deleted_while_consumer_is_running(Config) -> t_subscription_deleted_while_consumer_is_running(Config) ->
ResourceId = resource_id(Config),
?check_trace( ?check_trace(
begin begin
{{ok, _}, {ok, #{subscription_id := SubscriptionId}}} = {{ok, _}, {ok, #{subscription_id := SubscriptionId}}} =
@ -1870,7 +1890,7 @@ t_subscription_deleted_while_consumer_is_running(Config) ->
{ok, _} = snabbkaffe:receive_events(SRef0), {ok, _} = snabbkaffe:receive_events(SRef0),
{ok, _} = snabbkaffe:receive_events(SRef1), {ok, _} = snabbkaffe:receive_events(SRef1),
?assertMatch({ok, connected}, emqx_resource_manager:health_check(ResourceId)), ?assertMatch({ok, connected}, health_check(Config)),
ok ok
end, end,
[] []
@ -1880,7 +1900,6 @@ t_subscription_deleted_while_consumer_is_running(Config) ->
t_subscription_and_topic_deleted_while_consumer_is_running(Config) -> t_subscription_and_topic_deleted_while_consumer_is_running(Config) ->
ct:timetrap({seconds, 90}), ct:timetrap({seconds, 90}),
[#{pubsub_topic := PubSubTopic}] = ?config(topic_mapping, Config), [#{pubsub_topic := PubSubTopic}] = ?config(topic_mapping, Config),
ResourceId = resource_id(Config),
?check_trace( ?check_trace(
begin begin
{{ok, _}, {ok, #{subscription_id := SubscriptionId}}} = {{ok, _}, {ok, #{subscription_id := SubscriptionId}}} =
@ -1896,7 +1915,11 @@ t_subscription_and_topic_deleted_while_consumer_is_running(Config) ->
delete_subscription(Config, SubscriptionId), delete_subscription(Config, SubscriptionId),
{ok, _} = ?block_until(#{?snk_kind := gcp_pubsub_consumer_worker_terminate}, 60_000), {ok, _} = ?block_until(#{?snk_kind := gcp_pubsub_consumer_worker_terminate}, 60_000),
?assertMatch({ok, disconnected}, emqx_resource_manager:health_check(ResourceId)), ?retry(
_Sleep0 = 100,
_Retries = 20,
?assertMatch({ok, disconnected}, health_check(Config))
),
ok ok
end, end,
[] []
@ -2161,7 +2184,6 @@ t_get_subscription(Config) ->
t_permission_denied_topic_check(Config) -> t_permission_denied_topic_check(Config) ->
[#{pubsub_topic := PubSubTopic}] = ?config(topic_mapping, Config), [#{pubsub_topic := PubSubTopic}] = ?config(topic_mapping, Config),
ResourceId = resource_id(Config),
?check_trace( ?check_trace(
begin begin
%% the emulator does not check any credentials %% the emulator does not check any credentials
@ -2178,19 +2200,17 @@ t_permission_denied_topic_check(Config) ->
end end
end, end,
fun() -> fun() ->
{{ok, _}, {ok, _}} = {ok, _} = create_bridge(Config),
?wait_async_action(
create_bridge(Config),
#{?snk_kind := gcp_pubsub_stop},
5_000
),
?assertMatch( ?assertMatch(
{ok, disconnected}, {ok, disconnected},
emqx_resource_manager:health_check(ResourceId) health_check(Config)
), ),
?assertMatch( ?assertMatch(
{ok, _Group, #{error := {unhealthy_target, "Permission denied" ++ _}}}, #{
emqx_resource_manager:lookup_cached(ResourceId) status := disconnected,
error := {unhealthy_target, "Permission denied" ++ _}
},
health_check_channel(Config)
), ),
ok ok
end end
@ -2236,7 +2256,6 @@ t_permission_denied_worker(Config) ->
t_unauthenticated_topic_check(Config) -> t_unauthenticated_topic_check(Config) ->
[#{pubsub_topic := PubSubTopic}] = ?config(topic_mapping, Config), [#{pubsub_topic := PubSubTopic}] = ?config(topic_mapping, Config),
ResourceId = resource_id(Config),
?check_trace( ?check_trace(
begin begin
%% the emulator does not check any credentials %% the emulator does not check any credentials
@ -2253,19 +2272,17 @@ t_unauthenticated_topic_check(Config) ->
end end
end, end,
fun() -> fun() ->
{{ok, _}, {ok, _}} = {ok, _} = create_bridge(Config),
?wait_async_action(
create_bridge(Config),
#{?snk_kind := gcp_pubsub_stop},
5_000
),
?assertMatch( ?assertMatch(
{ok, disconnected}, {ok, disconnected},
emqx_resource_manager:health_check(ResourceId) health_check(Config)
), ),
?assertMatch( ?assertMatch(
{ok, _Group, #{error := {unhealthy_target, "Permission denied" ++ _}}}, #{
emqx_resource_manager:lookup_cached(ResourceId) status := disconnected,
error := {unhealthy_target, "Permission denied" ++ _}
},
health_check_channel(Config)
), ),
ok ok
end end

View File

@ -26,6 +26,8 @@ resource_type(azure_event_hub_producer) ->
emqx_bridge_kafka_impl_producer; emqx_bridge_kafka_impl_producer;
resource_type(confluent_producer) -> resource_type(confluent_producer) ->
emqx_bridge_kafka_impl_producer; emqx_bridge_kafka_impl_producer;
resource_type(gcp_pubsub_consumer) ->
emqx_bridge_gcp_pubsub_impl_consumer;
resource_type(gcp_pubsub_producer) -> resource_type(gcp_pubsub_producer) ->
emqx_bridge_gcp_pubsub_impl_producer; emqx_bridge_gcp_pubsub_impl_producer;
resource_type(kafka_producer) -> resource_type(kafka_producer) ->
@ -118,6 +120,14 @@ connector_structs() ->
required => false required => false
} }
)}, )},
{gcp_pubsub_consumer,
mk(
hoconsc:map(name, ref(emqx_bridge_gcp_pubsub_consumer_schema, "config_connector")),
#{
desc => <<"GCP PubSub Consumer Connector Config">>,
required => false
}
)},
{gcp_pubsub_producer, {gcp_pubsub_producer,
mk( mk(
hoconsc:map(name, ref(emqx_bridge_gcp_pubsub_producer_schema, "config_connector")), hoconsc:map(name, ref(emqx_bridge_gcp_pubsub_producer_schema, "config_connector")),
@ -309,6 +319,7 @@ schema_modules() ->
[ [
emqx_bridge_azure_event_hub, emqx_bridge_azure_event_hub,
emqx_bridge_confluent_producer, emqx_bridge_confluent_producer,
emqx_bridge_gcp_pubsub_consumer_schema,
emqx_bridge_gcp_pubsub_producer_schema, emqx_bridge_gcp_pubsub_producer_schema,
emqx_bridge_kafka, emqx_bridge_kafka,
emqx_bridge_kinesis, emqx_bridge_kinesis,
@ -344,6 +355,11 @@ api_schemas(Method) ->
api_ref( api_ref(
emqx_bridge_confluent_producer, <<"confluent_producer">>, Method ++ "_connector" emqx_bridge_confluent_producer, <<"confluent_producer">>, Method ++ "_connector"
), ),
api_ref(
emqx_bridge_gcp_pubsub_consumer_schema,
<<"gcp_pubsub_consumer">>,
Method ++ "_connector"
),
api_ref( api_ref(
emqx_bridge_gcp_pubsub_producer_schema, emqx_bridge_gcp_pubsub_producer_schema,
<<"gcp_pubsub_producer">>, <<"gcp_pubsub_producer">>,

View File

@ -126,6 +126,8 @@ connector_type_to_bridge_types(azure_event_hub_producer) ->
[azure_event_hub_producer]; [azure_event_hub_producer];
connector_type_to_bridge_types(confluent_producer) -> connector_type_to_bridge_types(confluent_producer) ->
[confluent_producer]; [confluent_producer];
connector_type_to_bridge_types(gcp_pubsub_consumer) ->
[gcp_pubsub_consumer];
connector_type_to_bridge_types(gcp_pubsub_producer) -> connector_type_to_bridge_types(gcp_pubsub_producer) ->
[gcp_pubsub, gcp_pubsub_producer]; [gcp_pubsub, gcp_pubsub_producer];
connector_type_to_bridge_types(kafka_producer) -> connector_type_to_bridge_types(kafka_producer) ->
@ -527,14 +529,6 @@ fields(connectors) ->
required => false required => false
} }
)} )}
% {mqtt_subscriber,
% mk(
% hoconsc:map(name, ref(emqx_bridge_mqtt_connector_schema, "config_connector")),
% #{
% desc => <<"MQTT Subscriber Connector Config">>,
% required => false
% }
% )}
] ++ enterprise_fields_connectors(); ] ++ enterprise_fields_connectors();
fields("node_status") -> fields("node_status") ->
[ [

View File

@ -115,6 +115,7 @@
| {error, term()}. | {error, term()}.
-type action_resource_id() :: resource_id(). -type action_resource_id() :: resource_id().
-type source_resource_id() :: resource_id().
-type connector_resource_id() :: resource_id(). -type connector_resource_id() :: resource_id().
-type message_tag() :: action_resource_id(). -type message_tag() :: action_resource_id().

View File

@ -0,0 +1,18 @@
emqx_bridge_gcp_pubsub_consumer_schema {
source_parameters.desc:
"""Source specific configs."""
source_parameters.label:
"""Source"""
consumer_source.desc:
"""Source configs."""
consumer_source.label:
"""Source"""
config_connector.desc:
"""Configuration for a GCP PubSub Consumer Client."""
config_connector.label:
"""GCP PubSub Consumer Client Configuration"""
}