From 60a90858f8745fbe1c46154007e805f69b9a2c64 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Tue, 6 Sep 2022 15:34:23 +0800 Subject: [PATCH] feat: check dependent actions before removing the bridges --- apps/emqx_bridge/src/emqx_bridge.erl | 19 ++++ apps/emqx_bridge/src/emqx_bridge_api.erl | 26 ++++- .../test/emqx_bridge_api_SUITE.erl | 82 +++++++++++++- .../test/emqx_dashboard_api_test_helpers.erl | 2 +- .../emqx_rule_engine/src/emqx_rule_engine.erl | 61 +++++++++++ .../src/emqx_rule_sqltester.erl | 8 +- .../test/emqx_rule_engine_SUITE.erl | 103 +++++++++++++++++- 7 files changed, 286 insertions(+), 15 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index 151275bd9..d4d24ef3a 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -37,6 +37,7 @@ create/3, disable_enable/3, remove/2, + check_deps_and_remove/3, list/0 ]). @@ -247,6 +248,24 @@ remove(BridgeType, BridgeName) -> #{override_to => cluster} ). +check_deps_and_remove(BridgeType, BridgeName, RemoveDeps) -> + Id = emqx_bridge_resource:bridge_id(BridgeType, BridgeName), + %% NOTE: This violates the design: Rule depends on data-bridge but not vice versa. + case emqx_rule_engine:get_rule_ids_by_action(Id) of + [] -> + remove(BridgeType, BridgeName); + Rules when RemoveDeps =:= false -> + {error, {rules_deps_on_this_bridge, Rules}}; + Rules when RemoveDeps =:= true -> + lists:foreach( + fun(R) -> + emqx_rule_engine:ensure_action_removed(R, Id) + end, + Rules + ), + remove(BridgeType, BridgeName) + end. + %%======================================================================================== %% Helper functions %%======================================================================================== diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index aa6a6af89..df5121f67 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -331,6 +331,7 @@ schema("/bridges/:id") -> responses => #{ 204 => <<"Bridge deleted">>, 400 => error_schema(['INVALID_ID'], "Update bridge failed"), + 403 => error_schema('FORBIDDEN_REQUEST', "forbidden operation"), 503 => error_schema('SERVICE_UNAVAILABLE', "Service unavailable") } } @@ -424,13 +425,28 @@ schema("/nodes/:node/bridges/:id/operation/:operation") -> {404, error_msg('NOT_FOUND', <<"bridge not found">>)} end ); -'/bridges/:id'(delete, #{bindings := #{id := Id}}) -> +'/bridges/:id'(delete, #{bindings := #{id := Id}, query_string := Qs}) -> + AlsoDeleteActs = + case maps:get(<<"also_delete_dep_actions">>, Qs, <<"false">>) of + <<"true">> -> true; + true -> true; + _ -> false + end, ?TRY_PARSE_ID( Id, - case emqx_bridge:remove(BridgeType, BridgeName) of - {ok, _} -> {204}; - {error, timeout} -> {503, error_msg('SERVICE_UNAVAILABLE', <<"request timeout">>)}; - {error, Reason} -> {500, error_msg('INTERNAL_ERROR', Reason)} + case emqx_bridge:check_deps_and_remove(BridgeType, BridgeName, AlsoDeleteActs) of + {ok, _} -> + 204; + {error, {rules_deps_on_this_bridge, RuleIds}} -> + {403, + error_msg( + 'FORBIDDEN_REQUEST', + {<<"There're some rules dependent on this bridge">>, RuleIds} + )}; + {error, timeout} -> + {503, error_msg('SERVICE_UNAVAILABLE', <<"request timeout">>)}; + {error, Reason} -> + {500, error_msg('INTERNAL_ERROR', Reason)} end ). diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl index 9346fb9c0..c0a58abcc 100644 --- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl @@ -61,14 +61,18 @@ init_per_suite(Config) -> _ = application:stop(emqx_resource), _ = application:stop(emqx_connector), ok = emqx_common_test_helpers:start_apps( - [emqx_bridge, emqx_dashboard], + [emqx_rule_engine, emqx_bridge, emqx_dashboard], fun set_special_configs/1 ), + ok = emqx_common_test_helpers:load_config( + emqx_rule_engine_schema, + <<"rule_engine {rules {}}">> + ), ok = emqx_common_test_helpers:load_config(emqx_bridge_schema, ?CONF_DEFAULT), Config. end_per_suite(_Config) -> - emqx_common_test_helpers:stop_apps([emqx_bridge, emqx_dashboard]), + emqx_common_test_helpers:stop_apps([emqx_rule_engine, emqx_bridge, emqx_dashboard]), ok. set_special_configs(emqx_dashboard) -> @@ -301,6 +305,80 @@ t_http_crud_apis(Config) -> ), ok. +t_check_dependent_actions_on_delete(Config) -> + Port = ?config(port, Config), + %% assert we there's no bridges at first + {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), + + %% then we add a webhook bridge, using POST + %% POST /bridges/ will create a bridge + URL1 = ?URL(Port, "path1"), + Name = <<"t_http_crud_apis">>, + BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name), + {ok, 201, _} = request( + post, + uri(["bridges"]), + ?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name) + ), + {ok, 201, Rule} = request( + post, + uri(["rules"]), + #{ + <<"name">> => <<"t_http_crud_apis">>, + <<"enable">> => true, + <<"actions">> => [BridgeID], + <<"sql">> => <<"SELECT * from \"t\"">> + } + ), + #{<<"id">> := RuleId} = jsx:decode(Rule), + %% delete the bridge should fail because there is a rule depenents on it + {ok, 403, _} = request(delete, uri(["bridges", BridgeID]), []), + %% delete the rule first + {ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), []), + %% then delete the bridge is OK + {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []), + {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), + ok. + +t_cascade_delete_actions(Config) -> + Port = ?config(port, Config), + %% assert we there's no bridges at first + {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), + + %% then we add a webhook bridge, using POST + %% POST /bridges/ will create a bridge + URL1 = ?URL(Port, "path1"), + Name = <<"t_http_crud_apis">>, + BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name), + {ok, 201, _} = request( + post, + uri(["bridges"]), + ?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name) + ), + {ok, 201, Rule} = request( + post, + uri(["rules"]), + #{ + <<"name">> => <<"t_http_crud_apis">>, + <<"enable">> => true, + <<"actions">> => [BridgeID], + <<"sql">> => <<"SELECT * from \"t\"">> + } + ), + #{<<"id">> := RuleId} = jsx:decode(Rule), + %% delete the bridge will also delete the actions from the rules + {ok, 204, _} = request(delete, uri(["bridges", BridgeID]) ++ "?also_delete_dep_actions", []), + {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), + {ok, 200, Rule1} = request(get, uri(["rules", RuleId]), []), + ?assertMatch( + #{ + <<"actions">> := [] + }, + jsx:decode(Rule1) + ), + {ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), []), + ok. + t_start_stop_bridges_node(Config) -> do_start_stop_bridges(node, Config). diff --git a/apps/emqx_dashboard/test/emqx_dashboard_api_test_helpers.erl b/apps/emqx_dashboard/test/emqx_dashboard_api_test_helpers.erl index 6b3891ef3..b74a118d2 100644 --- a/apps/emqx_dashboard/test/emqx_dashboard_api_test_helpers.erl +++ b/apps/emqx_dashboard/test/emqx_dashboard_api_test_helpers.erl @@ -92,7 +92,7 @@ request(Username, Method, Url, Body) -> uri() -> uri([]). uri(Parts) when is_list(Parts) -> NParts = [E || E <- Parts], - ?HOST ++ filename:join([?BASE_PATH, ?API_VERSION | NParts]). + ?HOST ++ to_list(filename:join([?BASE_PATH, ?API_VERSION | NParts])). auth_header(Username) -> Password = <<"public">>, diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl index f991c7b4f..5995b3f53 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl @@ -46,6 +46,8 @@ get_rules/0, get_rules_for_topic/1, get_rules_with_same_event/1, + get_rule_ids_by_action/1, + ensure_action_removed/2, get_rules_ordered_by_ts/0 ]). @@ -99,6 +101,8 @@ -define(RATE_METRICS, ['matched']). +-type action_name() :: binary() | #{function := binary()}. + config_key_path() -> [rule_engine, rules]. @@ -208,6 +212,46 @@ get_rules_with_same_event(Topic) -> lists:any(fun(T) -> is_of_event_name(EventName, T) end, From) ]. +-spec get_rule_ids_by_action(action_name()) -> [rule_id()]. +get_rule_ids_by_action(ActionName) when is_binary(ActionName) -> + [ + Id + || #{actions := Acts, id := Id} <- get_rules(), + lists:any(fun(A) -> A =:= ActionName end, Acts) + ]; +get_rule_ids_by_action(#{function := FuncName}) when is_binary(FuncName) -> + {Mod, Fun} = + case string:split(FuncName, ":", leading) of + [M, F] -> {binary_to_module(M), F}; + [F] -> {emqx_rule_actions, F} + end, + [ + Id + || #{actions := Acts, id := Id} <- get_rules(), + contains_actions(Acts, Mod, Fun) + ]. + +-spec ensure_action_removed(rule_id(), action_name()) -> ok. +ensure_action_removed(RuleId, ActionName) -> + FilterFunc = + fun + (Func, Func) -> false; + (#{<<"function">> := Func}, #{function := Func}) -> false; + (_, _) -> true + end, + case emqx:get_raw_config([rule_engine, rules, RuleId], not_found) of + not_found -> + ok; + #{<<"actions">> := Acts} -> + NewActs = [AName || AName <- Acts, FilterFunc(AName, ActionName)], + {ok, _} = emqx_conf:update( + emqx_rule_engine:config_key_path() ++ [RuleId, actions], + NewActs, + #{override_to => cluster} + ), + ok + end. + is_of_event_name(EventName, Topic) -> EventName =:= emqx_rule_events:event_name(Topic). @@ -413,3 +457,20 @@ now_ms() -> bin(A) when is_atom(A) -> atom_to_binary(A, utf8); bin(B) when is_binary(B) -> B. + +binary_to_module(ModName) -> + try + binary_to_existing_atom(ModName, utf8) + catch + error:badarg -> + not_exist_mod + end. + +contains_actions(Actions, Mod0, Func0) -> + lists:any( + fun + (#{mod := Mod, func := Func}) when Mod =:= Mod0; Func =:= Func0 -> true; + (_) -> false + end, + Actions + ). diff --git a/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl b/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl index 9669e0113..4de63e94f 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl @@ -19,7 +19,6 @@ -export([ test/1, - echo_action/2, get_selected_data/3 ]). @@ -70,7 +69,8 @@ test_rule(Sql, Select, Context, EventTopics) -> ok = emqx_rule_engine:clear_metrics_for_rule(RuleId) end. -get_selected_data(Selected, _Envs, _Args) -> +get_selected_data(Selected, Envs, Args) -> + ?TRACE("RULE", "testing_rule_sql_ok", #{selected => Selected, envs => Envs, args => Args}), {ok, Selected}. is_publish_topic(<<"$events/", _/binary>>) -> false; @@ -84,10 +84,6 @@ flatten([{ok, D}]) -> flatten([D | L]) when is_list(D) -> [D0 || {ok, D0} <- D] ++ flatten(L). -echo_action(Data, Envs) -> - ?TRACE("RULE", "testing_rule_sql_ok", #{data => Data, envs => Envs}), - {ok, Data}. - fill_default_values(Event, Context) -> maps:merge(envs_examp(Event), Context). diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl index 4e0c7dfe3..ba3a2cc30 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl @@ -52,7 +52,9 @@ groups() -> t_create_existing_rule, t_get_rules_for_topic, t_get_rules_for_topic_2, - t_get_rules_with_same_event + t_get_rules_with_same_event, + t_get_rule_ids_by_action, + t_ensure_action_removed ]}, {runtime, [], [ t_match_atom_and_binary, @@ -431,6 +433,105 @@ t_get_rules_with_same_event(_Config) -> ]), ok. +t_get_rule_ids_by_action(_) -> + ID = <<"t_get_rule_ids_by_action">>, + Rule1 = #{ + enable => false, + id => ID, + sql => <<"SELECT * FROM \"t\"">>, + from => [<<"t">>], + fields => [<<"*">>], + is_foreach => false, + conditions => {}, + actions => [ + #{mod => emqx_rule_actions, func => console, args => #{}}, + #{mod => emqx_rule_actions, func => republish, args => #{}}, + <<"mqtt:my_mqtt_bridge">>, + <<"mysql:foo">> + ], + description => ID, + created_at => erlang:system_time(millisecond) + }, + ok = insert_rules([Rule1]), + ?assertMatch( + [ID], + emqx_rule_engine:get_rule_ids_by_action(#{function => <<"emqx_rule_actions:console">>}) + ), + ?assertMatch( + [ID], + emqx_rule_engine:get_rule_ids_by_action(#{function => <<"emqx_rule_actions:republish">>}) + ), + ?assertEqual([], emqx_rule_engine:get_rule_ids_by_action(#{function => <<"some_mod:fun">>})), + ?assertMatch([ID], emqx_rule_engine:get_rule_ids_by_action(<<"mysql:foo">>)), + ?assertEqual([], emqx_rule_engine:get_rule_ids_by_action(<<"mysql:not_exists">>)), + ok = delete_rules_by_ids([<<"t_get_rule_ids_by_action">>]). + +t_ensure_action_removed(_) -> + Id = <<"t_ensure_action_removed">>, + GetSelectedData = <<"emqx_rule_sqltester:get_selected_data">>, + emqx:update_config( + [rule_engine, rules], + #{ + Id => #{ + <<"actions">> => [ + #{<<"function">> => GetSelectedData}, + #{<<"function">> => <<"console">>}, + #{<<"function">> => <<"republish">>}, + <<"mysql:foo">>, + <<"mqtt:bar">> + ], + <<"description">> => <<"">>, + <<"sql">> => <<"SELECT * FROM \"t/#\"">> + } + } + ), + ?assertMatch( + #{ + <<"actions">> := [ + #{<<"function">> := GetSelectedData}, + #{<<"function">> := <<"console">>}, + #{<<"function">> := <<"republish">>}, + <<"mysql:foo">>, + <<"mqtt:bar">> + ] + }, + emqx:get_raw_config([rule_engine, rules, Id]) + ), + ok = emqx_rule_engine:ensure_action_removed(Id, #{function => <<"console">>}), + ?assertMatch( + #{ + <<"actions">> := [ + #{<<"function">> := GetSelectedData}, + #{<<"function">> := <<"republish">>}, + <<"mysql:foo">>, + <<"mqtt:bar">> + ] + }, + emqx:get_raw_config([rule_engine, rules, Id]) + ), + ok = emqx_rule_engine:ensure_action_removed(Id, <<"mysql:foo">>), + ?assertMatch( + #{ + <<"actions">> := [ + #{<<"function">> := GetSelectedData}, + #{<<"function">> := <<"republish">>}, + <<"mqtt:bar">> + ] + }, + emqx:get_raw_config([rule_engine, rules, Id]) + ), + ok = emqx_rule_engine:ensure_action_removed(Id, #{function => GetSelectedData}), + ?assertMatch( + #{ + <<"actions">> := [ + #{<<"function">> := <<"republish">>}, + <<"mqtt:bar">> + ] + }, + emqx:get_raw_config([rule_engine, rules, Id]) + ), + emqx:remove_config([rule_engine, rules, Id]). + %%------------------------------------------------------------------------------ %% Test cases for rule runtime %%------------------------------------------------------------------------------