feat(sources & actions api): add dependent rules to response
Fixes https://emqx.atlassian.net/browse/EMQX-12654
This commit is contained in:
parent
ae828e8cfb
commit
4d174b8678
|
@ -18,5 +18,6 @@
|
||||||
-define(EMQX_BRIDGE_RESOURCE_HRL, true).
|
-define(EMQX_BRIDGE_RESOURCE_HRL, true).
|
||||||
|
|
||||||
-define(BRIDGE_HOOKPOINT(BridgeId), <<"$bridges/", BridgeId/binary>>).
|
-define(BRIDGE_HOOKPOINT(BridgeId), <<"$bridges/", BridgeId/binary>>).
|
||||||
|
-define(SOURCE_HOOKPOINT(BridgeId), <<"$sources/", BridgeId/binary>>).
|
||||||
|
|
||||||
-endif.
|
-endif.
|
||||||
|
|
|
@ -132,6 +132,8 @@ bridge_hookpoint(BridgeId) ->
|
||||||
|
|
||||||
bridge_hookpoint_to_bridge_id(?BRIDGE_HOOKPOINT(BridgeId)) ->
|
bridge_hookpoint_to_bridge_id(?BRIDGE_HOOKPOINT(BridgeId)) ->
|
||||||
{ok, BridgeId};
|
{ok, BridgeId};
|
||||||
|
bridge_hookpoint_to_bridge_id(?SOURCE_HOOKPOINT(BridgeId)) ->
|
||||||
|
{ok, BridgeId};
|
||||||
bridge_hookpoint_to_bridge_id(_) ->
|
bridge_hookpoint_to_bridge_id(_) ->
|
||||||
{error, bad_bridge_hookpoint}.
|
{error, bad_bridge_hookpoint}.
|
||||||
|
|
||||||
|
|
|
@ -790,7 +790,7 @@ handle_list(ConfRootKey) ->
|
||||||
[format_resource(ConfRootKey, Data, Node) || Data <- Bridges]
|
[format_resource(ConfRootKey, Data, Node) || Data <- Bridges]
|
||||||
|| {Node, Bridges} <- lists:zip(Nodes, NodeBridges)
|
|| {Node, Bridges} <- lists:zip(Nodes, NodeBridges)
|
||||||
],
|
],
|
||||||
?OK(zip_bridges(AllBridges));
|
?OK(zip_bridges(ConfRootKey, AllBridges));
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
?INTERNAL_ERROR(Reason)
|
?INTERNAL_ERROR(Reason)
|
||||||
end.
|
end.
|
||||||
|
@ -987,8 +987,9 @@ lookup_from_all_nodes(ConfRootKey, BridgeType, BridgeName, SuccCode) ->
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
of
|
of
|
||||||
{ok, [{ok, _} | _] = Results} ->
|
{ok, [{ok, _} | _] = Results0} ->
|
||||||
{SuccCode, format_bridge_info([R || {ok, R} <- Results])};
|
Results = [R || {ok, R} <- Results0],
|
||||||
|
{SuccCode, format_bridge_info(ConfRootKey, BridgeType, BridgeName, Results)};
|
||||||
{ok, [{error, not_found} | _]} ->
|
{ok, [{error, not_found} | _]} ->
|
||||||
?BRIDGE_NOT_FOUND(BridgeType, BridgeName);
|
?BRIDGE_NOT_FOUND(BridgeType, BridgeName);
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
|
@ -1146,11 +1147,11 @@ maybe_unwrap({error, not_implemented}) ->
|
||||||
maybe_unwrap(RpcMulticallResult) ->
|
maybe_unwrap(RpcMulticallResult) ->
|
||||||
emqx_rpc:unwrap_erpc(RpcMulticallResult).
|
emqx_rpc:unwrap_erpc(RpcMulticallResult).
|
||||||
|
|
||||||
zip_bridges([BridgesFirstNode | _] = BridgesAllNodes) ->
|
zip_bridges(ConfRootKey, [BridgesFirstNode | _] = BridgesAllNodes) ->
|
||||||
lists:foldl(
|
lists:foldl(
|
||||||
fun(#{type := Type, name := Name}, Acc) ->
|
fun(#{type := Type, name := Name}, Acc) ->
|
||||||
Bridges = pick_bridges_by_id(Type, Name, BridgesAllNodes),
|
Bridges = pick_bridges_by_id(Type, Name, BridgesAllNodes),
|
||||||
[format_bridge_info(Bridges) | Acc]
|
[format_bridge_info(ConfRootKey, Type, Name, Bridges) | Acc]
|
||||||
end,
|
end,
|
||||||
[],
|
[],
|
||||||
BridgesFirstNode
|
BridgesFirstNode
|
||||||
|
@ -1184,12 +1185,19 @@ pick_bridges_by_id(Type, Name, BridgesAllNodes) ->
|
||||||
BridgesAllNodes
|
BridgesAllNodes
|
||||||
).
|
).
|
||||||
|
|
||||||
format_bridge_info([FirstBridge | _] = Bridges) ->
|
format_bridge_info(ConfRootKey, Type, Name, [FirstBridge | _] = Bridges) ->
|
||||||
Res = maps:remove(node, FirstBridge),
|
Res = maps:remove(node, FirstBridge),
|
||||||
NodeStatus = node_status(Bridges),
|
NodeStatus = node_status(Bridges),
|
||||||
|
Id = emqx_bridge_resource:bridge_id(Type, Name),
|
||||||
|
Rules =
|
||||||
|
case ConfRootKey of
|
||||||
|
actions -> emqx_rule_engine:get_rule_ids_by_bridge_action(Id);
|
||||||
|
sources -> emqx_rule_engine:get_rule_ids_by_bridge_source(Id)
|
||||||
|
end,
|
||||||
redact(Res#{
|
redact(Res#{
|
||||||
status => aggregate_status(NodeStatus),
|
status => aggregate_status(NodeStatus),
|
||||||
node_status => NodeStatus
|
node_status => NodeStatus,
|
||||||
|
rules => lists:sort(Rules)
|
||||||
}).
|
}).
|
||||||
|
|
||||||
node_status(Bridges) ->
|
node_status(Bridges) ->
|
||||||
|
|
|
@ -289,7 +289,8 @@ end_per_testcase(_TestCase, Config) ->
|
||||||
|
|
||||||
skip_connector_creation_test_cases() ->
|
skip_connector_creation_test_cases() ->
|
||||||
[
|
[
|
||||||
t_connector_dependencies
|
t_connector_dependencies,
|
||||||
|
t_kind_dependencies
|
||||||
].
|
].
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
@ -608,6 +609,14 @@ get_connector_api(Type, Name) ->
|
||||||
Res = emqx_bridge_v2_testlib:get_connector_api(Type, Name),
|
Res = emqx_bridge_v2_testlib:get_connector_api(Type, Name),
|
||||||
emqx_mgmt_api_test_util:simplify_result(Res).
|
emqx_mgmt_api_test_util:simplify_result(Res).
|
||||||
|
|
||||||
|
get_source_api(Type, Name) ->
|
||||||
|
Res = emqx_bridge_v2_testlib:get_bridge_api(source, Type, Name),
|
||||||
|
emqx_mgmt_api_test_util:simplify_result(Res).
|
||||||
|
|
||||||
|
get_action_api(Type, Name) ->
|
||||||
|
Res = emqx_bridge_v2_testlib:get_bridge_api(action, Type, Name),
|
||||||
|
emqx_mgmt_api_test_util:simplify_result(Res).
|
||||||
|
|
||||||
create_source_api(Name, Type, Params) ->
|
create_source_api(Name, Type, Params) ->
|
||||||
Res = emqx_bridge_v2_testlib:create_kind_api([
|
Res = emqx_bridge_v2_testlib:create_kind_api([
|
||||||
{bridge_kind, source},
|
{bridge_kind, source},
|
||||||
|
@ -626,6 +635,43 @@ create_action_api(Name, Type, Params) ->
|
||||||
]),
|
]),
|
||||||
emqx_mgmt_api_test_util:simplify_result(Res).
|
emqx_mgmt_api_test_util:simplify_result(Res).
|
||||||
|
|
||||||
|
list_sources_api() ->
|
||||||
|
Res = emqx_bridge_v2_testlib:list_sources_http_api(),
|
||||||
|
emqx_mgmt_api_test_util:simplify_result(Res).
|
||||||
|
|
||||||
|
list_actions_api() ->
|
||||||
|
Res = emqx_bridge_v2_testlib:list_actions_http_api(),
|
||||||
|
emqx_mgmt_api_test_util:simplify_result(Res).
|
||||||
|
|
||||||
|
create_action_rule(ActionType, ActionName) ->
|
||||||
|
RuleTopic = <<"t/", ActionName/binary>>,
|
||||||
|
Config = [{action_name, ActionName}],
|
||||||
|
emqx_bridge_v2_testlib:create_rule_and_action_http(ActionType, RuleTopic, Config).
|
||||||
|
|
||||||
|
create_source_rule1(SourceType, SourceName) ->
|
||||||
|
RuleTopic = <<"t/", SourceName/binary>>,
|
||||||
|
Config = [{action_name, <<"unused">>}],
|
||||||
|
Id = emqx_bridge_resource:bridge_id(SourceType, SourceName),
|
||||||
|
Opts = #{
|
||||||
|
overrides => #{
|
||||||
|
sql => <<"select * from \"$bridges/", Id/binary, "\"">>,
|
||||||
|
actions => []
|
||||||
|
}
|
||||||
|
},
|
||||||
|
emqx_bridge_v2_testlib:create_rule_and_action_http(SourceType, RuleTopic, Config, Opts).
|
||||||
|
|
||||||
|
create_source_rule2(SourceType, SourceName) ->
|
||||||
|
RuleTopic = <<"t/", SourceName/binary>>,
|
||||||
|
Config = [{action_name, <<"unused">>}],
|
||||||
|
Id = emqx_bridge_resource:bridge_id(SourceType, SourceName),
|
||||||
|
Opts = #{
|
||||||
|
overrides => #{
|
||||||
|
sql => <<"select * from \"$sources/", Id/binary, "\"">>,
|
||||||
|
actions => []
|
||||||
|
}
|
||||||
|
},
|
||||||
|
emqx_bridge_v2_testlib:create_rule_and_action_http(SourceType, RuleTopic, Config, Opts).
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% Testcases
|
%% Testcases
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
@ -1743,3 +1789,121 @@ t_connector_dependencies(Config) when is_list(Config) ->
|
||||||
[]
|
[]
|
||||||
),
|
),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
%% Verifies that listing actions/sources return the rules that depend on them.
|
||||||
|
t_kind_dependencies(matrix) ->
|
||||||
|
[
|
||||||
|
[single, actions],
|
||||||
|
[single, sources]
|
||||||
|
];
|
||||||
|
t_kind_dependencies(Config) when is_list(Config) ->
|
||||||
|
?check_trace(
|
||||||
|
begin
|
||||||
|
%% This particular source type happens to serve both actions and sources, a
|
||||||
|
%% nice edge case for this test.
|
||||||
|
ActionType = ?SOURCE_TYPE,
|
||||||
|
SourceType = ?SOURCE_TYPE,
|
||||||
|
ConnectorType = ?SOURCE_CONNECTOR_TYPE,
|
||||||
|
ConnectorName = <<"c">>,
|
||||||
|
{ok, {{_, 201, _}, _, _}} =
|
||||||
|
emqx_bridge_v2_testlib:create_connector_api([
|
||||||
|
{connector_config, source_connector_create_config(#{})},
|
||||||
|
{connector_name, ConnectorName},
|
||||||
|
{connector_type, ConnectorType}
|
||||||
|
]),
|
||||||
|
|
||||||
|
ActionName1 = <<"a1">>,
|
||||||
|
{201, _} = create_action_api(
|
||||||
|
ActionName1,
|
||||||
|
ActionType,
|
||||||
|
mqtt_action_create_config(#{
|
||||||
|
<<"connector">> => ConnectorName
|
||||||
|
})
|
||||||
|
),
|
||||||
|
?assertMatch(
|
||||||
|
{200, [#{<<"rules">> := []}]},
|
||||||
|
list_actions_api()
|
||||||
|
),
|
||||||
|
?assertMatch(
|
||||||
|
{200, #{<<"rules">> := []}},
|
||||||
|
get_action_api(ActionType, ActionName1)
|
||||||
|
),
|
||||||
|
|
||||||
|
{ok, #{<<"id">> := RuleId1}} = create_action_rule(ActionType, ActionName1),
|
||||||
|
|
||||||
|
?assertMatch(
|
||||||
|
{200, [#{<<"rules">> := [RuleId1]}]},
|
||||||
|
list_actions_api()
|
||||||
|
),
|
||||||
|
?assertMatch(
|
||||||
|
{200, #{<<"rules">> := [RuleId1]}},
|
||||||
|
get_action_api(ActionType, ActionName1)
|
||||||
|
),
|
||||||
|
?assertMatch(
|
||||||
|
{200, []},
|
||||||
|
list_sources_api()
|
||||||
|
),
|
||||||
|
|
||||||
|
SourceName1 = <<"s1">>,
|
||||||
|
{201, _} = create_source_api(
|
||||||
|
SourceName1,
|
||||||
|
?SOURCE_TYPE,
|
||||||
|
source_create_config(#{
|
||||||
|
<<"connector">> => ConnectorName
|
||||||
|
})
|
||||||
|
),
|
||||||
|
?assertMatch(
|
||||||
|
{200, [#{<<"rules">> := []}]},
|
||||||
|
list_sources_api()
|
||||||
|
),
|
||||||
|
?assertMatch(
|
||||||
|
{200, #{<<"rules">> := []}},
|
||||||
|
get_source_api(SourceType, SourceName1)
|
||||||
|
),
|
||||||
|
%% Action remains untouched
|
||||||
|
?assertMatch(
|
||||||
|
{200, [#{<<"rules">> := [RuleId1]}]},
|
||||||
|
list_actions_api()
|
||||||
|
),
|
||||||
|
?assertMatch(
|
||||||
|
{200, #{<<"rules">> := [RuleId1]}},
|
||||||
|
get_action_api(ActionType, ActionName1)
|
||||||
|
),
|
||||||
|
|
||||||
|
%% using "$bridges/..." hookpoint
|
||||||
|
{ok, #{<<"id">> := RuleId2}} = create_source_rule1(SourceType, SourceName1),
|
||||||
|
?assertMatch(
|
||||||
|
{200, [#{<<"rules">> := [RuleId2]}]},
|
||||||
|
list_sources_api()
|
||||||
|
),
|
||||||
|
?assertMatch(
|
||||||
|
{200, #{<<"rules">> := [RuleId2]}},
|
||||||
|
get_source_api(SourceType, SourceName1)
|
||||||
|
),
|
||||||
|
%% Action remains untouched
|
||||||
|
?assertMatch(
|
||||||
|
{200, [#{<<"rules">> := [RuleId1]}]},
|
||||||
|
list_actions_api()
|
||||||
|
),
|
||||||
|
|
||||||
|
%% using "$sources/..." hookpoint
|
||||||
|
{ok, #{<<"id">> := RuleId3}} = create_source_rule2(SourceType, SourceName1),
|
||||||
|
?assertMatch(
|
||||||
|
{200, [#{<<"rules">> := [RuleId1]}]},
|
||||||
|
list_actions_api()
|
||||||
|
),
|
||||||
|
Rules = lists:sort([RuleId2, RuleId3]),
|
||||||
|
?assertMatch(
|
||||||
|
{200, [#{<<"rules">> := Rules}]},
|
||||||
|
list_sources_api()
|
||||||
|
),
|
||||||
|
?assertMatch(
|
||||||
|
{200, #{<<"rules">> := Rules}},
|
||||||
|
get_source_api(SourceType, SourceName1)
|
||||||
|
),
|
||||||
|
|
||||||
|
ok
|
||||||
|
end,
|
||||||
|
[]
|
||||||
|
),
|
||||||
|
ok.
|
||||||
|
|
|
@ -415,6 +415,7 @@ common_init(ConfigT) ->
|
||||||
emqx_conf,
|
emqx_conf,
|
||||||
emqx_bridge_hstreamdb,
|
emqx_bridge_hstreamdb,
|
||||||
emqx_bridge,
|
emqx_bridge,
|
||||||
|
emqx_rule_engine,
|
||||||
emqx_management,
|
emqx_management,
|
||||||
emqx_mgmt_api_test_util:emqx_dashboard()
|
emqx_mgmt_api_test_util:emqx_dashboard()
|
||||||
],
|
],
|
||||||
|
|
|
@ -47,6 +47,8 @@
|
||||||
get_rules_for_topic/1,
|
get_rules_for_topic/1,
|
||||||
get_rules_with_same_event/1,
|
get_rules_with_same_event/1,
|
||||||
get_rule_ids_by_action/1,
|
get_rule_ids_by_action/1,
|
||||||
|
get_rule_ids_by_bridge_action/1,
|
||||||
|
get_rule_ids_by_bridge_source/1,
|
||||||
ensure_action_removed/2,
|
ensure_action_removed/2,
|
||||||
get_rules_ordered_by_ts/0
|
get_rules_ordered_by_ts/0
|
||||||
]).
|
]).
|
||||||
|
@ -108,6 +110,8 @@
|
||||||
-define(RATE_METRICS, ['matched']).
|
-define(RATE_METRICS, ['matched']).
|
||||||
|
|
||||||
-type action_name() :: binary() | #{function := binary()}.
|
-type action_name() :: binary() | #{function := binary()}.
|
||||||
|
-type bridge_action_id() :: binary().
|
||||||
|
-type bridge_source_id() :: binary().
|
||||||
|
|
||||||
-spec start_link() -> {ok, pid()} | ignore | {error, Reason :: term()}.
|
-spec start_link() -> {ok, pid()} | ignore | {error, Reason :: term()}.
|
||||||
start_link() ->
|
start_link() ->
|
||||||
|
@ -255,6 +259,24 @@ get_rule_ids_by_action(#{function := FuncName}) when is_binary(FuncName) ->
|
||||||
contains_actions(Acts, Mod, Fun)
|
contains_actions(Acts, Mod, Fun)
|
||||||
].
|
].
|
||||||
|
|
||||||
|
-spec get_rule_ids_by_bridge_action(bridge_action_id()) -> [binary()].
|
||||||
|
get_rule_ids_by_bridge_action(ActionId) ->
|
||||||
|
%% ActionId = <<"type:name">>
|
||||||
|
[
|
||||||
|
Id
|
||||||
|
|| #{actions := Acts, id := Id} <- get_rules(),
|
||||||
|
forwards_to_bridge(Acts, ActionId)
|
||||||
|
].
|
||||||
|
|
||||||
|
-spec get_rule_ids_by_bridge_source(bridge_source_id()) -> [binary()].
|
||||||
|
get_rule_ids_by_bridge_source(SourceId) ->
|
||||||
|
%% SourceId = <<"type:name">>
|
||||||
|
[
|
||||||
|
Id
|
||||||
|
|| #{from := Froms, id := Id} <- get_rules(),
|
||||||
|
references_ingress_bridge(Froms, SourceId)
|
||||||
|
].
|
||||||
|
|
||||||
-spec ensure_action_removed(rule_id(), action_name()) -> ok.
|
-spec ensure_action_removed(rule_id(), action_name()) -> ok.
|
||||||
ensure_action_removed(RuleId, ActionName) ->
|
ensure_action_removed(RuleId, ActionName) ->
|
||||||
FilterFunc =
|
FilterFunc =
|
||||||
|
|
Loading…
Reference in New Issue