From 1824e7efcc4bcf485111f9643cbe3feb79df811e Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 27 Mar 2023 13:49:35 -0300 Subject: [PATCH] fix(rule_engine): count referenced bridges in `from` clauses as dependencies (rv5.0) Fixes https://emqx.atlassian.net/browse/EMQX-9325 Currently, ingress bridges referenced in the `FROM` clause of rules are not being accounted as dependencies. When we try to delete an ingress bridge that's referenced in a rule like `select * from "$bridges/mqtt:ingress"`, that bridge does not trigger an UI warning about dependent actions. --- .../include/emqx_bridge_resource.hrl | 22 +++++++++++++ apps/emqx_bridge/src/emqx_bridge_resource.erl | 16 +++++++-- .../test/emqx_bridge_resource_tests.erl | 33 +++++++++++++++++++ .../emqx_rule_engine/src/emqx_rule_engine.erl | 33 ++++++++++++++++--- .../emqx_rule_engine/src/emqx_rule_events.erl | 7 ++-- .../test/emqx_rule_engine_SUITE.erl | 25 +++++++++++++- changes/ce/fix-10251.en.md | 3 ++ 7 files changed, 128 insertions(+), 11 deletions(-) create mode 100644 apps/emqx_bridge/include/emqx_bridge_resource.hrl create mode 100644 apps/emqx_bridge/test/emqx_bridge_resource_tests.erl create mode 100644 changes/ce/fix-10251.en.md diff --git a/apps/emqx_bridge/include/emqx_bridge_resource.hrl b/apps/emqx_bridge/include/emqx_bridge_resource.hrl new file mode 100644 index 000000000..fcf1c41a4 --- /dev/null +++ b/apps/emqx_bridge/include/emqx_bridge_resource.hrl @@ -0,0 +1,22 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-ifndef(EMQX_BRIDGE_RESOURCE_HRL). +-define(EMQX_BRIDGE_RESOURCE_HRL, true). + +-define(BRIDGE_HOOKPOINT(BridgeId), <<"$bridges/", BridgeId/binary>>). + +-endif. diff --git a/apps/emqx_bridge/src/emqx_bridge_resource.erl b/apps/emqx_bridge/src/emqx_bridge_resource.erl index 6426a46b7..b43cbe0ec 100644 --- a/apps/emqx_bridge/src/emqx_bridge_resource.erl +++ b/apps/emqx_bridge/src/emqx_bridge_resource.erl @@ -15,6 +15,7 @@ %%-------------------------------------------------------------------- -module(emqx_bridge_resource). +-include("emqx_bridge_resource.hrl"). -include_lib("emqx/include/logger.hrl"). -include_lib("emqx_resource/include/emqx_resource.hrl"). @@ -23,7 +24,9 @@ resource_id/1, resource_id/2, bridge_id/2, - parse_bridge_id/1 + parse_bridge_id/1, + bridge_hookpoint/1, + bridge_hookpoint_to_bridge_id/1 ]). -export([ @@ -89,6 +92,14 @@ parse_bridge_id(BridgeId) -> ) end. +bridge_hookpoint(BridgeId) -> + <<"$bridges/", (bin(BridgeId))/binary>>. + +bridge_hookpoint_to_bridge_id(?BRIDGE_HOOKPOINT(BridgeId)) -> + {ok, BridgeId}; +bridge_hookpoint_to_bridge_id(_) -> + {error, bad_bridge_hookpoint}. + validate_name(Name0) -> Name = unicode:characters_to_list(Name0, utf8), case is_list(Name) andalso Name =/= [] of @@ -308,7 +319,8 @@ parse_confs(Type, Name, Conf) when ?IS_INGRESS_BRIDGE(Type) -> %% hookpoint. The underlying driver will run `emqx_hooks:run/3` when it %% receives a message from the external database. BId = bridge_id(Type, Name), - Conf#{hookpoint => <<"$bridges/", BId/binary>>, bridge_name => Name}; + BridgeHookpoint = bridge_hookpoint(BId), + Conf#{hookpoint => BridgeHookpoint, bridge_name => Name}; %% TODO: rename this to `kafka_producer' after alias support is added %% to hocon; keeping this as just `kafka' for backwards compatibility. parse_confs(<<"kafka">> = _Type, Name, Conf) -> diff --git a/apps/emqx_bridge/test/emqx_bridge_resource_tests.erl b/apps/emqx_bridge/test/emqx_bridge_resource_tests.erl new file mode 100644 index 000000000..a8a83ff6a --- /dev/null +++ b/apps/emqx_bridge/test/emqx_bridge_resource_tests.erl @@ -0,0 +1,33 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_bridge_resource_tests). + +-include_lib("eunit/include/eunit.hrl"). + +bridge_hookpoint_test_() -> + BridgeId = emqx_bridge_resource:bridge_id(type, name), + BridgeHookpoint = emqx_bridge_resource:bridge_hookpoint(BridgeId), + [ + ?_assertEqual(<<"$bridges/type:name">>, BridgeHookpoint), + ?_assertEqual( + {ok, BridgeId}, + emqx_bridge_resource:bridge_hookpoint_to_bridge_id(BridgeHookpoint) + ), + ?_assertEqual( + {error, bad_bridge_hookpoint}, + emqx_bridge_resource:bridge_hookpoint_to_bridge_id(BridgeId) + ) + ]. diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl index f15290547..d494a4740 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl @@ -213,11 +213,12 @@ get_rules_with_same_event(Topic) -> ]. -spec get_rule_ids_by_action(action_name()) -> [rule_id()]. -get_rule_ids_by_action(ActionName) when is_binary(ActionName) -> +get_rule_ids_by_action(BridgeId) when is_binary(BridgeId) -> [ Id - || #{actions := Acts, id := Id} <- get_rules(), - lists:any(fun(A) -> A =:= ActionName end, Acts) + || #{actions := Acts, id := Id, from := Froms} <- get_rules(), + forwards_to_bridge(Acts, BridgeId) orelse + references_ingress_bridge(Froms, BridgeId) ]; get_rule_ids_by_action(#{function := FuncName}) when is_binary(FuncName) -> {Mod, Fun} = @@ -317,8 +318,14 @@ get_basic_usage_info() -> NumRules = length(EnabledRules), ReferencedBridges = lists:foldl( - fun(#{actions := Actions, from := From}, Acc) -> - BridgeIDs0 = [BridgeID || <<"$bridges/", BridgeID/binary>> <- From], + fun(#{actions := Actions, from := Froms}, Acc) -> + BridgeIDs0 = + [ + BridgeID + || From <- Froms, + {ok, BridgeID} <- + [emqx_bridge_resource:bridge_hookpoint_to_bridge_id(From)] + ], BridgeIDs1 = lists:filter(fun is_binary/1, Actions), tally_referenced_bridges(BridgeIDs0 ++ BridgeIDs1, Acc) end, @@ -478,3 +485,19 @@ contains_actions(Actions, Mod0, Func0) -> end, Actions ). + +forwards_to_bridge(Actions, BridgeId) -> + lists:any(fun(A) -> A =:= BridgeId end, Actions). + +references_ingress_bridge(Froms, BridgeId) -> + lists:any( + fun(ReferenceBridgeId) -> + BridgeId =:= ReferenceBridgeId + end, + [ + RefBridgeId + || From <- Froms, + {ok, RefBridgeId} <- + [emqx_bridge_resource:bridge_hookpoint_to_bridge_id(From)] + ] + ). diff --git a/apps/emqx_rule_engine/src/emqx_rule_events.erl b/apps/emqx_rule_engine/src/emqx_rule_events.erl index 0c962f1fa..7f14f6d8b 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_events.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_events.erl @@ -20,6 +20,7 @@ -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/emqx_hooks.hrl"). +-include_lib("emqx_bridge/include/emqx_bridge_resource.hrl"). -export([ reload/0, @@ -1011,7 +1012,7 @@ hook_fun_name(HookPoint) -> HookFunName. %% return static function references to help static code checks -hook_fun(<<"$bridges/", _/binary>>) -> fun ?MODULE:on_bridge_message_received/2; +hook_fun(?BRIDGE_HOOKPOINT(_)) -> fun ?MODULE:on_bridge_message_received/2; hook_fun('client.connected') -> fun ?MODULE:on_client_connected/3; hook_fun('client.disconnected') -> fun ?MODULE:on_client_disconnected/4; hook_fun('client.connack') -> fun ?MODULE:on_client_connack/4; @@ -1034,7 +1035,7 @@ ntoa(undefined) -> undefined; ntoa({IpAddr, Port}) -> iolist_to_binary([inet:ntoa(IpAddr), ":", integer_to_list(Port)]); ntoa(IpAddr) -> iolist_to_binary(inet:ntoa(IpAddr)). -event_name(<<"$bridges/", _/binary>> = Bridge) -> Bridge; +event_name(?BRIDGE_HOOKPOINT(_) = Bridge) -> Bridge; event_name(<<"$events/client_connected">>) -> 'client.connected'; event_name(<<"$events/client_disconnected">>) -> 'client.disconnected'; event_name(<<"$events/client_connack">>) -> 'client.connack'; @@ -1047,7 +1048,7 @@ event_name(<<"$events/message_dropped">>) -> 'message.dropped'; event_name(<<"$events/delivery_dropped">>) -> 'delivery.dropped'; event_name(_) -> 'message.publish'. -event_topic(<<"$bridges/", _/binary>> = Bridge) -> Bridge; +event_topic(?BRIDGE_HOOKPOINT(_) = Bridge) -> Bridge; event_topic('client.connected') -> <<"$events/client_connected">>; event_topic('client.disconnected') -> <<"$events/client_disconnected">>; event_topic('client.connack') -> <<"$events/client_connack">>; diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl index c986cd365..2de013975 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl @@ -25,6 +25,8 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). +-import(emqx_common_test_helpers, [on_exit/1]). + %%-define(PROPTEST(M,F), true = proper:quickcheck(M:F())). -define(TMP_RULEID, atom_to_binary(?FUNCTION_NAME)). @@ -198,8 +200,11 @@ init_per_testcase(_TestCase, Config) -> end_per_testcase(t_events, Config) -> ets:delete(events_record_tab), - ok = delete_rule(?config(hook_points_rules, Config)); + ok = delete_rule(?config(hook_points_rules, Config)), + emqx_common_test_helpers:call_janitor(), + ok; end_per_testcase(_TestCase, _Config) -> + emqx_common_test_helpers:call_janitor(), ok. %%------------------------------------------------------------------------------ @@ -2683,6 +2688,24 @@ t_get_basic_usage_info_1(_Config) -> ), ok. +t_get_rule_ids_by_action_reference_ingress_bridge(_Config) -> + BridgeId = <<"mqtt:ingress">>, + RuleId = <<"rule:ingress_bridge_referenced">>, + {ok, _} = + emqx_rule_engine:create_rule( + #{ + id => RuleId, + sql => <<"select 1 from \"$bridges/", BridgeId/binary, "\"">>, + actions => [#{function => console}] + } + ), + on_exit(fun() -> emqx_rule_engine:delete_rule(RuleId) end), + ?assertMatch( + [RuleId], + emqx_rule_engine:get_rule_ids_by_action(BridgeId) + ), + ok. + %%------------------------------------------------------------------------------ %% Internal helpers %%------------------------------------------------------------------------------ diff --git a/changes/ce/fix-10251.en.md b/changes/ce/fix-10251.en.md new file mode 100644 index 000000000..84102f952 --- /dev/null +++ b/changes/ce/fix-10251.en.md @@ -0,0 +1,3 @@ +Consider bridges referenced in `FROM` rule clauses as dependencies. + +Before this fix, when one tried to delete an ingress rule referenced in an action like `select * from "$bridges/mqtt:ingress"`, the UI would not trigger a warning about dependent rule actions.