Merge pull request #13492 from thalesmg/20240718-m-rules-conn-deps

feat: return dependent entities in connectors/actions/sources API
This commit is contained in:
Thales Macedo Garitezi 2024-07-24 09:16:00 -03:00 committed by GitHub
commit 9a950571d8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 377 additions and 21 deletions

View File

@ -18,5 +18,6 @@
-define(EMQX_BRIDGE_RESOURCE_HRL, true). -define(EMQX_BRIDGE_RESOURCE_HRL, true).
-define(BRIDGE_HOOKPOINT(BridgeId), <<"$bridges/", BridgeId/binary>>). -define(BRIDGE_HOOKPOINT(BridgeId), <<"$bridges/", BridgeId/binary>>).
-define(SOURCE_HOOKPOINT(BridgeId), <<"$sources/", BridgeId/binary>>).
-endif. -endif.

View File

@ -132,6 +132,8 @@ bridge_hookpoint(BridgeId) ->
bridge_hookpoint_to_bridge_id(?BRIDGE_HOOKPOINT(BridgeId)) -> bridge_hookpoint_to_bridge_id(?BRIDGE_HOOKPOINT(BridgeId)) ->
{ok, BridgeId}; {ok, BridgeId};
bridge_hookpoint_to_bridge_id(?SOURCE_HOOKPOINT(BridgeId)) ->
{ok, BridgeId};
bridge_hookpoint_to_bridge_id(_) -> bridge_hookpoint_to_bridge_id(_) ->
{error, bad_bridge_hookpoint}. {error, bad_bridge_hookpoint}.

View File

@ -790,7 +790,7 @@ handle_list(ConfRootKey) ->
[format_resource(ConfRootKey, Data, Node) || Data <- Bridges] [format_resource(ConfRootKey, Data, Node) || Data <- Bridges]
|| {Node, Bridges} <- lists:zip(Nodes, NodeBridges) || {Node, Bridges} <- lists:zip(Nodes, NodeBridges)
], ],
?OK(zip_bridges(AllBridges)); ?OK(zip_bridges(ConfRootKey, AllBridges));
{error, Reason} -> {error, Reason} ->
?INTERNAL_ERROR(Reason) ?INTERNAL_ERROR(Reason)
end. end.
@ -987,8 +987,9 @@ lookup_from_all_nodes(ConfRootKey, BridgeType, BridgeName, SuccCode) ->
) )
) )
of of
{ok, [{ok, _} | _] = Results} -> {ok, [{ok, _} | _] = Results0} ->
{SuccCode, format_bridge_info([R || {ok, R} <- Results])}; Results = [R || {ok, R} <- Results0],
{SuccCode, format_bridge_info(ConfRootKey, BridgeType, BridgeName, Results)};
{ok, [{error, not_found} | _]} -> {ok, [{error, not_found} | _]} ->
?BRIDGE_NOT_FOUND(BridgeType, BridgeName); ?BRIDGE_NOT_FOUND(BridgeType, BridgeName);
{error, Reason} -> {error, Reason} ->
@ -1146,11 +1147,11 @@ maybe_unwrap({error, not_implemented}) ->
maybe_unwrap(RpcMulticallResult) -> maybe_unwrap(RpcMulticallResult) ->
emqx_rpc:unwrap_erpc(RpcMulticallResult). emqx_rpc:unwrap_erpc(RpcMulticallResult).
zip_bridges([BridgesFirstNode | _] = BridgesAllNodes) -> zip_bridges(ConfRootKey, [BridgesFirstNode | _] = BridgesAllNodes) ->
lists:foldl( lists:foldl(
fun(#{type := Type, name := Name}, Acc) -> fun(#{type := Type, name := Name}, Acc) ->
Bridges = pick_bridges_by_id(Type, Name, BridgesAllNodes), Bridges = pick_bridges_by_id(Type, Name, BridgesAllNodes),
[format_bridge_info(Bridges) | Acc] [format_bridge_info(ConfRootKey, Type, Name, Bridges) | Acc]
end, end,
[], [],
BridgesFirstNode BridgesFirstNode
@ -1184,12 +1185,19 @@ pick_bridges_by_id(Type, Name, BridgesAllNodes) ->
BridgesAllNodes BridgesAllNodes
). ).
format_bridge_info([FirstBridge | _] = Bridges) -> format_bridge_info(ConfRootKey, Type, Name, [FirstBridge | _] = Bridges) ->
Res = maps:remove(node, FirstBridge), Res = maps:remove(node, FirstBridge),
NodeStatus = node_status(Bridges), NodeStatus = node_status(Bridges),
Id = emqx_bridge_resource:bridge_id(Type, Name),
Rules =
case ConfRootKey of
actions -> emqx_rule_engine:get_rule_ids_by_bridge_action(Id);
sources -> emqx_rule_engine:get_rule_ids_by_bridge_source(Id)
end,
redact(Res#{ redact(Res#{
status => aggregate_status(NodeStatus), status => aggregate_status(NodeStatus),
node_status => NodeStatus node_status => NodeStatus,
rules => lists:sort(Rules)
}). }).
node_status(Bridges) -> node_status(Bridges) ->

