From 9a32895a1a6d1f6561131e2f70bbd0b361c81860 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Tue, 6 Feb 2024 16:19:16 -0300 Subject: [PATCH] feat: convert `gcp_pubsub_consumer` to connector/source Fixes https://emqx.atlassian.net/browse/EMQX-11471 --- apps/emqx_bridge/src/emqx_action_info.erl | 1 + apps/emqx_bridge/src/emqx_bridge_v2.erl | 27 +- .../src/schema/emqx_bridge_v2_schema.erl | 17 +- apps/emqx_bridge/test/emqx_bridge_testlib.erl | 8 +- .../src/emqx_bridge_gcp_pubsub.app.src | 7 +- .../src/emqx_bridge_gcp_pubsub_client.erl | 6 +- ...bridge_gcp_pubsub_consumer_action_info.erl | 87 ++++++ ...emqx_bridge_gcp_pubsub_consumer_schema.erl | 253 ++++++++++++++++++ ...emqx_bridge_gcp_pubsub_consumer_worker.erl | 79 +++--- .../emqx_bridge_gcp_pubsub_impl_consumer.erl | 211 ++++++++++----- .../emqx_bridge_gcp_pubsub_consumer_SUITE.erl | 177 ++++++------ .../src/schema/emqx_connector_ee_schema.erl | 16 ++ .../src/schema/emqx_connector_schema.erl | 10 +- apps/emqx_resource/include/emqx_resource.hrl | 1 + ...qx_bridge_gcp_pubsub_consumer_schema.hocon | 18 ++ 15 files changed, 719 insertions(+), 199 deletions(-) create mode 100644 apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_consumer_action_info.erl create mode 100644 apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_consumer_schema.erl create mode 100644 rel/i18n/emqx_bridge_gcp_pubsub_consumer_schema.hocon diff --git a/apps/emqx_bridge/src/emqx_action_info.erl b/apps/emqx_bridge/src/emqx_action_info.erl index e54ef6124..8959a8857 100644 --- a/apps/emqx_bridge/src/emqx_action_info.erl +++ b/apps/emqx_bridge/src/emqx_action_info.erl @@ -88,6 +88,7 @@ hard_coded_action_info_modules_ee() -> [ emqx_bridge_azure_event_hub_action_info, emqx_bridge_confluent_producer_action_info, + emqx_bridge_gcp_pubsub_consumer_action_info, emqx_bridge_gcp_pubsub_producer_action_info, emqx_bridge_kafka_action_info, emqx_bridge_kinesis_action_info, diff --git a/apps/emqx_bridge/src/emqx_bridge_v2.erl b/apps/emqx_bridge/src/emqx_bridge_v2.erl index 7ee384b84..b2b57f0fc 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2.erl @@ -27,7 +27,9 @@ %% Note: this is strange right now, because it lives in `emqx_bridge_v2', but it shall be %% refactored into a new module/application with appropriate name. -define(ROOT_KEY_ACTIONS, actions). +-define(ROOT_KEY_ACTIONS_BIN, <<"actions">>). -define(ROOT_KEY_SOURCES, sources). +-define(ROOT_KEY_SOURCES_BIN, <<"sources">>). %% Loading and unloading config when EMQX starts and stops -export([ @@ -91,6 +93,7 @@ -export([ id/2, id/3, + source_id/3, bridge_v1_is_valid/2, bridge_v1_is_valid/3, extract_connector_id_from_bridge_v2_id/1 @@ -475,7 +478,7 @@ augment_channel_config( bridge_type => bin(BridgeV2Type), bridge_name => bin(BridgeName) }, - case emqx_action_info:is_source(BridgeV2Type) of + case emqx_action_info:is_source(BridgeV2Type) andalso ConfigRoot =:= ?ROOT_KEY_SOURCES of true -> BId = emqx_bridge_resource:bridge_id(BridgeV2Type, BridgeName), BridgeHookpoint = emqx_bridge_resource:bridge_hookpoint(BId), @@ -750,7 +753,9 @@ create_dry_run(ConfRootKey, Type, Conf0) -> not_found -> {error, iolist_to_binary(io_lib:format("Connector ~p not found", [ConnectorName]))}; ConnectorRawConf -> - create_dry_run_helper(Type, ConnectorRawConf, Conf1) + create_dry_run_helper( + ensure_atom_root_key(ConfRootKey), Type, ConnectorRawConf, Conf1 + ) end catch %% validation errors @@ -758,9 +763,6 @@ create_dry_run(ConfRootKey, Type, Conf0) -> {error, Reason1} end. -create_dry_run_helper(BridgeType, ConnectorRawConf, BridgeV2RawConf) -> - create_dry_run_helper(?ROOT_KEY_ACTIONS, BridgeType, ConnectorRawConf, BridgeV2RawConf). - create_dry_run_helper(ConfRootKey, BridgeType, ConnectorRawConf, BridgeV2RawConf) -> BridgeName = iolist_to_binary([?TEST_ID_PREFIX, emqx_utils:gen_id(8)]), ConnectorType = connector_type(BridgeType), @@ -969,6 +971,9 @@ id(BridgeType, BridgeName) -> id(BridgeType, BridgeName, ConnectorName) -> id_with_root_name(?ROOT_KEY_ACTIONS, BridgeType, BridgeName, ConnectorName). +source_id(BridgeType, BridgeName, ConnectorName) -> + id_with_root_name(?ROOT_KEY_SOURCES, BridgeType, BridgeName, ConnectorName). + id_with_root_name(RootName, BridgeType, BridgeName) -> case lookup_conf(RootName, BridgeType, BridgeName) of #{connector := ConnectorName} -> @@ -1633,6 +1638,7 @@ bridge_v1_create_dry_run(BridgeType, RawConfig0) -> PreviousRawConf = undefined, try #{ + conf_root_key := ConfRootKey, connector_type := _ConnectorType, connector_name := _NewConnectorName, connector_conf := ConnectorRawConf, @@ -1640,7 +1646,9 @@ bridge_v1_create_dry_run(BridgeType, RawConfig0) -> bridge_v2_name := _BridgeName, bridge_v2_conf := BridgeV2RawConf } = split_and_validate_bridge_v1_config(BridgeType, TmpName, RawConf, PreviousRawConf), - create_dry_run_helper(BridgeV2Type, ConnectorRawConf, BridgeV2RawConf) + create_dry_run_helper( + ensure_atom_root_key(ConfRootKey), BridgeV2Type, ConnectorRawConf, BridgeV2RawConf + ) catch throw:Reason -> {error, Reason} @@ -1858,6 +1866,13 @@ extract_connector_id_from_bridge_v2_id(Id) -> error({error, iolist_to_binary(io_lib:format("Invalid action ID: ~p", [Id]))}) end. +ensure_atom_root_key(ConfRootKey) when is_atom(ConfRootKey) -> + ConfRootKey; +ensure_atom_root_key(?ROOT_KEY_ACTIONS_BIN) -> + ?ROOT_KEY_ACTIONS; +ensure_atom_root_key(?ROOT_KEY_SOURCES_BIN) -> + ?ROOT_KEY_SOURCES. + to_existing_atom(X) -> case emqx_utils:safe_to_existing_atom(X, utf8) of {ok, A} -> A; diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl index 2708df87d..2ccd04834 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl @@ -64,8 +64,9 @@ make_producer_action_schema/1, make_producer_action_schema/2, make_consumer_action_schema/1, make_consumer_action_schema/2, top_level_common_action_keys/0, + top_level_common_source_keys/0, project_to_actions_resource_opts/1, - project_to_sources_resource_opts/1 + project_to_sources_resource_opts/1, project_to_sources_resource_opts/2 ]). -export([actions_convert_from_connectors/1]). @@ -422,6 +423,16 @@ top_level_common_action_keys() -> <<"resource_opts">> ]. +top_level_common_source_keys() -> + [ + <<"connector">>, + <<"tags">>, + <<"description">>, + <<"enable">>, + <<"parameters">>, + <<"resource_opts">> + ]. + %%====================================================================================== %% Helper functions for making HOCON Schema %%====================================================================================== @@ -474,7 +485,9 @@ project_to_actions_resource_opts(OldResourceOpts) -> maps:with(Subfields, OldResourceOpts). project_to_sources_resource_opts(OldResourceOpts) -> - Subfields = common_source_resource_opts_subfields_bin(), + project_to_sources_resource_opts(OldResourceOpts, common_source_resource_opts_subfields_bin()). + +project_to_sources_resource_opts(OldResourceOpts, Subfields) -> maps:with(Subfields, OldResourceOpts). actions_convert_from_connectors(RawConf = #{<<"actions">> := Actions}) -> diff --git a/apps/emqx_bridge/test/emqx_bridge_testlib.erl b/apps/emqx_bridge/test/emqx_bridge_testlib.erl index 7ab8c68a6..7e186f531 100644 --- a/apps/emqx_bridge/test/emqx_bridge_testlib.erl +++ b/apps/emqx_bridge/test/emqx_bridge_testlib.erl @@ -105,9 +105,15 @@ parse_and_check(BridgeType, BridgeName, ConfigString) -> BridgeConfig. resource_id(Config) -> + BridgeKind = proplists:get_value(bridge_kind, Config, action), + ConfRootKey = + case BridgeKind of + action -> actions; + source -> sources + end, BridgeType = ?config(bridge_type, Config), BridgeName = ?config(bridge_name, Config), - emqx_bridge_resource:resource_id(BridgeType, BridgeName). + emqx_bridge_resource:resource_id(ConfRootKey, BridgeType, BridgeName). create_bridge(Config) -> create_bridge(Config, _Overrides = #{}). diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.app.src b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.app.src index c4a814da9..f4af9445b 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.app.src +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.app.src @@ -8,7 +8,12 @@ emqx_resource, ehttpc ]}, - {env, [{emqx_action_info_modules, [emqx_bridge_gcp_pubsub_producer_action_info]}]}, + {env, [ + {emqx_action_info_modules, [ + emqx_bridge_gcp_pubsub_producer_action_info, + emqx_bridge_gcp_pubsub_consumer_action_info + ]} + ]}, {modules, []}, {links, []} ]}. diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_client.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_client.erl index 17bc10c4b..45d408629 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_client.erl +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_client.erl @@ -180,17 +180,17 @@ query_async( ), do_send_requests_async(State, {prepared_request, PreparedRequest, ReqOpts}, ReplyFunAndArgs). --spec get_status(state()) -> connected | disconnected. +-spec get_status(state()) -> ?status_connected | ?status_disconnected. get_status(#{connect_timeout := Timeout, pool_name := PoolName} = State) -> case do_get_status(PoolName, Timeout) of true -> - connected; + ?status_connected; false -> ?SLOG(error, #{ msg => "gcp_pubsub_bridge_get_status_failed", state => State }), - disconnected + ?status_disconnected end. %%------------------------------------------------------------------------------------------------- diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_consumer_action_info.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_consumer_action_info.erl new file mode 100644 index 000000000..16129b997 --- /dev/null +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_consumer_action_info.erl @@ -0,0 +1,87 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_gcp_pubsub_consumer_action_info). + +-behaviour(emqx_action_info). + +-export([ + is_source/0, + is_action/0, + bridge_v1_type_name/0, + action_type_name/0, + connector_type_name/0, + schema_module/0, + bridge_v1_config_to_action_config/2, + connector_action_config_to_bridge_v1_config/2 +]). + +is_source() -> true. + +is_action() -> false. + +bridge_v1_type_name() -> gcp_pubsub_consumer. + +action_type_name() -> gcp_pubsub_consumer. + +connector_type_name() -> gcp_pubsub_consumer. + +schema_module() -> emqx_bridge_gcp_pubsub_consumer_schema. + +bridge_v1_config_to_action_config(BridgeV1Config, ConnectorName) -> + CommonSourceKeys = emqx_bridge_v2_schema:top_level_common_source_keys(), + ParamsKeys = source_action_parameters_field_keys(), + Config1 = maps:with(CommonSourceKeys, BridgeV1Config), + ConsumerCfg = maps:get(<<"consumer">>, BridgeV1Config, #{}), + Params = maps:with(ParamsKeys, ConsumerCfg), + {source, gcp_pubsub_consumer, + emqx_utils_maps:update_if_present( + <<"resource_opts">>, + fun(Cfg) -> + emqx_bridge_v2_schema:project_to_sources_resource_opts( + Cfg, + resource_opts_fields() + ) + end, + Config1#{ + <<"connector">> => ConnectorName, + <<"parameters">> => Params + } + )}. + +connector_action_config_to_bridge_v1_config(ConnectorConfig, SourceConfig) -> + BridgeV1Config1 = maps:remove(<<"connector">>, SourceConfig), + BridgeV1Config2 = emqx_utils_maps:deep_merge(ConnectorConfig, BridgeV1Config1), + BridgeV1Config3 = + emqx_utils_maps:update_if_present( + <<"resource_opts">>, + fun(RO) -> maps:with(bridge_v1_resource_opts_fields(), RO) end, + BridgeV1Config2 + ), + emqx_utils_maps:rename(<<"parameters">>, <<"consumer">>, BridgeV1Config3). + +%%------------------------------------------------------------------------------------------ +%% Internal helper fns +%%------------------------------------------------------------------------------------------ + +resource_opts_fields() -> + [ + to_bin(K) + || {K, _} <- emqx_bridge_gcp_pubsub_consumer_schema:fields(source_resource_opts) + ]. + +bridge_v1_resource_opts_fields() -> + [ + to_bin(K) + || {K, _} <- emqx_bridge_gcp_pubsub:fields("consumer_resource_opts") + ]. + +source_action_parameters_field_keys() -> + [ + to_bin(K) + || {K, _} <- emqx_bridge_gcp_pubsub_consumer_schema:fields(source_parameters) + ]. + +to_bin(L) when is_list(L) -> list_to_binary(L); +to_bin(A) when is_atom(A) -> atom_to_binary(A, utf8). diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_consumer_schema.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_consumer_schema.erl new file mode 100644 index 000000000..0be13d64a --- /dev/null +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_consumer_schema.erl @@ -0,0 +1,253 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_gcp_pubsub_consumer_schema). + +-import(hoconsc, [mk/2, ref/2]). + +-include_lib("typerefl/include/types.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). + +%% `hocon_schema' API +-export([ + namespace/0, + roots/0, + fields/1, + desc/1 +]). + +%% `emqx_bridge_v2_schema' "unofficial" API +-export([ + source_examples/1, + conn_bridge_examples/1, + connector_examples/1 +]). + +-define(CONNECTOR_TYPE, gcp_pubsub_consumer). +-define(SOURCE_TYPE, gcp_pubsub_consumer). + +%%------------------------------------------------------------------------------------------------- +%% `hocon_schema' API +%%------------------------------------------------------------------------------------------------- + +namespace() -> + "gcp_pubsub_consumer". + +roots() -> + []. + +%%========================================= +%% Action fields +%%========================================= +fields(source) -> + {gcp_pubsub_consumer, + mk( + hoconsc:map(name, ref(?MODULE, consumer_source)), + #{ + desc => <<"GCP PubSub Consumer Source Config">>, + required => false + } + )}; +fields(consumer_source) -> + emqx_bridge_v2_schema:make_consumer_action_schema( + mk( + ref(?MODULE, source_parameters), + #{ + required => true, + desc => ?DESC(consumer_source) + } + ), + #{resource_opts_ref => ref(?MODULE, source_resource_opts)} + ); +fields(source_parameters) -> + %% FIXME: check + emqx_bridge_gcp_pubsub:fields(consumer); +fields(source_resource_opts) -> + Fields = [ + health_check_interval, + %% the workers pull the messages + request_ttl, + resume_interval + ], + lists:filter( + fun({Key, _Sc}) -> lists:member(Key, Fields) end, + emqx_resource_schema:create_opts( + _Overrides = [ + {health_check_interval, #{default => <<"30s">>}} + ] + ) + ); +%%========================================= +%% Connector fields +%%========================================= +fields("config_connector") -> + %% FIXME + emqx_connector_schema:common_fields() ++ + connector_config_fields(); +fields(connector_resource_opts) -> + emqx_connector_schema:resource_opts_fields(); +%%========================================= +%% HTTP API fields: source +%%========================================= +fields(Field) when + Field == "get_source"; + Field == "post_source"; + Field == "put_source" +-> + emqx_bridge_v2_schema:api_fields(Field, ?SOURCE_TYPE, fields(consumer_source)); +%%========================================= +%% HTTP API fields: connector +%%========================================= +fields(Field) when + Field == "get_connector"; + Field == "put_connector"; + Field == "post_connector" +-> + emqx_connector_schema:api_fields(Field, ?CONNECTOR_TYPE, connector_config_fields()). + +connector_config_fields() -> + emqx_bridge_gcp_pubsub:fields(connector_config) ++ + emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts). + +desc("config_connector") -> + ?DESC("config_connector"); +desc(source_parameters) -> + ?DESC(source_parameters); +desc(consumer_source) -> + ?DESC(consumer_source); +desc(connector_resource_opts) -> + ?DESC(emqx_resource_schema, "resource_opts"); +desc(source_resource_opts) -> + ?DESC(emqx_resource_schema, "resource_opts"); +desc(_Name) -> + undefined. + +%%------------------------------------------------------------------------------------------------- +%% `emqx_bridge_v2_schema' "unofficial" API +%%------------------------------------------------------------------------------------------------- + +source_examples(Method) -> + [ + #{ + <<"gcp_pubsub_consumer">> => #{ + summary => <<"GCP PubSub Consumer Source">>, + value => source_example(Method) + } + } + ]. + +connector_examples(Method) -> + [ + #{ + <<"gcp_pubsub_consumer">> => #{ + summary => <<"GCP PubSub Consumer Connector">>, + value => connector_example(Method) + } + } + ]. + +conn_bridge_examples(Method) -> + emqx_bridge_gcp_pubsub:conn_bridge_examples(Method). + +source_example(post) -> + maps:merge( + source_example(put), + #{ + type => <<"gcp_pubsub_consumer">>, + name => <<"my_action">> + } + ); +source_example(get) -> + maps:merge( + source_example(put), + #{ + status => <<"connected">>, + node_status => [ + #{ + node => <<"emqx@localhost">>, + status => <<"connected">> + } + ] + } + ); +source_example(put) -> + #{ + enable => true, + connector => <<"my_connector_name">>, + description => <<"My action">>, + local_topic => <<"local/topic">>, + resource_opts => + #{batch_size => 5}, + parameters => + #{ + pubsub_topic => <<"mytopic">>, + ordering_key_template => <<"${payload.ok}">>, + payload_template => <<"${payload}">>, + attributes_template => + [ + #{ + key => <<"${payload.attrs.k}">>, + value => <<"${payload.attrs.v}">> + } + ] + } + }. + +connector_example(get) -> + maps:merge( + connector_example(post), + #{ + status => <<"connected">>, + node_status => [ + #{ + node => <<"emqx@localhost">>, + status => <<"connected">> + } + ], + actions => [<<"my_action">>] + } + ); +connector_example(post) -> + maps:merge( + connector_example(put), + #{ + type => <<"gcp_pubsub_producer">>, + name => <<"my_connector">> + } + ); +connector_example(put) -> + %% FIXME: revisit + #{ + enable => true, + connect_timeout => <<"10s">>, + pool_size => 8, + pipelining => 100, + max_retries => 2, + resource_opts => #{request_ttl => <<"60s">>}, + service_account_json => + #{ + auth_provider_x509_cert_url => + <<"https://www.googleapis.com/oauth2/v1/certs">>, + auth_uri => + <<"https://accounts.google.com/o/oauth2/auth">>, + client_email => + <<"test@myproject.iam.gserviceaccount.com">>, + client_id => <<"123812831923812319190">>, + client_x509_cert_url => + << + "https://www.googleapis.com/robot/v1/" + "metadata/x509/test%40myproject.iam.gserviceaccount.com" + >>, + private_key => + << + "-----BEGIN PRIVATE KEY-----\n" + "MIIEvQI..." + >>, + private_key_id => <<"kid">>, + project_id => <<"myproject">>, + token_uri => + <<"https://oauth2.googleapis.com/token">>, + type => <<"service_account">> + } + }. diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_consumer_worker.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_consumer_worker.erl index c25e7e1ea..39e1c19d5 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_consumer_worker.erl +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_consumer_worker.erl @@ -37,8 +37,9 @@ client := emqx_bridge_gcp_pubsub_client:state(), ecpool_worker_id => non_neg_integer(), forget_interval := duration(), - hookpoint := binary(), - instance_id := binary(), + hookpoints := [binary()], + connector_resource_id := binary(), + source_resource_id := binary(), mqtt_config => emqx_bridge_gcp_pubsub_impl_consumer:mqtt_config(), project_id := emqx_bridge_gcp_pubsub_client:project_id(), pull_max_messages := non_neg_integer(), @@ -55,8 +56,9 @@ client := emqx_bridge_gcp_pubsub_client:state(), ecpool_worker_id := non_neg_integer(), forget_interval := duration(), - hookpoint := binary(), - instance_id := binary(), + hookpoints := [binary()], + connector_resource_id := binary(), + source_resource_id := binary(), mqtt_config := emqx_bridge_gcp_pubsub_impl_consumer:mqtt_config(), pending_acks := #{message_id() => ack_id()}, project_id := emqx_bridge_gcp_pubsub_client:project_id(), @@ -97,7 +99,7 @@ ensure_subscription(WorkerPid) -> gen_server:cast(WorkerPid, ensure_subscription). -spec reply_delegator(pid(), pull_async, binary(), {ok, map()} | {error, timeout | term()}) -> ok. -reply_delegator(WorkerPid, pull_async = _Action, InstanceId, Result) -> +reply_delegator(WorkerPid, pull_async = _Action, SourceResId, Result) -> ?tp(gcp_pubsub_consumer_worker_reply_delegator, #{result => Result}), case Result of {error, timeout} -> @@ -107,7 +109,7 @@ reply_delegator(WorkerPid, pull_async = _Action, InstanceId, Result) -> warning, "gcp_pubsub_consumer_worker_pull_error", #{ - instance_id => InstanceId, + instance_id => SourceResId, reason => Reason } ), @@ -156,8 +158,9 @@ connect(Opts0) -> client := Client, ecpool_worker_id := WorkerId, forget_interval := ForgetInterval, - hookpoint := Hookpoint, - instance_id := InstanceId, + hookpoints := Hookpoints, + connector_resource_id := ConnectorResId, + source_resource_id := SourceResId, project_id := ProjectId, pull_max_messages := PullMaxMessages, pull_retry_interval := PullRetryInterval, @@ -175,8 +178,9 @@ connect(Opts0) -> %% workers. client => Client, forget_interval => ForgetInterval, - hookpoint => Hookpoint, - instance_id => InstanceId, + hookpoints => Hookpoints, + connector_resource_id => ConnectorResId, + source_resource_id => SourceResId, mqtt_config => MQTTConfig, project_id => ProjectId, pull_max_messages => PullMaxMessages, @@ -185,6 +189,7 @@ connect(Opts0) -> topic => Topic, subscription_id => subscription_id(BridgeName, Topic) }, + ?tp(gcp_pubsub_consumer_worker_about_to_spawn, #{}), start_link(Config). %%------------------------------------------------------------------------------------------------- @@ -209,13 +214,13 @@ handle_continue(?ensure_subscription, State0) -> already_exists -> {noreply, State0, {continue, ?patch_subscription}}; continue -> - #{instance_id := InstanceId} = State0, + #{source_resource_id := SourceResId} = State0, ?MODULE:pull_async(self()), optvar:set(?OPTVAR_SUB_OK(self()), subscription_ok), ?tp( debug, "gcp_pubsub_consumer_worker_subscription_ready", - #{instance_id => InstanceId} + #{instance_id => SourceResId} ), {noreply, State0}; retry -> @@ -232,13 +237,13 @@ handle_continue(?patch_subscription, State0) -> ?tp(gcp_pubsub_consumer_worker_patch_subscription_enter, #{}), case patch_subscription(State0) of ok -> - #{instance_id := InstanceId} = State0, + #{source_resource_id := SourceResId} = State0, ?MODULE:pull_async(self()), optvar:set(?OPTVAR_SUB_OK(self()), subscription_ok), ?tp( debug, "gcp_pubsub_consumer_worker_subscription_ready", - #{instance_id => InstanceId} + #{instance_id => SourceResId} ), {noreply, State0}; error -> @@ -292,13 +297,13 @@ handle_info({forget_message_ids, MsgIds}, State0) -> {noreply, State}; handle_info(Msg, State0) -> #{ - instance_id := InstanceId, + source_resource_id := SoureceResId, topic := Topic } = State0, ?SLOG(debug, #{ msg => "gcp_pubsub_consumer_worker_unexpected_message", unexpected_msg => Msg, - instance_id => InstanceId, + instance_id => SoureceResId, topic => Topic }), {noreply, State0}. @@ -309,11 +314,11 @@ terminate({error, Reason}, State) when Reason =:= permission_denied -> #{ - instance_id := InstanceId, + source_resource_id := SourceResId, topic := _Topic } = State, optvar:unset(?OPTVAR_SUB_OK(self())), - emqx_bridge_gcp_pubsub_impl_consumer:mark_as_unhealthy(InstanceId, Reason), + emqx_bridge_gcp_pubsub_impl_consumer:mark_as_unhealthy(SourceResId, Reason), ?tp(gcp_pubsub_consumer_worker_terminate, #{reason => {error, Reason}, topic => _Topic}), ok; terminate(_Reason, _State) -> @@ -351,7 +356,7 @@ ensure_subscription_exists(State) -> ?tp(gcp_pubsub_consumer_worker_create_subscription_enter, #{}), #{ client := Client, - instance_id := InstanceId, + source_resource_id := SourceResId, request_ttl := RequestTTL, subscription_id := SubscriptionId, topic := Topic @@ -369,7 +374,7 @@ ensure_subscription_exists(State) -> debug, "gcp_pubsub_consumer_worker_subscription_already_exists", #{ - instance_id => InstanceId, + instance_id => SourceResId, topic => Topic, subscription_id => SubscriptionId } @@ -381,7 +386,7 @@ ensure_subscription_exists(State) -> warning, "gcp_pubsub_consumer_worker_nonexistent_topic", #{ - instance_id => InstanceId, + instance_id => SourceResId, topic => Topic } ), @@ -392,7 +397,7 @@ ensure_subscription_exists(State) -> warning, "gcp_pubsub_consumer_worker_permission_denied", #{ - instance_id => InstanceId, + instance_id => SourceResId, topic => Topic } ), @@ -403,7 +408,7 @@ ensure_subscription_exists(State) -> warning, "gcp_pubsub_consumer_worker_bad_credentials", #{ - instance_id => InstanceId, + instance_id => SourceResId, topic => Topic } ), @@ -413,7 +418,7 @@ ensure_subscription_exists(State) -> debug, "gcp_pubsub_consumer_worker_subscription_created", #{ - instance_id => InstanceId, + instance_id => SourceResId, topic => Topic, subscription_id => SubscriptionId } @@ -424,7 +429,7 @@ ensure_subscription_exists(State) -> error, "gcp_pubsub_consumer_worker_subscription_error", #{ - instance_id => InstanceId, + instance_id => SourceResId, topic => Topic, reason => Reason } @@ -436,7 +441,7 @@ ensure_subscription_exists(State) -> patch_subscription(State) -> #{ client := Client, - instance_id := InstanceId, + source_resource_id := SourceResId, subscription_id := SubscriptionId, request_ttl := RequestTTL, topic := Topic @@ -453,7 +458,7 @@ patch_subscription(State) -> debug, "gcp_pubsub_consumer_worker_subscription_patched", #{ - instance_id => InstanceId, + instance_id => SourceResId, topic => Topic, subscription_id => SubscriptionId, result => Res @@ -465,7 +470,7 @@ patch_subscription(State) -> warning, "gcp_pubsub_consumer_worker_subscription_patch_error", #{ - instance_id => InstanceId, + instance_id => SourceResId, topic => Topic, subscription_id => SubscriptionId, reason => Reason @@ -483,7 +488,7 @@ do_pull_async(State0) -> begin #{ client := Client, - instance_id := InstanceId, + source_resource_id := SourceResId, request_ttl := RequestTTL } = State0, Method = post, @@ -491,7 +496,7 @@ do_pull_async(State0) -> Body = body(State0, pull), ReqOpts = #{request_ttl => RequestTTL}, PreparedRequest = {prepared_request, {Method, Path, Body}, ReqOpts}, - ReplyFunAndArgs = {fun ?MODULE:reply_delegator/4, [self(), pull_async, InstanceId]}, + ReplyFunAndArgs = {fun ?MODULE:reply_delegator/4, [self(), pull_async, SourceResId]}, Res = emqx_bridge_gcp_pubsub_client:query_async( PreparedRequest, ReplyFunAndArgs, @@ -734,8 +739,8 @@ handle_message(State, #{<<"ackId">> := AckId, <<"message">> := InnerMsg} = _Mess #{message_id => maps:get(<<"messageId">>, InnerMsg), message => _Message, ack_id => AckId}, begin #{ - instance_id := InstanceId, - hookpoint := Hookpoint, + source_resource_id := SourceResId, + hookpoints := Hookpoints, mqtt_config := #{ payload_template := PayloadTemplate, qos := MQTTQoS, @@ -764,11 +769,15 @@ handle_message(State, #{<<"ackId">> := AckId, <<"message">> := InnerMsg} = _Mess {<<"orderingKey">>, ordering_key} ] ), + %% TODO: this should be optional Payload = render(FullMessage, PayloadTemplate), - MQTTMessage = emqx_message:make(InstanceId, MQTTQoS, MQTTTopic, Payload), + MQTTMessage = emqx_message:make(SourceResId, MQTTQoS, MQTTTopic, Payload), _ = emqx:publish(MQTTMessage), - emqx_hooks:run(Hookpoint, [FullMessage]), - emqx_resource_metrics:received_inc(InstanceId), + lists:foreach( + fun(Hookpoint) -> emqx_hooks:run(Hookpoint, [FullMessage]) end, + Hookpoints + ), + emqx_resource_metrics:received_inc(SourceResId), ok end ). diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_consumer.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_consumer.erl index bcc5c818e..9da56c677 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_consumer.erl +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_consumer.erl @@ -12,7 +12,12 @@ query_mode/1, on_start/2, on_stop/2, - on_get_status/2 + on_get_status/2, + + on_add_channel/4, + on_remove_channel/3, + on_get_channels/1, + on_get_channel_status/3 ]). %% health check API @@ -31,7 +36,7 @@ qos := emqx_types:qos(), payload_template := string() }. --type config() :: #{ +-type connector_config() :: #{ connect_timeout := emqx_schema:duration_ms(), max_retries := non_neg_integer(), pool_size := non_neg_integer(), @@ -39,9 +44,27 @@ service_account_json := emqx_bridge_gcp_pubsub_client:service_account_json(), any() => term() }. --type state() :: #{ - client := emqx_bridge_gcp_pubsub_client:state() +-type connector_state() :: #{ + client := emqx_bridge_gcp_pubsub_client:state(), + installed_sources := #{source_resource_id() => source_state()}, + project_id := binary() }. +-type topic_mapping() :: #{ + pubsub_topic := binary(), + mqtt_topic := binary(), + qos := emqx_types:qos(), + payload_template := binary() +}. +-type source_config() :: #{ + bridge_name := binary(), + hookpoints := [binary()], + parameters := #{ + consumer_workers_per_topic := pos_integer(), + topic_mapping := [topic_mapping(), ...] + }, + resource_opts := #{request_ttl := infinity | emqx_schema:duration_ms(), any() => term()} +}. +-type source_state() :: #{}. -export_type([mqtt_config/0]). @@ -67,38 +90,91 @@ callback_mode() -> async_if_possible. -spec query_mode(any()) -> query_mode(). query_mode(_Config) -> no_queries. --spec on_start(resource_id(), config()) -> {ok, state()} | {error, term()}. -on_start(InstanceId, Config0) -> +-spec on_start(connector_resource_id(), connector_config()) -> + {ok, connector_state()} | {error, term()}. +on_start(ConnectorResId, Config0) -> %% ensure it's a binary key map Config = maps:update_with(service_account_json, fun emqx_utils_maps:binary_key_map/1, Config0), - case emqx_bridge_gcp_pubsub_client:start(InstanceId, Config) of + #{service_account_json := #{<<"project_id">> := ProjectId}} = Config, + case emqx_bridge_gcp_pubsub_client:start(ConnectorResId, Config) of {ok, Client} -> - start_consumers(InstanceId, Client, Config); + ConnectorState = #{ + client => Client, + installed_sources => #{}, + project_id => ProjectId + }, + {ok, ConnectorState}; Error -> Error end. --spec on_stop(resource_id(), state()) -> ok | {error, term()}. -on_stop(InstanceId, _State) -> +-spec on_stop(resource_id(), connector_state()) -> ok | {error, term()}. +on_stop(ConnectorResId, ConnectorState) -> ?tp(gcp_pubsub_consumer_stop_enter, #{}), - clear_unhealthy(InstanceId), - ok = stop_consumers(InstanceId), - emqx_bridge_gcp_pubsub_client:stop(InstanceId). + clear_unhealthy(ConnectorState), + ok = stop_consumers(ConnectorState), + emqx_bridge_gcp_pubsub_client:stop(ConnectorResId). --spec on_get_status(resource_id(), state()) -> connected | connecting | {disconnected, state(), _}. -on_get_status(InstanceId, State) -> +-spec on_get_status(resource_id(), connector_state()) -> + ?status_connected | ?status_connecting. +on_get_status(_ConnectorResId, ConnectorState) -> + #{client := Client} = ConnectorState, + get_client_status(Client). + +-spec on_add_channel( + connector_resource_id(), + connector_state(), + source_resource_id(), + source_config() +) -> + {ok, connector_state()}. +on_add_channel(ConnectorResId, ConnectorState0, SourceResId, SourceConfig) -> + #{ + client := Client, + installed_sources := InstalledSources0, + project_id := ProjectId + } = ConnectorState0, + case start_consumers(ConnectorResId, SourceResId, Client, ProjectId, SourceConfig) of + {ok, SourceState} -> + InstalledSources = InstalledSources0#{SourceResId => SourceState}, + ConnectorState = ConnectorState0#{installed_sources := InstalledSources}, + {ok, ConnectorState}; + Error = {error, _} -> + Error + end. + +-spec on_remove_channel( + connector_resource_id(), + connector_state(), + source_resource_id() +) -> + {ok, connector_state()}. +on_remove_channel(_ConnectorResId, ConnectorState0, SourceResId) -> + #{installed_sources := InstalledSources0} = ConnectorState0, + InstalledSources = maps:remove(SourceResId, InstalledSources0), + ConnectorState = ConnectorState0#{installed_sources := InstalledSources}, + {ok, ConnectorState}. + +-spec on_get_channels(connector_resource_id()) -> + [{action_resource_id(), source_config()}]. +on_get_channels(ConnectorResId) -> + emqx_bridge_v2:get_channels_for_connector(ConnectorResId). + +-spec on_get_channel_status(connector_resource_id(), source_resource_id(), connector_state()) -> + health_check_status(). +on_get_channel_status(_ConnectorResId, SourceResId, ConnectorState) -> %% We need to check this flag separately because the workers might be gone when we %% check them. - case check_if_unhealthy(InstanceId) of + case check_if_unhealthy(SourceResId) of {error, topic_not_found} -> - {disconnected, State, {unhealthy_target, ?TOPIC_MESSAGE}}; + {?status_disconnected, {unhealthy_target, ?TOPIC_MESSAGE}}; {error, permission_denied} -> - {disconnected, State, {unhealthy_target, ?PERMISSION_MESSAGE}}; + {?status_disconnected, {unhealthy_target, ?PERMISSION_MESSAGE}}; {error, bad_credentials} -> - {disconnected, State, {unhealthy_target, ?PERMISSION_MESSAGE}}; + {?status_disconnected, {unhealthy_target, ?PERMISSION_MESSAGE}}; ok -> - #{client := Client} = State, - check_workers(InstanceId, Client) + #{client := Client} = ConnectorState, + check_workers(SourceResId, Client) end. %%------------------------------------------------------------------------------------------------- @@ -106,29 +182,35 @@ on_get_status(InstanceId, State) -> %%------------------------------------------------------------------------------------------------- -spec mark_as_unhealthy( - resource_id(), + source_resource_id(), topic_not_found | permission_denied | bad_credentials ) -> ok. -mark_as_unhealthy(InstanceId, Reason) -> - optvar:set(?OPTVAR_UNHEALTHY(InstanceId), Reason), +mark_as_unhealthy(SourceResId, Reason) -> + optvar:set(?OPTVAR_UNHEALTHY(SourceResId), Reason), ok. --spec clear_unhealthy(resource_id()) -> ok. -clear_unhealthy(InstanceId) -> - optvar:unset(?OPTVAR_UNHEALTHY(InstanceId)), +-spec clear_unhealthy(connector_state()) -> ok. +clear_unhealthy(ConnectorState) -> + #{installed_sources := InstalledSources} = ConnectorState, + maps:foreach( + fun(SourceResId, _SourceState) -> + optvar:unset(?OPTVAR_UNHEALTHY(SourceResId)) + end, + InstalledSources + ), ?tp(gcp_pubsub_consumer_clear_unhealthy, #{}), ok. --spec check_if_unhealthy(resource_id()) -> +-spec check_if_unhealthy(source_resource_id()) -> ok | {error, topic_not_found | permission_denied | bad_credentials}. -check_if_unhealthy(InstanceId) -> - case optvar:peek(?OPTVAR_UNHEALTHY(InstanceId)) of +check_if_unhealthy(SourceResId) -> + case optvar:peek(?OPTVAR_UNHEALTHY(SourceResId)) of {ok, Reason} -> {error, Reason}; undefined -> @@ -139,14 +221,13 @@ check_if_unhealthy(InstanceId) -> %% Internal fns %%------------------------------------------------------------------------------------------------- -start_consumers(InstanceId, Client, Config) -> +start_consumers(ConnectorResId, SourceResId, Client, ProjectId, SourceConfig) -> #{ bridge_name := BridgeName, - consumer := ConsumerConfig0, - hookpoint := Hookpoint, - resource_opts := #{request_ttl := RequestTTL}, - service_account_json := #{<<"project_id">> := ProjectId} - } = Config, + parameters := ConsumerConfig0, + hookpoints := Hookpoints, + resource_opts := #{request_ttl := RequestTTL} + } = SourceConfig, ConsumerConfig1 = maps:update_with(topic_mapping, fun convert_topic_mapping/1, ConsumerConfig0), TopicMapping = maps:get(topic_mapping, ConsumerConfig1), ConsumerWorkersPerTopic = maps:get(consumer_workers_per_topic, ConsumerConfig1), @@ -156,8 +237,9 @@ start_consumers(InstanceId, Client, Config) -> bridge_name => BridgeName, client => Client, forget_interval => forget_interval(RequestTTL), - hookpoint => Hookpoint, - instance_id => InstanceId, + hookpoints => Hookpoints, + connector_resource_id => ConnectorResId, + source_resource_id => SourceResId, pool_size => PoolSize, project_id => ProjectId, pull_retry_interval => RequestTTL, @@ -169,17 +251,14 @@ start_consumers(InstanceId, Client, Config) -> ok -> ok; {error, not_found} -> - _ = emqx_bridge_gcp_pubsub_client:stop(InstanceId), throw( {unhealthy_target, ?TOPIC_MESSAGE} ); {error, permission_denied} -> - _ = emqx_bridge_gcp_pubsub_client:stop(InstanceId), throw( {unhealthy_target, ?PERMISSION_MESSAGE} ); {error, bad_credentials} -> - _ = emqx_bridge_gcp_pubsub_client:stop(InstanceId), throw( {unhealthy_target, ?PERMISSION_MESSAGE} ); @@ -190,30 +269,34 @@ start_consumers(InstanceId, Client, Config) -> ok end, case - emqx_resource_pool:start(InstanceId, emqx_bridge_gcp_pubsub_consumer_worker, ConsumerOpts) + emqx_resource_pool:start(SourceResId, emqx_bridge_gcp_pubsub_consumer_worker, ConsumerOpts) of ok -> State = #{ client => Client, - pool_name => InstanceId + pool_name => SourceResId }, {ok, State}; {error, Reason} -> - _ = emqx_bridge_gcp_pubsub_client:stop(InstanceId), {error, Reason} end. -stop_consumers(InstanceId) -> - _ = log_when_error( - fun() -> - ok = emqx_resource_pool:stop(InstanceId) +stop_consumers(ConnectorState) -> + #{installed_sources := InstalledSources} = ConnectorState, + maps:foreach( + fun(SourceResId, _SourceState) -> + _ = log_when_error( + fun() -> + ok = emqx_resource_pool:stop(SourceResId) + end, + #{ + msg => "failed_to_stop_pull_worker_pool", + instance_id => SourceResId + } + ) end, - #{ - msg => "failed_to_stop_pull_worker_pool", - instance_id => InstanceId - } - ), - ok. + InstalledSources + ). convert_topic_mapping(TopicMappingList) -> lists:foldl( @@ -268,35 +351,37 @@ check_for_topic_existence(Topic, Client, ReqOpts) -> {error, Reason} end. --spec get_client_status(emqx_bridge_gcp_pubsub_client:state()) -> connected | connecting. +-spec get_client_status(emqx_bridge_gcp_pubsub_client:state()) -> + ?status_connected | ?status_connecting. get_client_status(Client) -> case emqx_bridge_gcp_pubsub_client:get_status(Client) of - disconnected -> connecting; - connected -> connected + ?status_disconnected -> ?status_connecting; + ?status_connected -> ?status_connected end. --spec check_workers(resource_id(), emqx_bridge_gcp_pubsub_client:state()) -> connected | connecting. -check_workers(InstanceId, Client) -> +-spec check_workers(source_resource_id(), emqx_bridge_gcp_pubsub_client:state()) -> + ?status_connected | ?status_connecting. +check_workers(SourceResId, Client) -> case emqx_resource_pool:health_check_workers( - InstanceId, + SourceResId, fun emqx_bridge_gcp_pubsub_consumer_worker:health_check/1, emqx_resource_pool:health_check_timeout(), #{return_values => true} ) of {ok, []} -> - connecting; + ?status_connecting; {ok, Values} -> AllOk = lists:all(fun(S) -> S =:= subscription_ok end, Values), case AllOk of true -> get_client_status(Client); false -> - connecting + ?status_connecting end; {error, _} -> - connecting + ?status_connecting end. log_when_error(Fun, Log) -> diff --git a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl index 4b179b050..78167d439 100644 --- a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl +++ b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl @@ -13,6 +13,8 @@ -define(BRIDGE_TYPE, gcp_pubsub_consumer). -define(BRIDGE_TYPE_BIN, <<"gcp_pubsub_consumer">>). +-define(CONNECTOR_TYPE_BIN, <<"gcp_pubsub_consumer">>). +-define(SOURCE_TYPE_BIN, <<"gcp_pubsub_consumer">>). -define(REPUBLISH_TOPIC, <<"republish/t">>). -define(PREPARED_REQUEST(METHOD, PATH, BODY), {prepared_request, {METHOD, PATH, BODY}, #{request_ttl => 1_000}} @@ -147,6 +149,7 @@ common_init_per_testcase(TestCase, Config0) -> ensure_topics(Config), ok = snabbkaffe:start_trace(), [ + {bridge_kind, source}, {bridge_type, ?BRIDGE_TYPE}, {bridge_name, Name}, {bridge_config, ConsumerConfig}, @@ -157,18 +160,13 @@ common_init_per_testcase(TestCase, Config0) -> ]. end_per_testcase(_Testcase, Config) -> - case proplists:get_bool(skip_does_not_apply, Config) of - true -> - ok; - false -> - ProxyHost = ?config(proxy_host, Config), - ProxyPort = ?config(proxy_port, Config), - emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), - emqx_bridge_testlib:delete_all_bridges(), - emqx_common_test_helpers:call_janitor(60_000), - ok = snabbkaffe:stop(), - ok - end. + ProxyHost = ?config(proxy_host, Config), + ProxyPort = ?config(proxy_port, Config), + emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), + emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(), + emqx_common_test_helpers:call_janitor(60_000), + ok = snabbkaffe:stop(), + ok. %%------------------------------------------------------------------------------ %% Helper fns @@ -421,9 +419,22 @@ start_and_subscribe_mqtt(Config) -> ok. resource_id(Config) -> - Type = ?BRIDGE_TYPE_BIN, Name = ?config(consumer_name, Config), - emqx_bridge_resource:resource_id(Type, Name). + emqx_bridge_v2:source_id(?SOURCE_TYPE_BIN, Name, Name). + +connector_resource_id(Config) -> + Name = ?config(consumer_name, Config), + emqx_connector_resource:resource_id(?CONNECTOR_TYPE_BIN, Name). + +health_check(Config) -> + #{status := Status} = health_check_channel(Config), + {ok, Status}. + +health_check_channel(Config) -> + Name = ?config(consumer_name, Config), + ConnectorResId = emqx_connector_resource:resource_id(?CONNECTOR_TYPE_BIN, Name), + SourceResId = resource_id(Config), + emqx_resource_manager:channel_health_check(ConnectorResId, SourceResId). bridge_id(Config) -> Type = ?BRIDGE_TYPE_BIN, @@ -564,7 +575,7 @@ get_pull_worker_pids(Config) -> Pids. get_async_worker_pids(Config) -> - ResourceId = resource_id(Config), + ResourceId = connector_resource_id(Config), Pids = [ AsyncWorkerPid @@ -902,7 +913,6 @@ unauthenticated_response() -> %%------------------------------------------------------------------------------ t_start_stop(Config) -> - ResourceId = resource_id(Config), [#{pubsub_topic := PubSubTopic} | _] = ?config(topic_mapping, Config), ?check_trace( begin @@ -913,7 +923,7 @@ t_start_stop(Config) -> ), ?assertMatch({ok, _}, create_bridge(Config)), {ok, _} = snabbkaffe:receive_events(SRef0), - ?assertMatch({ok, connected}, emqx_resource_manager:health_check(ResourceId)), + ?assertMatch({ok, connected}, health_check(Config)), ?assertMatch(ok, remove_bridge(Config)), ok @@ -1045,8 +1055,8 @@ t_consume_ok(Config) -> ), %% Check that the bridge probe API doesn't leak atoms. - ProbeRes0 = probe_bridge_api(Config), - ?assertMatch({ok, {{_, 204, _}, _Headers, _Body}}, ProbeRes0), + ?assertMatch({ok, {{_, 204, _}, _Headers, _Body}}, probe_bridge_api(Config)), + ?assertMatch({ok, {{_, 204, _}, _Headers, _Body}}, probe_bridge_api(Config)), AtomsBefore = erlang:system_info(atom_count), %% Probe again; shouldn't have created more atoms. ProbeRes1 = probe_bridge_api(Config), @@ -1143,13 +1153,12 @@ t_bridge_rule_action_source(Config) -> ok. t_on_get_status(Config) -> - ResourceId = resource_id(Config), emqx_bridge_testlib:t_on_get_status(Config, #{failure_status => connecting}), %% no workers alive ?retry( _Interval0 = 200, _NAttempts0 = 20, - ?assertMatch({ok, connected}, emqx_resource_manager:health_check(ResourceId)) + ?assertMatch({ok, connected}, health_check(Config)) ), WorkerPids = get_pull_worker_pids(Config), emqx_utils:pmap( @@ -1163,7 +1172,7 @@ t_on_get_status(Config) -> end, WorkerPids ), - ?assertMatch({ok, connecting}, emqx_resource_manager:health_check(ResourceId)), + ?assertMatch({ok, connecting}, health_check(Config)), ok. t_create_update_via_http_api(Config) -> @@ -1331,7 +1340,6 @@ t_multiple_pull_workers(Config) -> t_nonexistent_topic(Config) -> BridgeName = ?config(bridge_name, Config), [Mapping0] = ?config(topic_mapping, Config), - ResourceId = resource_id(Config), PubSubTopic = <<"nonexistent-", (emqx_guid:to_hexstr(emqx_guid:gen()))/binary>>, TopicMapping0 = [Mapping0#{pubsub_topic := PubSubTopic}], TopicMapping = emqx_utils_maps:binary_key_map(TopicMapping0), @@ -1347,11 +1355,14 @@ t_nonexistent_topic(Config) -> ), ?assertMatch( {ok, disconnected}, - emqx_resource_manager:health_check(ResourceId) + health_check(Config) ), ?assertMatch( - {ok, _Group, #{error := {unhealthy_target, "GCP PubSub topics are invalid" ++ _}}}, - emqx_resource_manager:lookup_cached(ResourceId) + #{ + status := disconnected, + error := {unhealthy_target, "GCP PubSub topics are invalid" ++ _} + }, + health_check_channel(Config) ), %% now create the topic and restart the bridge ensure_topic(Config, PubSubTopic), @@ -1362,11 +1373,14 @@ t_nonexistent_topic(Config) -> ?retry( _Interval0 = 200, _NAttempts0 = 20, - ?assertMatch({ok, connected}, emqx_resource_manager:health_check(ResourceId)) + ?assertMatch({ok, connected}, health_check(Config)) ), ?assertMatch( - {ok, _Group, #{error := undefined}}, - emqx_resource_manager:lookup_cached(ResourceId) + #{ + status := connected, + error := undefined + }, + health_check_channel(Config) ), ok end, @@ -1383,7 +1397,6 @@ t_nonexistent_topic(Config) -> t_topic_deleted_while_consumer_is_running(Config) -> TopicMapping = [#{pubsub_topic := PubSubTopic} | _] = ?config(topic_mapping, Config), NTopics = length(TopicMapping), - ResourceId = resource_id(Config), ?check_trace( begin {ok, SRef0} = @@ -1395,7 +1408,7 @@ t_topic_deleted_while_consumer_is_running(Config) -> {ok, _} = create_bridge(Config), {ok, _} = snabbkaffe:receive_events(SRef0), - ?assertMatch({ok, connected}, emqx_resource_manager:health_check(ResourceId)), + ?assertMatch({ok, connected}, health_check(Config)), %% curiously, gcp pubsub doesn't seem to return any errors from the %% subscription if the topic is deleted while the subscription still exists... @@ -1421,7 +1434,6 @@ t_connection_down_before_starting(Config) -> ProxyName = ?config(proxy_name, Config), ProxyHost = ?config(proxy_host, Config), ProxyPort = ?config(proxy_port, Config), - ResourceId = resource_id(Config), ?check_trace( begin emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() -> @@ -1433,13 +1445,13 @@ t_connection_down_before_starting(Config) -> 10_000 ) ), - ?assertMatch({ok, connecting}, emqx_resource_manager:health_check(ResourceId)), + ?assertMatch({ok, disconnected}, health_check(Config)), ok end), ?retry( _Interval0 = 200, _NAttempts0 = 20, - ?assertMatch({ok, connected}, emqx_resource_manager:health_check(ResourceId)) + ?assertMatch({ok, connected}, health_check(Config)) ), ok end, @@ -1451,7 +1463,6 @@ t_connection_timeout_before_starting(Config) -> ProxyName = ?config(proxy_name, Config), ProxyHost = ?config(proxy_host, Config), ProxyPort = ?config(proxy_port, Config), - ResourceId = resource_id(Config), ?check_trace( begin emqx_common_test_helpers:with_failure( @@ -1464,14 +1475,14 @@ t_connection_timeout_before_starting(Config) -> 10_000 ) ), - ?assertMatch({ok, connecting}, emqx_resource_manager:health_check(ResourceId)), + ?assertMatch({ok, connecting}, health_check(Config)), ok end ), ?retry( _Interval0 = 200, _NAttempts0 = 20, - ?assertMatch({ok, connected}, emqx_resource_manager:health_check(ResourceId)) + ?assertMatch({ok, connected}, health_check(Config)) ), ok end, @@ -1480,7 +1491,6 @@ t_connection_timeout_before_starting(Config) -> ok. t_pull_worker_death(Config) -> - ResourceId = resource_id(Config), ?check_trace( begin ?assertMatch( @@ -1500,13 +1510,13 @@ t_pull_worker_death(Config) -> ok after 500 -> ct:fail("pull worker didn't die") end, - ?assertMatch({ok, connecting}, emqx_resource_manager:health_check(ResourceId)), + ?assertMatch({ok, connecting}, health_check(Config)), %% recovery ?retry( _Interval0 = 200, _NAttempts0 = 20, - ?assertMatch({ok, connected}, emqx_resource_manager:health_check(ResourceId)) + ?assertMatch({ok, connected}, health_check(Config)) ), ok @@ -1641,19 +1651,32 @@ t_connection_error_while_creating_subscription(Config) -> ProxyPort = ?config(proxy_port, Config), ?check_trace( begin - emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() -> - %% check retries - {ok, SRef0} = - snabbkaffe:subscribe( - ?match_event(#{?snk_kind := "gcp_pubsub_consumer_worker_subscription_error"}), - _NEvents0 = 2, - 10_000 - ), - {ok, _} = create_bridge(Config), - {ok, _} = snabbkaffe:receive_events(SRef0), - ok + ?force_ordering( + #{?snk_kind := gcp_pubsub_consumer_worker_init}, + #{?snk_kind := will_cut_connection} + ), + ?force_ordering( + #{?snk_kind := connection_down}, + #{?snk_kind := gcp_pubsub_consumer_worker_create_subscription_enter} + ), + spawn_link(fun() -> + ?tp(notice, will_cut_connection, #{}), + emqx_common_test_helpers:enable_failure(down, ProxyName, ProxyHost, ProxyPort), + ?tp(notice, connection_down, #{}) end), - %% eventually succeeds + %% check retries + {ok, SRef0} = + snabbkaffe:subscribe( + ?match_event(#{?snk_kind := "gcp_pubsub_consumer_worker_subscription_error"}), + _NEvents0 = 2, + 10_000 + ), + {ok, _} = create_bridge(Config), + {ok, _} = snabbkaffe:receive_events(SRef0), + emqx_common_test_helpers:heal_failure(down, ProxyName, ProxyHost, ProxyPort), + + %% should eventually succeed + ?tp(notice, "waiting for recovery", #{}), {ok, _} = ?block_until( #{?snk_kind := "gcp_pubsub_consumer_worker_subscription_created"}, @@ -1763,7 +1786,6 @@ t_subscription_patch_error(Config) -> t_topic_deleted_while_creating_subscription(Config) -> [#{pubsub_topic := PubSubTopic}] = ?config(topic_mapping, Config), - ResourceId = resource_id(Config), ?check_trace( begin ?force_ordering( @@ -1787,7 +1809,7 @@ t_topic_deleted_while_creating_subscription(Config) -> #{?snk_kind := gcp_pubsub_consumer_worker_terminate}, 10_000 ), - ?assertMatch({ok, disconnected}, emqx_resource_manager:health_check(ResourceId)), + ?assertMatch({ok, disconnected}, health_check(Config)), ok end, [] @@ -1797,7 +1819,6 @@ t_topic_deleted_while_creating_subscription(Config) -> t_topic_deleted_while_patching_subscription(Config) -> BridgeName = ?config(bridge_name, Config), [#{pubsub_topic := PubSubTopic}] = ?config(topic_mapping, Config), - ResourceId = resource_id(Config), ?check_trace( begin {{ok, _}, {ok, _}} = @@ -1832,7 +1853,7 @@ t_topic_deleted_while_patching_subscription(Config) -> #{?snk_kind := "gcp_pubsub_consumer_worker_subscription_ready"}, 10_000 ), - ?assertMatch({ok, connected}, emqx_resource_manager:health_check(ResourceId)), + ?assertMatch({ok, connected}, health_check(Config)), ok end, [] @@ -1840,7 +1861,6 @@ t_topic_deleted_while_patching_subscription(Config) -> ok. t_subscription_deleted_while_consumer_is_running(Config) -> - ResourceId = resource_id(Config), ?check_trace( begin {{ok, _}, {ok, #{subscription_id := SubscriptionId}}} = @@ -1870,7 +1890,7 @@ t_subscription_deleted_while_consumer_is_running(Config) -> {ok, _} = snabbkaffe:receive_events(SRef0), {ok, _} = snabbkaffe:receive_events(SRef1), - ?assertMatch({ok, connected}, emqx_resource_manager:health_check(ResourceId)), + ?assertMatch({ok, connected}, health_check(Config)), ok end, [] @@ -1880,7 +1900,6 @@ t_subscription_deleted_while_consumer_is_running(Config) -> t_subscription_and_topic_deleted_while_consumer_is_running(Config) -> ct:timetrap({seconds, 90}), [#{pubsub_topic := PubSubTopic}] = ?config(topic_mapping, Config), - ResourceId = resource_id(Config), ?check_trace( begin {{ok, _}, {ok, #{subscription_id := SubscriptionId}}} = @@ -1896,7 +1915,11 @@ t_subscription_and_topic_deleted_while_consumer_is_running(Config) -> delete_subscription(Config, SubscriptionId), {ok, _} = ?block_until(#{?snk_kind := gcp_pubsub_consumer_worker_terminate}, 60_000), - ?assertMatch({ok, disconnected}, emqx_resource_manager:health_check(ResourceId)), + ?retry( + _Sleep0 = 100, + _Retries = 20, + ?assertMatch({ok, disconnected}, health_check(Config)) + ), ok end, [] @@ -2161,7 +2184,6 @@ t_get_subscription(Config) -> t_permission_denied_topic_check(Config) -> [#{pubsub_topic := PubSubTopic}] = ?config(topic_mapping, Config), - ResourceId = resource_id(Config), ?check_trace( begin %% the emulator does not check any credentials @@ -2178,19 +2200,17 @@ t_permission_denied_topic_check(Config) -> end end, fun() -> - {{ok, _}, {ok, _}} = - ?wait_async_action( - create_bridge(Config), - #{?snk_kind := gcp_pubsub_stop}, - 5_000 - ), + {ok, _} = create_bridge(Config), ?assertMatch( {ok, disconnected}, - emqx_resource_manager:health_check(ResourceId) + health_check(Config) ), ?assertMatch( - {ok, _Group, #{error := {unhealthy_target, "Permission denied" ++ _}}}, - emqx_resource_manager:lookup_cached(ResourceId) + #{ + status := disconnected, + error := {unhealthy_target, "Permission denied" ++ _} + }, + health_check_channel(Config) ), ok end @@ -2236,7 +2256,6 @@ t_permission_denied_worker(Config) -> t_unauthenticated_topic_check(Config) -> [#{pubsub_topic := PubSubTopic}] = ?config(topic_mapping, Config), - ResourceId = resource_id(Config), ?check_trace( begin %% the emulator does not check any credentials @@ -2253,19 +2272,17 @@ t_unauthenticated_topic_check(Config) -> end end, fun() -> - {{ok, _}, {ok, _}} = - ?wait_async_action( - create_bridge(Config), - #{?snk_kind := gcp_pubsub_stop}, - 5_000 - ), + {ok, _} = create_bridge(Config), ?assertMatch( {ok, disconnected}, - emqx_resource_manager:health_check(ResourceId) + health_check(Config) ), ?assertMatch( - {ok, _Group, #{error := {unhealthy_target, "Permission denied" ++ _}}}, - emqx_resource_manager:lookup_cached(ResourceId) + #{ + status := disconnected, + error := {unhealthy_target, "Permission denied" ++ _} + }, + health_check_channel(Config) ), ok end diff --git a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl index 4679e1bc4..99b63c50c 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl @@ -26,6 +26,8 @@ resource_type(azure_event_hub_producer) -> emqx_bridge_kafka_impl_producer; resource_type(confluent_producer) -> emqx_bridge_kafka_impl_producer; +resource_type(gcp_pubsub_consumer) -> + emqx_bridge_gcp_pubsub_impl_consumer; resource_type(gcp_pubsub_producer) -> emqx_bridge_gcp_pubsub_impl_producer; resource_type(kafka_producer) -> @@ -118,6 +120,14 @@ connector_structs() -> required => false } )}, + {gcp_pubsub_consumer, + mk( + hoconsc:map(name, ref(emqx_bridge_gcp_pubsub_consumer_schema, "config_connector")), + #{ + desc => <<"GCP PubSub Consumer Connector Config">>, + required => false + } + )}, {gcp_pubsub_producer, mk( hoconsc:map(name, ref(emqx_bridge_gcp_pubsub_producer_schema, "config_connector")), @@ -309,6 +319,7 @@ schema_modules() -> [ emqx_bridge_azure_event_hub, emqx_bridge_confluent_producer, + emqx_bridge_gcp_pubsub_consumer_schema, emqx_bridge_gcp_pubsub_producer_schema, emqx_bridge_kafka, emqx_bridge_kinesis, @@ -344,6 +355,11 @@ api_schemas(Method) -> api_ref( emqx_bridge_confluent_producer, <<"confluent_producer">>, Method ++ "_connector" ), + api_ref( + emqx_bridge_gcp_pubsub_consumer_schema, + <<"gcp_pubsub_consumer">>, + Method ++ "_connector" + ), api_ref( emqx_bridge_gcp_pubsub_producer_schema, <<"gcp_pubsub_producer">>, diff --git a/apps/emqx_connector/src/schema/emqx_connector_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_schema.erl index 27d7f6379..c73923eb6 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_schema.erl @@ -126,6 +126,8 @@ connector_type_to_bridge_types(azure_event_hub_producer) -> [azure_event_hub_producer]; connector_type_to_bridge_types(confluent_producer) -> [confluent_producer]; +connector_type_to_bridge_types(gcp_pubsub_consumer) -> + [gcp_pubsub_consumer]; connector_type_to_bridge_types(gcp_pubsub_producer) -> [gcp_pubsub, gcp_pubsub_producer]; connector_type_to_bridge_types(kafka_producer) -> @@ -527,14 +529,6 @@ fields(connectors) -> required => false } )} - % {mqtt_subscriber, - % mk( - % hoconsc:map(name, ref(emqx_bridge_mqtt_connector_schema, "config_connector")), - % #{ - % desc => <<"MQTT Subscriber Connector Config">>, - % required => false - % } - % )} ] ++ enterprise_fields_connectors(); fields("node_status") -> [ diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index faf96ac9b..780745571 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -115,6 +115,7 @@ | {error, term()}. -type action_resource_id() :: resource_id(). +-type source_resource_id() :: resource_id(). -type connector_resource_id() :: resource_id(). -type message_tag() :: action_resource_id(). diff --git a/rel/i18n/emqx_bridge_gcp_pubsub_consumer_schema.hocon b/rel/i18n/emqx_bridge_gcp_pubsub_consumer_schema.hocon new file mode 100644 index 000000000..1f1e444c0 --- /dev/null +++ b/rel/i18n/emqx_bridge_gcp_pubsub_consumer_schema.hocon @@ -0,0 +1,18 @@ +emqx_bridge_gcp_pubsub_consumer_schema { + + source_parameters.desc: + """Source specific configs.""" + source_parameters.label: + """Source""" + + consumer_source.desc: + """Source configs.""" + consumer_source.label: + """Source""" + + config_connector.desc: + """Configuration for a GCP PubSub Consumer Client.""" + config_connector.label: + """GCP PubSub Consumer Client Configuration""" + +}