feat: check dependent actions before removing the bridges

This commit is contained in:
Shawn 2022-09-06 15:34:23 +08:00
parent c695e67e18
commit 60a90858f8
7 changed files with 286 additions and 15 deletions

View File

@ -37,6 +37,7 @@
create/3,
disable_enable/3,
remove/2,
check_deps_and_remove/3,
list/0
]).
@ -247,6 +248,24 @@ remove(BridgeType, BridgeName) ->
#{override_to => cluster}
).
check_deps_and_remove(BridgeType, BridgeName, RemoveDeps) ->
Id = emqx_bridge_resource:bridge_id(BridgeType, BridgeName),
%% NOTE: This violates the design: Rule depends on data-bridge but not vice versa.
case emqx_rule_engine:get_rule_ids_by_action(Id) of
[] ->
remove(BridgeType, BridgeName);
Rules when RemoveDeps =:= false ->
{error, {rules_deps_on_this_bridge, Rules}};
Rules when RemoveDeps =:= true ->
lists:foreach(
fun(R) ->
emqx_rule_engine:ensure_action_removed(R, Id)
end,
Rules
),
remove(BridgeType, BridgeName)
end.
%%========================================================================================
%% Helper functions
%%========================================================================================

View File

@ -331,6 +331,7 @@ schema("/bridges/:id") ->
responses => #{
204 => <<"Bridge deleted">>,
400 => error_schema(['INVALID_ID'], "Update bridge failed"),
403 => error_schema('FORBIDDEN_REQUEST', "forbidden operation"),
503 => error_schema('SERVICE_UNAVAILABLE', "Service unavailable")
}
}
@ -424,13 +425,28 @@ schema("/nodes/:node/bridges/:id/operation/:operation") ->
{404, error_msg('NOT_FOUND', <<"bridge not found">>)}
end
);
'/bridges/:id'(delete, #{bindings := #{id := Id}}) ->
'/bridges/:id'(delete, #{bindings := #{id := Id}, query_string := Qs}) ->
AlsoDeleteActs =
case maps:get(<<"also_delete_dep_actions">>, Qs, <<"false">>) of
<<"true">> -> true;
true -> true;
_ -> false
end,
?TRY_PARSE_ID(
Id,
case emqx_bridge:remove(BridgeType, BridgeName) of
{ok, _} -> {204};
{error, timeout} -> {503, error_msg('SERVICE_UNAVAILABLE', <<"request timeout">>)};
{error, Reason} -> {500, error_msg('INTERNAL_ERROR', Reason)}
case emqx_bridge:check_deps_and_remove(BridgeType, BridgeName, AlsoDeleteActs) of
{ok, _} ->
204;
{error, {rules_deps_on_this_bridge, RuleIds}} ->
{403,
error_msg(
'FORBIDDEN_REQUEST',
{<<"There're some rules dependent on this bridge">>, RuleIds}
)};
{error, timeout} ->
{503, error_msg('SERVICE_UNAVAILABLE', <<"request timeout">>)};
{error, Reason} ->
{500, error_msg('INTERNAL_ERROR', Reason)}
end
).

View File

@ -61,14 +61,18 @@ init_per_suite(Config) ->
_ = application:stop(emqx_resource),
_ = application:stop(emqx_connector),
ok = emqx_common_test_helpers:start_apps(
[emqx_bridge, emqx_dashboard],
[emqx_rule_engine, emqx_bridge, emqx_dashboard],
fun set_special_configs/1
),
ok = emqx_common_test_helpers:load_config(
emqx_rule_engine_schema,
<<"rule_engine {rules {}}">>
),
ok = emqx_common_test_helpers:load_config(emqx_bridge_schema, ?CONF_DEFAULT),
Config.
end_per_suite(_Config) ->
emqx_common_test_helpers:stop_apps([emqx_bridge, emqx_dashboard]),
emqx_common_test_helpers:stop_apps([emqx_rule_engine, emqx_bridge, emqx_dashboard]),
ok.
set_special_configs(emqx_dashboard) ->
@ -301,6 +305,80 @@ t_http_crud_apis(Config) ->
),
ok.
t_check_dependent_actions_on_delete(Config) ->
Port = ?config(port, Config),
%% assert we there's no bridges at first
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
%% then we add a webhook bridge, using POST
%% POST /bridges/ will create a bridge
URL1 = ?URL(Port, "path1"),
Name = <<"t_http_crud_apis">>,
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name),
{ok, 201, _} = request(
post,
uri(["bridges"]),
?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name)
),
{ok, 201, Rule} = request(
post,
uri(["rules"]),
#{
<<"name">> => <<"t_http_crud_apis">>,
<<"enable">> => true,
<<"actions">> => [BridgeID],
<<"sql">> => <<"SELECT * from \"t\"">>
}
),
#{<<"id">> := RuleId} = jsx:decode(Rule),
%% delete the bridge should fail because there is a rule depenents on it
{ok, 403, _} = request(delete, uri(["bridges", BridgeID]), []),
%% delete the rule first
{ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), []),
%% then delete the bridge is OK
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []),
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
ok.
t_cascade_delete_actions(Config) ->
Port = ?config(port, Config),
%% assert we there's no bridges at first
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
%% then we add a webhook bridge, using POST
%% POST /bridges/ will create a bridge
URL1 = ?URL(Port, "path1"),
Name = <<"t_http_crud_apis">>,
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name),
{ok, 201, _} = request(
post,
uri(["bridges"]),
?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name)
),
{ok, 201, Rule} = request(
post,
uri(["rules"]),
#{
<<"name">> => <<"t_http_crud_apis">>,
<<"enable">> => true,
<<"actions">> => [BridgeID],
<<"sql">> => <<"SELECT * from \"t\"">>
}
),
#{<<"id">> := RuleId} = jsx:decode(Rule),
%% delete the bridge will also delete the actions from the rules
{ok, 204, _} = request(delete, uri(["bridges", BridgeID]) ++ "?also_delete_dep_actions", []),
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
{ok, 200, Rule1} = request(get, uri(["rules", RuleId]), []),
?assertMatch(
#{
<<"actions">> := []
},
jsx:decode(Rule1)
),
{ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), []),
ok.
t_start_stop_bridges_node(Config) ->
do_start_stop_bridges(node, Config).