View File

@ -252,23 +252,26 @@ init_per_testcase(TestCase, Config) when
BridgeConfig BridgeConfig
| Config | Config
]; ];
init_per_testcase(_TestCase, Config) -> init_per_testcase(TestCase, Config) ->
case ?config(cluster_nodes, Config) of case ?config(cluster_nodes, Config) of
undefined -> undefined ->
init_mocks(); init_mocks();
Nodes -> Nodes ->
[erpc:call(Node, ?MODULE, init_mocks, []) || Node <- Nodes] [erpc:call(Node, ?MODULE, init_mocks, []) || Node <- Nodes]
end, end,
ShouldCreateConnector = not lists:member(TestCase, skip_connector_creation_test_cases()),
case ?config(bridge_kind, Config) of case ?config(bridge_kind, Config) of
action -> action when ShouldCreateConnector ->
{ok, 201, _} = request(post, uri(["connectors"]), ?ACTIONS_CONNECTOR, Config); {ok, 201, _} = request(post, uri(["connectors"]), ?ACTIONS_CONNECTOR, Config);
source -> source when ShouldCreateConnector ->
{ok, 201, _} = request( {ok, 201, _} = request(
post, post,
uri(["connectors"]), uri(["connectors"]),
source_connector_create_config(#{}), source_connector_create_config(#{}),
Config Config
) );
_ ->
ok
end, end,
Config. Config.
@ -284,6 +287,12 @@ end_per_testcase(_TestCase, Config) ->
ok = emqx_common_test_helpers:call_janitor(), ok = emqx_common_test_helpers:call_janitor(),
ok. ok.
skip_connector_creation_test_cases() ->
[
t_connector_dependencies,
t_kind_dependencies
].
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Helper fns %% Helper fns
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
@ -500,6 +509,23 @@ source_config_base() ->
} }
}. }.
mqtt_action_config_base() ->
source_config_base().
mqtt_action_create_config(Overrides0) ->
Overrides = emqx_utils_maps:binary_key_map(Overrides0),
Conf0 = maps:merge(
mqtt_action_config_base(),
#{
<<"enable">> => true,
<<"type">> => ?SOURCE_TYPE
}
),
emqx_utils_maps:deep_merge(
Conf0,
Overrides
).
source_create_config(Overrides0) -> source_create_config(Overrides0) ->
Overrides = emqx_utils_maps:binary_key_map(Overrides0), Overrides = emqx_utils_maps:binary_key_map(Overrides0),
Conf0 = maps:merge( Conf0 = maps:merge(
@ -575,6 +601,77 @@ maybe_get_other_node(Config) ->
OtherNode OtherNode
end. end.
list_connectors_api() ->
Res = emqx_bridge_v2_testlib:list_connectors_http_api(),
emqx_mgmt_api_test_util:simplify_result(Res).
get_connector_api(Type, Name) ->
Res = emqx_bridge_v2_testlib:get_connector_api(Type, Name),
emqx_mgmt_api_test_util:simplify_result(Res).
get_source_api(Type, Name) ->
Res = emqx_bridge_v2_testlib:get_bridge_api(source, Type, Name),
emqx_mgmt_api_test_util:simplify_result(Res).
get_action_api(Type, Name) ->
Res = emqx_bridge_v2_testlib:get_bridge_api(action, Type, Name),
emqx_mgmt_api_test_util:simplify_result(Res).
create_source_api(Name, Type, Params) ->
Res = emqx_bridge_v2_testlib:create_kind_api([
{bridge_kind, source},
{source_type, Type},
{source_name, Name},
{source_config, Params}
]),
emqx_mgmt_api_test_util:simplify_result(Res).
create_action_api(Name, Type, Params) ->
Res = emqx_bridge_v2_testlib:create_kind_api([
{bridge_kind, action},
{action_type, Type},
{action_name, Name},
{action_config, Params}
]),
emqx_mgmt_api_test_util:simplify_result(Res).
list_sources_api() ->
Res = emqx_bridge_v2_testlib:list_sources_http_api(),
emqx_mgmt_api_test_util:simplify_result(Res).
list_actions_api() ->
Res = emqx_bridge_v2_testlib:list_actions_http_api(),
emqx_mgmt_api_test_util:simplify_result(Res).
create_action_rule(ActionType, ActionName) ->
RuleTopic = <<"t/", ActionName/binary>>,
Config = [{action_name, ActionName}],
emqx_bridge_v2_testlib:create_rule_and_action_http(ActionType, RuleTopic, Config).
create_source_rule1(SourceType, SourceName) ->
RuleTopic = <<"t/", SourceName/binary>>,
Config = [{action_name, <<"unused">>}],
Id = emqx_bridge_resource:bridge_id(SourceType, SourceName),
Opts = #{
overrides => #{
sql => <<"select * from \"$bridges/", Id/binary, "\"">>,
actions => []
}
},
emqx_bridge_v2_testlib:create_rule_and_action_http(SourceType, RuleTopic, Config, Opts).
create_source_rule2(SourceType, SourceName) ->
RuleTopic = <<"t/", SourceName/binary>>,
Config = [{action_name, <<"unused">>}],
Id = emqx_bridge_resource:bridge_id(SourceType, SourceName),
Opts = #{
overrides => #{
sql => <<"select * from \"$sources/", Id/binary, "\"">>,
actions => []
}
},
emqx_bridge_v2_testlib:create_rule_and_action_http(SourceType, RuleTopic, Config, Opts).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Testcases %% Testcases
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
@ -1598,3 +1695,215 @@ t_start_action_or_source_with_disabled_connector(matrix) ->
t_start_action_or_source_with_disabled_connector(Config) -> t_start_action_or_source_with_disabled_connector(Config) ->
ok = emqx_bridge_v2_testlib:t_start_action_or_source_with_disabled_connector(Config), ok = emqx_bridge_v2_testlib:t_start_action_or_source_with_disabled_connector(Config),
ok. ok.
%% Verifies that listing connectors return the actions and sources that depend on the
%% connector
t_connector_dependencies(matrix) ->
[
[single, actions],
[single, sources]
];
t_connector_dependencies(Config) when is_list(Config) ->
?check_trace(
begin
%% This particular source type happens to serve both actions and sources, a
%% nice edge case for this test.
ActionType = ?SOURCE_TYPE,
ConnectorType = ?SOURCE_CONNECTOR_TYPE,
ConnectorName = <<"c">>,
{ok, {{_, 201, _}, _, _}} =
emqx_bridge_v2_testlib:create_connector_api([
{connector_config, source_connector_create_config(#{})},
{connector_name, ConnectorName},
{connector_type, ConnectorType}
]),
?assertMatch(
{200, [
#{
<<"actions">> := [],
<<"sources">> := []
}
]},
list_connectors_api()
),
?assertMatch(
{200, #{
<<"actions">> := [],
<<"sources">> := []
}},
get_connector_api(ConnectorType, ConnectorName)
),
SourceName1 = <<"s1">>,
{201, _} = create_source_api(
SourceName1,
?SOURCE_TYPE,
source_create_config(#{
<<"connector">> => ConnectorName
})
),
?assertMatch(
{200, [
#{
<<"actions">> := [],
<<"sources">> := [SourceName1]
}
]},
list_connectors_api()
),
?assertMatch(
{200, #{
<<"actions">> := [],
<<"sources">> := [SourceName1]
}},
get_connector_api(ConnectorType, ConnectorName)
),
ActionName1 = <<"a1">>,
{201, _} = create_action_api(
ActionName1,
ActionType,
mqtt_action_create_config(#{
<<"connector">> => ConnectorName
})
),
?assertMatch(
{200, [
#{
<<"actions">> := [ActionName1],
<<"sources">> := [SourceName1]
}
]},
list_connectors_api()
),
?assertMatch(
{200, #{
<<"actions">> := [ActionName1],
<<"sources">> := [SourceName1]
}},
get_connector_api(ConnectorType, ConnectorName)
),
ok
end,
[]
),
ok.
%% Verifies that listing actions/sources return the rules that depend on them.
t_kind_dependencies(matrix) ->
[
[single, actions],
[single, sources]
];
t_kind_dependencies(Config) when is_list(Config) ->
?check_trace(
begin
%% This particular source type happens to serve both actions and sources, a
%% nice edge case for this test.
ActionType = ?SOURCE_TYPE,
SourceType = ?SOURCE_TYPE,
ConnectorType = ?SOURCE_CONNECTOR_TYPE,
ConnectorName = <<"c">>,
{ok, {{_, 201, _}, _, _}} =
emqx_bridge_v2_testlib:create_connector_api([
{connector_config, source_connector_create_config(#{})},
{connector_name, ConnectorName},
{connector_type, ConnectorType}
]),
ActionName1 = <<"a1">>,
{201, _} = create_action_api(
ActionName1,
ActionType,
mqtt_action_create_config(#{
<<"connector">> => ConnectorName
})
),
?assertMatch(
{200, [#{<<"rules">> := []}]},
list_actions_api()
),
?assertMatch(
{200, #{<<"rules">> := []}},
get_action_api(ActionType, ActionName1)
),
{ok, #{<<"id">> := RuleId1}} = create_action_rule(ActionType, ActionName1),
?assertMatch(
{200, [#{<<"rules">> := [RuleId1]}]},
list_actions_api()
),
?assertMatch(
{200, #{<<"rules">> := [RuleId1]}},
get_action_api(ActionType, ActionName1)
),
?assertMatch(
{200, []},
list_sources_api()
),
SourceName1 = <<"s1">>,
{201, _} = create_source_api(
SourceName1,
?SOURCE_TYPE,
source_create_config(#{
<<"connector">> => ConnectorName
})
),
?assertMatch(
{200, [#{<<"rules">> := []}]},
list_sources_api()
),
?assertMatch(
{200, #{<<"rules">> := []}},
get_source_api(SourceType, SourceName1)
),
%% Action remains untouched
?assertMatch(
{200, [#{<<"rules">> := [RuleId1]}]},
list_actions_api()
),
?assertMatch(
{200, #{<<"rules">> := [RuleId1]}},
get_action_api(ActionType, ActionName1)
),
%% using "$bridges/..." hookpoint
{ok, #{<<"id">> := RuleId2}} = create_source_rule1(SourceType, SourceName1),
?assertMatch(
{200, [#{<<"rules">> := [RuleId2]}]},
list_sources_api()
),
?assertMatch(
{200, #{<<"rules">> := [RuleId2]}},
get_source_api(SourceType, SourceName1)
),
%% Action remains untouched
?assertMatch(
{200, [#{<<"rules">> := [RuleId1]}]},
list_actions_api()
),
%% using "$sources/..." hookpoint
{ok, #{<<"id">> := RuleId3}} = create_source_rule2(SourceType, SourceName1),
?assertMatch(
{200, [#{<<"rules">> := [RuleId1]}]},
list_actions_api()
),
Rules = lists:sort([RuleId2, RuleId3]),
?assertMatch(
{200, [#{<<"rules">> := Rules}]},
list_sources_api()
),
?assertMatch(
{200, #{<<"rules">> := Rules}},
get_source_api(SourceType, SourceName1)
),
ok
end,
[]
),
ok.

View File

@ -415,6 +415,7 @@ common_init(ConfigT) ->
emqx_conf, emqx_conf,
emqx_bridge_hstreamdb, emqx_bridge_hstreamdb,
emqx_bridge, emqx_bridge,
emqx_rule_engine,
emqx_management, emqx_management,
emqx_mgmt_api_test_util:emqx_dashboard() emqx_mgmt_api_test_util:emqx_dashboard()
], ],

View File

@ -655,7 +655,22 @@ format_resource_data(error, undefined, Result) ->
format_resource_data(error, Error, Result) -> format_resource_data(error, Error, Result) ->
Result#{status_reason => emqx_utils:readable_error_msg(Error)}; Result#{status_reason => emqx_utils:readable_error_msg(Error)};
format_resource_data(channels, Channels, Result) -> format_resource_data(channels, Channels, Result) ->
Result#{actions => lists:map(fun format_action/1, maps:keys(Channels))}; #{
actions := Actions,
sources := Sources
} = lists:foldl(
fun(Id, Acc) ->
case emqx_bridge_v2:parse_id(Id) of
#{kind := source, name := Name} ->
maps:update_with(sources, fun(Ss) -> [Name | Ss] end, Acc);
#{name := Name} ->
maps:update_with(actions, fun(As) -> [Name | As] end, Acc)
end
end,
#{actions => [], sources => []},
maps:keys(Channels)
),
Result#{actions => lists:sort(Actions), sources => lists:sort(Sources)};
format_resource_data(K, V, Result) -> format_resource_data(K, V, Result) ->
Result#{K => V}. Result#{K => V}.
@ -673,12 +688,6 @@ unpack_connector_conf(Type, PackedConf) ->
#{<<"foo">> := RawConf} = maps:get(TypeBin, Bridges), #{<<"foo">> := RawConf} = maps:get(TypeBin, Bridges),
RawConf. RawConf.
format_action(ActionId) ->
case emqx_bridge_v2:parse_id(ActionId) of
#{name := Name} ->
Name
end.
is_ok(ok) -> is_ok(ok) ->
ok; ok;
is_ok(OkResult = {ok, _}) -> is_ok(OkResult = {ok, _}) ->

View File

@ -109,9 +109,10 @@
emqx_conf, emqx_conf,
emqx, emqx,
emqx_auth, emqx_auth,
emqx_management, emqx_connector,
{emqx_connector, "connectors {}"}, emqx_bridge,
{emqx_bridge, "actions {}"} emqx_rule_engine,
emqx_management
]). ]).
-define(APPSPEC_DASHBOARD, -define(APPSPEC_DASHBOARD,

View File

@ -47,6 +47,8 @@
get_rules_for_topic/1, get_rules_for_topic/1,
get_rules_with_same_event/1, get_rules_with_same_event/1,
get_rule_ids_by_action/1, get_rule_ids_by_action/1,
get_rule_ids_by_bridge_action/1,
get_rule_ids_by_bridge_source/1,
ensure_action_removed/2, ensure_action_removed/2,
get_rules_ordered_by_ts/0 get_rules_ordered_by_ts/0
]). ]).
@ -108,6 +110,8 @@
-define(RATE_METRICS, ['matched']). -define(RATE_METRICS, ['matched']).
-type action_name() :: binary() | #{function := binary()}. -type action_name() :: binary() | #{function := binary()}.
-type bridge_action_id() :: binary().
-type bridge_source_id() :: binary().
-spec start_link() -> {ok, pid()} | ignore | {error, Reason :: term()}. -spec start_link() -> {ok, pid()} | ignore | {error, Reason :: term()}.
start_link() -> start_link() ->
@ -255,6 +259,24 @@ get_rule_ids_by_action(#{function := FuncName}) when is_binary(FuncName) ->
contains_actions(Acts, Mod, Fun) contains_actions(Acts, Mod, Fun)
]. ].
-spec get_rule_ids_by_bridge_action(bridge_action_id()) -> [binary()].
get_rule_ids_by_bridge_action(ActionId) ->
%% ActionId = <<"type:name">>
[
Id
|| #{actions := Acts, id := Id} <- get_rules(),
forwards_to_bridge(Acts, ActionId)
].
-spec get_rule_ids_by_bridge_source(bridge_source_id()) -> [binary()].
get_rule_ids_by_bridge_source(SourceId) ->
%% SourceId = <<"type:name">>
[
Id
|| #{from := Froms, id := Id} <- get_rules(),
references_ingress_bridge(Froms, SourceId)
].
-spec ensure_action_removed(rule_id(), action_name()) -> ok. -spec ensure_action_removed(rule_id(), action_name()) -> ok.
ensure_action_removed(RuleId, ActionName) -> ensure_action_removed(RuleId, ActionName) ->
FilterFunc = FilterFunc =

View File

@ -0,0 +1,3 @@
The lists of actions and sources that depend on a given connector are now returned in the `GET /connectors` and `GET /connectors/:id` APIs.
The list of rules that depend on a given action or source is now returned in the `GET /actions`, `GET /sources`, `GET /actions/:id` and `GET /sources/:id` APIs.