diff --git a/apps/emqx_bridge/src/emqx_bridge_resource.erl b/apps/emqx_bridge/src/emqx_bridge_resource.erl index 8abb1eb2f..dd0a1fee8 100644 --- a/apps/emqx_bridge/src/emqx_bridge_resource.erl +++ b/apps/emqx_bridge/src/emqx_bridge_resource.erl @@ -186,7 +186,7 @@ restart(Type, Name) -> false -> emqx_resource:restart(resource_id(Type, Name)); true -> - emqx_bridge_v2:restart(Type, Name) + emqx_bridge_v2:bridge_v1_restart(Type, Name) end. stop(Type, Name) -> @@ -194,7 +194,7 @@ stop(Type, Name) -> false -> emqx_resource:stop(resource_id(Type, Name)); true -> - emqx_bridge_v2:stop(Type, Name) + emqx_bridge_v2:bridge_v1_stop(Type, Name) end. start(Type, Name) -> @@ -202,7 +202,7 @@ start(Type, Name) -> false -> emqx_resource:start(resource_id(Type, Name)); true -> - emqx_bridge_v2:start(Type, Name) + emqx_bridge_v2:bridge_v1_start(Type, Name) end. create(BridgeId, Conf) -> diff --git a/apps/emqx_bridge/src/emqx_bridge_v2.erl b/apps/emqx_bridge/src/emqx_bridge_v2.erl index 7e8de12c9..d94263b6b 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2.erl @@ -93,7 +93,10 @@ extract_connector_id_from_bridge_v2_id/1, bridge_v1_type_to_bridge_v2_type/1, bridge_v1_id_to_connector_resource_id/1, - bridge_v1_enable_disable/3 + bridge_v1_enable_disable/3, + bridge_v1_restart/2, + bridge_v1_stop/2, + bridge_v1_start/2 ]). %%==================================================================== @@ -357,7 +360,7 @@ restart(Type, Name) -> stop(Type, Name) -> %% Stop means that we should remove the channel from the connector and reset the metrics %% The emqx_resource_buffer_worker is not stopped - stop_helper(Type, Name, lookup_raw_conf(Type, Name)). + stop_helper(Type, Name, lookup_conf(Type, Name)). stop_helper(_Type, _Name, #{enable := false}) -> ok; @@ -371,7 +374,7 @@ stop_helper(BridgeV2Type, BridgeName, #{connector := ConnectorName}) -> start(Type, Name) -> %% Start means that we should add the channel to the connector (if it is not already there) - start_helper(Type, Name, lookup_raw_conf(Type, Name)). + start_helper(Type, Name, lookup_conf(Type, Name)). start_helper(_Type, _Name, #{enable := false}) -> ok; @@ -392,7 +395,7 @@ start_helper(BridgeV2Type, BridgeName, #{connector := ConnectorName} = Config) - ). reset_metrics(Type, Name) -> - reset_metrics_helper(Type, Name, lookup_raw_conf(Type, Name)). + reset_metrics_helper(Type, Name, lookup_conf(Type, Name)). reset_metrics_helper(_Type, _Name, #{enable := false}) -> ok; @@ -407,7 +410,7 @@ get_query_mode(BridgeV2Type, Config) -> emqx_resource:query_mode(ResourceType, Config, CreationOpts). send_message(BridgeType, BridgeName, Message, QueryOpts0) -> - case lookup_raw_conf(BridgeType, BridgeName) of + case lookup_conf(BridgeType, BridgeName) of #{enable := true} = Config -> do_send_msg_with_enabled_config(BridgeType, BridgeName, Message, QueryOpts0, Config); #{enable := false} -> @@ -431,7 +434,7 @@ do_send_msg_with_enabled_config( emqx_resource:query(BridgeV2Id, {BridgeV2Id, Message}, QueryOpts). health_check(BridgeType, BridgeName) -> - case lookup_raw_conf(BridgeType, BridgeName) of + case lookup_conf(BridgeType, BridgeName) of #{ enable := true, connector := ConnectorName @@ -658,7 +661,7 @@ get_channels_for_connector(ConnectorName, BridgeV2Type) -> %%==================================================================== id(BridgeType, BridgeName) -> - case lookup_raw_conf(BridgeType, BridgeName) of + case lookup_conf(BridgeType, BridgeName) of #{connector := ConnectorName} -> id(BridgeType, BridgeName, ConnectorName); Error -> @@ -871,7 +874,7 @@ unpack_bridge_conf(Type, PackedConf, TopLevelConf) -> %% * The connector for the bridge v2 should have exactly on channel is_valid_bridge_v1(BridgeV1Type, BridgeName) -> BridgeV2Type = ?MODULE:bridge_v1_type_to_bridge_v2_type(BridgeV1Type), - case lookup_raw_conf(BridgeV2Type, BridgeName) of + case lookup_conf(BridgeV2Type, BridgeName) of {error, _} -> %% If the bridge v2 does not exist, it is a valid bridge v1 true; @@ -973,10 +976,10 @@ lookup_and_transform_to_bridge_v1_helper( end; _ -> %% No need to modify the status - {ok, BridgeV1} + {ok, BridgeV1#{resource_data => ResourceData2}} end. -lookup_raw_conf(Type, Name) -> +lookup_conf(Type, Name) -> case emqx:get_config([?ROOT_KEY, Type, Name], not_found) of not_found -> {error, bridge_not_found}; @@ -987,7 +990,7 @@ lookup_raw_conf(Type, Name) -> split_bridge_v1_config_and_create(BridgeV1Type, BridgeName, RawConf) -> BridgeV2Type = ?MODULE:bridge_v1_type_to_bridge_v2_type(BridgeV1Type), %% Check if the bridge v2 exists - case lookup_raw_conf(BridgeV2Type, BridgeName) of + case lookup_conf(BridgeV2Type, BridgeName) of {error, _} -> %% If the bridge v2 does not exist, it is a valid bridge v1 split_bridge_v1_config_and_create_helper(BridgeV1Type, BridgeName, RawConf); @@ -1152,7 +1155,7 @@ bridge_v1_check_deps_and_remove(BridgeV1Type, BridgeName, RemoveDeps) -> BridgeV2Type, BridgeName, RemoveDeps, - lookup_raw_conf(BridgeV2Type, BridgeName) + lookup_conf(BridgeV2Type, BridgeName) ). bridge_v1_check_deps_and_remove( @@ -1205,7 +1208,7 @@ bridge_v1_id_to_connector_resource_id(BridgeId) -> [Type, Name] -> BridgeV2Type = bin(?MODULE:bridge_v1_type_to_bridge_v2_type(Type)), ConnectorName = - case lookup_raw_conf(BridgeV2Type, Name) of + case lookup_conf(BridgeV2Type, Name) of #{connector := Con} -> Con; Error -> @@ -1222,7 +1225,7 @@ bridge_v1_enable_disable(Action, BridgeType, BridgeName) -> Action, BridgeType, BridgeName, - lookup_raw_conf(BridgeType, BridgeName) + lookup_conf(BridgeType, BridgeName) ); false -> {error, not_bridge_v1_compatible} @@ -1240,6 +1243,59 @@ bridge_v1_enable_disable_helper(disable, BridgeType, BridgeName, #{connector := ConnectorType = ?MODULE:bridge_v2_type_to_connector_type(BridgeV2Type), {ok, _} = emqx_bridge_v2:disable_enable(disable, BridgeV2Type, BridgeName), emqx_connector:disable_enable(disable, ConnectorType, ConnectorName). + +bridge_v1_restart(BridgeV1Type, Name) -> + ConnectorOpFun = fun(ConnectorType, ConnectorName) -> + emqx_connector_resource:restart(ConnectorType, ConnectorName) + end, + bridge_v1_operation_helper(BridgeV1Type, Name, ConnectorOpFun). + +bridge_v1_stop(BridgeV1Type, Name) -> + ConnectorOpFun = fun(ConnectorType, ConnectorName) -> + emqx_connector_resource:stop(ConnectorType, ConnectorName) + end, + bridge_v1_operation_helper(BridgeV1Type, Name, ConnectorOpFun). + +bridge_v1_start(BridgeV1Type, Name) -> + ConnectorOpFun = fun(ConnectorType, ConnectorName) -> + emqx_connector_resource:start(ConnectorType, ConnectorName) + end, + bridge_v1_operation_helper(BridgeV1Type, Name, ConnectorOpFun). + +bridge_v1_operation_helper(BridgeV1Type, Name, ConnectorOpFun) -> + BridgeV2Type = ?MODULE:bridge_v1_type_to_bridge_v2_type(BridgeV1Type), + case emqx_bridge_v2:is_valid_bridge_v1(BridgeV1Type, Name) of + true -> + bridge_v1_operation_helper_with_conf( + BridgeV1Type, + lookup_conf(BridgeV2Type, Name), + ConnectorOpFun + ); + false -> + {error, not_bridge_v1_compatible} + end. + +bridge_v1_operation_helper_with_conf( + _BridgeV1Type, + {error, bridge_not_found} = Error, + _ConnectorOpFun +) -> + Error; +bridge_v1_operation_helper_with_conf( + _BridgeV1Type, + #{enable := false}, + _ConnectorOpFun +) -> + ok; +bridge_v1_operation_helper_with_conf( + BridgeV1Type, + #{connector := ConnectorName}, + ConnectorOpFun +) -> + BridgeV2Type = ?MODULE:bridge_v1_type_to_bridge_v2_type(BridgeV1Type), + ConnectorType = ?MODULE:bridge_v2_type_to_connector_type(BridgeV2Type), + ConnectorOpFun(ConnectorType, ConnectorName). + %%==================================================================== %% Misc helper functions %%====================================================================