View File

@ -92,7 +92,7 @@ request(Username, Method, Url, Body) ->
uri() -> uri([]).
uri(Parts) when is_list(Parts) ->
NParts = [E || E <- Parts],
?HOST ++ filename:join([?BASE_PATH, ?API_VERSION | NParts]).
?HOST ++ to_list(filename:join([?BASE_PATH, ?API_VERSION | NParts])).
auth_header(Username) ->
Password = <<"public">>,

View File

@ -46,6 +46,8 @@
get_rules/0,
get_rules_for_topic/1,
get_rules_with_same_event/1,
get_rule_ids_by_action/1,
ensure_action_removed/2,
get_rules_ordered_by_ts/0
]).
@ -99,6 +101,8 @@
-define(RATE_METRICS, ['matched']).
-type action_name() :: binary() | #{function := binary()}.
config_key_path() ->
[rule_engine, rules].
@ -208,6 +212,46 @@ get_rules_with_same_event(Topic) ->
lists:any(fun(T) -> is_of_event_name(EventName, T) end, From)
].
-spec get_rule_ids_by_action(action_name()) -> [rule_id()].
get_rule_ids_by_action(ActionName) when is_binary(ActionName) ->
[
Id
|| #{actions := Acts, id := Id} <- get_rules(),
lists:any(fun(A) -> A =:= ActionName end, Acts)
];
get_rule_ids_by_action(#{function := FuncName}) when is_binary(FuncName) ->
{Mod, Fun} =
case string:split(FuncName, ":", leading) of
[M, F] -> {binary_to_module(M), F};
[F] -> {emqx_rule_actions, F}
end,
[
Id
|| #{actions := Acts, id := Id} <- get_rules(),
contains_actions(Acts, Mod, Fun)
].
-spec ensure_action_removed(rule_id(), action_name()) -> ok.
ensure_action_removed(RuleId, ActionName) ->
FilterFunc =
fun
(Func, Func) -> false;
(#{<<"function">> := Func}, #{function := Func}) -> false;
(_, _) -> true
end,
case emqx:get_raw_config([rule_engine, rules, RuleId], not_found) of
not_found ->
ok;
#{<<"actions">> := Acts} ->
NewActs = [AName || AName <- Acts, FilterFunc(AName, ActionName)],
{ok, _} = emqx_conf:update(
emqx_rule_engine:config_key_path() ++ [RuleId, actions],
NewActs,
#{override_to => cluster}
),
ok
end.
is_of_event_name(EventName, Topic) ->
EventName =:= emqx_rule_events:event_name(Topic).
@ -413,3 +457,20 @@ now_ms() ->
bin(A) when is_atom(A) -> atom_to_binary(A, utf8);
bin(B) when is_binary(B) -> B.
binary_to_module(ModName) ->
try
binary_to_existing_atom(ModName, utf8)
catch
error:badarg ->
not_exist_mod
end.
contains_actions(Actions, Mod0, Func0) ->
lists:any(
fun
(#{mod := Mod, func := Func}) when Mod =:= Mod0; Func =:= Func0 -> true;
(_) -> false
end,
Actions
).

View File

@ -19,7 +19,6 @@
-export([
test/1,
echo_action/2,
get_selected_data/3
]).
@ -70,7 +69,8 @@ test_rule(Sql, Select, Context, EventTopics) ->
ok = emqx_rule_engine:clear_metrics_for_rule(RuleId)
end.
get_selected_data(Selected, _Envs, _Args) ->
get_selected_data(Selected, Envs, Args) ->
?TRACE("RULE", "testing_rule_sql_ok", #{selected => Selected, envs => Envs, args => Args}),
{ok, Selected}.
is_publish_topic(<<"$events/", _/binary>>) -> false;
@ -84,10 +84,6 @@ flatten([{ok, D}]) ->
flatten([D | L]) when is_list(D) ->
[D0 || {ok, D0} <- D] ++ flatten(L).
echo_action(Data, Envs) ->
?TRACE("RULE", "testing_rule_sql_ok", #{data => Data, envs => Envs}),
{ok, Data}.
fill_default_values(Event, Context) ->
maps:merge(envs_examp(Event), Context).

View File

@ -52,7 +52,9 @@ groups() ->
t_create_existing_rule,
t_get_rules_for_topic,
t_get_rules_for_topic_2,
t_get_rules_with_same_event
t_get_rules_with_same_event,
t_get_rule_ids_by_action,
t_ensure_action_removed
]},
{runtime, [], [
t_match_atom_and_binary,
@ -431,6 +433,105 @@ t_get_rules_with_same_event(_Config) ->
]),
ok.
t_get_rule_ids_by_action(_) ->
ID = <<"t_get_rule_ids_by_action">>,
Rule1 = #{
enable => false,
id => ID,
sql => <<"SELECT * FROM \"t\"">>,
from => [<<"t">>],
fields => [<<"*">>],
is_foreach => false,
conditions => {},
actions => [
#{mod => emqx_rule_actions, func => console, args => #{}},
#{mod => emqx_rule_actions, func => republish, args => #{}},
<<"mqtt:my_mqtt_bridge">>,
<<"mysql:foo">>
],
description => ID,
created_at => erlang:system_time(millisecond)
},
ok = insert_rules([Rule1]),
?assertMatch(
[ID],
emqx_rule_engine:get_rule_ids_by_action(#{function => <<"emqx_rule_actions:console">>})
),
?assertMatch(
[ID],
emqx_rule_engine:get_rule_ids_by_action(#{function => <<"emqx_rule_actions:republish">>})
),
?assertEqual([], emqx_rule_engine:get_rule_ids_by_action(#{function => <<"some_mod:fun">>})),
?assertMatch([ID], emqx_rule_engine:get_rule_ids_by_action(<<"mysql:foo">>)),
?assertEqual([], emqx_rule_engine:get_rule_ids_by_action(<<"mysql:not_exists">>)),
ok = delete_rules_by_ids([<<"t_get_rule_ids_by_action">>]).
t_ensure_action_removed(_) ->
Id = <<"t_ensure_action_removed">>,
GetSelectedData = <<"emqx_rule_sqltester:get_selected_data">>,
emqx:update_config(
[rule_engine, rules],
#{
Id => #{
<<"actions">> => [
#{<<"function">> => GetSelectedData},
#{<<"function">> => <<"console">>},
#{<<"function">> => <<"republish">>},
<<"mysql:foo">>,
<<"mqtt:bar">>
],
<<"description">> => <<"">>,
<<"sql">> => <<"SELECT * FROM \"t/#\"">>
}
}
),
?assertMatch(
#{
<<"actions">> := [
#{<<"function">> := GetSelectedData},
#{<<"function">> := <<"console">>},
#{<<"function">> := <<"republish">>},
<<"mysql:foo">>,
<<"mqtt:bar">>
]
},
emqx:get_raw_config([rule_engine, rules, Id])
),
ok = emqx_rule_engine:ensure_action_removed(Id, #{function => <<"console">>}),
?assertMatch(
#{
<<"actions">> := [
#{<<"function">> := GetSelectedData},
#{<<"function">> := <<"republish">>},
<<"mysql:foo">>,
<<"mqtt:bar">>
]
},
emqx:get_raw_config([rule_engine, rules, Id])
),
ok = emqx_rule_engine:ensure_action_removed(Id, <<"mysql:foo">>),
?assertMatch(
#{
<<"actions">> := [
#{<<"function">> := GetSelectedData},
#{<<"function">> := <<"republish">>},
<<"mqtt:bar">>
]
},
emqx:get_raw_config([rule_engine, rules, Id])
),
ok = emqx_rule_engine:ensure_action_removed(Id, #{function => GetSelectedData}),
?assertMatch(
#{
<<"actions">> := [
#{<<"function">> := <<"republish">>},
<<"mqtt:bar">>
]
},
emqx:get_raw_config([rule_engine, rules, Id])
),
emqx:remove_config([rule_engine, rules, Id]).
%%------------------------------------------------------------------------------
%% Test cases for rule runtime
%%------------------------------------------------------------------------------