diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index 6eb156232..76bf32c07 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -330,7 +330,13 @@ lookup(Type, Name, RawConf) -> get_metrics(Type, Name) -> case emqx_bridge_v2:is_bridge_v2_type(Type) of true -> - emqx_bridge_v2:get_metrics(Type, Name); + case emqx_bridge_v2:is_valid_bridge_v1(Type, Name) of + true -> + BridgeV2Type = emqx_bridge_v2:bridge_v2_type_to_connector_type(Type), + emqx_bridge_v2:get_metrics(BridgeV2Type, Name); + false -> + {error, not_bridge_v1_compatible} + end; false -> emqx_resource:get_metrics(emqx_bridge_resource:resource_id(Type, Name)) end. @@ -347,7 +353,12 @@ disable_enable(Action, BridgeType, BridgeName) when -> case emqx_bridge_v2:is_bridge_v2_type(BridgeType) of true -> - emqx_bridge_v2:disable_enable(Action, BridgeType, BridgeName); + case emqx_bridge_v2:is_valid_bridge_v1(BridgeType, BridgeName) of + true -> + do_disable_enable_bridge_v2_compatible(Action, BridgeType, BridgeName); + false -> + {error, not_bridge_v1_compatible} + end; false -> emqx_conf:update( config_key_path() ++ [BridgeType, BridgeName], @@ -356,6 +367,15 @@ disable_enable(Action, BridgeType, BridgeName) when ) end. +do_disable_enable_bridge_v2_compatible(enable, BridgeType, BridgeName) -> + BridgeV2Type = emqx_bridge_v2:bridge_v1_type_to_bridge_v2_type(BridgeType), + _ = emqx_connector:disable_enable(enable, BridgeType, BridgeName), + emqx_bridge_v2:disable_enable(enable, BridgeV2Type, BridgeName); +do_disable_enable_bridge_v2_compatible(disable, BridgeType, BridgeName) -> + BridgeV2Type = emqx_bridge_v2:bridge_v1_type_to_bridge_v2_type(BridgeType), + _ = emqx_bridge_v2:disable_enable(disable, BridgeV2Type, BridgeName), + emqx_connector:disable_enable(disable, BridgeType, BridgeName). + create(BridgeType, BridgeName, RawConf) -> ?SLOG(debug, #{ bridge_action => create, diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index ced78e08d..949082606 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -460,6 +460,8 @@ schema("/bridges_probe") -> case emqx_bridge:lookup(BridgeType, BridgeName) of {ok, _} -> ?BAD_REQUEST('ALREADY_EXISTS', <<"bridge already exists">>); + {error, not_bridge_v1_compatible} -> + ?BAD_REQUEST('ALREADY_EXISTS', non_compat_bridge_msg()); {error, not_found} -> Conf = filter_out_request_body(Conf0), create_bridge(BridgeType, BridgeName, Conf) @@ -487,11 +489,12 @@ schema("/bridges_probe") -> case emqx_bridge:lookup(BridgeType, BridgeName) of {ok, #{raw_config := RawConf}} -> %% TODO will the maybe_upgrade step done by emqx_bridge:lookup cause any problems - %%RawConf = emqx:get_raw_config([bridges, BridgeType, BridgeName], #{}), Conf = deobfuscate(Conf1, RawConf), update_bridge(BridgeType, BridgeName, Conf); {error, not_found} -> - ?BRIDGE_NOT_FOUND(BridgeType, BridgeName) + ?BRIDGE_NOT_FOUND(BridgeType, BridgeName); + {error, not_bridge_v1_compatible} -> + ?BAD_REQUEST('ALREADY_EXISTS', non_compat_bridge_msg()) end ); '/bridges/:id'(delete, #{bindings := #{id := Id}, query_string := Qs}) -> @@ -519,7 +522,9 @@ schema("/bridges_probe") -> ?INTERNAL_ERROR(Reason) end; {error, not_found} -> - ?BRIDGE_NOT_FOUND(BridgeType, BridgeName) + ?BRIDGE_NOT_FOUND(BridgeType, BridgeName); + {error, not_bridge_v1_compatible} -> + ?BAD_REQUEST('ALREADY_EXISTS', non_compat_bridge_msg()) end ). @@ -529,11 +534,16 @@ schema("/bridges_probe") -> '/bridges/:id/metrics/reset'(put, #{bindings := #{id := Id}}) -> ?TRY_PARSE_ID( Id, - begin - ok = emqx_bridge_resource:reset_metrics( - emqx_bridge_resource:resource_id(BridgeType, BridgeName) - ), - ?NO_CONTENT + case emqx_bridge_v2:is_bridge_v2_type(BridgeType) of + true -> + BridgeV2Type = emqx_bridge_v2:bridge_v2_type_to_connector_type(BridgeType), + ok = emqx_bridge_v2:reset_metrics(BridgeV2Type, BridgeName), + ?NO_CONTENT; + false -> + ok = emqx_bridge_resource:reset_metrics( + emqx_bridge_resource:resource_id(BridgeType, BridgeName) + ), + ?NO_CONTENT end ). @@ -592,6 +602,8 @@ lookup_from_all_nodes(BridgeType, BridgeName, SuccCode) -> {SuccCode, format_bridge_info([R || {ok, R} <- Results])}; {ok, [{error, not_found} | _]} -> ?BRIDGE_NOT_FOUND(BridgeType, BridgeName); + {ok, [{error, not_bridge_v1_compatible} | _]} -> + ?NOT_FOUND(non_compat_bridge_msg()); {error, Reason} -> ?INTERNAL_ERROR(Reason) end. @@ -606,7 +618,17 @@ create_bridge(BridgeType, BridgeName, Conf) -> create_or_update_bridge(BridgeType, BridgeName, Conf, 201). update_bridge(BridgeType, BridgeName, Conf) -> - create_or_update_bridge(BridgeType, BridgeName, Conf, 200). + case emqx_bridge_v2:is_bridge_v2_type(BridgeType) of + true -> + case emqx_bridge_v2:is_valid_bridge_v1(BridgeType, BridgeName) of + true -> + create_or_update_bridge(BridgeType, BridgeName, Conf, 200); + false -> + ?NOT_FOUND(non_compat_bridge_msg()) + end; + false -> + create_or_update_bridge(BridgeType, BridgeName, Conf, 200) + end. create_or_update_bridge(BridgeType, BridgeName, Conf, HttpStatusCode) -> case emqx_bridge:create(BridgeType, BridgeName, Conf) of @@ -1119,3 +1141,6 @@ map_to_json(M0) -> M2 = maps:without([value, <<"value">>], M1), emqx_utils_json:encode(M2) end. + +non_compat_bridge_msg() -> + <<"bridge already exists as non Bridge V1 compatible Bridge V2 bridge">>. diff --git a/apps/emqx_bridge/src/emqx_bridge_resource.erl b/apps/emqx_bridge/src/emqx_bridge_resource.erl index 81556f062..8abb1eb2f 100644 --- a/apps/emqx_bridge/src/emqx_bridge_resource.erl +++ b/apps/emqx_bridge/src/emqx_bridge_resource.erl @@ -172,7 +172,13 @@ reset_metrics(ResourceId) -> false -> emqx_resource:reset_metrics(ResourceId); true -> - emqx_bridge_v2:reset_metrics(Type, Name) + case emqx_bridge_v2:is_valid_bridge_v1(Type, Name) of + true -> + BridgeV2Type = emqx_bridge_v2:bridge_v2_type_to_connector_type(Type), + emqx_bridge_v2:reset_metrics(BridgeV2Type, Name); + false -> + {error, not_bridge_v1_compatible} + end end. restart(Type, Name) -> diff --git a/apps/emqx_bridge/src/emqx_bridge_v2.erl b/apps/emqx_bridge/src/emqx_bridge_v2.erl index c40a5a20e..6c1ad2e55 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2.erl @@ -68,7 +68,6 @@ %% Exported for tests -export([ - bridge_v2_type_to_connector_type/1, id/2, id/3, is_valid_bridge_v1/2 @@ -84,6 +83,7 @@ %% Compatibility API -export([ + bridge_v2_type_to_connector_type/1, is_bridge_v2_type/1, lookup_and_transform_to_bridge_v1/2, list_and_transform_to_bridge_v1/0, @@ -100,17 +100,47 @@ %%==================================================================== load() -> + load_bridges(), load_message_publish_hook(), ok = emqx_config_handler:add_handler(config_key_path_leaf(), emqx_bridge_v2), ok = emqx_config_handler:add_handler(config_key_path(), emqx_bridge_v2), ok. +load_bridges() -> + Bridges = emqx:get_config([?ROOT_KEY], #{}), + lists:foreach( + fun({Type, Bridge}) -> + lists:foreach( + fun({Name, BridgeConf}) -> + install_bridge_v2(Type, Name, BridgeConf) + end, + maps:to_list(Bridge) + ) + end, + maps:to_list(Bridges) + ). + unload() -> + unload_bridges(), unload_message_publish_hook(), emqx_conf:remove_handler(config_key_path()), emqx_conf:remove_handler(config_key_path_leaf()), ok. +unload_bridges() -> + Bridges = emqx:get_config([?ROOT_KEY], #{}), + lists:foreach( + fun({Type, Bridge}) -> + lists:foreach( + fun({Name, BridgeConf}) -> + uninstall_bridge_v2(Type, Name, BridgeConf) + end, + maps:to_list(Bridge) + ) + end, + maps:to_list(Bridges) + ). + %%==================================================================== %% CRUD API %%==================================================================== @@ -131,13 +161,26 @@ lookup(Type, Name) -> {ok, _, Data} -> Data; {error, not_found} -> - undefined + #{} + end, + %% Find the Bridge V2 status from the InstanceData + Channels = maps:get(added_channels, InstanceData, #{}), + BridgeV2Id = id(Type, Name, BridgeConnector), + ChannelStatus = maps:get(BridgeV2Id, Channels, undefined), + DisplayBridgeV2Status = + case ChannelStatus of + {error, undefined} -> <<"Unknown reason">>; + {error, Reason} -> emqx_utils:readable_error_msg(Reason); + connected -> <<"connected">>; + connecting -> <<"connecting">>; + Error -> emqx_utils:readable_error_msg(Error) end, {ok, #{ type => Type, name => Name, raw_config => RawConf, - resource_data => InstanceData + resource_data => InstanceData, + status => DisplayBridgeV2Status }} end. @@ -178,6 +221,8 @@ check_deps_and_remove(BridgeType, BridgeName, RemoveDeps) -> case emqx_rule_engine:get_rule_ids_by_action(BridgeId) of [] -> remove(BridgeType, BridgeName); + _RuleIds when RemoveDeps =:= ignore_deps -> + remove(BridgeType, BridgeName); RuleIds when RemoveDeps =:= false -> {error, {rules_deps_on_this_bridge, RuleIds}}; RuleIds when RemoveDeps =:= true -> @@ -599,7 +644,10 @@ get_channels_for_connector(ConnectorId) -> get_channels_for_connector(ConnectorName, BridgeV2Type) -> BridgeV2s = emqx:get_config([?ROOT_KEY, BridgeV2Type], #{}), [ - {id(BridgeV2Type, Name, ConnectorName), Conf} + {id(BridgeV2Type, Name, ConnectorName), Conf#{ + bridge_name => bin(Name), + bridge_type => bin(BridgeV2Type) + }} || {Name, Conf} <- maps:to_list(BridgeV2s), bin(ConnectorName) =:= maps:get(connector, Conf, no_name) ]. @@ -824,7 +872,8 @@ is_valid_bridge_v1(BridgeV1Type, BridgeName) -> BridgeV2Type = ?MODULE:bridge_v1_type_to_bridge_v2_type(BridgeV1Type), case lookup_raw_conf(BridgeV2Type, BridgeName) of {error, _} -> - false; + %% If the bridge v2 does not exist, it is a valid bridge v1 + true; #{connector := ConnectorName} -> ConnectorType = ?MODULE:bridge_v2_type_to_connector_type(BridgeV2Type), ConnectorResourceId = emqx_connector_resource:resource_id(ConnectorType, ConnectorName), @@ -854,25 +903,37 @@ is_bridge_v2_type(_) -> false. list_and_transform_to_bridge_v1() -> - list_with_lookup_fun(fun lookup_and_transform_to_bridge_v1/2). + Bridges = list_with_lookup_fun(fun lookup_and_transform_to_bridge_v1/2), + [B || B <- Bridges, B =/= not_bridge_v1_compatible_error()]. -lookup_and_transform_to_bridge_v1(Type, Name) -> - case lookup(Type, Name) of - {ok, #{raw_config := #{<<"connector">> := ConnectorName}} = BridgeV2} -> - ConnectorType = ?MODULE:bridge_v2_type_to_connector_type(Type), - case emqx_connector:lookup(ConnectorType, ConnectorName) of - {ok, Connector} -> - lookup_and_transform_to_bridge_v1_helper( - Type, BridgeV2, ConnectorType, Connector - ); +lookup_and_transform_to_bridge_v1(BridgeV1Type, Name) -> + case is_valid_bridge_v1(BridgeV1Type, Name) of + true -> + Type = ?MODULE:bridge_v1_type_to_bridge_v2_type(BridgeV1Type), + case lookup(Type, Name) of + {ok, #{raw_config := #{<<"connector">> := ConnectorName}} = BridgeV2} -> + ConnectorType = ?MODULE:bridge_v2_type_to_connector_type(Type), + case emqx_connector:lookup(ConnectorType, ConnectorName) of + {ok, Connector} -> + lookup_and_transform_to_bridge_v1_helper( + BridgeV1Type, Name, Type, BridgeV2, ConnectorType, Connector + ); + Error -> + Error + end; Error -> Error end; - Error -> - Error + false -> + not_bridge_v1_compatible_error() end. -lookup_and_transform_to_bridge_v1_helper(BridgeV2Type, BridgeV2, ConnectorType, Connector) -> +not_bridge_v1_compatible_error() -> + {error, not_bridge_v1_compatible}. + +lookup_and_transform_to_bridge_v1_helper( + BridgeV1Type, BridgeName, BridgeV2Type, BridgeV2, ConnectorType, Connector +) -> ConnectorRawConfig1 = maps:get(raw_config, Connector), ConnectorRawConfig2 = fill_defaults( ConnectorType, @@ -890,7 +951,29 @@ lookup_and_transform_to_bridge_v1_helper(BridgeV2Type, BridgeV2, ConnectorType, BridgeV1Config1 = maps:remove(<<"connector">>, BridgeV2RawConfig2), BridgeV1Config2 = maps:merge(BridgeV1Config1, ConnectorRawConfig2), BridgeV1 = maps:put(raw_config, BridgeV1Config2, BridgeV2), - {ok, BridgeV1}. + BridgeV1_1 = maps:remove(status, BridgeV1), + BridgeV2Status = maps:get(status, BridgeV2, undefined), + ResourceData1 = maps:get(resource_data, BridgeV1_1, #{}), + %% Replace id in resouce data + BridgeV1Id = <<"bridge:", (bin(BridgeV1Type))/binary, ":", (bin(BridgeName))/binary>>, + ResourceData2 = maps:put(id, BridgeV1Id, ResourceData1), + ConnectorStatus = maps:get(status, ResourceData2, undefined), + case ConnectorStatus of + connected -> + case BridgeV2Status of + <<"connected">> -> + %% No need to modify the status + {ok, BridgeV1_1#{resource_data => ResourceData2}}; + NotConnected -> + ResourceData3 = maps:put(status, connecting, ResourceData2), + ResourceData4 = maps:put(error, NotConnected, ResourceData3), + BridgeV1_2 = maps:put(resource_data, ResourceData4, BridgeV1_1), + {ok, BridgeV1_2} + end; + _ -> + %% No need to modify the status + {ok, BridgeV1_1} + end. lookup_raw_conf(Type, Name) -> case emqx:get_config([?ROOT_KEY, Type, Name], not_found) of @@ -901,6 +984,25 @@ lookup_raw_conf(Type, Name) -> end. split_bridge_v1_config_and_create(BridgeV1Type, BridgeName, RawConf) -> + BridgeV2Type = ?MODULE:bridge_v1_type_to_bridge_v2_type(BridgeV1Type), + %% Check if the bridge v2 exists + case lookup_raw_conf(BridgeV2Type, BridgeName) of + {error, _} -> + %% If the bridge v2 does not exist, it is a valid bridge v1 + split_bridge_v1_config_and_create_helper(BridgeV1Type, BridgeName, RawConf); + _Conf -> + case is_valid_bridge_v1(BridgeV1Type, BridgeName) of + true -> + %% Remove and create if update operation + bridge_v1_check_deps_and_remove(BridgeV1Type, BridgeName, ignore_deps), + split_bridge_v1_config_and_create_helper(BridgeV1Type, BridgeName, RawConf); + false -> + %% If the bridge v2 exists, it is not a valid bridge v1 + {error, non_compatible_bridge_v2_exists} + end + end. + +split_bridge_v1_config_and_create_helper(BridgeV1Type, BridgeName, RawConf) -> #{ connector_type := ConnectorType, connector_name := NewConnectorName, @@ -922,7 +1024,6 @@ split_bridge_v1_config_and_create(BridgeV1Type, BridgeName, RawConf) -> {ok, _} -> Error; Error -> - %% TODO log error ?SLOG(warning, #{ message => <<"Failed to remove connector after bridge creation failed">>, @@ -938,17 +1039,17 @@ split_bridge_v1_config_and_create(BridgeV1Type, BridgeName, RawConf) -> Error end. -split_and_validate_bridge_v1_config(BridgeType, BridgeName, RawConf) -> +split_and_validate_bridge_v1_config(BridgeV1Type, BridgeName, RawConf) -> %% Create fake global config for the transformation and then call %% emqx_connector_schema:transform_bridges_v1_to_connectors_and_bridges_v2/1 - - ConnectorType = ?MODULE:bridge_v2_type_to_connector_type(BridgeType), + BridgeV2Type = ?MODULE:bridge_v1_type_to_bridge_v2_type(BridgeV1Type), + ConnectorType = ?MODULE:bridge_v2_type_to_connector_type(BridgeV2Type), %% Needed so name confligts will ba avoided CurrentConnectorsConfig = emqx:get_raw_config([connectors], #{}), FakeGlobalConfig = #{ <<"connectors">> => CurrentConnectorsConfig, <<"bridges">> => #{ - bin(BridgeType) => #{ + bin(BridgeV1Type) => #{ bin(BridgeName) => RawConf } } @@ -960,7 +1061,7 @@ split_and_validate_bridge_v1_config(BridgeType, BridgeName, RawConf) -> emqx_utils_maps:deep_get( [ bin(?ROOT_KEY), - bin(BridgeType), + bin(BridgeV2Type), bin(BridgeName) ], Output @@ -1004,7 +1105,7 @@ split_and_validate_bridge_v1_config(BridgeType, BridgeName, RawConf) -> } }, <<"bridges_v2">> => #{ - bin(BridgeType) => #{ + bin(BridgeV2Type) => #{ bin(BridgeName) => NewBridgeV2RawConf } } @@ -1021,7 +1122,7 @@ split_and_validate_bridge_v1_config(BridgeType, BridgeName, RawConf) -> connector_type => ConnectorType, connector_name => NewConnectorName, connector_conf => NewConnectorRawConf, - bridge_v2_type => BridgeType, + bridge_v2_type => BridgeV2Type, bridge_v2_name => BridgeName, bridge_v2_conf => NewBridgeV2RawConf } @@ -1038,18 +1139,19 @@ bridge_v1_create_dry_run(BridgeType, RawConfig0) -> connector_type := _ConnectorType, connector_name := _NewConnectorName, connector_conf := ConnectorRawConf, - bridge_v2_type := BridgeType, + bridge_v2_type := BridgeV2Type, bridge_v2_name := _BridgeName, bridge_v2_conf := BridgeV2RawConf } = split_and_validate_bridge_v1_config(BridgeType, TmpName, RawConf), - create_dry_run_helper(BridgeType, ConnectorRawConf, BridgeV2RawConf). + create_dry_run_helper(BridgeV2Type, ConnectorRawConf, BridgeV2RawConf). -bridge_v1_check_deps_and_remove(BridgeType, BridgeName, RemoveDeps) -> +bridge_v1_check_deps_and_remove(BridgeV1Type, BridgeName, RemoveDeps) -> + BridgeV2Type = ?MODULE:bridge_v1_type_to_bridge_v2_type(BridgeV1Type), bridge_v1_check_deps_and_remove( - BridgeType, + BridgeV2Type, BridgeName, RemoveDeps, - lookup_raw_conf(BridgeType, BridgeName) + lookup_raw_conf(BridgeV2Type, BridgeName) ). bridge_v1_check_deps_and_remove( diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl index 7d194ef1b..0d6ee86f0 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl @@ -258,7 +258,8 @@ t_is_valid_bridge_v1(_) -> {ok, _} = emqx_bridge_v2:remove(bridge_type(), my_test_bridge), true = emqx_bridge_v2:is_valid_bridge_v1(bridge_v1_type, my_test_bridge_2), {ok, _} = emqx_bridge_v2:remove(bridge_type(), my_test_bridge_2), - false = emqx_bridge_v2:is_valid_bridge_v1(bridge_v1_type, my_test_bridge), + %% Non existing bridge is a valid Bridge V1 + true = emqx_bridge_v2:is_valid_bridge_v1(bridge_v1_type, my_test_bridge), ok. t_manual_health_check(_) -> @@ -599,7 +600,7 @@ t_remove_single_connector_being_referenced_without_active_channels(_Config) -> ?assertMatch({ok, _}, emqx_connector:remove(con_type(), con_name())), %% we no longer have connector data if this happens... ?assertMatch( - {ok, #{resource_data := undefined}}, + {ok, #{resource_data := #{}}}, emqx_bridge_v2:lookup(bridge_type(), BridgeName) ), ok @@ -638,7 +639,7 @@ t_remove_multiple_connectors_being_referenced_without_channels(_Config) -> ), %% we no longer have connector data if this happens... ?assertMatch( - {ok, #{resource_data := undefined}}, + {ok, #{resource_data := #{}}}, emqx_bridge_v2:lookup(bridge_type(), BridgeName) ), ok