662 lines
21 KiB
Erlang
662 lines
21 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(emqx_rule_engine).
|
|
|
|
-behaviour(gen_server).
|
|
-behaviour(emqx_config_handler).
|
|
-behaviour(emqx_config_backup).
|
|
|
|
-include("rule_engine.hrl").
|
|
-include_lib("emqx/include/logger.hrl").
|
|
-include_lib("stdlib/include/qlc.hrl").
|
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
|
|
|
-export([start_link/0]).
|
|
|
|
-export([
|
|
post_config_update/5
|
|
]).
|
|
|
|
%% Rule Management
|
|
|
|
-export([load_rules/0]).
|
|
|
|
-export([
|
|
create_rule/1,
|
|
update_rule/1,
|
|
delete_rule/1,
|
|
get_rule/1
|
|
]).
|
|
|
|
-export([
|
|
get_rules/0,
|
|
get_rules_for_topic/1,
|
|
get_rules_with_same_event/1,
|
|
get_rule_ids_by_action/1,
|
|
ensure_action_removed/2,
|
|
get_rules_ordered_by_ts/0
|
|
]).
|
|
|
|
-export([
|
|
load_hooks_for_rule/1,
|
|
unload_hooks_for_rule/1,
|
|
maybe_add_metrics_for_rule/1,
|
|
clear_metrics_for_rule/1,
|
|
reset_metrics_for_rule/1
|
|
]).
|
|
|
|
%% exported for `emqx_telemetry'
|
|
-export([get_basic_usage_info/0]).
|
|
|
|
-export([now_ms/0]).
|
|
|
|
%% gen_server Callbacks
|
|
-export([
|
|
init/1,
|
|
handle_call/3,
|
|
handle_cast/2,
|
|
handle_info/2,
|
|
terminate/2,
|
|
code_change/3
|
|
]).
|
|
|
|
%% Data backup
|
|
-export([
|
|
import_config/1
|
|
]).
|
|
|
|
%% For setting and getting extra rule engine SQL functions module
|
|
-export([
|
|
extra_functions_module/0,
|
|
set_extra_functions_module/1
|
|
]).
|
|
|
|
-define(RULE_ENGINE, ?MODULE).
|
|
|
|
-define(T_CALL, infinity).
|
|
|
|
%% NOTE: This order cannot be changed! This is to make the metric working during relup.
|
|
%% Append elements to this list to add new metrics.
|
|
-define(METRICS, [
|
|
'matched',
|
|
'passed',
|
|
'failed',
|
|
'failed.exception',
|
|
'failed.no_result',
|
|
'actions.total',
|
|
'actions.success',
|
|
'actions.failed',
|
|
'actions.failed.out_of_service',
|
|
'actions.failed.unknown'
|
|
]).
|
|
|
|
-define(RATE_METRICS, ['matched']).
|
|
|
|
-type action_name() :: binary() | #{function := binary()}.
|
|
|
|
-spec start_link() -> {ok, pid()} | ignore | {error, Reason :: term()}.
|
|
start_link() ->
|
|
gen_server:start_link({local, ?RULE_ENGINE}, ?MODULE, [], []).
|
|
|
|
%%----------------------------------------------------------------------------------------
|
|
%% The config handler for emqx_rule_engine
|
|
%%------------------------------------------------------------------------------
|
|
post_config_update(?RULE_PATH(RuleId), _Req, NewRule, undefined, _AppEnvs) ->
|
|
create_rule(NewRule#{id => bin(RuleId)});
|
|
post_config_update(?RULE_PATH(RuleId), '$remove', undefined, _OldRule, _AppEnvs) ->
|
|
delete_rule(bin(RuleId));
|
|
post_config_update(?RULE_PATH(RuleId), _Req, NewRule, _OldRule, _AppEnvs) ->
|
|
update_rule(NewRule#{id => bin(RuleId)});
|
|
post_config_update([rule_engine], _Req, #{rules := NewRules}, #{rules := OldRules}, _AppEnvs) ->
|
|
#{added := Added, removed := Removed, changed := Updated} =
|
|
emqx_utils_maps:diff_maps(NewRules, OldRules),
|
|
try
|
|
maps_foreach(
|
|
fun({Id, {_Old, New}}) ->
|
|
{ok, _} = update_rule(New#{id => bin(Id)})
|
|
end,
|
|
Updated
|
|
),
|
|
maps_foreach(
|
|
fun({Id, _Rule}) ->
|
|
ok = delete_rule(bin(Id))
|
|
end,
|
|
Removed
|
|
),
|
|
maps_foreach(
|
|
fun({Id, Rule}) ->
|
|
{ok, _} = create_rule(Rule#{id => bin(Id)})
|
|
end,
|
|
Added
|
|
),
|
|
ok
|
|
catch
|
|
throw:#{kind := _} = Error ->
|
|
{error, Error}
|
|
end.
|
|
|
|
%%----------------------------------------------------------------------------------------
|
|
%% APIs for rules
|
|
%%----------------------------------------------------------------------------------------
|
|
|
|
-spec load_rules() -> ok.
|
|
load_rules() ->
|
|
maps_foreach(
|
|
fun
|
|
({Id, #{metadata := #{created_at := CreatedAt}} = Rule}) ->
|
|
create_rule(Rule#{id => bin(Id)}, CreatedAt);
|
|
({Id, Rule}) ->
|
|
create_rule(Rule#{id => bin(Id)})
|
|
end,
|
|
emqx:get_config([rule_engine, rules], #{})
|
|
).
|
|
|
|
-spec create_rule(map()) -> {ok, rule()} | {error, term()}.
|
|
create_rule(Params) ->
|
|
create_rule(Params, maps:get(created_at, Params, now_ms())).
|
|
|
|
create_rule(Params = #{id := RuleId}, CreatedAt) when is_binary(RuleId) ->
|
|
case get_rule(RuleId) of
|
|
not_found -> with_parsed_rule(Params, CreatedAt, fun insert_rule/1);
|
|
{ok, _} -> {error, already_exists}
|
|
end.
|
|
|
|
-spec update_rule(map()) -> {ok, rule()} | {error, term()}.
|
|
update_rule(Params = #{id := RuleId}) when is_binary(RuleId) ->
|
|
case get_rule(RuleId) of
|
|
not_found ->
|
|
{error, not_found};
|
|
{ok, RulePrev = #{created_at := CreatedAt}} ->
|
|
with_parsed_rule(Params, CreatedAt, fun(Rule) -> update_rule(Rule, RulePrev) end)
|
|
end.
|
|
|
|
-spec delete_rule(RuleId :: rule_id()) -> ok.
|
|
delete_rule(RuleId) when is_binary(RuleId) ->
|
|
case get_rule(RuleId) of
|
|
not_found ->
|
|
ok;
|
|
{ok, Rule} ->
|
|
gen_server:call(?RULE_ENGINE, {delete_rule, Rule}, ?T_CALL)
|
|
end.
|
|
|
|
-spec insert_rule(Rule :: rule()) -> ok.
|
|
insert_rule(Rule) ->
|
|
gen_server:call(?RULE_ENGINE, {insert_rule, Rule}, ?T_CALL).
|
|
|
|
-spec update_rule(Rule :: rule(), RulePrev :: rule()) -> ok.
|
|
update_rule(Rule, RulePrev) ->
|
|
gen_server:call(?RULE_ENGINE, {update_rule, Rule, RulePrev}, ?T_CALL).
|
|
|
|
%%----------------------------------------------------------------------------------------
|
|
%% Rule Management
|
|
%%----------------------------------------------------------------------------------------
|
|
|
|
-spec get_rules() -> [rule()].
|
|
get_rules() ->
|
|
get_all_records(?RULE_TAB).
|
|
|
|
get_rules_ordered_by_ts() ->
|
|
lists:sort(
|
|
fun(#{created_at := CreatedA}, #{created_at := CreatedB}) ->
|
|
CreatedA =< CreatedB
|
|
end,
|
|
get_rules()
|
|
).
|
|
|
|
-spec get_rules_for_topic(Topic :: binary()) -> [rule()].
|
|
get_rules_for_topic(Topic) ->
|
|
[
|
|
Rule
|
|
|| M <- emqx_topic_index:matches(Topic, ?RULE_TOPIC_INDEX, [unique]),
|
|
Rule <- lookup_rule(emqx_topic_index:get_id(M))
|
|
].
|
|
|
|
-spec get_rules_with_same_event(Topic :: binary()) -> [rule()].
|
|
get_rules_with_same_event(Topic) ->
|
|
EventName = emqx_rule_events:event_name(Topic),
|
|
[
|
|
Rule
|
|
|| Rule = #{from := From} <- get_rules(),
|
|
lists:any(fun(T) -> is_of_event_name(EventName, T) end, From)
|
|
].
|
|
|
|
-spec get_rule_ids_by_action(action_name()) -> [rule_id()].
|
|
get_rule_ids_by_action(BridgeId) when is_binary(BridgeId) ->
|
|
[
|
|
Id
|
|
|| #{actions := Acts, id := Id, from := Froms} <- get_rules(),
|
|
forwards_to_bridge(Acts, BridgeId) orelse
|
|
references_ingress_bridge(Froms, BridgeId)
|
|
];
|
|
get_rule_ids_by_action(#{function := FuncName}) when is_binary(FuncName) ->
|
|
{Mod, Fun} =
|
|
case string:split(FuncName, ":", leading) of
|
|
[M, F] -> {binary_to_module(M), F};
|
|
[F] -> {emqx_rule_actions, F}
|
|
end,
|
|
[
|
|
Id
|
|
|| #{actions := Acts, id := Id} <- get_rules(),
|
|
contains_actions(Acts, Mod, Fun)
|
|
].
|
|
|
|
-spec ensure_action_removed(rule_id(), action_name()) -> ok.
|
|
ensure_action_removed(RuleId, ActionName) ->
|
|
FilterFunc =
|
|
fun
|
|
(Func, Func) -> false;
|
|
(#{<<"function">> := Func}, #{function := Func}) -> false;
|
|
(_, _) -> true
|
|
end,
|
|
case emqx:get_raw_config([rule_engine, rules, RuleId], not_found) of
|
|
not_found ->
|
|
ok;
|
|
#{<<"actions">> := Acts} = Conf ->
|
|
NewActs = [AName || AName <- Acts, FilterFunc(AName, ActionName)],
|
|
{ok, _} = emqx_conf:update(
|
|
?RULE_PATH(RuleId),
|
|
Conf#{<<"actions">> => NewActs},
|
|
#{override_to => cluster}
|
|
),
|
|
ok
|
|
end.
|
|
|
|
is_of_event_name(EventName, Topic) ->
|
|
EventName =:= emqx_rule_events:event_name(Topic).
|
|
|
|
-spec get_rule(Id :: rule_id()) -> {ok, rule()} | not_found.
|
|
get_rule(Id) ->
|
|
case lookup_rule(Id) of
|
|
[Rule] -> {ok, Rule};
|
|
[] -> not_found
|
|
end.
|
|
|
|
lookup_rule(Id) ->
|
|
[Rule || {_Id, Rule} <- ets:lookup(?RULE_TAB, Id)].
|
|
|
|
load_hooks_for_rule(#{from := Topics}) ->
|
|
lists:foreach(fun emqx_rule_events:load/1, Topics).
|
|
|
|
maybe_add_metrics_for_rule(Id) ->
|
|
case emqx_metrics_worker:has_metrics(rule_metrics, Id) of
|
|
true ->
|
|
ok = reset_metrics_for_rule(Id);
|
|
false ->
|
|
ok = emqx_metrics_worker:create_metrics(rule_metrics, Id, ?METRICS, ?RATE_METRICS)
|
|
end.
|
|
|
|
clear_metrics_for_rule(Id) ->
|
|
ok = emqx_metrics_worker:clear_metrics(rule_metrics, Id).
|
|
|
|
-spec reset_metrics_for_rule(rule_id()) -> ok.
|
|
reset_metrics_for_rule(Id) ->
|
|
emqx_metrics_worker:reset_metrics(rule_metrics, Id).
|
|
|
|
unload_hooks_for_rule(#{id := Id, from := Topics}) ->
|
|
lists:foreach(
|
|
fun(Topic) ->
|
|
case get_rules_with_same_event(Topic) of
|
|
%% we are now deleting the last rule
|
|
[#{id := Id0}] when Id0 == Id ->
|
|
emqx_rule_events:unload(Topic);
|
|
_ ->
|
|
ok
|
|
end
|
|
end,
|
|
Topics
|
|
).
|
|
|
|
%%----------------------------------------------------------------------------------------
|
|
%% Telemetry helper functions
|
|
%%----------------------------------------------------------------------------------------
|
|
|
|
-spec get_basic_usage_info() ->
|
|
#{
|
|
num_rules => non_neg_integer(),
|
|
referenced_bridges =>
|
|
#{BridgeType => non_neg_integer()}
|
|
}
|
|
when
|
|
BridgeType :: atom().
|
|
get_basic_usage_info() ->
|
|
try
|
|
Rules = get_rules(),
|
|
EnabledRules =
|
|
lists:filter(
|
|
fun(#{enable := Enabled}) -> Enabled end,
|
|
Rules
|
|
),
|
|
NumRules = length(EnabledRules),
|
|
ReferencedBridges =
|
|
lists:foldl(
|
|
fun(#{actions := Actions, from := Froms}, Acc) ->
|
|
BridgeIDs0 = get_referenced_hookpoints(Froms),
|
|
BridgeIDs1 = get_egress_bridges(Actions),
|
|
tally_referenced_bridges(BridgeIDs0 ++ BridgeIDs1, Acc)
|
|
end,
|
|
#{},
|
|
EnabledRules
|
|
),
|
|
#{
|
|
num_rules => NumRules,
|
|
referenced_bridges => ReferencedBridges
|
|
}
|
|
catch
|
|
_:_ ->
|
|
#{
|
|
num_rules => 0,
|
|
referenced_bridges => #{}
|
|
}
|
|
end.
|
|
|
|
tally_referenced_bridges(BridgeIDs, Acc0) ->
|
|
lists:foldl(
|
|
fun(BridgeID, Acc) ->
|
|
{BridgeType, _BridgeName} = emqx_bridge_resource:parse_bridge_id(
|
|
BridgeID,
|
|
#{atom_name => false}
|
|
),
|
|
maps:update_with(
|
|
BridgeType,
|
|
fun(X) -> X + 1 end,
|
|
1,
|
|
Acc
|
|
)
|
|
end,
|
|
Acc0,
|
|
BridgeIDs
|
|
).
|
|
|
|
%%----------------------------------------------------------------------------------------
|
|
%% Data backup
|
|
%%----------------------------------------------------------------------------------------
|
|
|
|
import_config(#{<<"rule_engine">> := #{<<"rules">> := NewRules} = RuleEngineConf}) ->
|
|
OldRules = emqx:get_raw_config(?KEY_PATH, #{}),
|
|
RuleEngineConf1 = RuleEngineConf#{<<"rules">> => maps:merge(OldRules, NewRules)},
|
|
case emqx_conf:update([rule_engine], RuleEngineConf1, #{override_to => cluster}) of
|
|
{ok, #{raw_config := #{<<"rules">> := NewRawRules}}} ->
|
|
Changed = maps:get(changed, emqx_utils_maps:diff_maps(NewRawRules, OldRules)),
|
|
ChangedPaths = [?RULE_PATH(Id) || Id <- maps:keys(Changed)],
|
|
{ok, #{root_key => rule_engine, changed => ChangedPaths}};
|
|
Error ->
|
|
{error, #{root_key => rule_engine, reason => Error}}
|
|
end;
|
|
import_config(_RawConf) ->
|
|
{ok, #{root_key => rule_engine, changed => []}}.
|
|
|
|
%%----------------------------------------------------------------------------------------
|
|
%% gen_server callbacks
|
|
%%----------------------------------------------------------------------------------------
|
|
|
|
init([]) ->
|
|
_TableId = ets:new(?KV_TAB, [
|
|
named_table,
|
|
set,
|
|
public,
|
|
{write_concurrency, true},
|
|
{read_concurrency, true}
|
|
]),
|
|
ok = emqx_conf:add_handler(
|
|
[rule_engine, jq_implementation_module],
|
|
emqx_rule_engine_schema
|
|
),
|
|
{ok, #{}}.
|
|
|
|
handle_call({insert_rule, Rule}, _From, State) ->
|
|
ok = do_insert_rule(Rule),
|
|
ok = do_update_rule_index(Rule),
|
|
{reply, ok, State};
|
|
handle_call({update_rule, Rule, RulePrev}, _From, State) ->
|
|
ok = do_delete_rule_index(RulePrev),
|
|
ok = do_insert_rule(Rule),
|
|
ok = do_update_rule_index(Rule),
|
|
{reply, ok, State};
|
|
handle_call({delete_rule, Rule}, _From, State) ->
|
|
ok = do_delete_rule_index(Rule),
|
|
ok = do_delete_rule(Rule),
|
|
{reply, ok, State};
|
|
handle_call(Req, _From, State) ->
|
|
?SLOG(error, #{msg => "unexpected_call", request => Req}),
|
|
{reply, ignored, State}.
|
|
|
|
handle_cast(Msg, State) ->
|
|
?SLOG(error, #{msg => "unexpected_cast", request => Msg}),
|
|
{noreply, State}.
|
|
|
|
handle_info(Info, State) ->
|
|
?SLOG(error, #{msg => "unexpected_info", request => Info}),
|
|
{noreply, State}.
|
|
|
|
terminate(_Reason, _State) ->
|
|
ok.
|
|
|
|
code_change(_OldVsn, State, _Extra) ->
|
|
{ok, State}.
|
|
|
|
%%----------------------------------------------------------------------------------------
|
|
%% Internal Functions
|
|
%%----------------------------------------------------------------------------------------
|
|
|
|
with_parsed_rule(Params = #{id := RuleId, sql := Sql, actions := Actions}, CreatedAt, Fun) ->
|
|
case emqx_rule_sqlparser:parse(Sql) of
|
|
{ok, Select} ->
|
|
Rule0 = #{
|
|
id => RuleId,
|
|
name => maps:get(name, Params, <<"">>),
|
|
created_at => CreatedAt,
|
|
updated_at => now_ms(),
|
|
sql => Sql,
|
|
actions => parse_actions(Actions),
|
|
description => maps:get(description, Params, ""),
|
|
%% -- calculated fields:
|
|
from => emqx_rule_sqlparser:select_from(Select),
|
|
is_foreach => emqx_rule_sqlparser:select_is_foreach(Select),
|
|
fields => emqx_rule_sqlparser:select_fields(Select),
|
|
doeach => emqx_rule_sqlparser:select_doeach(Select),
|
|
incase => emqx_rule_sqlparser:select_incase(Select),
|
|
conditions => emqx_rule_sqlparser:select_where(Select)
|
|
%% -- calculated fields end
|
|
},
|
|
InputEnable = maps:get(enable, Params, true),
|
|
case validate_bridge_existence_in_actions(Rule0) of
|
|
ok ->
|
|
ok;
|
|
{error, NonExistentBridgeIDs} ->
|
|
?tp(error, "action_references_nonexistent_bridges", #{
|
|
rule_id => RuleId,
|
|
nonexistent_bridge_ids => NonExistentBridgeIDs,
|
|
hint => "this rule will be disabled"
|
|
})
|
|
end,
|
|
Rule = Rule0#{enable => InputEnable},
|
|
ok = Fun(Rule),
|
|
{ok, Rule};
|
|
{error, Reason} ->
|
|
{error, Reason}
|
|
end.
|
|
|
|
do_insert_rule(#{id := Id} = Rule) ->
|
|
ok = load_hooks_for_rule(Rule),
|
|
ok = maybe_add_metrics_for_rule(Id),
|
|
true = ets:insert(?RULE_TAB, {Id, Rule}),
|
|
ok.
|
|
|
|
do_delete_rule(#{id := Id} = Rule) ->
|
|
ok = unload_hooks_for_rule(Rule),
|
|
ok = clear_metrics_for_rule(Id),
|
|
true = ets:delete(?RULE_TAB, Id),
|
|
ok.
|
|
|
|
do_update_rule_index(#{id := Id, from := From}) ->
|
|
ok = lists:foreach(
|
|
fun(Topic) ->
|
|
true = emqx_topic_index:insert(Topic, Id, [], ?RULE_TOPIC_INDEX)
|
|
end,
|
|
From
|
|
).
|
|
|
|
do_delete_rule_index(#{id := Id, from := From}) ->
|
|
ok = lists:foreach(
|
|
fun(Topic) ->
|
|
true = emqx_topic_index:delete(Topic, Id, ?RULE_TOPIC_INDEX)
|
|
end,
|
|
From
|
|
).
|
|
|
|
parse_actions(Actions) ->
|
|
[do_parse_action(Act) || Act <- Actions].
|
|
|
|
do_parse_action(Action) ->
|
|
emqx_rule_actions:parse_action(Action).
|
|
|
|
get_all_records(Tab) ->
|
|
[Rule#{id => Id} || {Id, Rule} <- ets:tab2list(Tab)].
|
|
|
|
maps_foreach(Fun, Map) ->
|
|
lists:foreach(Fun, maps:to_list(Map)).
|
|
|
|
now_ms() ->
|
|
erlang:system_time(millisecond).
|
|
|
|
bin(A) when is_atom(A) -> atom_to_binary(A, utf8);
|
|
bin(B) when is_binary(B) -> B.
|
|
|
|
binary_to_module(ModName) ->
|
|
try
|
|
binary_to_existing_atom(ModName, utf8)
|
|
catch
|
|
error:badarg ->
|
|
not_exist_mod
|
|
end.
|
|
|
|
contains_actions(Actions, Mod0, Func0) ->
|
|
lists:any(
|
|
fun
|
|
(#{mod := Mod, func := Func}) when Mod =:= Mod0; Func =:= Func0 -> true;
|
|
(_) -> false
|
|
end,
|
|
Actions
|
|
).
|
|
|
|
forwards_to_bridge(Actions, BridgeId) ->
|
|
Action = do_parse_action(BridgeId),
|
|
lists:any(fun(A) -> A =:= Action end, Actions).
|
|
|
|
references_ingress_bridge(Froms, BridgeId) ->
|
|
lists:member(
|
|
BridgeId,
|
|
[
|
|
RefBridgeId
|
|
|| From <- Froms,
|
|
{ok, RefBridgeId} <-
|
|
[emqx_bridge_resource:bridge_hookpoint_to_bridge_id(From)]
|
|
]
|
|
).
|
|
|
|
get_referenced_hookpoints(Froms) ->
|
|
[
|
|
BridgeID
|
|
|| From <- Froms,
|
|
{ok, BridgeID} <-
|
|
[emqx_bridge_resource:bridge_hookpoint_to_bridge_id(From)]
|
|
].
|
|
|
|
get_egress_bridges(Actions) ->
|
|
lists:foldr(
|
|
fun
|
|
({bridge, BridgeType, BridgeName, _ResId}, Acc) ->
|
|
[emqx_bridge_resource:bridge_id(BridgeType, BridgeName) | Acc];
|
|
({bridge_v2, BridgeType, BridgeName}, Acc) ->
|
|
[emqx_bridge_resource:bridge_id(BridgeType, BridgeName) | Acc];
|
|
(_, Acc) ->
|
|
Acc
|
|
end,
|
|
[],
|
|
Actions
|
|
).
|
|
|
|
%% For allowing an external application to add extra "built-in" functions to the
|
|
%% rule engine SQL like language. The module set by
|
|
%% set_extra_functions_module/1 should export a function called
|
|
%% handle_rule_function with two parameters (the first being an atom for the
|
|
%% the function name and the second a list of arguments). The function should
|
|
%% should return the result or {error, no_match_for_function} if it cannot
|
|
%% handle the function. See '$handle_undefined_function' in the emqx_rule_funcs
|
|
%% module. See also callback function declaration in emqx_rule_funcs.erl.
|
|
|
|
-spec extra_functions_module() -> module() | undefined.
|
|
extra_functions_module() ->
|
|
persistent_term:get({?MODULE, extra_functions}, undefined).
|
|
|
|
-spec set_extra_functions_module(module()) -> ok.
|
|
set_extra_functions_module(Mod) ->
|
|
persistent_term:put({?MODULE, extra_functions}, Mod),
|
|
ok.
|
|
|
|
%% Checks whether the referenced bridges in actions all exist. If there are non-existent
|
|
%% ones, the rule shouldn't be allowed to be enabled.
|
|
%% The actions here are already parsed.
|
|
validate_bridge_existence_in_actions(#{actions := Actions, from := Froms} = _Rule) ->
|
|
BridgeIDs0 =
|
|
lists:map(
|
|
fun(BridgeID) ->
|
|
%% FIXME: this supposedly returns an upgraded type, but it's fuzzy: it
|
|
%% returns v1 types when attempting to "upgrade".....
|
|
{Type, Name} =
|
|
emqx_bridge_resource:parse_bridge_id(BridgeID, #{atom_name => false}),
|
|
case emqx_action_info:is_action_type(Type) of
|
|
true -> {source, Type, Name};
|
|
false -> {bridge_v1, Type, Name}
|
|
end
|
|
end,
|
|
get_referenced_hookpoints(Froms)
|
|
),
|
|
BridgeIDs1 =
|
|
lists:filtermap(
|
|
fun
|
|
({bridge_v2, Type, Name}) -> {true, {action, Type, Name}};
|
|
({bridge, Type, Name, _ResId}) -> {true, {bridge_v1, Type, Name}};
|
|
(_) -> false
|
|
end,
|
|
Actions
|
|
),
|
|
NonExistentBridgeIDs =
|
|
lists:filter(
|
|
fun({Kind, Type, Name}) ->
|
|
LookupFn =
|
|
case Kind of
|
|
action -> fun emqx_bridge_v2:lookup_action/2;
|
|
source -> fun emqx_bridge_v2:lookup_source/2;
|
|
bridge_v1 -> fun emqx_bridge:lookup/2
|
|
end,
|
|
try
|
|
case LookupFn(Type, Name) of
|
|
{ok, _} -> false;
|
|
{error, _} -> true
|
|
end
|
|
catch
|
|
_:_ -> true
|
|
end
|
|
end,
|
|
BridgeIDs0 ++ BridgeIDs1
|
|
),
|
|
case NonExistentBridgeIDs of
|
|
[] -> ok;
|
|
_ -> {error, #{nonexistent_bridge_ids => NonExistentBridgeIDs}}
|
|
end.
|