fix(rule_engine): count referenced bridges in `from` clauses as dependencies (rv5.0)
Fixes https://emqx.atlassian.net/browse/EMQX-9325 Currently, ingress bridges referenced in the `FROM` clause of rules are not being accounted as dependencies. When we try to delete an ingress bridge that's referenced in a rule like `select * from "$bridges/mqtt:ingress"`, that bridge does not trigger an UI warning about dependent actions.
This commit is contained in:
parent
e59362a332
commit
1824e7efcc
|
@ -0,0 +1,22 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-ifndef(EMQX_BRIDGE_RESOURCE_HRL).
|
||||||
|
-define(EMQX_BRIDGE_RESOURCE_HRL, true).
|
||||||
|
|
||||||
|
-define(BRIDGE_HOOKPOINT(BridgeId), <<"$bridges/", BridgeId/binary>>).
|
||||||
|
|
||||||
|
-endif.
|
|
@ -15,6 +15,7 @@
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
-module(emqx_bridge_resource).
|
-module(emqx_bridge_resource).
|
||||||
|
|
||||||
|
-include("emqx_bridge_resource.hrl").
|
||||||
-include_lib("emqx/include/logger.hrl").
|
-include_lib("emqx/include/logger.hrl").
|
||||||
-include_lib("emqx_resource/include/emqx_resource.hrl").
|
-include_lib("emqx_resource/include/emqx_resource.hrl").
|
||||||
|
|
||||||
|
@ -23,7 +24,9 @@
|
||||||
resource_id/1,
|
resource_id/1,
|
||||||
resource_id/2,
|
resource_id/2,
|
||||||
bridge_id/2,
|
bridge_id/2,
|
||||||
parse_bridge_id/1
|
parse_bridge_id/1,
|
||||||
|
bridge_hookpoint/1,
|
||||||
|
bridge_hookpoint_to_bridge_id/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([
|
-export([
|
||||||
|
@ -89,6 +92,14 @@ parse_bridge_id(BridgeId) ->
|
||||||
)
|
)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
bridge_hookpoint(BridgeId) ->
|
||||||
|
<<"$bridges/", (bin(BridgeId))/binary>>.
|
||||||
|
|
||||||
|
bridge_hookpoint_to_bridge_id(?BRIDGE_HOOKPOINT(BridgeId)) ->
|
||||||
|
{ok, BridgeId};
|
||||||
|
bridge_hookpoint_to_bridge_id(_) ->
|
||||||
|
{error, bad_bridge_hookpoint}.
|
||||||
|
|
||||||
validate_name(Name0) ->
|
validate_name(Name0) ->
|
||||||
Name = unicode:characters_to_list(Name0, utf8),
|
Name = unicode:characters_to_list(Name0, utf8),
|
||||||
case is_list(Name) andalso Name =/= [] of
|
case is_list(Name) andalso Name =/= [] of
|
||||||
|
@ -308,7 +319,8 @@ parse_confs(Type, Name, Conf) when ?IS_INGRESS_BRIDGE(Type) ->
|
||||||
%% hookpoint. The underlying driver will run `emqx_hooks:run/3` when it
|
%% hookpoint. The underlying driver will run `emqx_hooks:run/3` when it
|
||||||
%% receives a message from the external database.
|
%% receives a message from the external database.
|
||||||
BId = bridge_id(Type, Name),
|
BId = bridge_id(Type, Name),
|
||||||
Conf#{hookpoint => <<"$bridges/", BId/binary>>, bridge_name => Name};
|
BridgeHookpoint = bridge_hookpoint(BId),
|
||||||
|
Conf#{hookpoint => BridgeHookpoint, bridge_name => Name};
|
||||||
%% TODO: rename this to `kafka_producer' after alias support is added
|
%% TODO: rename this to `kafka_producer' after alias support is added
|
||||||
%% to hocon; keeping this as just `kafka' for backwards compatibility.
|
%% to hocon; keeping this as just `kafka' for backwards compatibility.
|
||||||
parse_confs(<<"kafka">> = _Type, Name, Conf) ->
|
parse_confs(<<"kafka">> = _Type, Name, Conf) ->
|
||||||
|
|
|
@ -0,0 +1,33 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
-module(emqx_bridge_resource_tests).
|
||||||
|
|
||||||
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
|
||||||
|
bridge_hookpoint_test_() ->
|
||||||
|
BridgeId = emqx_bridge_resource:bridge_id(type, name),
|
||||||
|
BridgeHookpoint = emqx_bridge_resource:bridge_hookpoint(BridgeId),
|
||||||
|
[
|
||||||
|
?_assertEqual(<<"$bridges/type:name">>, BridgeHookpoint),
|
||||||
|
?_assertEqual(
|
||||||
|
{ok, BridgeId},
|
||||||
|
emqx_bridge_resource:bridge_hookpoint_to_bridge_id(BridgeHookpoint)
|
||||||
|
),
|
||||||
|
?_assertEqual(
|
||||||
|
{error, bad_bridge_hookpoint},
|
||||||
|
emqx_bridge_resource:bridge_hookpoint_to_bridge_id(BridgeId)
|
||||||
|
)
|
||||||
|
].
|
|
@ -213,11 +213,12 @@ get_rules_with_same_event(Topic) ->
|
||||||
].
|
].
|
||||||
|
|
||||||
-spec get_rule_ids_by_action(action_name()) -> [rule_id()].
|
-spec get_rule_ids_by_action(action_name()) -> [rule_id()].
|
||||||
get_rule_ids_by_action(ActionName) when is_binary(ActionName) ->
|
get_rule_ids_by_action(BridgeId) when is_binary(BridgeId) ->
|
||||||
[
|
[
|
||||||
Id
|
Id
|
||||||
|| #{actions := Acts, id := Id} <- get_rules(),
|
|| #{actions := Acts, id := Id, from := Froms} <- get_rules(),
|
||||||
lists:any(fun(A) -> A =:= ActionName end, Acts)
|
forwards_to_bridge(Acts, BridgeId) orelse
|
||||||
|
references_ingress_bridge(Froms, BridgeId)
|
||||||
];
|
];
|
||||||
get_rule_ids_by_action(#{function := FuncName}) when is_binary(FuncName) ->
|
get_rule_ids_by_action(#{function := FuncName}) when is_binary(FuncName) ->
|
||||||
{Mod, Fun} =
|
{Mod, Fun} =
|
||||||
|
@ -317,8 +318,14 @@ get_basic_usage_info() ->
|
||||||
NumRules = length(EnabledRules),
|
NumRules = length(EnabledRules),
|
||||||
ReferencedBridges =
|
ReferencedBridges =
|
||||||
lists:foldl(
|
lists:foldl(
|
||||||
fun(#{actions := Actions, from := From}, Acc) ->
|
fun(#{actions := Actions, from := Froms}, Acc) ->
|
||||||
BridgeIDs0 = [BridgeID || <<"$bridges/", BridgeID/binary>> <- From],
|
BridgeIDs0 =
|
||||||
|
[
|
||||||
|
BridgeID
|
||||||
|
|| From <- Froms,
|
||||||
|
{ok, BridgeID} <-
|
||||||
|
[emqx_bridge_resource:bridge_hookpoint_to_bridge_id(From)]
|
||||||
|
],
|
||||||
BridgeIDs1 = lists:filter(fun is_binary/1, Actions),
|
BridgeIDs1 = lists:filter(fun is_binary/1, Actions),
|
||||||
tally_referenced_bridges(BridgeIDs0 ++ BridgeIDs1, Acc)
|
tally_referenced_bridges(BridgeIDs0 ++ BridgeIDs1, Acc)
|
||||||
end,
|
end,
|
||||||
|
@ -478,3 +485,19 @@ contains_actions(Actions, Mod0, Func0) ->
|
||||||
end,
|
end,
|
||||||
Actions
|
Actions
|
||||||
).
|
).
|
||||||
|
|
||||||
|
forwards_to_bridge(Actions, BridgeId) ->
|
||||||
|
lists:any(fun(A) -> A =:= BridgeId end, Actions).
|
||||||
|
|
||||||
|
references_ingress_bridge(Froms, BridgeId) ->
|
||||||
|
lists:any(
|
||||||
|
fun(ReferenceBridgeId) ->
|
||||||
|
BridgeId =:= ReferenceBridgeId
|
||||||
|
end,
|
||||||
|
[
|
||||||
|
RefBridgeId
|
||||||
|
|| From <- Froms,
|
||||||
|
{ok, RefBridgeId} <-
|
||||||
|
[emqx_bridge_resource:bridge_hookpoint_to_bridge_id(From)]
|
||||||
|
]
|
||||||
|
).
|
||||||
|
|
|
@ -20,6 +20,7 @@
|
||||||
-include_lib("emqx/include/emqx.hrl").
|
-include_lib("emqx/include/emqx.hrl").
|
||||||
-include_lib("emqx/include/logger.hrl").
|
-include_lib("emqx/include/logger.hrl").
|
||||||
-include_lib("emqx/include/emqx_hooks.hrl").
|
-include_lib("emqx/include/emqx_hooks.hrl").
|
||||||
|
-include_lib("emqx_bridge/include/emqx_bridge_resource.hrl").
|
||||||
|
|
||||||
-export([
|
-export([
|
||||||
reload/0,
|
reload/0,
|
||||||
|
@ -1011,7 +1012,7 @@ hook_fun_name(HookPoint) ->
|
||||||
HookFunName.
|
HookFunName.
|
||||||
|
|
||||||
%% return static function references to help static code checks
|
%% return static function references to help static code checks
|
||||||
hook_fun(<<"$bridges/", _/binary>>) -> fun ?MODULE:on_bridge_message_received/2;
|
hook_fun(?BRIDGE_HOOKPOINT(_)) -> fun ?MODULE:on_bridge_message_received/2;
|
||||||
hook_fun('client.connected') -> fun ?MODULE:on_client_connected/3;
|
hook_fun('client.connected') -> fun ?MODULE:on_client_connected/3;
|
||||||
hook_fun('client.disconnected') -> fun ?MODULE:on_client_disconnected/4;
|
hook_fun('client.disconnected') -> fun ?MODULE:on_client_disconnected/4;
|
||||||
hook_fun('client.connack') -> fun ?MODULE:on_client_connack/4;
|
hook_fun('client.connack') -> fun ?MODULE:on_client_connack/4;
|
||||||
|
@ -1034,7 +1035,7 @@ ntoa(undefined) -> undefined;
|
||||||
ntoa({IpAddr, Port}) -> iolist_to_binary([inet:ntoa(IpAddr), ":", integer_to_list(Port)]);
|
ntoa({IpAddr, Port}) -> iolist_to_binary([inet:ntoa(IpAddr), ":", integer_to_list(Port)]);
|
||||||
ntoa(IpAddr) -> iolist_to_binary(inet:ntoa(IpAddr)).
|
ntoa(IpAddr) -> iolist_to_binary(inet:ntoa(IpAddr)).
|
||||||
|
|
||||||
event_name(<<"$bridges/", _/binary>> = Bridge) -> Bridge;
|
event_name(?BRIDGE_HOOKPOINT(_) = Bridge) -> Bridge;
|
||||||
event_name(<<"$events/client_connected">>) -> 'client.connected';
|
event_name(<<"$events/client_connected">>) -> 'client.connected';
|
||||||
event_name(<<"$events/client_disconnected">>) -> 'client.disconnected';
|
event_name(<<"$events/client_disconnected">>) -> 'client.disconnected';
|
||||||
event_name(<<"$events/client_connack">>) -> 'client.connack';
|
event_name(<<"$events/client_connack">>) -> 'client.connack';
|
||||||
|
@ -1047,7 +1048,7 @@ event_name(<<"$events/message_dropped">>) -> 'message.dropped';
|
||||||
event_name(<<"$events/delivery_dropped">>) -> 'delivery.dropped';
|
event_name(<<"$events/delivery_dropped">>) -> 'delivery.dropped';
|
||||||
event_name(_) -> 'message.publish'.
|
event_name(_) -> 'message.publish'.
|
||||||
|
|
||||||
event_topic(<<"$bridges/", _/binary>> = Bridge) -> Bridge;
|
event_topic(?BRIDGE_HOOKPOINT(_) = Bridge) -> Bridge;
|
||||||
event_topic('client.connected') -> <<"$events/client_connected">>;
|
event_topic('client.connected') -> <<"$events/client_connected">>;
|
||||||
event_topic('client.disconnected') -> <<"$events/client_disconnected">>;
|
event_topic('client.disconnected') -> <<"$events/client_disconnected">>;
|
||||||
event_topic('client.connack') -> <<"$events/client_connack">>;
|
event_topic('client.connack') -> <<"$events/client_connack">>;
|
||||||
|
|
|
@ -25,6 +25,8 @@
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
-include_lib("common_test/include/ct.hrl").
|
-include_lib("common_test/include/ct.hrl").
|
||||||
|
|
||||||
|
-import(emqx_common_test_helpers, [on_exit/1]).
|
||||||
|
|
||||||
%%-define(PROPTEST(M,F), true = proper:quickcheck(M:F())).
|
%%-define(PROPTEST(M,F), true = proper:quickcheck(M:F())).
|
||||||
-define(TMP_RULEID, atom_to_binary(?FUNCTION_NAME)).
|
-define(TMP_RULEID, atom_to_binary(?FUNCTION_NAME)).
|
||||||
|
|
||||||
|
@ -198,8 +200,11 @@ init_per_testcase(_TestCase, Config) ->
|
||||||
|
|
||||||
end_per_testcase(t_events, Config) ->
|
end_per_testcase(t_events, Config) ->
|
||||||
ets:delete(events_record_tab),
|
ets:delete(events_record_tab),
|
||||||
ok = delete_rule(?config(hook_points_rules, Config));
|
ok = delete_rule(?config(hook_points_rules, Config)),
|
||||||
|
emqx_common_test_helpers:call_janitor(),
|
||||||
|
ok;
|
||||||
end_per_testcase(_TestCase, _Config) ->
|
end_per_testcase(_TestCase, _Config) ->
|
||||||
|
emqx_common_test_helpers:call_janitor(),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
@ -2683,6 +2688,24 @@ t_get_basic_usage_info_1(_Config) ->
|
||||||
),
|
),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
t_get_rule_ids_by_action_reference_ingress_bridge(_Config) ->
|
||||||
|
BridgeId = <<"mqtt:ingress">>,
|
||||||
|
RuleId = <<"rule:ingress_bridge_referenced">>,
|
||||||
|
{ok, _} =
|
||||||
|
emqx_rule_engine:create_rule(
|
||||||
|
#{
|
||||||
|
id => RuleId,
|
||||||
|
sql => <<"select 1 from \"$bridges/", BridgeId/binary, "\"">>,
|
||||||
|
actions => [#{function => console}]
|
||||||
|
}
|
||||||
|
),
|
||||||
|
on_exit(fun() -> emqx_rule_engine:delete_rule(RuleId) end),
|
||||||
|
?assertMatch(
|
||||||
|
[RuleId],
|
||||||
|
emqx_rule_engine:get_rule_ids_by_action(BridgeId)
|
||||||
|
),
|
||||||
|
ok.
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% Internal helpers
|
%% Internal helpers
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
|
@ -0,0 +1,3 @@
|
||||||
|
Consider bridges referenced in `FROM` rule clauses as dependencies.
|
||||||
|
|
||||||
|
Before this fix, when one tried to delete an ingress rule referenced in an action like `select * from "$bridges/mqtt:ingress"`, the UI would not trigger a warning about dependent rule actions.
|
Loading…
Reference in New Issue