diff --git a/apps/emqx_bridge/src/emqx_bridge_v2.erl b/apps/emqx_bridge/src/emqx_bridge_v2.erl index 2d48ddb0a..9485590cd 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2.erl @@ -270,12 +270,34 @@ install_bridge_v2( ) -> ok; install_bridge_v2( + BridgeV2Type, + BridgeName, + Config +) -> + install_bridge_v2_helper( + BridgeV2Type, + BridgeName, + combine_connector_and_bridge_v2_config( + BridgeV2Type, + BridgeName, + Config + ) + ). + +install_bridge_v2_helper( + _BridgeV2Type, + _BridgeName, + {error, Reason} = Error +) -> + ?SLOG(error, Reason), + Error; +install_bridge_v2_helper( BridgeV2Type, BridgeName, #{connector := ConnectorName} = Config ) -> - CreationOpts = emqx_resource:fetch_creation_opts(Config), BridgeV2Id = id(BridgeV2Type, BridgeName, ConnectorName), + CreationOpts = emqx_resource:fetch_creation_opts(Config), %% Create metrics for Bridge V2 ok = emqx_resource:create_metrics(BridgeV2Id), %% We might need to create buffer workers for Bridge V2 @@ -315,6 +337,28 @@ uninstall_bridge_v2( %% Already not installed ok; uninstall_bridge_v2( + BridgeV2Type, + BridgeName, + Config +) -> + uninstall_bridge_v2_helper( + BridgeV2Type, + BridgeName, + combine_connector_and_bridge_v2_config( + BridgeV2Type, + BridgeName, + Config + ) + ). + +uninstall_bridge_v2_helper( + _BridgeV2Type, + _BridgeName, + {error, Reason} = Error +) -> + ?SLOG(error, Reason), + Error; +uninstall_bridge_v2_helper( BridgeV2Type, BridgeName, #{connector := ConnectorName} = Config @@ -329,6 +373,31 @@ uninstall_bridge_v2( ), emqx_resource_manager:remove_channel(ConnectorId, BridgeV2Id). +combine_connector_and_bridge_v2_config( + BridgeV2Type, + BridgeName, + #{connector := ConnectorName} = BridgeV2Config +) -> + ConnectorType = ?MODULE:bridge_v2_type_to_connector_type(BridgeV2Type), + try emqx_config:get([connectors, ConnectorType, to_existing_atom(ConnectorName)]) of + ConnectorConfig -> + ConnectorCreationOpts = emqx_resource:fetch_creation_opts(ConnectorConfig), + BridgeV2CreationOpts = emqx_resource:fetch_creation_opts(BridgeV2Config), + CombinedCreationOpts = emqx_utils_maps:deep_merge( + ConnectorCreationOpts, + BridgeV2CreationOpts + ), + BridgeV2Config#{resource_opts => CombinedCreationOpts} + catch + _:_ -> + {error, #{ + reason => "connector_not_found", + type => BridgeV2Type, + bridge_name => BridgeName, + connector_name => ConnectorName + }} + end. + %% Creates the external id for the bridge_v2 that is used by the rule actions %% to refer to the bridge_v2 external_id(BridgeType, BridgeName) -> @@ -402,7 +471,8 @@ get_query_mode(BridgeV2Type, Config) -> send_message(BridgeType, BridgeName, Message, QueryOpts0) -> case lookup_conf(BridgeType, BridgeName) of - #{enable := true} = Config -> + #{enable := true} = Config0 -> + Config = combine_connector_and_bridge_v2_config(BridgeType, BridgeName, Config0), do_send_msg_with_enabled_config(BridgeType, BridgeName, Message, QueryOpts0, Config); #{enable := false} -> {error, bridge_stopped}; @@ -410,6 +480,11 @@ send_message(BridgeType, BridgeName, Message, QueryOpts0) -> {error, bridge_not_found} end. +do_send_msg_with_enabled_config( + _BridgeType, _BridgeName, _Message, _QueryOpts0, {error, Reason} = Error +) -> + ?SLOG(error, Reason), + Error; do_send_msg_with_enabled_config( BridgeType, BridgeName, Message, QueryOpts0, Config ) ->