fix: add start/stop/restart support to bridge v1 compatibility layer

This commit is contained in:
Kjell Winblad 2023-10-28 07:30:47 +02:00
parent 1dea3e1cc4
commit 917e13c0e9
2 changed files with 73 additions and 17 deletions

View File

@ -186,7 +186,7 @@ restart(Type, Name) ->
false ->
emqx_resource:restart(resource_id(Type, Name));
true ->
emqx_bridge_v2:restart(Type, Name)
emqx_bridge_v2:bridge_v1_restart(Type, Name)
end.
stop(Type, Name) ->
@ -194,7 +194,7 @@ stop(Type, Name) ->
false ->
emqx_resource:stop(resource_id(Type, Name));
true ->
emqx_bridge_v2:stop(Type, Name)
emqx_bridge_v2:bridge_v1_stop(Type, Name)
end.
start(Type, Name) ->
@ -202,7 +202,7 @@ start(Type, Name) ->
false ->
emqx_resource:start(resource_id(Type, Name));
true ->
emqx_bridge_v2:start(Type, Name)
emqx_bridge_v2:bridge_v1_start(Type, Name)
end.
create(BridgeId, Conf) ->

View File

@ -93,7 +93,10 @@
extract_connector_id_from_bridge_v2_id/1,
bridge_v1_type_to_bridge_v2_type/1,
bridge_v1_id_to_connector_resource_id/1,
bridge_v1_enable_disable/3
bridge_v1_enable_disable/3,
bridge_v1_restart/2,
bridge_v1_stop/2,
bridge_v1_start/2
]).
%%====================================================================
@ -357,7 +360,7 @@ restart(Type, Name) ->
stop(Type, Name) ->
%% Stop means that we should remove the channel from the connector and reset the metrics
%% The emqx_resource_buffer_worker is not stopped
stop_helper(Type, Name, lookup_raw_conf(Type, Name)).
stop_helper(Type, Name, lookup_conf(Type, Name)).
stop_helper(_Type, _Name, #{enable := false}) ->
ok;
@ -371,7 +374,7 @@ stop_helper(BridgeV2Type, BridgeName, #{connector := ConnectorName}) ->
start(Type, Name) ->
%% Start means that we should add the channel to the connector (if it is not already there)
start_helper(Type, Name, lookup_raw_conf(Type, Name)).
start_helper(Type, Name, lookup_conf(Type, Name)).
start_helper(_Type, _Name, #{enable := false}) ->
ok;
@ -392,7 +395,7 @@ start_helper(BridgeV2Type, BridgeName, #{connector := ConnectorName} = Config) -
).
reset_metrics(Type, Name) ->
reset_metrics_helper(Type, Name, lookup_raw_conf(Type, Name)).
reset_metrics_helper(Type, Name, lookup_conf(Type, Name)).
reset_metrics_helper(_Type, _Name, #{enable := false}) ->
ok;
@ -407,7 +410,7 @@ get_query_mode(BridgeV2Type, Config) ->
emqx_resource:query_mode(ResourceType, Config, CreationOpts).
send_message(BridgeType, BridgeName, Message, QueryOpts0) ->
case lookup_raw_conf(BridgeType, BridgeName) of
case lookup_conf(BridgeType, BridgeName) of
#{enable := true} = Config ->
do_send_msg_with_enabled_config(BridgeType, BridgeName, Message, QueryOpts0, Config);
#{enable := false} ->
@ -431,7 +434,7 @@ do_send_msg_with_enabled_config(
emqx_resource:query(BridgeV2Id, {BridgeV2Id, Message}, QueryOpts).
health_check(BridgeType, BridgeName) ->
case lookup_raw_conf(BridgeType, BridgeName) of
case lookup_conf(BridgeType, BridgeName) of
#{
enable := true,
connector := ConnectorName
@ -658,7 +661,7 @@ get_channels_for_connector(ConnectorName, BridgeV2Type) ->
%%====================================================================
id(BridgeType, BridgeName) ->
case lookup_raw_conf(BridgeType, BridgeName) of
case lookup_conf(BridgeType, BridgeName) of
#{connector := ConnectorName} ->
id(BridgeType, BridgeName, ConnectorName);
Error ->
@ -871,7 +874,7 @@ unpack_bridge_conf(Type, PackedConf, TopLevelConf) ->
%% * The connector for the bridge v2 should have exactly on channel
is_valid_bridge_v1(BridgeV1Type, BridgeName) ->
BridgeV2Type = ?MODULE:bridge_v1_type_to_bridge_v2_type(BridgeV1Type),
case lookup_raw_conf(BridgeV2Type, BridgeName) of
case lookup_conf(BridgeV2Type, BridgeName) of
{error, _} ->
%% If the bridge v2 does not exist, it is a valid bridge v1
true;
@ -973,10 +976,10 @@ lookup_and_transform_to_bridge_v1_helper(
end;
_ ->
%% No need to modify the status
{ok, BridgeV1}
{ok, BridgeV1#{resource_data => ResourceData2}}
end.
lookup_raw_conf(Type, Name) ->
lookup_conf(Type, Name) ->
case emqx:get_config([?ROOT_KEY, Type, Name], not_found) of
not_found ->
{error, bridge_not_found};
@ -987,7 +990,7 @@ lookup_raw_conf(Type, Name) ->
split_bridge_v1_config_and_create(BridgeV1Type, BridgeName, RawConf) ->
BridgeV2Type = ?MODULE:bridge_v1_type_to_bridge_v2_type(BridgeV1Type),
%% Check if the bridge v2 exists
case lookup_raw_conf(BridgeV2Type, BridgeName) of
case lookup_conf(BridgeV2Type, BridgeName) of
{error, _} ->
%% If the bridge v2 does not exist, it is a valid bridge v1
split_bridge_v1_config_and_create_helper(BridgeV1Type, BridgeName, RawConf);
@ -1152,7 +1155,7 @@ bridge_v1_check_deps_and_remove(BridgeV1Type, BridgeName, RemoveDeps) ->
BridgeV2Type,
BridgeName,
RemoveDeps,
lookup_raw_conf(BridgeV2Type, BridgeName)
lookup_conf(BridgeV2Type, BridgeName)
).
bridge_v1_check_deps_and_remove(
@ -1205,7 +1208,7 @@ bridge_v1_id_to_connector_resource_id(BridgeId) ->
[Type, Name] ->
BridgeV2Type = bin(?MODULE:bridge_v1_type_to_bridge_v2_type(Type)),
ConnectorName =
case lookup_raw_conf(BridgeV2Type, Name) of
case lookup_conf(BridgeV2Type, Name) of
#{connector := Con} ->
Con;
Error ->
@ -1222,7 +1225,7 @@ bridge_v1_enable_disable(Action, BridgeType, BridgeName) ->
Action,
BridgeType,
BridgeName,
lookup_raw_conf(BridgeType, BridgeName)
lookup_conf(BridgeType, BridgeName)
);
false ->
{error, not_bridge_v1_compatible}
@ -1240,6 +1243,59 @@ bridge_v1_enable_disable_helper(disable, BridgeType, BridgeName, #{connector :=
ConnectorType = ?MODULE:bridge_v2_type_to_connector_type(BridgeV2Type),
{ok, _} = emqx_bridge_v2:disable_enable(disable, BridgeV2Type, BridgeName),
emqx_connector:disable_enable(disable, ConnectorType, ConnectorName).
bridge_v1_restart(BridgeV1Type, Name) ->
ConnectorOpFun = fun(ConnectorType, ConnectorName) ->
emqx_connector_resource:restart(ConnectorType, ConnectorName)
end,
bridge_v1_operation_helper(BridgeV1Type, Name, ConnectorOpFun).
bridge_v1_stop(BridgeV1Type, Name) ->
ConnectorOpFun = fun(ConnectorType, ConnectorName) ->
emqx_connector_resource:stop(ConnectorType, ConnectorName)
end,
bridge_v1_operation_helper(BridgeV1Type, Name, ConnectorOpFun).
bridge_v1_start(BridgeV1Type, Name) ->
ConnectorOpFun = fun(ConnectorType, ConnectorName) ->
emqx_connector_resource:start(ConnectorType, ConnectorName)
end,
bridge_v1_operation_helper(BridgeV1Type, Name, ConnectorOpFun).
bridge_v1_operation_helper(BridgeV1Type, Name, ConnectorOpFun) ->
BridgeV2Type = ?MODULE:bridge_v1_type_to_bridge_v2_type(BridgeV1Type),
case emqx_bridge_v2:is_valid_bridge_v1(BridgeV1Type, Name) of
true ->
bridge_v1_operation_helper_with_conf(
BridgeV1Type,
lookup_conf(BridgeV2Type, Name),
ConnectorOpFun
);
false ->
{error, not_bridge_v1_compatible}
end.
bridge_v1_operation_helper_with_conf(
_BridgeV1Type,
{error, bridge_not_found} = Error,
_ConnectorOpFun
) ->
Error;
bridge_v1_operation_helper_with_conf(
_BridgeV1Type,
#{enable := false},
_ConnectorOpFun
) ->
ok;
bridge_v1_operation_helper_with_conf(
BridgeV1Type,
#{connector := ConnectorName},
ConnectorOpFun
) ->
BridgeV2Type = ?MODULE:bridge_v1_type_to_bridge_v2_type(BridgeV1Type),
ConnectorType = ?MODULE:bridge_v2_type_to_connector_type(BridgeV2Type),
ConnectorOpFun(ConnectorType, ConnectorName).
%%====================================================================
%% Misc helper functions
%%====================================================================