feat: add compatibilty layer function for checking if valid bridge_v1
This commit is contained in:
parent
e2b4fb3bda
commit
d8a9778d7c
|
@ -70,7 +70,8 @@
|
||||||
-export([
|
-export([
|
||||||
bridge_v2_type_to_connector_type/1,
|
bridge_v2_type_to_connector_type/1,
|
||||||
id/2,
|
id/2,
|
||||||
id/3
|
id/3,
|
||||||
|
is_valid_bridge_v1/2
|
||||||
]).
|
]).
|
||||||
|
|
||||||
%% Config Update Handler API
|
%% Config Update Handler API
|
||||||
|
@ -752,8 +753,30 @@ unpack_bridge_conf(Type, PackedConf, TopLevelConf) ->
|
||||||
%%====================================================================
|
%%====================================================================
|
||||||
%% Compatibility API
|
%% Compatibility API
|
||||||
%%====================================================================
|
%%====================================================================
|
||||||
|
|
||||||
|
%% Check if the bridge can be converted to a valid bridge v1
|
||||||
|
%%
|
||||||
|
%% * The corresponding bridge v2 should exist
|
||||||
|
%% * The connector for the bridge v2 should have exactly on channel
|
||||||
|
is_valid_bridge_v1(BridgeV1Type, BridgeName) ->
|
||||||
|
BridgeV2Type = ?MODULE:bridge_v1_type_to_bridge_v2_type(BridgeV1Type),
|
||||||
|
case lookup_raw_conf(BridgeV2Type, BridgeName) of
|
||||||
|
{error, _} ->
|
||||||
|
false;
|
||||||
|
#{connector := ConnectorName} ->
|
||||||
|
ConnectorType = ?MODULE:bridge_v2_type_to_connector_type(BridgeV2Type),
|
||||||
|
ConnectorResourceId = emqx_connector_resource:resource_id(ConnectorType, ConnectorName),
|
||||||
|
{ok, Channels} = emqx_resource:get_channels(ConnectorResourceId),
|
||||||
|
case Channels of
|
||||||
|
[_Channel] ->
|
||||||
|
true;
|
||||||
|
_ ->
|
||||||
|
false
|
||||||
|
end
|
||||||
|
end.
|
||||||
|
|
||||||
bridge_v1_type_to_bridge_v2_type(Bin) when is_binary(Bin) ->
|
bridge_v1_type_to_bridge_v2_type(Bin) when is_binary(Bin) ->
|
||||||
bridge_v1_type_to_bridge_v2_type(binary_to_existing_atom(Bin));
|
?MODULE:bridge_v1_type_to_bridge_v2_type(binary_to_existing_atom(Bin));
|
||||||
bridge_v1_type_to_bridge_v2_type(kafka) ->
|
bridge_v1_type_to_bridge_v2_type(kafka) ->
|
||||||
kafka.
|
kafka.
|
||||||
|
|
||||||
|
@ -985,7 +1008,7 @@ connector_has_channels(BridgeV2Type, ConnectorName) ->
|
||||||
bridge_v1_id_to_connector_resource_id(BridgeId) ->
|
bridge_v1_id_to_connector_resource_id(BridgeId) ->
|
||||||
case binary:split(BridgeId, <<":">>) of
|
case binary:split(BridgeId, <<":">>) of
|
||||||
[Type, Name] ->
|
[Type, Name] ->
|
||||||
BridgeV2Type = bin(bridge_v1_type_to_bridge_v2_type(Type)),
|
BridgeV2Type = bin(?MODULE:bridge_v1_type_to_bridge_v2_type(Type)),
|
||||||
ConnectorName =
|
ConnectorName =
|
||||||
case lookup_raw_conf(BridgeV2Type, Name) of
|
case lookup_raw_conf(BridgeV2Type, Name) of
|
||||||
#{connector := Con} ->
|
#{connector := Con} ->
|
||||||
|
|
|
@ -66,7 +66,7 @@ bridge_schema() ->
|
||||||
|
|
||||||
bridge_config() ->
|
bridge_config() ->
|
||||||
#{
|
#{
|
||||||
<<"connector">> => con_name()
|
<<"connector">> => atom_to_binary(con_name())
|
||||||
}.
|
}.
|
||||||
|
|
||||||
all() ->
|
all() ->
|
||||||
|
@ -87,6 +87,7 @@ init_per_suite(Config) ->
|
||||||
|
|
||||||
meck:new(emqx_bridge_v2, [passthrough, no_link]),
|
meck:new(emqx_bridge_v2, [passthrough, no_link]),
|
||||||
meck:expect(emqx_bridge_v2, bridge_v2_type_to_connector_type, 1, con_type()),
|
meck:expect(emqx_bridge_v2, bridge_v2_type_to_connector_type, 1, con_type()),
|
||||||
|
meck:expect(emqx_bridge_v2, bridge_v1_type_to_bridge_v2_type, 1, bridge_type()),
|
||||||
|
|
||||||
_ = application:load(emqx_conf),
|
_ = application:load(emqx_conf),
|
||||||
ok = emqx_common_test_helpers:start_apps(start_apps()),
|
ok = emqx_common_test_helpers:start_apps(start_apps()),
|
||||||
|
@ -166,3 +167,15 @@ t_create_dry_run_fail_get_channel_status(_) ->
|
||||||
t_create_dry_run_connector_does_not_exist(_) ->
|
t_create_dry_run_connector_does_not_exist(_) ->
|
||||||
BridgeConf = (bridge_config())#{<<"connector">> => <<"connector_does_not_exist">>},
|
BridgeConf = (bridge_config())#{<<"connector">> => <<"connector_does_not_exist">>},
|
||||||
{error, _} = emqx_bridge_v2:create_dry_run(bridge_type(), BridgeConf).
|
{error, _} = emqx_bridge_v2:create_dry_run(bridge_type(), BridgeConf).
|
||||||
|
|
||||||
|
t_is_valid_bridge_v1(_) ->
|
||||||
|
{ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge, bridge_config()),
|
||||||
|
true = emqx_bridge_v2:is_valid_bridge_v1(bridge_v1_type, my_test_bridge),
|
||||||
|
%% Add another channel/bridge to the connector
|
||||||
|
{ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge_2, bridge_config()),
|
||||||
|
false = emqx_bridge_v2:is_valid_bridge_v1(bridge_v1_type, my_test_bridge),
|
||||||
|
{ok, _} = emqx_bridge_v2:remove(bridge_type(), my_test_bridge),
|
||||||
|
true = emqx_bridge_v2:is_valid_bridge_v1(bridge_v1_type, my_test_bridge_2),
|
||||||
|
{ok, _} = emqx_bridge_v2:remove(bridge_type(), my_test_bridge_2),
|
||||||
|
false = emqx_bridge_v2:is_valid_bridge_v1(bridge_v1_type, my_test_bridge),
|
||||||
|
ok.
|
||||||
|
|
|
@ -449,7 +449,7 @@ health_check(ResId) ->
|
||||||
channel_health_check(ResId, ChannelId) ->
|
channel_health_check(ResId, ChannelId) ->
|
||||||
emqx_resource_manager:channel_health_check(ResId, ChannelId).
|
emqx_resource_manager:channel_health_check(ResId, ChannelId).
|
||||||
|
|
||||||
-spec get_channels(resource_id()) -> [{binary(), map()}].
|
-spec get_channels(resource_id()) -> {ok, [{binary(), map()}]}.
|
||||||
get_channels(ResId) ->
|
get_channels(ResId) ->
|
||||||
emqx_resource_manager:get_channels(ResId).
|
emqx_resource_manager:get_channels(ResId).
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue