fix(emqx_bridge_v2): properly working start function

This commit is contained in:
Kjell Winblad 2023-10-28 14:00:05 +02:00
parent babf5f973a
commit e1009998c9
1 changed files with 35 additions and 65 deletions

View File

@ -48,8 +48,6 @@
health_check/2,
send_message/4,
start/2,
stop/2,
restart/2,
reset_metrics/2,
create_dry_run/2,
get_metrics/2
@ -351,49 +349,42 @@ disable_enable(Action, BridgeType, BridgeName) when
#{override_to => cluster}
).
restart(Type, Name) ->
stop(Type, Name),
start(Type, Name).
%% Manually start connector. This function can speed up reconnection when
%% waiting for auto reconnection. The function forwards the start request to
%% its connector.
start(BridgeV2Type, Name) ->
ConnectorOpFun = fun(ConnectorType, ConnectorName) ->
emqx_connector_resource:start(ConnectorType, ConnectorName)
end,
connector_operation_helper(BridgeV2Type, Name, ConnectorOpFun).
%% TODO: it is not clear what these operations should do
stop(Type, Name) ->
%% Stop means that we should remove the channel from the connector and reset the metrics
%% The emqx_resource_buffer_worker is not stopped
stop_helper(Type, Name, lookup_conf(Type, Name)).
stop_helper(_Type, _Name, #{enable := false}) ->
ok;
stop_helper(BridgeV2Type, BridgeName, #{connector := ConnectorName}) ->
BridgeV2Id = id(BridgeV2Type, BridgeName, ConnectorName),
ok = emqx_metrics_worker:reset_metrics(?RES_METRICS, BridgeV2Id),
ConnectorId = emqx_connector_resource:resource_id(
?MODULE:bridge_v2_type_to_connector_type(BridgeV2Type), ConnectorName
),
emqx_resource_manager:remove_channel(ConnectorId, BridgeV2Id).
start(Type, Name) ->
%% Start means that we should add the channel to the connector (if it is not already there)
start_helper(Type, Name, lookup_conf(Type, Name)).
start_helper(_Type, _Name, #{enable := false}) ->
ok;
start_helper(BridgeV2Type, BridgeName, #{connector := ConnectorName} = Config) ->
BridgeV2Id = id(BridgeV2Type, BridgeName, ConnectorName),
%% Deinstall from connector
ConnectorId = emqx_connector_resource:resource_id(
?MODULE:bridge_v2_type_to_connector_type(BridgeV2Type), ConnectorName
),
ConfigWithTypeAndName = Config#{
bridge_type => bin(BridgeV2Type),
bridge_name => bin(BridgeName)
},
emqx_resource_manager:add_channel(
ConnectorId,
BridgeV2Id,
ConfigWithTypeAndName
connector_operation_helper(BridgeV2Type, Name, ConnectorOpFun) ->
connector_operation_helper_with_conf(
BridgeV2Type,
lookup_conf(BridgeV2Type, Name),
ConnectorOpFun
).
connector_operation_helper_with_conf(
_BridgeV2Type,
{error, bridge_not_found} = Error,
_ConnectorOpFun
) ->
Error;
connector_operation_helper_with_conf(
_BridgeV2Type,
#{enable := false},
_ConnectorOpFun
) ->
ok;
connector_operation_helper_with_conf(
BridgeV2Type,
#{connector := ConnectorName},
ConnectorOpFun
) ->
ConnectorType = ?MODULE:bridge_v2_type_to_connector_type(BridgeV2Type),
ConnectorOpFun(ConnectorType, ConnectorName).
reset_metrics(Type, Name) ->
reset_metrics_helper(Type, Name, lookup_conf(Type, Name)).
@ -1266,8 +1257,8 @@ bridge_v1_operation_helper(BridgeV1Type, Name, ConnectorOpFun) ->
BridgeV2Type = ?MODULE:bridge_v1_type_to_bridge_v2_type(BridgeV1Type),
case emqx_bridge_v2:is_valid_bridge_v1(BridgeV1Type, Name) of
true ->
bridge_v1_operation_helper_with_conf(
BridgeV1Type,
connector_operation_helper_with_conf(
BridgeV2Type,
lookup_conf(BridgeV2Type, Name),
ConnectorOpFun
);
@ -1275,27 +1266,6 @@ bridge_v1_operation_helper(BridgeV1Type, Name, ConnectorOpFun) ->
{error, not_bridge_v1_compatible}
end.
bridge_v1_operation_helper_with_conf(
_BridgeV1Type,
{error, bridge_not_found} = Error,
_ConnectorOpFun
) ->
Error;
bridge_v1_operation_helper_with_conf(
_BridgeV1Type,
#{enable := false},
_ConnectorOpFun
) ->
ok;
bridge_v1_operation_helper_with_conf(
BridgeV1Type,
#{connector := ConnectorName},
ConnectorOpFun
) ->
BridgeV2Type = ?MODULE:bridge_v1_type_to_bridge_v2_type(BridgeV1Type),
ConnectorType = ?MODULE:bridge_v2_type_to_connector_type(BridgeV2Type),
ConnectorOpFun(ConnectorType, ConnectorName).
%%====================================================================
%% Misc helper functions
%%====================================================================