diff --git a/apps/emqx_bridge/include/emqx_bridge_v2_register.hrl b/apps/emqx_bridge/include/emqx_bridge_v2_register.hrl index 362b671ad..186564bef 100644 --- a/apps/emqx_bridge/include/emqx_bridge_v2_register.hrl +++ b/apps/emqx_bridge/include/emqx_bridge_v2_register.hrl @@ -120,8 +120,15 @@ internal_register_bridge_type_with_lock(BridgeTypeInfo) -> InfoMap7, maps:get(schema_struct_field, BridgeTypeInfo) ), - - ok = persistent_term:put(internal_emqx_bridge_v2_persistent_term_info_key(), InfoMap8). + InfoMap9 = emqx_utils_maps:deep_force_put( + [ + bridge_v2_type_to_bridge_v1_type, + maps:get(bridge_v2_type_name, BridgeTypeInfo) + ], + InfoMap8, + maps:get(bridge_v1_type_name, BridgeTypeInfo) + ), + ok = persistent_term:put(internal_emqx_bridge_v2_persistent_term_info_key(), InfoMap9). internal_maybe_create_initial_bridge_v2_info_map() -> case persistent_term:get(internal_emqx_bridge_v2_persistent_term_info_key(), undefined) of @@ -131,6 +138,7 @@ internal_maybe_create_initial_bridge_v2_info_map() -> #{ bridge_v2_type_names => #{}, bridge_v1_type_to_bridge_v2_type => #{}, + bridge_v2_type_to_bridge_v1_type => #{}, bridge_v2_type_to_connector_type => #{}, bridge_v2_type_to_schema_module => #{}, bridge_v2_type_to_schema_struct_field => #{} diff --git a/apps/emqx_bridge/src/emqx_bridge_lib.erl b/apps/emqx_bridge/src/emqx_bridge_lib.erl index b11344ee1..e8ea422f6 100644 --- a/apps/emqx_bridge/src/emqx_bridge_lib.erl +++ b/apps/emqx_bridge/src/emqx_bridge_lib.erl @@ -53,20 +53,16 @@ maybe_withdraw_rule_action_loop([BridgeId | More], DeleteActions) -> end. %% @doc Kafka producer bridge renamed from 'kafka' to 'kafka_bridge' since 5.3.1. -upgrade_type(kafka) -> - kafka_producer; -upgrade_type(<<"kafka">>) -> - <<"kafka_producer">>; -upgrade_type(Other) -> - Other. +upgrade_type(Type) when is_atom(Type) -> + emqx_bridge_v2:bridge_v1_type_to_bridge_v2_type(Type); +upgrade_type(Type) when is_binary(Type) -> + atom_to_binary(emqx_bridge_v2:bridge_v1_type_to_bridge_v2_type(Type)). %% @doc Kafka producer bridge type renamed from 'kafka' to 'kafka_bridge' since 5.3.1 -downgrade_type(kafka_producer) -> - kafka; -downgrade_type(<<"kafka_producer">>) -> - <<"kafka">>; -downgrade_type(Other) -> - Other. +downgrade_type(Type) when is_atom(Type) -> + emqx_bridge_v2:bridge_v2_type_to_bridge_v1_type(Type); +downgrade_type(Type) when is_binary(Type) -> + atom_to_binary(emqx_bridge_v2:bridge_v2_type_to_bridge_v1_type(Type)). %% A rule might be referencing an old version bridge type name %% i.e. 'kafka' instead of 'kafka_producer' so we need to try both diff --git a/apps/emqx_bridge/src/emqx_bridge_v2.erl b/apps/emqx_bridge/src/emqx_bridge_v2.erl index 1a0160b46..d4a621452 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2.erl @@ -128,6 +128,8 @@ bridge_v1_split_config_and_create/3, bridge_v1_create_dry_run/2, bridge_v1_type_to_bridge_v2_type/1, + %% Exception from the naming convention: + bridge_v2_type_to_bridge_v1_type/1, bridge_v1_id_to_connector_resource_id/1, bridge_v1_enable_disable/3, bridge_v1_restart/2, @@ -1140,7 +1142,24 @@ bridge_v1_type_to_bridge_v2_type(Type) -> % bridge_v1_type_to_bridge_v2_type_old(kafka_producer) -> % kafka_producer; bridge_v1_type_to_bridge_v2_type_old(azure_event_hub_producer) -> - azure_event_hub_producer. + azure_event_hub_producer; +bridge_v1_type_to_bridge_v2_type_old(Type) -> + Type. + +bridge_v2_type_to_bridge_v1_type(Bin) when is_binary(Bin) -> + ?MODULE:bridge_v2_type_to_bridge_v1_type(binary_to_existing_atom(Bin)); +bridge_v2_type_to_bridge_v1_type(Type) -> + BridgeV2InfoMap = persistent_term:get(internal_emqx_bridge_v2_persistent_term_info_key()), + BridgeV2TypeToBridgeV1Type = maps:get(bridge_v2_type_to_bridge_v1_type, BridgeV2InfoMap), + case maps:get(Type, BridgeV2TypeToBridgeV1Type, undefined) of + undefined -> bridge_v2_type_to_bridge_v1_type_old(Type); + BridgeV1Type -> BridgeV1Type + end. + +bridge_v2_type_to_bridge_v1_type_old(azure_event_hub_producer) -> + azure_event_hub_producer; +bridge_v2_type_to_bridge_v1_type_old(Type) -> + Type. %% This function should return true for all inputs that are bridge V1 types for %% bridges that have been refactored to bridge V2s, and for all all bridge V2 diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_api_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_api_SUITE.erl index a4e659ce6..c2c52b6a6 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_api_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_api_SUITE.erl @@ -311,6 +311,15 @@ t_rule_engine(_) -> {400, _} = emqx_rule_engine_api:'/rule_engine'(put, #{body => #{<<"something">> => <<"weird">>}}). t_downgrade_bridge_type(_) -> + case emqx_release:edition() of + ee -> + do_test_downgrade_bridge_type(); + ce -> + %% downgrade is not supported in CE + ok + end. + +do_test_downgrade_bridge_type() -> #{id := RuleId} = create_rule((?SIMPLE_RULE(<<>>))#{<<"actions">> => [<<"kafka:name">>]}), ?assertMatch( %% returns a bridges_v2 ID