From b61ccf3373f486b02047e82cb0b1fbd55d96e266 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 20 Dec 2023 09:45:56 -0300 Subject: [PATCH] fix(bridges_v1): don't list v2-only bridges in API Fixes https://emqx.atlassian.net/browse/EMQX-11614 --- apps/emqx_bridge/src/emqx_action_info.erl | 15 ++++++ apps/emqx_bridge/src/emqx_bridge_api.erl | 1 + apps/emqx_bridge/src/emqx_bridge_v2.erl | 9 +++- ...qx_bridge_v1_compatibility_layer_SUITE.erl | 3 ++ .../test/emqx_bridge_v2_testlib.erl | 36 ++++++++++++++ .../emqx_bridge_confluent_producer_SUITE.erl | 48 +++++++++++++++++-- .../emqx_rule_engine/src/emqx_rule_engine.erl | 22 +++++++-- 7 files changed, 124 insertions(+), 10 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_action_info.erl b/apps/emqx_bridge/src/emqx_action_info.erl index c6304d9f7..e9c51edfa 100644 --- a/apps/emqx_bridge/src/emqx_action_info.erl +++ b/apps/emqx_bridge/src/emqx_action_info.erl @@ -24,6 +24,7 @@ action_type_to_connector_type/1, action_type_to_bridge_v1_type/2, bridge_v1_type_to_action_type/1, + bridge_v1_type_name/1, is_action_type/1, registered_schema_modules/0, connector_action_config_to_bridge_v1_config/2, @@ -144,6 +145,20 @@ get_confs(ActionType, #{<<"connector">> := ConnectorName} = ActionConfig) -> get_confs(_, _) -> undefined. +%% We need this hack because of the bugs introduced by associating v2/action/source types +%% with v1 types unconditionally, like `mongodb' being a "valid" V1 bridge type, or +%% `confluent_producer', which has no v1 equivalent.... +bridge_v1_type_name(ActionTypeBin) when is_binary(ActionTypeBin) -> + bridge_v1_type_name(binary_to_existing_atom(ActionTypeBin)); +bridge_v1_type_name(ActionType) -> + Module = get_action_info_module(ActionType), + case erlang:function_exported(Module, bridge_v1_type_name, 0) of + true -> + {ok, Module:bridge_v1_type_name()}; + false -> + {error, no_v1_equivalent} + end. + %% This function should return true for all inputs that are bridge V1 types for %% bridges that have been refactored to bridge V2s, and for all all bridge V2 %% types. For everything else the function should return false. diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index edc8da113..1595f040d 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -621,6 +621,7 @@ lookup_from_all_nodes(BridgeType, BridgeName, SuccCode) -> end. lookup_from_local_node(ActionType, ActionName) -> + %% TODO: BUG: shouldn't accept an action type here, only V1 types.... case emqx_bridge:lookup(ActionType, ActionName) of {ok, Res} -> {ok, format_resource(Res, node())}; Error -> Error diff --git a/apps/emqx_bridge/src/emqx_bridge_v2.erl b/apps/emqx_bridge/src/emqx_bridge_v2.erl index 00d3ef6ec..9436ac0ea 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2.erl @@ -1086,7 +1086,8 @@ bridge_v1_lookup_and_transform(ActionType, Name) -> case lookup(ActionType, Name) of {ok, #{raw_config := #{<<"connector">> := ConnectorName} = RawConfig} = ActionConfig} -> BridgeV1Type = ?MODULE:bridge_v2_type_to_bridge_v1_type(ActionType, RawConfig), - case ?MODULE:bridge_v1_is_valid(BridgeV1Type, Name) of + HasBridgeV1Equivalent = has_bridge_v1_equivalent(ActionType), + case HasBridgeV1Equivalent andalso ?MODULE:bridge_v1_is_valid(BridgeV1Type, Name) of true -> ConnectorType = connector_type(ActionType), case emqx_connector:lookup(ConnectorType, ConnectorName) of @@ -1112,6 +1113,12 @@ bridge_v1_lookup_and_transform(ActionType, Name) -> not_bridge_v1_compatible_error() -> {error, not_bridge_v1_compatible}. +has_bridge_v1_equivalent(ActionType) -> + case emqx_action_info:bridge_v1_type_name(ActionType) of + {ok, _} -> true; + {error, no_v1_equivalent} -> false + end. + connector_raw_config(Connector, ConnectorType) -> get_raw_with_defaults(Connector, ConnectorType, <<"connectors">>, emqx_connector_schema). diff --git a/apps/emqx_bridge/test/emqx_bridge_v1_compatibility_layer_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_v1_compatibility_layer_SUITE.erl index b268c127d..dadc0a09c 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v1_compatibility_layer_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v1_compatibility_layer_SUITE.erl @@ -136,6 +136,9 @@ setup_mocks() -> end ), + catch meck:new(emqx_action_info, MeckOpts), + meck:expect(emqx_action_info, bridge_v1_type_name, 1, {ok, bridge_type()}), + ok. con_mod() -> diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl b/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl index e56ead313..1f5373b70 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl @@ -343,6 +343,42 @@ probe_bridge_api(BridgeType, BridgeName, BridgeConfig) -> ct:pal("bridge probe result: ~p", [Res]), Res. +list_bridges_http_api_v1() -> + Path = emqx_mgmt_api_test_util:api_path(["bridges"]), + ct:pal("list bridges (http v1)"), + Res = request(get, Path, _Params = []), + ct:pal("list bridges (http v1) result:\n ~p", [Res]), + Res. + +list_actions_http_api() -> + Path = emqx_mgmt_api_test_util:api_path(["actions"]), + ct:pal("list actions (http v2)"), + Res = request(get, Path, _Params = []), + ct:pal("list actions (http v2) result:\n ~p", [Res]), + Res. + +list_connectors_http_api() -> + Path = emqx_mgmt_api_test_util:api_path(["connectors"]), + ct:pal("list connectors"), + Res = request(get, Path, _Params = []), + ct:pal("list connectors result:\n ~p", [Res]), + Res. + +update_rule_http(RuleId, Params) -> + Path = emqx_mgmt_api_test_util:api_path(["rules", RuleId]), + ct:pal("update rule ~p:\n ~p", [RuleId, Params]), + Res = request(put, Path, Params), + ct:pal("update rule ~p result:\n ~p", [RuleId, Res]), + Res. + +enable_rule_http(RuleId) -> + Params = #{<<"enable">> => true}, + update_rule_http(RuleId, Params). + +is_rule_enabled(RuleId) -> + {ok, #{enable := Enable}} = emqx_rule_engine:get_rule(RuleId), + Enable. + try_decode_error(Body0) -> case emqx_utils_json:safe_decode(Body0, [return_maps]) of {ok, #{<<"message">> := Msg0} = Body1} -> diff --git a/apps/emqx_bridge_confluent/test/emqx_bridge_confluent_producer_SUITE.erl b/apps/emqx_bridge_confluent/test/emqx_bridge_confluent_producer_SUITE.erl index 2977f72cf..420da1275 100644 --- a/apps/emqx_bridge_confluent/test/emqx_bridge_confluent_producer_SUITE.erl +++ b/apps/emqx_bridge_confluent/test/emqx_bridge_confluent_producer_SUITE.erl @@ -10,8 +10,8 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). --define(BRIDGE_TYPE, confluent_producer). --define(BRIDGE_TYPE_BIN, <<"confluent_producer">>). +-define(ACTION_TYPE, confluent_producer). +-define(ACTION_TYPE_BIN, <<"confluent_producer">>). -define(CONNECTOR_TYPE, confluent_producer). -define(CONNECTOR_TYPE_BIN, <<"confluent_producer">>). -define(KAFKA_BRIDGE_TYPE, kafka_producer). @@ -93,7 +93,7 @@ common_init_per_testcase(TestCase, Config) -> {connector_type, ?CONNECTOR_TYPE}, {connector_name, Name}, {connector_config, ConnectorConfig}, - {bridge_type, ?BRIDGE_TYPE}, + {bridge_type, ?ACTION_TYPE}, {bridge_name, Name}, {bridge_config, BridgeConfig} | Config @@ -212,7 +212,7 @@ serde_roundtrip(InnerConfigMap0) -> InnerConfigMap. parse_and_check_bridge_config(InnerConfigMap, Name) -> - TypeBin = ?BRIDGE_TYPE_BIN, + TypeBin = ?ACTION_TYPE_BIN, RawConf = #{<<"bridges">> => #{TypeBin => #{Name => InnerConfigMap}}}, hocon_tconf:check_plain(emqx_bridge_v2_schema, RawConf, #{required => false, atom_key => false}), InnerConfigMap. @@ -341,3 +341,43 @@ t_same_name_confluent_kafka_bridges(Config) -> end ), ok. + +t_list_v1_bridges(Config) -> + ?check_trace( + begin + {ok, _} = emqx_bridge_v2_testlib:create_bridge_api(Config), + + ?assertMatch( + {error, no_v1_equivalent}, + emqx_action_info:bridge_v1_type_name(confluent_producer) + ), + + ?assertMatch( + {ok, {{_, 200, _}, _, []}}, emqx_bridge_v2_testlib:list_bridges_http_api_v1() + ), + ?assertMatch( + {ok, {{_, 200, _}, _, [_]}}, emqx_bridge_v2_testlib:list_actions_http_api() + ), + ?assertMatch( + {ok, {{_, 200, _}, _, [_]}}, emqx_bridge_v2_testlib:list_connectors_http_api() + ), + + RuleTopic = <<"t/c">>, + {ok, #{<<"id">> := RuleId0}} = + emqx_bridge_v2_testlib:create_rule_and_action_http( + ?ACTION_TYPE_BIN, + RuleTopic, + Config, + #{overrides => #{enable => true}} + ), + ?assert(emqx_bridge_v2_testlib:is_rule_enabled(RuleId0)), + ?assertMatch( + {ok, {{_, 200, _}, _, _}}, emqx_bridge_v2_testlib:enable_rule_http(RuleId0) + ), + ?assert(emqx_bridge_v2_testlib:is_rule_enabled(RuleId0)), + + ok + end, + [] + ), + ok. diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl index afa57dfac..70a7fc32c 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl @@ -621,24 +621,36 @@ validate_bridge_existence_in_actions(#{actions := Actions, from := Froms} = _Rul BridgeIDs0 = lists:map( fun(BridgeID) -> - emqx_bridge_resource:parse_bridge_id(BridgeID, #{atom_name => false}) + %% FIXME: this supposedly returns an upgraded type, but it's fuzzy: it + %% returns v1 types when attempting to "upgrade"..... + {Type, Name} = + emqx_bridge_resource:parse_bridge_id(BridgeID, #{atom_name => false}), + case emqx_action_info:is_action_type(Type) of + true -> {action, Type, Name}; + false -> {bridge_v1, Type, Name} + end end, get_referenced_hookpoints(Froms) ), BridgeIDs1 = lists:filtermap( fun - ({bridge_v2, Type, Name}) -> {true, {Type, Name}}; - ({bridge, Type, Name, _ResId}) -> {true, {Type, Name}}; + ({bridge_v2, Type, Name}) -> {true, {action, Type, Name}}; + ({bridge, Type, Name, _ResId}) -> {true, {bridge_v1, Type, Name}}; (_) -> false end, Actions ), NonExistentBridgeIDs = lists:filter( - fun({Type, Name}) -> + fun({Kind, Type, Name}) -> + LookupFn = + case Kind of + action -> fun emqx_bridge_v2:lookup/2; + bridge_v1 -> fun emqx_bridge:lookup/2 + end, try - case emqx_bridge:lookup(Type, Name) of + case LookupFn(Type, Name) of {ok, _} -> false; {error, _} -> true end