From ae828e8cfbce3882b3c88a4335e826ea3cde3752 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 18 Jul 2024 17:17:11 -0300 Subject: [PATCH 1/3] feat(connectors api): add dependent actions and sources to response Fixes https://emqx.atlassian.net/browse/EMQX-12654 --- .../test/emqx_bridge_v2_api_SUITE.erl | 153 +++++++++++++++++- .../emqx_connector/src/emqx_connector_api.erl | 23 ++- .../test/emqx_connector_api_SUITE.erl | 7 +- 3 files changed, 169 insertions(+), 14 deletions(-) diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl index 039402738..bf19b364e 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl @@ -252,23 +252,26 @@ init_per_testcase(TestCase, Config) when BridgeConfig | Config ]; -init_per_testcase(_TestCase, Config) -> +init_per_testcase(TestCase, Config) -> case ?config(cluster_nodes, Config) of undefined -> init_mocks(); Nodes -> [erpc:call(Node, ?MODULE, init_mocks, []) || Node <- Nodes] end, + ShouldCreateConnector = not lists:member(TestCase, skip_connector_creation_test_cases()), case ?config(bridge_kind, Config) of - action -> + action when ShouldCreateConnector -> {ok, 201, _} = request(post, uri(["connectors"]), ?ACTIONS_CONNECTOR, Config); - source -> + source when ShouldCreateConnector -> {ok, 201, _} = request( post, uri(["connectors"]), source_connector_create_config(#{}), Config - ) + ); + _ -> + ok end, Config. @@ -284,6 +287,11 @@ end_per_testcase(_TestCase, Config) -> ok = emqx_common_test_helpers:call_janitor(), ok. +skip_connector_creation_test_cases() -> + [ + t_connector_dependencies + ]. + %%------------------------------------------------------------------------------ %% Helper fns %%------------------------------------------------------------------------------ @@ -500,6 +508,23 @@ source_config_base() -> } }. +mqtt_action_config_base() -> + source_config_base(). + +mqtt_action_create_config(Overrides0) -> + Overrides = emqx_utils_maps:binary_key_map(Overrides0), + Conf0 = maps:merge( + mqtt_action_config_base(), + #{ + <<"enable">> => true, + <<"type">> => ?SOURCE_TYPE + } + ), + emqx_utils_maps:deep_merge( + Conf0, + Overrides + ). + source_create_config(Overrides0) -> Overrides = emqx_utils_maps:binary_key_map(Overrides0), Conf0 = maps:merge( @@ -575,6 +600,32 @@ maybe_get_other_node(Config) -> OtherNode end. +list_connectors_api() -> + Res = emqx_bridge_v2_testlib:list_connectors_http_api(), + emqx_mgmt_api_test_util:simplify_result(Res). + +get_connector_api(Type, Name) -> + Res = emqx_bridge_v2_testlib:get_connector_api(Type, Name), + emqx_mgmt_api_test_util:simplify_result(Res). + +create_source_api(Name, Type, Params) -> + Res = emqx_bridge_v2_testlib:create_kind_api([ + {bridge_kind, source}, + {source_type, Type}, + {source_name, Name}, + {source_config, Params} + ]), + emqx_mgmt_api_test_util:simplify_result(Res). + +create_action_api(Name, Type, Params) -> + Res = emqx_bridge_v2_testlib:create_kind_api([ + {bridge_kind, action}, + {action_type, Type}, + {action_name, Name}, + {action_config, Params} + ]), + emqx_mgmt_api_test_util:simplify_result(Res). + %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ @@ -1598,3 +1649,97 @@ t_start_action_or_source_with_disabled_connector(matrix) -> t_start_action_or_source_with_disabled_connector(Config) -> ok = emqx_bridge_v2_testlib:t_start_action_or_source_with_disabled_connector(Config), ok. + +%% Verifies that listing connectors return the actions and sources that depend on the +%% connector +t_connector_dependencies(matrix) -> + [ + [single, actions], + [single, sources] + ]; +t_connector_dependencies(Config) when is_list(Config) -> + ?check_trace( + begin + %% This particular source type happens to serve both actions and sources, a + %% nice edge case for this test. + ActionType = ?SOURCE_TYPE, + ConnectorType = ?SOURCE_CONNECTOR_TYPE, + ConnectorName = <<"c">>, + {ok, {{_, 201, _}, _, _}} = + emqx_bridge_v2_testlib:create_connector_api([ + {connector_config, source_connector_create_config(#{})}, + {connector_name, ConnectorName}, + {connector_type, ConnectorType} + ]), + ?assertMatch( + {200, [ + #{ + <<"actions">> := [], + <<"sources">> := [] + } + ]}, + list_connectors_api() + ), + ?assertMatch( + {200, #{ + <<"actions">> := [], + <<"sources">> := [] + }}, + get_connector_api(ConnectorType, ConnectorName) + ), + + SourceName1 = <<"s1">>, + {201, _} = create_source_api( + SourceName1, + ?SOURCE_TYPE, + source_create_config(#{ + <<"connector">> => ConnectorName + }) + ), + ?assertMatch( + {200, [ + #{ + <<"actions">> := [], + <<"sources">> := [SourceName1] + } + ]}, + list_connectors_api() + ), + ?assertMatch( + {200, #{ + <<"actions">> := [], + <<"sources">> := [SourceName1] + }}, + get_connector_api(ConnectorType, ConnectorName) + ), + + ActionName1 = <<"a1">>, + {201, _} = create_action_api( + ActionName1, + ActionType, + mqtt_action_create_config(#{ + <<"connector">> => ConnectorName + }) + ), + ?assertMatch( + {200, [ + #{ + <<"actions">> := [ActionName1], + <<"sources">> := [SourceName1] + } + ]}, + list_connectors_api() + ), + ?assertMatch( + {200, #{ + <<"actions">> := [ActionName1], + <<"sources">> := [SourceName1] + }}, + get_connector_api(ConnectorType, ConnectorName) + ), + + ok + end, + [] + ), + ok. diff --git a/apps/emqx_connector/src/emqx_connector_api.erl b/apps/emqx_connector/src/emqx_connector_api.erl index 4c7a0476e..c1e0b8ec2 100644 --- a/apps/emqx_connector/src/emqx_connector_api.erl +++ b/apps/emqx_connector/src/emqx_connector_api.erl @@ -655,7 +655,22 @@ format_resource_data(error, undefined, Result) -> format_resource_data(error, Error, Result) -> Result#{status_reason => emqx_utils:readable_error_msg(Error)}; format_resource_data(channels, Channels, Result) -> - Result#{actions => lists:map(fun format_action/1, maps:keys(Channels))}; + #{ + actions := Actions, + sources := Sources + } = lists:foldl( + fun(Id, Acc) -> + case emqx_bridge_v2:parse_id(Id) of + #{kind := source, name := Name} -> + maps:update_with(sources, fun(Ss) -> [Name | Ss] end, Acc); + #{name := Name} -> + maps:update_with(actions, fun(As) -> [Name | As] end, Acc) + end + end, + #{actions => [], sources => []}, + maps:keys(Channels) + ), + Result#{actions => lists:sort(Actions), sources => lists:sort(Sources)}; format_resource_data(K, V, Result) -> Result#{K => V}. @@ -673,12 +688,6 @@ unpack_connector_conf(Type, PackedConf) -> #{<<"foo">> := RawConf} = maps:get(TypeBin, Bridges), RawConf. -format_action(ActionId) -> - case emqx_bridge_v2:parse_id(ActionId) of - #{name := Name} -> - Name - end. - is_ok(ok) -> ok; is_ok(OkResult = {ok, _}) -> diff --git a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl index f3e91ef12..31069b075 100644 --- a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl @@ -109,9 +109,10 @@ emqx_conf, emqx, emqx_auth, - emqx_management, - {emqx_connector, "connectors {}"}, - {emqx_bridge, "actions {}"} + emqx_connector, + emqx_bridge, + emqx_rule_engine, + emqx_management ]). -define(APPSPEC_DASHBOARD, From 4d174b867816f6a726f19361aa3eb1035b41b44e Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 18 Jul 2024 17:18:03 -0300 Subject: [PATCH 2/3] feat(sources & actions api): add dependent rules to response Fixes https://emqx.atlassian.net/browse/EMQX-12654 --- .../include/emqx_bridge_resource.hrl | 1 + apps/emqx_bridge/src/emqx_bridge_resource.erl | 2 + apps/emqx_bridge/src/emqx_bridge_v2_api.erl | 22 ++- .../test/emqx_bridge_v2_api_SUITE.erl | 166 +++++++++++++++++- .../test/emqx_bridge_hstreamdb_SUITE.erl | 1 + .../emqx_rule_engine/src/emqx_rule_engine.erl | 22 +++ 6 files changed, 206 insertions(+), 8 deletions(-) diff --git a/apps/emqx_bridge/include/emqx_bridge_resource.hrl b/apps/emqx_bridge/include/emqx_bridge_resource.hrl index 564eb8902..a74bd3bed 100644 --- a/apps/emqx_bridge/include/emqx_bridge_resource.hrl +++ b/apps/emqx_bridge/include/emqx_bridge_resource.hrl @@ -18,5 +18,6 @@ -define(EMQX_BRIDGE_RESOURCE_HRL, true). -define(BRIDGE_HOOKPOINT(BridgeId), <<"$bridges/", BridgeId/binary>>). +-define(SOURCE_HOOKPOINT(BridgeId), <<"$sources/", BridgeId/binary>>). -endif. diff --git a/apps/emqx_bridge/src/emqx_bridge_resource.erl b/apps/emqx_bridge/src/emqx_bridge_resource.erl index d2408ca73..9215e0787 100644 --- a/apps/emqx_bridge/src/emqx_bridge_resource.erl +++ b/apps/emqx_bridge/src/emqx_bridge_resource.erl @@ -132,6 +132,8 @@ bridge_hookpoint(BridgeId) -> bridge_hookpoint_to_bridge_id(?BRIDGE_HOOKPOINT(BridgeId)) -> {ok, BridgeId}; +bridge_hookpoint_to_bridge_id(?SOURCE_HOOKPOINT(BridgeId)) -> + {ok, BridgeId}; bridge_hookpoint_to_bridge_id(_) -> {error, bad_bridge_hookpoint}. diff --git a/apps/emqx_bridge/src/emqx_bridge_v2_api.erl b/apps/emqx_bridge/src/emqx_bridge_v2_api.erl index 89b7f6e17..a1744a7d1 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2_api.erl @@ -790,7 +790,7 @@ handle_list(ConfRootKey) -> [format_resource(ConfRootKey, Data, Node) || Data <- Bridges] || {Node, Bridges} <- lists:zip(Nodes, NodeBridges) ], - ?OK(zip_bridges(AllBridges)); + ?OK(zip_bridges(ConfRootKey, AllBridges)); {error, Reason} -> ?INTERNAL_ERROR(Reason) end. @@ -987,8 +987,9 @@ lookup_from_all_nodes(ConfRootKey, BridgeType, BridgeName, SuccCode) -> ) ) of - {ok, [{ok, _} | _] = Results} -> - {SuccCode, format_bridge_info([R || {ok, R} <- Results])}; + {ok, [{ok, _} | _] = Results0} -> + Results = [R || {ok, R} <- Results0], + {SuccCode, format_bridge_info(ConfRootKey, BridgeType, BridgeName, Results)}; {ok, [{error, not_found} | _]} -> ?BRIDGE_NOT_FOUND(BridgeType, BridgeName); {error, Reason} -> @@ -1146,11 +1147,11 @@ maybe_unwrap({error, not_implemented}) -> maybe_unwrap(RpcMulticallResult) -> emqx_rpc:unwrap_erpc(RpcMulticallResult). -zip_bridges([BridgesFirstNode | _] = BridgesAllNodes) -> +zip_bridges(ConfRootKey, [BridgesFirstNode | _] = BridgesAllNodes) -> lists:foldl( fun(#{type := Type, name := Name}, Acc) -> Bridges = pick_bridges_by_id(Type, Name, BridgesAllNodes), - [format_bridge_info(Bridges) | Acc] + [format_bridge_info(ConfRootKey, Type, Name, Bridges) | Acc] end, [], BridgesFirstNode @@ -1184,12 +1185,19 @@ pick_bridges_by_id(Type, Name, BridgesAllNodes) -> BridgesAllNodes ). -format_bridge_info([FirstBridge | _] = Bridges) -> +format_bridge_info(ConfRootKey, Type, Name, [FirstBridge | _] = Bridges) -> Res = maps:remove(node, FirstBridge), NodeStatus = node_status(Bridges), + Id = emqx_bridge_resource:bridge_id(Type, Name), + Rules = + case ConfRootKey of + actions -> emqx_rule_engine:get_rule_ids_by_bridge_action(Id); + sources -> emqx_rule_engine:get_rule_ids_by_bridge_source(Id) + end, redact(Res#{ status => aggregate_status(NodeStatus), - node_status => NodeStatus + node_status => NodeStatus, + rules => lists:sort(Rules) }). node_status(Bridges) -> diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl index bf19b364e..27db7486f 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl @@ -289,7 +289,8 @@ end_per_testcase(_TestCase, Config) -> skip_connector_creation_test_cases() -> [ - t_connector_dependencies + t_connector_dependencies, + t_kind_dependencies ]. %%------------------------------------------------------------------------------ @@ -608,6 +609,14 @@ get_connector_api(Type, Name) -> Res = emqx_bridge_v2_testlib:get_connector_api(Type, Name), emqx_mgmt_api_test_util:simplify_result(Res). +get_source_api(Type, Name) -> + Res = emqx_bridge_v2_testlib:get_bridge_api(source, Type, Name), + emqx_mgmt_api_test_util:simplify_result(Res). + +get_action_api(Type, Name) -> + Res = emqx_bridge_v2_testlib:get_bridge_api(action, Type, Name), + emqx_mgmt_api_test_util:simplify_result(Res). + create_source_api(Name, Type, Params) -> Res = emqx_bridge_v2_testlib:create_kind_api([ {bridge_kind, source}, @@ -626,6 +635,43 @@ create_action_api(Name, Type, Params) -> ]), emqx_mgmt_api_test_util:simplify_result(Res). +list_sources_api() -> + Res = emqx_bridge_v2_testlib:list_sources_http_api(), + emqx_mgmt_api_test_util:simplify_result(Res). + +list_actions_api() -> + Res = emqx_bridge_v2_testlib:list_actions_http_api(), + emqx_mgmt_api_test_util:simplify_result(Res). + +create_action_rule(ActionType, ActionName) -> + RuleTopic = <<"t/", ActionName/binary>>, + Config = [{action_name, ActionName}], + emqx_bridge_v2_testlib:create_rule_and_action_http(ActionType, RuleTopic, Config). + +create_source_rule1(SourceType, SourceName) -> + RuleTopic = <<"t/", SourceName/binary>>, + Config = [{action_name, <<"unused">>}], + Id = emqx_bridge_resource:bridge_id(SourceType, SourceName), + Opts = #{ + overrides => #{ + sql => <<"select * from \"$bridges/", Id/binary, "\"">>, + actions => [] + } + }, + emqx_bridge_v2_testlib:create_rule_and_action_http(SourceType, RuleTopic, Config, Opts). + +create_source_rule2(SourceType, SourceName) -> + RuleTopic = <<"t/", SourceName/binary>>, + Config = [{action_name, <<"unused">>}], + Id = emqx_bridge_resource:bridge_id(SourceType, SourceName), + Opts = #{ + overrides => #{ + sql => <<"select * from \"$sources/", Id/binary, "\"">>, + actions => [] + } + }, + emqx_bridge_v2_testlib:create_rule_and_action_http(SourceType, RuleTopic, Config, Opts). + %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ @@ -1743,3 +1789,121 @@ t_connector_dependencies(Config) when is_list(Config) -> [] ), ok. + +%% Verifies that listing actions/sources return the rules that depend on them. +t_kind_dependencies(matrix) -> + [ + [single, actions], + [single, sources] + ]; +t_kind_dependencies(Config) when is_list(Config) -> + ?check_trace( + begin + %% This particular source type happens to serve both actions and sources, a + %% nice edge case for this test. + ActionType = ?SOURCE_TYPE, + SourceType = ?SOURCE_TYPE, + ConnectorType = ?SOURCE_CONNECTOR_TYPE, + ConnectorName = <<"c">>, + {ok, {{_, 201, _}, _, _}} = + emqx_bridge_v2_testlib:create_connector_api([ + {connector_config, source_connector_create_config(#{})}, + {connector_name, ConnectorName}, + {connector_type, ConnectorType} + ]), + + ActionName1 = <<"a1">>, + {201, _} = create_action_api( + ActionName1, + ActionType, + mqtt_action_create_config(#{ + <<"connector">> => ConnectorName + }) + ), + ?assertMatch( + {200, [#{<<"rules">> := []}]}, + list_actions_api() + ), + ?assertMatch( + {200, #{<<"rules">> := []}}, + get_action_api(ActionType, ActionName1) + ), + + {ok, #{<<"id">> := RuleId1}} = create_action_rule(ActionType, ActionName1), + + ?assertMatch( + {200, [#{<<"rules">> := [RuleId1]}]}, + list_actions_api() + ), + ?assertMatch( + {200, #{<<"rules">> := [RuleId1]}}, + get_action_api(ActionType, ActionName1) + ), + ?assertMatch( + {200, []}, + list_sources_api() + ), + + SourceName1 = <<"s1">>, + {201, _} = create_source_api( + SourceName1, + ?SOURCE_TYPE, + source_create_config(#{ + <<"connector">> => ConnectorName + }) + ), + ?assertMatch( + {200, [#{<<"rules">> := []}]}, + list_sources_api() + ), + ?assertMatch( + {200, #{<<"rules">> := []}}, + get_source_api(SourceType, SourceName1) + ), + %% Action remains untouched + ?assertMatch( + {200, [#{<<"rules">> := [RuleId1]}]}, + list_actions_api() + ), + ?assertMatch( + {200, #{<<"rules">> := [RuleId1]}}, + get_action_api(ActionType, ActionName1) + ), + + %% using "$bridges/..." hookpoint + {ok, #{<<"id">> := RuleId2}} = create_source_rule1(SourceType, SourceName1), + ?assertMatch( + {200, [#{<<"rules">> := [RuleId2]}]}, + list_sources_api() + ), + ?assertMatch( + {200, #{<<"rules">> := [RuleId2]}}, + get_source_api(SourceType, SourceName1) + ), + %% Action remains untouched + ?assertMatch( + {200, [#{<<"rules">> := [RuleId1]}]}, + list_actions_api() + ), + + %% using "$sources/..." hookpoint + {ok, #{<<"id">> := RuleId3}} = create_source_rule2(SourceType, SourceName1), + ?assertMatch( + {200, [#{<<"rules">> := [RuleId1]}]}, + list_actions_api() + ), + Rules = lists:sort([RuleId2, RuleId3]), + ?assertMatch( + {200, [#{<<"rules">> := Rules}]}, + list_sources_api() + ), + ?assertMatch( + {200, #{<<"rules">> := Rules}}, + get_source_api(SourceType, SourceName1) + ), + + ok + end, + [] + ), + ok. diff --git a/apps/emqx_bridge_hstreamdb/test/emqx_bridge_hstreamdb_SUITE.erl b/apps/emqx_bridge_hstreamdb/test/emqx_bridge_hstreamdb_SUITE.erl index 6762e4e50..b7605e6d8 100644 --- a/apps/emqx_bridge_hstreamdb/test/emqx_bridge_hstreamdb_SUITE.erl +++ b/apps/emqx_bridge_hstreamdb/test/emqx_bridge_hstreamdb_SUITE.erl @@ -415,6 +415,7 @@ common_init(ConfigT) -> emqx_conf, emqx_bridge_hstreamdb, emqx_bridge, + emqx_rule_engine, emqx_management, emqx_mgmt_api_test_util:emqx_dashboard() ], diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl index 2adf511a2..359984228 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl @@ -47,6 +47,8 @@ get_rules_for_topic/1, get_rules_with_same_event/1, get_rule_ids_by_action/1, + get_rule_ids_by_bridge_action/1, + get_rule_ids_by_bridge_source/1, ensure_action_removed/2, get_rules_ordered_by_ts/0 ]). @@ -108,6 +110,8 @@ -define(RATE_METRICS, ['matched']). -type action_name() :: binary() | #{function := binary()}. +-type bridge_action_id() :: binary(). +-type bridge_source_id() :: binary(). -spec start_link() -> {ok, pid()} | ignore | {error, Reason :: term()}. start_link() -> @@ -255,6 +259,24 @@ get_rule_ids_by_action(#{function := FuncName}) when is_binary(FuncName) -> contains_actions(Acts, Mod, Fun) ]. +-spec get_rule_ids_by_bridge_action(bridge_action_id()) -> [binary()]. +get_rule_ids_by_bridge_action(ActionId) -> + %% ActionId = <<"type:name">> + [ + Id + || #{actions := Acts, id := Id} <- get_rules(), + forwards_to_bridge(Acts, ActionId) + ]. + +-spec get_rule_ids_by_bridge_source(bridge_source_id()) -> [binary()]. +get_rule_ids_by_bridge_source(SourceId) -> + %% SourceId = <<"type:name">> + [ + Id + || #{from := Froms, id := Id} <- get_rules(), + references_ingress_bridge(Froms, SourceId) + ]. + -spec ensure_action_removed(rule_id(), action_name()) -> ok. ensure_action_removed(RuleId, ActionName) -> FilterFunc = From d7e72808a8f144c536c40351377bb5cf311abbfc Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Fri, 19 Jul 2024 10:02:17 -0300 Subject: [PATCH 3/3] docs: add changelog --- changes/ce/feat-13492.en.md | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 changes/ce/feat-13492.en.md diff --git a/changes/ce/feat-13492.en.md b/changes/ce/feat-13492.en.md new file mode 100644 index 000000000..e949157a9 --- /dev/null +++ b/changes/ce/feat-13492.en.md @@ -0,0 +1,3 @@ +The lists of actions and sources that depend on a given connector are now returned in the `GET /connectors` and `GET /connectors/:id` APIs. + +The list of rules that depend on a given action or source is now returned in the `GET /actions`, `GET /sources`, `GET /actions/:id` and `GET /sources/:id` APIs.