feat: let bridge V2 override bridge creation opts from connector
This commit is contained in:
parent
1b248279fd
commit
449b01ef78
|
@ -270,12 +270,34 @@ install_bridge_v2(
|
||||||
) ->
|
) ->
|
||||||
ok;
|
ok;
|
||||||
install_bridge_v2(
|
install_bridge_v2(
|
||||||
|
BridgeV2Type,
|
||||||
|
BridgeName,
|
||||||
|
Config
|
||||||
|
) ->
|
||||||
|
install_bridge_v2_helper(
|
||||||
|
BridgeV2Type,
|
||||||
|
BridgeName,
|
||||||
|
combine_connector_and_bridge_v2_config(
|
||||||
|
BridgeV2Type,
|
||||||
|
BridgeName,
|
||||||
|
Config
|
||||||
|
)
|
||||||
|
).
|
||||||
|
|
||||||
|
install_bridge_v2_helper(
|
||||||
|
_BridgeV2Type,
|
||||||
|
_BridgeName,
|
||||||
|
{error, Reason} = Error
|
||||||
|
) ->
|
||||||
|
?SLOG(error, Reason),
|
||||||
|
Error;
|
||||||
|
install_bridge_v2_helper(
|
||||||
BridgeV2Type,
|
BridgeV2Type,
|
||||||
BridgeName,
|
BridgeName,
|
||||||
#{connector := ConnectorName} = Config
|
#{connector := ConnectorName} = Config
|
||||||
) ->
|
) ->
|
||||||
CreationOpts = emqx_resource:fetch_creation_opts(Config),
|
|
||||||
BridgeV2Id = id(BridgeV2Type, BridgeName, ConnectorName),
|
BridgeV2Id = id(BridgeV2Type, BridgeName, ConnectorName),
|
||||||
|
CreationOpts = emqx_resource:fetch_creation_opts(Config),
|
||||||
%% Create metrics for Bridge V2
|
%% Create metrics for Bridge V2
|
||||||
ok = emqx_resource:create_metrics(BridgeV2Id),
|
ok = emqx_resource:create_metrics(BridgeV2Id),
|
||||||
%% We might need to create buffer workers for Bridge V2
|
%% We might need to create buffer workers for Bridge V2
|
||||||
|
@ -315,6 +337,28 @@ uninstall_bridge_v2(
|
||||||
%% Already not installed
|
%% Already not installed
|
||||||
ok;
|
ok;
|
||||||
uninstall_bridge_v2(
|
uninstall_bridge_v2(
|
||||||
|
BridgeV2Type,
|
||||||
|
BridgeName,
|
||||||
|
Config
|
||||||
|
) ->
|
||||||
|
uninstall_bridge_v2_helper(
|
||||||
|
BridgeV2Type,
|
||||||
|
BridgeName,
|
||||||
|
combine_connector_and_bridge_v2_config(
|
||||||
|
BridgeV2Type,
|
||||||
|
BridgeName,
|
||||||
|
Config
|
||||||
|
)
|
||||||
|
).
|
||||||
|
|
||||||
|
uninstall_bridge_v2_helper(
|
||||||
|
_BridgeV2Type,
|
||||||
|
_BridgeName,
|
||||||
|
{error, Reason} = Error
|
||||||
|
) ->
|
||||||
|
?SLOG(error, Reason),
|
||||||
|
Error;
|
||||||
|
uninstall_bridge_v2_helper(
|
||||||
BridgeV2Type,
|
BridgeV2Type,
|
||||||
BridgeName,
|
BridgeName,
|
||||||
#{connector := ConnectorName} = Config
|
#{connector := ConnectorName} = Config
|
||||||
|
@ -329,6 +373,31 @@ uninstall_bridge_v2(
|
||||||
),
|
),
|
||||||
emqx_resource_manager:remove_channel(ConnectorId, BridgeV2Id).
|
emqx_resource_manager:remove_channel(ConnectorId, BridgeV2Id).
|
||||||
|
|
||||||
|
combine_connector_and_bridge_v2_config(
|
||||||
|
BridgeV2Type,
|
||||||
|
BridgeName,
|
||||||
|
#{connector := ConnectorName} = BridgeV2Config
|
||||||
|
) ->
|
||||||
|
ConnectorType = ?MODULE:bridge_v2_type_to_connector_type(BridgeV2Type),
|
||||||
|
try emqx_config:get([connectors, ConnectorType, to_existing_atom(ConnectorName)]) of
|
||||||
|
ConnectorConfig ->
|
||||||
|
ConnectorCreationOpts = emqx_resource:fetch_creation_opts(ConnectorConfig),
|
||||||
|
BridgeV2CreationOpts = emqx_resource:fetch_creation_opts(BridgeV2Config),
|
||||||
|
CombinedCreationOpts = emqx_utils_maps:deep_merge(
|
||||||
|
ConnectorCreationOpts,
|
||||||
|
BridgeV2CreationOpts
|
||||||
|
),
|
||||||
|
BridgeV2Config#{resource_opts => CombinedCreationOpts}
|
||||||
|
catch
|
||||||
|
_:_ ->
|
||||||
|
{error, #{
|
||||||
|
reason => "connector_not_found",
|
||||||
|
type => BridgeV2Type,
|
||||||
|
bridge_name => BridgeName,
|
||||||
|
connector_name => ConnectorName
|
||||||
|
}}
|
||||||
|
end.
|
||||||
|
|
||||||
%% Creates the external id for the bridge_v2 that is used by the rule actions
|
%% Creates the external id for the bridge_v2 that is used by the rule actions
|
||||||
%% to refer to the bridge_v2
|
%% to refer to the bridge_v2
|
||||||
external_id(BridgeType, BridgeName) ->
|
external_id(BridgeType, BridgeName) ->
|
||||||
|
@ -402,7 +471,8 @@ get_query_mode(BridgeV2Type, Config) ->
|
||||||
|
|
||||||
send_message(BridgeType, BridgeName, Message, QueryOpts0) ->
|
send_message(BridgeType, BridgeName, Message, QueryOpts0) ->
|
||||||
case lookup_conf(BridgeType, BridgeName) of
|
case lookup_conf(BridgeType, BridgeName) of
|
||||||
#{enable := true} = Config ->
|
#{enable := true} = Config0 ->
|
||||||
|
Config = combine_connector_and_bridge_v2_config(BridgeType, BridgeName, Config0),
|
||||||
do_send_msg_with_enabled_config(BridgeType, BridgeName, Message, QueryOpts0, Config);
|
do_send_msg_with_enabled_config(BridgeType, BridgeName, Message, QueryOpts0, Config);
|
||||||
#{enable := false} ->
|
#{enable := false} ->
|
||||||
{error, bridge_stopped};
|
{error, bridge_stopped};
|
||||||
|
@ -410,6 +480,11 @@ send_message(BridgeType, BridgeName, Message, QueryOpts0) ->
|
||||||
{error, bridge_not_found}
|
{error, bridge_not_found}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
do_send_msg_with_enabled_config(
|
||||||
|
_BridgeType, _BridgeName, _Message, _QueryOpts0, {error, Reason} = Error
|
||||||
|
) ->
|
||||||
|
?SLOG(error, Reason),
|
||||||
|
Error;
|
||||||
do_send_msg_with_enabled_config(
|
do_send_msg_with_enabled_config(
|
||||||
BridgeType, BridgeName, Message, QueryOpts0, Config
|
BridgeType, BridgeName, Message, QueryOpts0, Config
|
||||||
) ->
|
) ->
|
||||||
|
|
Loading…
Reference in New Issue