fix(bridge_v1): no hard coded downgrade and upgrade type functions

This commit is contained in:
Kjell Winblad 2023-11-06 12:03:11 +01:00 committed by Ivan Dyachkov
parent a6aa81b548
commit d26a1b9afb
4 changed files with 47 additions and 15 deletions

View File

@ -120,8 +120,15 @@ internal_register_bridge_type_with_lock(BridgeTypeInfo) ->
InfoMap7,
maps:get(schema_struct_field, BridgeTypeInfo)
),
ok = persistent_term:put(internal_emqx_bridge_v2_persistent_term_info_key(), InfoMap8).
InfoMap9 = emqx_utils_maps:deep_force_put(
[
bridge_v2_type_to_bridge_v1_type,
maps:get(bridge_v2_type_name, BridgeTypeInfo)
],
InfoMap8,
maps:get(bridge_v1_type_name, BridgeTypeInfo)
),
ok = persistent_term:put(internal_emqx_bridge_v2_persistent_term_info_key(), InfoMap9).
internal_maybe_create_initial_bridge_v2_info_map() ->
case persistent_term:get(internal_emqx_bridge_v2_persistent_term_info_key(), undefined) of
@ -131,6 +138,7 @@ internal_maybe_create_initial_bridge_v2_info_map() ->
#{
bridge_v2_type_names => #{},
bridge_v1_type_to_bridge_v2_type => #{},
bridge_v2_type_to_bridge_v1_type => #{},
bridge_v2_type_to_connector_type => #{},
bridge_v2_type_to_schema_module => #{},
bridge_v2_type_to_schema_struct_field => #{}

View File

@ -53,20 +53,16 @@ maybe_withdraw_rule_action_loop([BridgeId | More], DeleteActions) ->
end.
%% @doc Kafka producer bridge renamed from 'kafka' to 'kafka_bridge' since 5.3.1.
upgrade_type(kafka) ->
kafka_producer;
upgrade_type(<<"kafka">>) ->
<<"kafka_producer">>;
upgrade_type(Other) ->
Other.
upgrade_type(Type) when is_atom(Type) ->
emqx_bridge_v2:bridge_v1_type_to_bridge_v2_type(Type);
upgrade_type(Type) when is_binary(Type) ->
atom_to_binary(emqx_bridge_v2:bridge_v1_type_to_bridge_v2_type(Type)).
%% @doc Kafka producer bridge type renamed from 'kafka' to 'kafka_bridge' since 5.3.1
downgrade_type(kafka_producer) ->
kafka;
downgrade_type(<<"kafka_producer">>) ->
<<"kafka">>;
downgrade_type(Other) ->
Other.
downgrade_type(Type) when is_atom(Type) ->
emqx_bridge_v2:bridge_v2_type_to_bridge_v1_type(Type);
downgrade_type(Type) when is_binary(Type) ->
atom_to_binary(emqx_bridge_v2:bridge_v2_type_to_bridge_v1_type(Type)).
%% A rule might be referencing an old version bridge type name
%% i.e. 'kafka' instead of 'kafka_producer' so we need to try both

View File

@ -128,6 +128,8 @@
bridge_v1_split_config_and_create/3,
bridge_v1_create_dry_run/2,
bridge_v1_type_to_bridge_v2_type/1,
%% Exception from the naming convention:
bridge_v2_type_to_bridge_v1_type/1,
bridge_v1_id_to_connector_resource_id/1,
bridge_v1_enable_disable/3,
bridge_v1_restart/2,
@ -1140,7 +1142,24 @@ bridge_v1_type_to_bridge_v2_type(Type) ->
% bridge_v1_type_to_bridge_v2_type_old(kafka_producer) ->
% kafka_producer;
bridge_v1_type_to_bridge_v2_type_old(azure_event_hub_producer) ->
azure_event_hub_producer.
azure_event_hub_producer;
bridge_v1_type_to_bridge_v2_type_old(Type) ->
Type.
bridge_v2_type_to_bridge_v1_type(Bin) when is_binary(Bin) ->
?MODULE:bridge_v2_type_to_bridge_v1_type(binary_to_existing_atom(Bin));
bridge_v2_type_to_bridge_v1_type(Type) ->
BridgeV2InfoMap = persistent_term:get(internal_emqx_bridge_v2_persistent_term_info_key()),
BridgeV2TypeToBridgeV1Type = maps:get(bridge_v2_type_to_bridge_v1_type, BridgeV2InfoMap),
case maps:get(Type, BridgeV2TypeToBridgeV1Type, undefined) of
undefined -> bridge_v2_type_to_bridge_v1_type_old(Type);
BridgeV1Type -> BridgeV1Type
end.
bridge_v2_type_to_bridge_v1_type_old(azure_event_hub_producer) ->
azure_event_hub_producer;
bridge_v2_type_to_bridge_v1_type_old(Type) ->
Type.
%% This function should return true for all inputs that are bridge V1 types for
%% bridges that have been refactored to bridge V2s, and for all all bridge V2

View File

@ -311,6 +311,15 @@ t_rule_engine(_) ->
{400, _} = emqx_rule_engine_api:'/rule_engine'(put, #{body => #{<<"something">> => <<"weird">>}}).
t_downgrade_bridge_type(_) ->
case emqx_release:edition() of
ee ->
do_test_downgrade_bridge_type();
ce ->
%% downgrade is not supported in CE
ok
end.
do_test_downgrade_bridge_type() ->
#{id := RuleId} = create_rule((?SIMPLE_RULE(<<>>))#{<<"actions">> => [<<"kafka:name">>]}),
?assertMatch(
%% returns a bridges_v2 ID