emqx/apps/emqx_rule_engine/src/emqx_rule_engine.erl

662 lines
21 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_rule_engine).
-behaviour(gen_server).
-behaviour(emqx_config_handler).
-behaviour(emqx_config_backup).
-include("rule_engine.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("stdlib/include/qlc.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-export([start_link/0]).
-export([
post_config_update/5
]).
%% Rule Management
-export([load_rules/0]).
-export([
create_rule/1,
update_rule/1,
delete_rule/1,
get_rule/1
]).
-export([
get_rules/0,
get_rules_for_topic/1,
get_rules_with_same_event/1,
get_rule_ids_by_action/1,
ensure_action_removed/2,
get_rules_ordered_by_ts/0
]).
-export([
load_hooks_for_rule/1,
unload_hooks_for_rule/1,
maybe_add_metrics_for_rule/1,
clear_metrics_for_rule/1,
reset_metrics_for_rule/1
]).
%% exported for `emqx_telemetry'
-export([get_basic_usage_info/0]).
-export([now_ms/0]).
%% gen_server Callbacks
-export([
init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3
]).
%% Data backup
-export([
import_config/1
]).
%% For setting and getting extra rule engine SQL functions module
-export([
extra_functions_module/0,
set_extra_functions_module/1
]).
-define(RULE_ENGINE, ?MODULE).
-define(T_CALL, infinity).
%% NOTE: This order cannot be changed! This is to make the metric working during relup.
%% Append elements to this list to add new metrics.
-define(METRICS, [
'matched',
'passed',
'failed',
'failed.exception',
'failed.no_result',
'actions.total',
'actions.success',
'actions.failed',
'actions.failed.out_of_service',
'actions.failed.unknown'
]).
-define(RATE_METRICS, ['matched']).
-type action_name() :: binary() | #{function := binary()}.
-spec start_link() -> {ok, pid()} | ignore | {error, Reason :: term()}.
start_link() ->
gen_server:start_link({local, ?RULE_ENGINE}, ?MODULE, [], []).
%%----------------------------------------------------------------------------------------
%% The config handler for emqx_rule_engine
%%------------------------------------------------------------------------------
post_config_update(?RULE_PATH(RuleId), _Req, NewRule, undefined, _AppEnvs) ->
create_rule(NewRule#{id => bin(RuleId)});
post_config_update(?RULE_PATH(RuleId), '$remove', undefined, _OldRule, _AppEnvs) ->
delete_rule(bin(RuleId));
post_config_update(?RULE_PATH(RuleId), _Req, NewRule, _OldRule, _AppEnvs) ->
update_rule(NewRule#{id => bin(RuleId)});
post_config_update([rule_engine], _Req, #{rules := NewRules}, #{rules := OldRules}, _AppEnvs) ->
#{added := Added, removed := Removed, changed := Updated} =
emqx_utils_maps:diff_maps(NewRules, OldRules),
try
maps_foreach(
fun({Id, {_Old, New}}) ->
{ok, _} = update_rule(New#{id => bin(Id)})
end,
Updated
),
maps_foreach(
fun({Id, _Rule}) ->
ok = delete_rule(bin(Id))
end,
Removed
),
maps_foreach(
fun({Id, Rule}) ->
{ok, _} = create_rule(Rule#{id => bin(Id)})
end,
Added
),
ok
catch
throw:#{kind := _} = Error ->
{error, Error}
end.
%%----------------------------------------------------------------------------------------
%% APIs for rules
%%----------------------------------------------------------------------------------------
-spec load_rules() -> ok.
load_rules() ->
maps_foreach(
fun
({Id, #{metadata := #{created_at := CreatedAt}} = Rule}) ->
create_rule(Rule#{id => bin(Id)}, CreatedAt);
({Id, Rule}) ->
create_rule(Rule#{id => bin(Id)})
end,
emqx:get_config([rule_engine, rules], #{})
).
-spec create_rule(map()) -> {ok, rule()} | {error, term()}.
create_rule(Params) ->
create_rule(Params, maps:get(created_at, Params, now_ms())).
create_rule(Params = #{id := RuleId}, CreatedAt) when is_binary(RuleId) ->
case get_rule(RuleId) of
not_found -> with_parsed_rule(Params, CreatedAt, fun insert_rule/1);
{ok, _} -> {error, already_exists}
end.
-spec update_rule(map()) -> {ok, rule()} | {error, term()}.
update_rule(Params = #{id := RuleId}) when is_binary(RuleId) ->
case get_rule(RuleId) of
not_found ->
{error, not_found};
{ok, RulePrev = #{created_at := CreatedAt}} ->
with_parsed_rule(Params, CreatedAt, fun(Rule) -> update_rule(Rule, RulePrev) end)
end.
-spec delete_rule(RuleId :: rule_id()) -> ok.
delete_rule(RuleId) when is_binary(RuleId) ->
case get_rule(RuleId) of
not_found ->
ok;
{ok, Rule} ->
gen_server:call(?RULE_ENGINE, {delete_rule, Rule}, ?T_CALL)
end.
-spec insert_rule(Rule :: rule()) -> ok.
insert_rule(Rule) ->
gen_server:call(?RULE_ENGINE, {insert_rule, Rule}, ?T_CALL).
-spec update_rule(Rule :: rule(), RulePrev :: rule()) -> ok.
update_rule(Rule, RulePrev) ->
gen_server:call(?RULE_ENGINE, {update_rule, Rule, RulePrev}, ?T_CALL).
%%----------------------------------------------------------------------------------------
%% Rule Management
%%----------------------------------------------------------------------------------------
-spec get_rules() -> [rule()].
get_rules() ->
get_all_records(?RULE_TAB).
get_rules_ordered_by_ts() ->
lists:sort(
fun(#{created_at := CreatedA}, #{created_at := CreatedB}) ->
CreatedA =< CreatedB
end,
get_rules()
).
-spec get_rules_for_topic(Topic :: binary()) -> [rule()].
get_rules_for_topic(Topic) ->
[
Rule
|| M <- emqx_topic_index:matches(Topic, ?RULE_TOPIC_INDEX, [unique]),
Rule <- lookup_rule(emqx_topic_index:get_id(M))
].
-spec get_rules_with_same_event(Topic :: binary()) -> [rule()].
get_rules_with_same_event(Topic) ->
EventName = emqx_rule_events:event_name(Topic),
[
Rule
|| Rule = #{from := From} <- get_rules(),
lists:any(fun(T) -> is_of_event_name(EventName, T) end, From)
].
-spec get_rule_ids_by_action(action_name()) -> [rule_id()].
get_rule_ids_by_action(BridgeId) when is_binary(BridgeId) ->
[
Id
|| #{actions := Acts, id := Id, from := Froms} <- get_rules(),
forwards_to_bridge(Acts, BridgeId) orelse
references_ingress_bridge(Froms, BridgeId)
];
get_rule_ids_by_action(#{function := FuncName}) when is_binary(FuncName) ->
{Mod, Fun} =
case string:split(FuncName, ":", leading) of
[M, F] -> {binary_to_module(M), F};
[F] -> {emqx_rule_actions, F}
end,
[
Id
|| #{actions := Acts, id := Id} <- get_rules(),
contains_actions(Acts, Mod, Fun)
].
-spec ensure_action_removed(rule_id(), action_name()) -> ok.
ensure_action_removed(RuleId, ActionName) ->
FilterFunc =
fun
(Func, Func) -> false;
(#{<<"function">> := Func}, #{function := Func}) -> false;
(_, _) -> true
end,
case emqx:get_raw_config([rule_engine, rules, RuleId], not_found) of
not_found ->
ok;
#{<<"actions">> := Acts} = Conf ->
NewActs = [AName || AName <- Acts, FilterFunc(AName, ActionName)],
{ok, _} = emqx_conf:update(
?RULE_PATH(RuleId),
Conf#{<<"actions">> => NewActs},
#{override_to => cluster}
),
ok
end.
is_of_event_name(EventName, Topic) ->
EventName =:= emqx_rule_events:event_name(Topic).
-spec get_rule(Id :: rule_id()) -> {ok, rule()} | not_found.
get_rule(Id) ->
case lookup_rule(Id) of
[Rule] -> {ok, Rule};
[] -> not_found
end.
lookup_rule(Id) ->
[Rule || {_Id, Rule} <- ets:lookup(?RULE_TAB, Id)].
load_hooks_for_rule(#{from := Topics}) ->
lists:foreach(fun emqx_rule_events:load/1, Topics).
maybe_add_metrics_for_rule(Id) ->
case emqx_metrics_worker:has_metrics(rule_metrics, Id) of
true ->
ok = reset_metrics_for_rule(Id);
false ->
ok = emqx_metrics_worker:create_metrics(rule_metrics, Id, ?METRICS, ?RATE_METRICS)
end.
clear_metrics_for_rule(Id) ->
ok = emqx_metrics_worker:clear_metrics(rule_metrics, Id).
-spec reset_metrics_for_rule(rule_id()) -> ok.
reset_metrics_for_rule(Id) ->
emqx_metrics_worker:reset_metrics(rule_metrics, Id).
unload_hooks_for_rule(#{id := Id, from := Topics}) ->
lists:foreach(
fun(Topic) ->
case get_rules_with_same_event(Topic) of
%% we are now deleting the last rule
[#{id := Id0}] when Id0 == Id ->
emqx_rule_events:unload(Topic);
_ ->
ok
end
end,
Topics
).
%%----------------------------------------------------------------------------------------
%% Telemetry helper functions
%%----------------------------------------------------------------------------------------
-spec get_basic_usage_info() ->
#{
num_rules => non_neg_integer(),
referenced_bridges =>
#{BridgeType => non_neg_integer()}
}
when
BridgeType :: atom().
get_basic_usage_info() ->
try
Rules = get_rules(),
EnabledRules =
lists:filter(
fun(#{enable := Enabled}) -> Enabled end,
Rules
),
NumRules = length(EnabledRules),
ReferencedBridges =
lists:foldl(
fun(#{actions := Actions, from := Froms}, Acc) ->
BridgeIDs0 = get_referenced_hookpoints(Froms),
BridgeIDs1 = get_egress_bridges(Actions),
tally_referenced_bridges(BridgeIDs0 ++ BridgeIDs1, Acc)
end,
#{},
EnabledRules
),
#{
num_rules => NumRules,
referenced_bridges => ReferencedBridges
}
catch
_:_ ->
#{
num_rules => 0,
referenced_bridges => #{}
}
end.
tally_referenced_bridges(BridgeIDs, Acc0) ->
lists:foldl(
fun(BridgeID, Acc) ->
{BridgeType, _BridgeName} = emqx_bridge_resource:parse_bridge_id(
BridgeID,
#{atom_name => false}
),
maps:update_with(
BridgeType,
fun(X) -> X + 1 end,
1,
Acc
)
end,
Acc0,
BridgeIDs
).
%%----------------------------------------------------------------------------------------
%% Data backup
%%----------------------------------------------------------------------------------------
import_config(#{<<"rule_engine">> := #{<<"rules">> := NewRules} = RuleEngineConf}) ->
OldRules = emqx:get_raw_config(?KEY_PATH, #{}),
RuleEngineConf1 = RuleEngineConf#{<<"rules">> => maps:merge(OldRules, NewRules)},
case emqx_conf:update([rule_engine], RuleEngineConf1, #{override_to => cluster}) of
{ok, #{raw_config := #{<<"rules">> := NewRawRules}}} ->
Changed = maps:get(changed, emqx_utils_maps:diff_maps(NewRawRules, OldRules)),
ChangedPaths = [?RULE_PATH(Id) || Id <- maps:keys(Changed)],
{ok, #{root_key => rule_engine, changed => ChangedPaths}};
Error ->
{error, #{root_key => rule_engine, reason => Error}}
end;
import_config(_RawConf) ->
{ok, #{root_key => rule_engine, changed => []}}.
%%----------------------------------------------------------------------------------------
%% gen_server callbacks
%%----------------------------------------------------------------------------------------
init([]) ->
_TableId = ets:new(?KV_TAB, [
named_table,
set,
public,
{write_concurrency, true},
{read_concurrency, true}
]),
ok = emqx_conf:add_handler(
[rule_engine, jq_implementation_module],
emqx_rule_engine_schema
),
{ok, #{}}.
handle_call({insert_rule, Rule}, _From, State) ->
ok = do_insert_rule(Rule),
ok = do_update_rule_index(Rule),
{reply, ok, State};
handle_call({update_rule, Rule, RulePrev}, _From, State) ->
ok = do_delete_rule_index(RulePrev),
ok = do_insert_rule(Rule),
ok = do_update_rule_index(Rule),
{reply, ok, State};
handle_call({delete_rule, Rule}, _From, State) ->
ok = do_delete_rule_index(Rule),
ok = do_delete_rule(Rule),
{reply, ok, State};
handle_call(Req, _From, State) ->
?SLOG(error, #{msg => "unexpected_call", request => Req}),
{reply, ignored, State}.
handle_cast(Msg, State) ->
?SLOG(error, #{msg => "unexpected_cast", request => Msg}),
{noreply, State}.
handle_info(Info, State) ->
?SLOG(error, #{msg => "unexpected_info", request => Info}),
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%----------------------------------------------------------------------------------------
%% Internal Functions
%%----------------------------------------------------------------------------------------
with_parsed_rule(Params = #{id := RuleId, sql := Sql, actions := Actions}, CreatedAt, Fun) ->
case emqx_rule_sqlparser:parse(Sql) of
{ok, Select} ->
Rule0 = #{
id => RuleId,
name => maps:get(name, Params, <<"">>),
created_at => CreatedAt,
updated_at => now_ms(),
sql => Sql,
actions => parse_actions(Actions),
description => maps:get(description, Params, ""),
%% -- calculated fields:
from => emqx_rule_sqlparser:select_from(Select),
is_foreach => emqx_rule_sqlparser:select_is_foreach(Select),
fields => emqx_rule_sqlparser:select_fields(Select),
doeach => emqx_rule_sqlparser:select_doeach(Select),
incase => emqx_rule_sqlparser:select_incase(Select),
conditions => emqx_rule_sqlparser:select_where(Select)
%% -- calculated fields end
},
InputEnable = maps:get(enable, Params, true),
case validate_bridge_existence_in_actions(Rule0) of
ok ->
ok;
{error, NonExistentBridgeIDs} ->
?tp(error, "action_references_nonexistent_bridges", #{
rule_id => RuleId,
nonexistent_bridge_ids => NonExistentBridgeIDs,
hint => "this rule will be disabled"
})
end,
Rule = Rule0#{enable => InputEnable},
ok = Fun(Rule),
{ok, Rule};
{error, Reason} ->
{error, Reason}
end.
do_insert_rule(#{id := Id} = Rule) ->
ok = load_hooks_for_rule(Rule),
ok = maybe_add_metrics_for_rule(Id),
true = ets:insert(?RULE_TAB, {Id, Rule}),
ok.
do_delete_rule(#{id := Id} = Rule) ->
ok = unload_hooks_for_rule(Rule),
ok = clear_metrics_for_rule(Id),
true = ets:delete(?RULE_TAB, Id),
ok.
do_update_rule_index(#{id := Id, from := From}) ->
ok = lists:foreach(
fun(Topic) ->
true = emqx_topic_index:insert(Topic, Id, [], ?RULE_TOPIC_INDEX)
end,
From
).
do_delete_rule_index(#{id := Id, from := From}) ->
ok = lists:foreach(
fun(Topic) ->
true = emqx_topic_index:delete(Topic, Id, ?RULE_TOPIC_INDEX)
end,
From
).
parse_actions(Actions) ->
[do_parse_action(Act) || Act <- Actions].
do_parse_action(Action) ->
emqx_rule_actions:parse_action(Action).
get_all_records(Tab) ->
[Rule#{id => Id} || {Id, Rule} <- ets:tab2list(Tab)].
maps_foreach(Fun, Map) ->
lists:foreach(Fun, maps:to_list(Map)).
now_ms() ->
erlang:system_time(millisecond).
bin(A) when is_atom(A) -> atom_to_binary(A, utf8);
bin(B) when is_binary(B) -> B.
binary_to_module(ModName) ->
try
binary_to_existing_atom(ModName, utf8)
catch
error:badarg ->
not_exist_mod
end.
contains_actions(Actions, Mod0, Func0) ->
lists:any(
fun
(#{mod := Mod, func := Func}) when Mod =:= Mod0; Func =:= Func0 -> true;
(_) -> false
end,
Actions
).
forwards_to_bridge(Actions, BridgeId) ->
Action = do_parse_action(BridgeId),
lists:any(fun(A) -> A =:= Action end, Actions).
references_ingress_bridge(Froms, BridgeId) ->
lists:member(
BridgeId,
[
RefBridgeId
|| From <- Froms,
{ok, RefBridgeId} <-
[emqx_bridge_resource:bridge_hookpoint_to_bridge_id(From)]
]
).
get_referenced_hookpoints(Froms) ->
[
BridgeID
|| From <- Froms,
{ok, BridgeID} <-
[emqx_bridge_resource:bridge_hookpoint_to_bridge_id(From)]
].
get_egress_bridges(Actions) ->
lists:foldr(
fun
({bridge, BridgeType, BridgeName, _ResId}, Acc) ->
[emqx_bridge_resource:bridge_id(BridgeType, BridgeName) | Acc];
({bridge_v2, BridgeType, BridgeName}, Acc) ->
[emqx_bridge_resource:bridge_id(BridgeType, BridgeName) | Acc];
(_, Acc) ->
Acc
end,
[],
Actions
).
%% For allowing an external application to add extra "built-in" functions to the
%% rule engine SQL like language. The module set by
%% set_extra_functions_module/1 should export a function called
%% handle_rule_function with two parameters (the first being an atom for the
%% the function name and the second a list of arguments). The function should
%% should return the result or {error, no_match_for_function} if it cannot
%% handle the function. See '$handle_undefined_function' in the emqx_rule_funcs
%% module. See also callback function declaration in emqx_rule_funcs.erl.
-spec extra_functions_module() -> module() | undefined.
extra_functions_module() ->
persistent_term:get({?MODULE, extra_functions}, undefined).
-spec set_extra_functions_module(module()) -> ok.
set_extra_functions_module(Mod) ->
persistent_term:put({?MODULE, extra_functions}, Mod),
ok.
%% Checks whether the referenced bridges in actions all exist. If there are non-existent
%% ones, the rule shouldn't be allowed to be enabled.
%% The actions here are already parsed.
validate_bridge_existence_in_actions(#{actions := Actions, from := Froms} = _Rule) ->
BridgeIDs0 =
lists:map(
fun(BridgeID) ->
%% FIXME: this supposedly returns an upgraded type, but it's fuzzy: it
%% returns v1 types when attempting to "upgrade".....
{Type, Name} =
emqx_bridge_resource:parse_bridge_id(BridgeID, #{atom_name => false}),
case emqx_action_info:is_action_type(Type) of
true -> {source, Type, Name};
false -> {bridge_v1, Type, Name}
end
end,
get_referenced_hookpoints(Froms)
),
BridgeIDs1 =
lists:filtermap(
fun
({bridge_v2, Type, Name}) -> {true, {action, Type, Name}};
({bridge, Type, Name, _ResId}) -> {true, {bridge_v1, Type, Name}};
(_) -> false
end,
Actions
),
NonExistentBridgeIDs =
lists:filter(
fun({Kind, Type, Name}) ->
LookupFn =
case Kind of
action -> fun emqx_bridge_v2:lookup_action/2;
source -> fun emqx_bridge_v2:lookup_source/2;
bridge_v1 -> fun emqx_bridge:lookup/2
end,
try
case LookupFn(Type, Name) of
{ok, _} -> false;
{error, _} -> true
end
catch
_:_ -> true
end
end,
BridgeIDs0 ++ BridgeIDs1
),
case NonExistentBridgeIDs of
[] -> ok;
_ -> {error, #{nonexistent_bridge_ids => NonExistentBridgeIDs}}
end.