fix(bridges_v1): don't list v2-only bridges in API
Fixes https://emqx.atlassian.net/browse/EMQX-11614
This commit is contained in:
parent
d3b32b64e1
commit
b61ccf3373
|
@ -24,6 +24,7 @@
|
||||||
action_type_to_connector_type/1,
|
action_type_to_connector_type/1,
|
||||||
action_type_to_bridge_v1_type/2,
|
action_type_to_bridge_v1_type/2,
|
||||||
bridge_v1_type_to_action_type/1,
|
bridge_v1_type_to_action_type/1,
|
||||||
|
bridge_v1_type_name/1,
|
||||||
is_action_type/1,
|
is_action_type/1,
|
||||||
registered_schema_modules/0,
|
registered_schema_modules/0,
|
||||||
connector_action_config_to_bridge_v1_config/2,
|
connector_action_config_to_bridge_v1_config/2,
|
||||||
|
@ -144,6 +145,20 @@ get_confs(ActionType, #{<<"connector">> := ConnectorName} = ActionConfig) ->
|
||||||
get_confs(_, _) ->
|
get_confs(_, _) ->
|
||||||
undefined.
|
undefined.
|
||||||
|
|
||||||
|
%% We need this hack because of the bugs introduced by associating v2/action/source types
|
||||||
|
%% with v1 types unconditionally, like `mongodb' being a "valid" V1 bridge type, or
|
||||||
|
%% `confluent_producer', which has no v1 equivalent....
|
||||||
|
bridge_v1_type_name(ActionTypeBin) when is_binary(ActionTypeBin) ->
|
||||||
|
bridge_v1_type_name(binary_to_existing_atom(ActionTypeBin));
|
||||||
|
bridge_v1_type_name(ActionType) ->
|
||||||
|
Module = get_action_info_module(ActionType),
|
||||||
|
case erlang:function_exported(Module, bridge_v1_type_name, 0) of
|
||||||
|
true ->
|
||||||
|
{ok, Module:bridge_v1_type_name()};
|
||||||
|
false ->
|
||||||
|
{error, no_v1_equivalent}
|
||||||
|
end.
|
||||||
|
|
||||||
%% This function should return true for all inputs that are bridge V1 types for
|
%% This function should return true for all inputs that are bridge V1 types for
|
||||||
%% bridges that have been refactored to bridge V2s, and for all all bridge V2
|
%% bridges that have been refactored to bridge V2s, and for all all bridge V2
|
||||||
%% types. For everything else the function should return false.
|
%% types. For everything else the function should return false.
|
||||||
|
|
|
@ -621,6 +621,7 @@ lookup_from_all_nodes(BridgeType, BridgeName, SuccCode) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
lookup_from_local_node(ActionType, ActionName) ->
|
lookup_from_local_node(ActionType, ActionName) ->
|
||||||
|
%% TODO: BUG: shouldn't accept an action type here, only V1 types....
|
||||||
case emqx_bridge:lookup(ActionType, ActionName) of
|
case emqx_bridge:lookup(ActionType, ActionName) of
|
||||||
{ok, Res} -> {ok, format_resource(Res, node())};
|
{ok, Res} -> {ok, format_resource(Res, node())};
|
||||||
Error -> Error
|
Error -> Error
|
||||||
|
|
|
@ -1086,7 +1086,8 @@ bridge_v1_lookup_and_transform(ActionType, Name) ->
|
||||||
case lookup(ActionType, Name) of
|
case lookup(ActionType, Name) of
|
||||||
{ok, #{raw_config := #{<<"connector">> := ConnectorName} = RawConfig} = ActionConfig} ->
|
{ok, #{raw_config := #{<<"connector">> := ConnectorName} = RawConfig} = ActionConfig} ->
|
||||||
BridgeV1Type = ?MODULE:bridge_v2_type_to_bridge_v1_type(ActionType, RawConfig),
|
BridgeV1Type = ?MODULE:bridge_v2_type_to_bridge_v1_type(ActionType, RawConfig),
|
||||||
case ?MODULE:bridge_v1_is_valid(BridgeV1Type, Name) of
|
HasBridgeV1Equivalent = has_bridge_v1_equivalent(ActionType),
|
||||||
|
case HasBridgeV1Equivalent andalso ?MODULE:bridge_v1_is_valid(BridgeV1Type, Name) of
|
||||||
true ->
|
true ->
|
||||||
ConnectorType = connector_type(ActionType),
|
ConnectorType = connector_type(ActionType),
|
||||||
case emqx_connector:lookup(ConnectorType, ConnectorName) of
|
case emqx_connector:lookup(ConnectorType, ConnectorName) of
|
||||||
|
@ -1112,6 +1113,12 @@ bridge_v1_lookup_and_transform(ActionType, Name) ->
|
||||||
not_bridge_v1_compatible_error() ->
|
not_bridge_v1_compatible_error() ->
|
||||||
{error, not_bridge_v1_compatible}.
|
{error, not_bridge_v1_compatible}.
|
||||||
|
|
||||||
|
has_bridge_v1_equivalent(ActionType) ->
|
||||||
|
case emqx_action_info:bridge_v1_type_name(ActionType) of
|
||||||
|
{ok, _} -> true;
|
||||||
|
{error, no_v1_equivalent} -> false
|
||||||
|
end.
|
||||||
|
|
||||||
connector_raw_config(Connector, ConnectorType) ->
|
connector_raw_config(Connector, ConnectorType) ->
|
||||||
get_raw_with_defaults(Connector, ConnectorType, <<"connectors">>, emqx_connector_schema).
|
get_raw_with_defaults(Connector, ConnectorType, <<"connectors">>, emqx_connector_schema).
|
||||||
|
|
||||||
|
|
|
@ -136,6 +136,9 @@ setup_mocks() ->
|
||||||
end
|
end
|
||||||
),
|
),
|
||||||
|
|
||||||
|
catch meck:new(emqx_action_info, MeckOpts),
|
||||||
|
meck:expect(emqx_action_info, bridge_v1_type_name, 1, {ok, bridge_type()}),
|
||||||
|
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
con_mod() ->
|
con_mod() ->
|
||||||
|
|
|
@ -343,6 +343,42 @@ probe_bridge_api(BridgeType, BridgeName, BridgeConfig) ->
|
||||||
ct:pal("bridge probe result: ~p", [Res]),
|
ct:pal("bridge probe result: ~p", [Res]),
|
||||||
Res.
|
Res.
|
||||||
|
|
||||||
|
list_bridges_http_api_v1() ->
|
||||||
|
Path = emqx_mgmt_api_test_util:api_path(["bridges"]),
|
||||||
|
ct:pal("list bridges (http v1)"),
|
||||||
|
Res = request(get, Path, _Params = []),
|
||||||
|
ct:pal("list bridges (http v1) result:\n ~p", [Res]),
|
||||||
|
Res.
|
||||||
|
|
||||||
|
list_actions_http_api() ->
|
||||||
|
Path = emqx_mgmt_api_test_util:api_path(["actions"]),
|
||||||
|
ct:pal("list actions (http v2)"),
|
||||||
|
Res = request(get, Path, _Params = []),
|
||||||
|
ct:pal("list actions (http v2) result:\n ~p", [Res]),
|
||||||
|
Res.
|
||||||
|
|
||||||
|
list_connectors_http_api() ->
|
||||||
|
Path = emqx_mgmt_api_test_util:api_path(["connectors"]),
|
||||||
|
ct:pal("list connectors"),
|
||||||
|
Res = request(get, Path, _Params = []),
|
||||||
|
ct:pal("list connectors result:\n ~p", [Res]),
|
||||||
|
Res.
|
||||||
|
|
||||||
|
update_rule_http(RuleId, Params) ->
|
||||||
|
Path = emqx_mgmt_api_test_util:api_path(["rules", RuleId]),
|
||||||
|
ct:pal("update rule ~p:\n ~p", [RuleId, Params]),
|
||||||
|
Res = request(put, Path, Params),
|
||||||
|
ct:pal("update rule ~p result:\n ~p", [RuleId, Res]),
|
||||||
|
Res.
|
||||||
|
|
||||||
|
enable_rule_http(RuleId) ->
|
||||||
|
Params = #{<<"enable">> => true},
|
||||||
|
update_rule_http(RuleId, Params).
|
||||||
|
|
||||||
|
is_rule_enabled(RuleId) ->
|
||||||
|
{ok, #{enable := Enable}} = emqx_rule_engine:get_rule(RuleId),
|
||||||
|
Enable.
|
||||||
|
|
||||||
try_decode_error(Body0) ->
|
try_decode_error(Body0) ->
|
||||||
case emqx_utils_json:safe_decode(Body0, [return_maps]) of
|
case emqx_utils_json:safe_decode(Body0, [return_maps]) of
|
||||||
{ok, #{<<"message">> := Msg0} = Body1} ->
|
{ok, #{<<"message">> := Msg0} = Body1} ->
|
||||||
|
|
|
@ -10,8 +10,8 @@
|
||||||
-include_lib("common_test/include/ct.hrl").
|
-include_lib("common_test/include/ct.hrl").
|
||||||
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||||
|
|
||||||
-define(BRIDGE_TYPE, confluent_producer).
|
-define(ACTION_TYPE, confluent_producer).
|
||||||
-define(BRIDGE_TYPE_BIN, <<"confluent_producer">>).
|
-define(ACTION_TYPE_BIN, <<"confluent_producer">>).
|
||||||
-define(CONNECTOR_TYPE, confluent_producer).
|
-define(CONNECTOR_TYPE, confluent_producer).
|
||||||
-define(CONNECTOR_TYPE_BIN, <<"confluent_producer">>).
|
-define(CONNECTOR_TYPE_BIN, <<"confluent_producer">>).
|
||||||
-define(KAFKA_BRIDGE_TYPE, kafka_producer).
|
-define(KAFKA_BRIDGE_TYPE, kafka_producer).
|
||||||
|
@ -93,7 +93,7 @@ common_init_per_testcase(TestCase, Config) ->
|
||||||
{connector_type, ?CONNECTOR_TYPE},
|
{connector_type, ?CONNECTOR_TYPE},
|
||||||
{connector_name, Name},
|
{connector_name, Name},
|
||||||
{connector_config, ConnectorConfig},
|
{connector_config, ConnectorConfig},
|
||||||
{bridge_type, ?BRIDGE_TYPE},
|
{bridge_type, ?ACTION_TYPE},
|
||||||
{bridge_name, Name},
|
{bridge_name, Name},
|
||||||
{bridge_config, BridgeConfig}
|
{bridge_config, BridgeConfig}
|
||||||
| Config
|
| Config
|
||||||
|
@ -212,7 +212,7 @@ serde_roundtrip(InnerConfigMap0) ->
|
||||||
InnerConfigMap.
|
InnerConfigMap.
|
||||||
|
|
||||||
parse_and_check_bridge_config(InnerConfigMap, Name) ->
|
parse_and_check_bridge_config(InnerConfigMap, Name) ->
|
||||||
TypeBin = ?BRIDGE_TYPE_BIN,
|
TypeBin = ?ACTION_TYPE_BIN,
|
||||||
RawConf = #{<<"bridges">> => #{TypeBin => #{Name => InnerConfigMap}}},
|
RawConf = #{<<"bridges">> => #{TypeBin => #{Name => InnerConfigMap}}},
|
||||||
hocon_tconf:check_plain(emqx_bridge_v2_schema, RawConf, #{required => false, atom_key => false}),
|
hocon_tconf:check_plain(emqx_bridge_v2_schema, RawConf, #{required => false, atom_key => false}),
|
||||||
InnerConfigMap.
|
InnerConfigMap.
|
||||||
|
@ -341,3 +341,43 @@ t_same_name_confluent_kafka_bridges(Config) ->
|
||||||
end
|
end
|
||||||
),
|
),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
t_list_v1_bridges(Config) ->
|
||||||
|
?check_trace(
|
||||||
|
begin
|
||||||
|
{ok, _} = emqx_bridge_v2_testlib:create_bridge_api(Config),
|
||||||
|
|
||||||
|
?assertMatch(
|
||||||
|
{error, no_v1_equivalent},
|
||||||
|
emqx_action_info:bridge_v1_type_name(confluent_producer)
|
||||||
|
),
|
||||||
|
|
||||||
|
?assertMatch(
|
||||||
|
{ok, {{_, 200, _}, _, []}}, emqx_bridge_v2_testlib:list_bridges_http_api_v1()
|
||||||
|
),
|
||||||
|
?assertMatch(
|
||||||
|
{ok, {{_, 200, _}, _, [_]}}, emqx_bridge_v2_testlib:list_actions_http_api()
|
||||||
|
),
|
||||||
|
?assertMatch(
|
||||||
|
{ok, {{_, 200, _}, _, [_]}}, emqx_bridge_v2_testlib:list_connectors_http_api()
|
||||||
|
),
|
||||||
|
|
||||||
|
RuleTopic = <<"t/c">>,
|
||||||
|
{ok, #{<<"id">> := RuleId0}} =
|
||||||
|
emqx_bridge_v2_testlib:create_rule_and_action_http(
|
||||||
|
?ACTION_TYPE_BIN,
|
||||||
|
RuleTopic,
|
||||||
|
Config,
|
||||||
|
#{overrides => #{enable => true}}
|
||||||
|
),
|
||||||
|
?assert(emqx_bridge_v2_testlib:is_rule_enabled(RuleId0)),
|
||||||
|
?assertMatch(
|
||||||
|
{ok, {{_, 200, _}, _, _}}, emqx_bridge_v2_testlib:enable_rule_http(RuleId0)
|
||||||
|
),
|
||||||
|
?assert(emqx_bridge_v2_testlib:is_rule_enabled(RuleId0)),
|
||||||
|
|
||||||
|
ok
|
||||||
|
end,
|
||||||
|
[]
|
||||||
|
),
|
||||||
|
ok.
|
||||||
|
|
|
@ -621,24 +621,36 @@ validate_bridge_existence_in_actions(#{actions := Actions, from := Froms} = _Rul
|
||||||
BridgeIDs0 =
|
BridgeIDs0 =
|
||||||
lists:map(
|
lists:map(
|
||||||
fun(BridgeID) ->
|
fun(BridgeID) ->
|
||||||
emqx_bridge_resource:parse_bridge_id(BridgeID, #{atom_name => false})
|
%% FIXME: this supposedly returns an upgraded type, but it's fuzzy: it
|
||||||
|
%% returns v1 types when attempting to "upgrade".....
|
||||||
|
{Type, Name} =
|
||||||
|
emqx_bridge_resource:parse_bridge_id(BridgeID, #{atom_name => false}),
|
||||||
|
case emqx_action_info:is_action_type(Type) of
|
||||||
|
true -> {action, Type, Name};
|
||||||
|
false -> {bridge_v1, Type, Name}
|
||||||
|
end
|
||||||
end,
|
end,
|
||||||
get_referenced_hookpoints(Froms)
|
get_referenced_hookpoints(Froms)
|
||||||
),
|
),
|
||||||
BridgeIDs1 =
|
BridgeIDs1 =
|
||||||
lists:filtermap(
|
lists:filtermap(
|
||||||
fun
|
fun
|
||||||
({bridge_v2, Type, Name}) -> {true, {Type, Name}};
|
({bridge_v2, Type, Name}) -> {true, {action, Type, Name}};
|
||||||
({bridge, Type, Name, _ResId}) -> {true, {Type, Name}};
|
({bridge, Type, Name, _ResId}) -> {true, {bridge_v1, Type, Name}};
|
||||||
(_) -> false
|
(_) -> false
|
||||||
end,
|
end,
|
||||||
Actions
|
Actions
|
||||||
),
|
),
|
||||||
NonExistentBridgeIDs =
|
NonExistentBridgeIDs =
|
||||||
lists:filter(
|
lists:filter(
|
||||||
fun({Type, Name}) ->
|
fun({Kind, Type, Name}) ->
|
||||||
|
LookupFn =
|
||||||
|
case Kind of
|
||||||
|
action -> fun emqx_bridge_v2:lookup/2;
|
||||||
|
bridge_v1 -> fun emqx_bridge:lookup/2
|
||||||
|
end,
|
||||||
try
|
try
|
||||||
case emqx_bridge:lookup(Type, Name) of
|
case LookupFn(Type, Name) of
|
||||||
{ok, _} -> false;
|
{ok, _} -> false;
|
||||||
{error, _} -> true
|
{error, _} -> true
|
||||||
end
|
end
|
||||||
|
|
Loading…
Reference in New Issue