481 lines
14 KiB
Erlang
481 lines
14 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(emqx_rule_engine).
|
|
|
|
-behaviour(gen_server).
|
|
-behaviour(emqx_config_handler).
|
|
|
|
-include("rule_engine.hrl").
|
|
-include_lib("emqx/include/logger.hrl").
|
|
-include_lib("stdlib/include/qlc.hrl").
|
|
|
|
-export([start_link/0]).
|
|
|
|
-export([
|
|
post_config_update/5,
|
|
config_key_path/0
|
|
]).
|
|
|
|
%% Rule Management
|
|
|
|
-export([load_rules/0]).
|
|
|
|
-export([
|
|
create_rule/1,
|
|
insert_rule/1,
|
|
update_rule/1,
|
|
delete_rule/1,
|
|
get_rule/1
|
|
]).
|
|
|
|
-export([
|
|
get_rules/0,
|
|
get_rules_for_topic/1,
|
|
get_rules_with_same_event/1,
|
|
get_rule_ids_by_action/1,
|
|
ensure_action_removed/2,
|
|
get_rules_ordered_by_ts/0
|
|
]).
|
|
|
|
%% exported for cluster_call
|
|
-export([
|
|
do_delete_rule/1,
|
|
do_insert_rule/1
|
|
]).
|
|
|
|
-export([
|
|
load_hooks_for_rule/1,
|
|
unload_hooks_for_rule/1,
|
|
maybe_add_metrics_for_rule/1,
|
|
clear_metrics_for_rule/1,
|
|
reset_metrics_for_rule/1
|
|
]).
|
|
|
|
%% exported for `emqx_telemetry'
|
|
-export([get_basic_usage_info/0]).
|
|
|
|
-export([now_ms/0]).
|
|
|
|
%% gen_server Callbacks
|
|
-export([
|
|
init/1,
|
|
handle_call/3,
|
|
handle_cast/2,
|
|
handle_info/2,
|
|
terminate/2,
|
|
code_change/3
|
|
]).
|
|
|
|
-define(RULE_ENGINE, ?MODULE).
|
|
|
|
-define(T_CALL, infinity).
|
|
|
|
%% NOTE: This order cannot be changed! This is to make the metric working during relup.
|
|
%% Append elements to this list to add new metrics.
|
|
-define(METRICS, [
|
|
'matched',
|
|
'passed',
|
|
'failed',
|
|
'failed.exception',
|
|
'failed.no_result',
|
|
'actions.total',
|
|
'actions.success',
|
|
'actions.failed',
|
|
'actions.failed.out_of_service',
|
|
'actions.failed.unknown'
|
|
]).
|
|
|
|
-define(RATE_METRICS, ['matched']).
|
|
|
|
-type action_name() :: binary() | #{function := binary()}.
|
|
|
|
config_key_path() ->
|
|
[rule_engine, rules].
|
|
|
|
-spec start_link() -> {ok, pid()} | ignore | {error, Reason :: term()}.
|
|
start_link() ->
|
|
gen_server:start_link({local, ?RULE_ENGINE}, ?MODULE, [], []).
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% The config handler for emqx_rule_engine
|
|
%%------------------------------------------------------------------------------
|
|
post_config_update(_, _Req, NewRules, OldRules, _AppEnvs) ->
|
|
#{added := Added, removed := Removed, changed := Updated} =
|
|
emqx_map_lib:diff_maps(NewRules, OldRules),
|
|
maps_foreach(
|
|
fun({Id, {_Old, New}}) ->
|
|
{ok, _} = update_rule(New#{id => bin(Id)})
|
|
end,
|
|
Updated
|
|
),
|
|
maps_foreach(
|
|
fun({Id, _Rule}) ->
|
|
ok = delete_rule(bin(Id))
|
|
end,
|
|
Removed
|
|
),
|
|
maps_foreach(
|
|
fun({Id, Rule}) ->
|
|
{ok, _} = create_rule(Rule#{id => bin(Id)})
|
|
end,
|
|
Added
|
|
),
|
|
{ok, get_rules()}.
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% APIs for rules
|
|
%%------------------------------------------------------------------------------
|
|
|
|
-spec load_rules() -> ok.
|
|
load_rules() ->
|
|
maps_foreach(
|
|
fun
|
|
({Id, #{metadata := #{created_at := CreatedAt}} = Rule}) ->
|
|
create_rule(Rule#{id => bin(Id)}, CreatedAt);
|
|
({Id, Rule}) ->
|
|
create_rule(Rule#{id => bin(Id)})
|
|
end,
|
|
emqx:get_config([rule_engine, rules], #{})
|
|
).
|
|
|
|
-spec create_rule(map()) -> {ok, rule()} | {error, term()}.
|
|
create_rule(Params) ->
|
|
create_rule(Params, now_ms()).
|
|
|
|
create_rule(Params = #{id := RuleId}, CreatedAt) when is_binary(RuleId) ->
|
|
case get_rule(RuleId) of
|
|
not_found -> parse_and_insert(Params, CreatedAt);
|
|
{ok, _} -> {error, already_exists}
|
|
end.
|
|
|
|
-spec update_rule(map()) -> {ok, rule()} | {error, term()}.
|
|
update_rule(Params = #{id := RuleId}) when is_binary(RuleId) ->
|
|
case get_rule(RuleId) of
|
|
not_found ->
|
|
{error, not_found};
|
|
{ok, #{created_at := CreatedAt}} ->
|
|
parse_and_insert(Params, CreatedAt)
|
|
end.
|
|
|
|
-spec delete_rule(RuleId :: rule_id()) -> ok.
|
|
delete_rule(RuleId) when is_binary(RuleId) ->
|
|
gen_server:call(?RULE_ENGINE, {delete_rule, RuleId}, ?T_CALL).
|
|
|
|
-spec insert_rule(Rule :: rule()) -> ok.
|
|
insert_rule(Rule) ->
|
|
gen_server:call(?RULE_ENGINE, {insert_rule, Rule}, ?T_CALL).
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% Rule Management
|
|
%%------------------------------------------------------------------------------
|
|
|
|
-spec get_rules() -> [rule()].
|
|
get_rules() ->
|
|
get_all_records(?RULE_TAB).
|
|
|
|
get_rules_ordered_by_ts() ->
|
|
lists:sort(
|
|
fun(#{created_at := CreatedA}, #{created_at := CreatedB}) ->
|
|
CreatedA =< CreatedB
|
|
end,
|
|
get_rules()
|
|
).
|
|
|
|
-spec get_rules_for_topic(Topic :: binary()) -> [rule()].
|
|
get_rules_for_topic(Topic) ->
|
|
[
|
|
Rule
|
|
|| Rule = #{from := From} <- get_rules(),
|
|
emqx_plugin_libs_rule:can_topic_match_oneof(Topic, From)
|
|
].
|
|
|
|
-spec get_rules_with_same_event(Topic :: binary()) -> [rule()].
|
|
get_rules_with_same_event(Topic) ->
|
|
EventName = emqx_rule_events:event_name(Topic),
|
|
[
|
|
Rule
|
|
|| Rule = #{from := From} <- get_rules(),
|
|
lists:any(fun(T) -> is_of_event_name(EventName, T) end, From)
|
|
].
|
|
|
|
-spec get_rule_ids_by_action(action_name()) -> [rule_id()].
|
|
get_rule_ids_by_action(ActionName) when is_binary(ActionName) ->
|
|
[
|
|
Id
|
|
|| #{actions := Acts, id := Id} <- get_rules(),
|
|
lists:any(fun(A) -> A =:= ActionName end, Acts)
|
|
];
|
|
get_rule_ids_by_action(#{function := FuncName}) when is_binary(FuncName) ->
|
|
{Mod, Fun} =
|
|
case string:split(FuncName, ":", leading) of
|
|
[M, F] -> {binary_to_module(M), F};
|
|
[F] -> {emqx_rule_actions, F}
|
|
end,
|
|
[
|
|
Id
|
|
|| #{actions := Acts, id := Id} <- get_rules(),
|
|
contains_actions(Acts, Mod, Fun)
|
|
].
|
|
|
|
-spec ensure_action_removed(rule_id(), action_name()) -> ok.
|
|
ensure_action_removed(RuleId, ActionName) ->
|
|
FilterFunc =
|
|
fun
|
|
(Func, Func) -> false;
|
|
(#{<<"function">> := Func}, #{function := Func}) -> false;
|
|
(_, _) -> true
|
|
end,
|
|
case emqx:get_raw_config([rule_engine, rules, RuleId], not_found) of
|
|
not_found ->
|
|
ok;
|
|
#{<<"actions">> := Acts} ->
|
|
NewActs = [AName || AName <- Acts, FilterFunc(AName, ActionName)],
|
|
{ok, _} = emqx_conf:update(
|
|
emqx_rule_engine:config_key_path() ++ [RuleId, actions],
|
|
NewActs,
|
|
#{override_to => cluster}
|
|
),
|
|
ok
|
|
end.
|
|
|
|
is_of_event_name(EventName, Topic) ->
|
|
EventName =:= emqx_rule_events:event_name(Topic).
|
|
|
|
-spec get_rule(Id :: rule_id()) -> {ok, rule()} | not_found.
|
|
get_rule(Id) ->
|
|
case ets:lookup(?RULE_TAB, Id) of
|
|
[{Id, Rule}] -> {ok, Rule#{id => Id}};
|
|
[] -> not_found
|
|
end.
|
|
|
|
load_hooks_for_rule(#{from := Topics}) ->
|
|
lists:foreach(fun emqx_rule_events:load/1, Topics).
|
|
|
|
maybe_add_metrics_for_rule(Id) ->
|
|
case emqx_metrics_worker:has_metrics(rule_metrics, Id) of
|
|
true ->
|
|
ok = reset_metrics_for_rule(Id);
|
|
false ->
|
|
ok = emqx_metrics_worker:create_metrics(rule_metrics, Id, ?METRICS, ?RATE_METRICS)
|
|
end.
|
|
|
|
clear_metrics_for_rule(Id) ->
|
|
ok = emqx_metrics_worker:clear_metrics(rule_metrics, Id).
|
|
|
|
-spec reset_metrics_for_rule(rule_id()) -> ok.
|
|
reset_metrics_for_rule(Id) ->
|
|
emqx_metrics_worker:reset_metrics(rule_metrics, Id).
|
|
|
|
unload_hooks_for_rule(#{id := Id, from := Topics}) ->
|
|
lists:foreach(
|
|
fun(Topic) ->
|
|
case get_rules_with_same_event(Topic) of
|
|
%% we are now deleting the last rule
|
|
[#{id := Id0}] when Id0 == Id ->
|
|
emqx_rule_events:unload(Topic);
|
|
_ ->
|
|
ok
|
|
end
|
|
end,
|
|
Topics
|
|
).
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% Telemetry helper functions
|
|
%%------------------------------------------------------------------------------
|
|
|
|
-spec get_basic_usage_info() ->
|
|
#{
|
|
num_rules => non_neg_integer(),
|
|
referenced_bridges =>
|
|
#{BridgeType => non_neg_integer()}
|
|
}
|
|
when
|
|
BridgeType :: atom().
|
|
get_basic_usage_info() ->
|
|
try
|
|
Rules = get_rules(),
|
|
EnabledRules =
|
|
lists:filter(
|
|
fun(#{enable := Enabled}) -> Enabled end,
|
|
Rules
|
|
),
|
|
NumRules = length(EnabledRules),
|
|
ReferencedBridges =
|
|
lists:foldl(
|
|
fun(#{actions := Actions, from := From}, Acc) ->
|
|
BridgeIDs0 = [BridgeID || <<"$bridges/", BridgeID/binary>> <- From],
|
|
BridgeIDs1 = lists:filter(fun is_binary/1, Actions),
|
|
tally_referenced_bridges(BridgeIDs0 ++ BridgeIDs1, Acc)
|
|
end,
|
|
#{},
|
|
EnabledRules
|
|
),
|
|
#{
|
|
num_rules => NumRules,
|
|
referenced_bridges => ReferencedBridges
|
|
}
|
|
catch
|
|
_:_ ->
|
|
#{
|
|
num_rules => 0,
|
|
referenced_bridges => #{}
|
|
}
|
|
end.
|
|
|
|
tally_referenced_bridges(BridgeIDs, Acc0) ->
|
|
lists:foldl(
|
|
fun(BridgeID, Acc) ->
|
|
{BridgeType, _BridgeName} = emqx_bridge_resource:parse_bridge_id(BridgeID),
|
|
maps:update_with(
|
|
BridgeType,
|
|
fun(X) -> X + 1 end,
|
|
1,
|
|
Acc
|
|
)
|
|
end,
|
|
Acc0,
|
|
BridgeIDs
|
|
).
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% gen_server callbacks
|
|
%%------------------------------------------------------------------------------
|
|
|
|
init([]) ->
|
|
_TableId = ets:new(?KV_TAB, [
|
|
named_table,
|
|
set,
|
|
public,
|
|
{write_concurrency, true},
|
|
{read_concurrency, true}
|
|
]),
|
|
ok = emqx_config_handler:add_handler(
|
|
[rule_engine, jq_implementation_module],
|
|
emqx_rule_engine_schema
|
|
),
|
|
{ok, #{}}.
|
|
|
|
handle_call({insert_rule, Rule}, _From, State) ->
|
|
do_insert_rule(Rule),
|
|
{reply, ok, State};
|
|
handle_call({delete_rule, Rule}, _From, State) ->
|
|
do_delete_rule(Rule),
|
|
{reply, ok, State};
|
|
handle_call(Req, _From, State) ->
|
|
?SLOG(error, #{msg => "unexpected_call", request => Req}),
|
|
{reply, ignored, State}.
|
|
|
|
handle_cast(Msg, State) ->
|
|
?SLOG(error, #{msg => "unexpected_cast", request => Msg}),
|
|
{noreply, State}.
|
|
|
|
handle_info(Info, State) ->
|
|
?SLOG(error, #{msg => "unexpected_info", request => Info}),
|
|
{noreply, State}.
|
|
|
|
terminate(_Reason, _State) ->
|
|
ok.
|
|
|
|
code_change(_OldVsn, State, _Extra) ->
|
|
{ok, State}.
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% Internal Functions
|
|
%%------------------------------------------------------------------------------
|
|
|
|
parse_and_insert(Params = #{id := RuleId, sql := Sql, actions := Actions}, CreatedAt) ->
|
|
case emqx_rule_sqlparser:parse(Sql) of
|
|
{ok, Select} ->
|
|
Rule = #{
|
|
id => RuleId,
|
|
name => maps:get(name, Params, <<"">>),
|
|
created_at => CreatedAt,
|
|
updated_at => now_ms(),
|
|
enable => maps:get(enable, Params, true),
|
|
sql => Sql,
|
|
actions => parse_actions(Actions),
|
|
description => maps:get(description, Params, ""),
|
|
%% -- calculated fields:
|
|
from => emqx_rule_sqlparser:select_from(Select),
|
|
is_foreach => emqx_rule_sqlparser:select_is_foreach(Select),
|
|
fields => emqx_rule_sqlparser:select_fields(Select),
|
|
doeach => emqx_rule_sqlparser:select_doeach(Select),
|
|
incase => emqx_rule_sqlparser:select_incase(Select),
|
|
conditions => emqx_rule_sqlparser:select_where(Select)
|
|
%% -- calculated fields end
|
|
},
|
|
ok = insert_rule(Rule),
|
|
{ok, Rule};
|
|
{error, Reason} ->
|
|
{error, Reason}
|
|
end.
|
|
|
|
do_insert_rule(#{id := Id} = Rule) ->
|
|
ok = load_hooks_for_rule(Rule),
|
|
ok = maybe_add_metrics_for_rule(Id),
|
|
true = ets:insert(?RULE_TAB, {Id, maps:remove(id, Rule)}),
|
|
ok.
|
|
|
|
do_delete_rule(RuleId) ->
|
|
case get_rule(RuleId) of
|
|
{ok, Rule} ->
|
|
ok = unload_hooks_for_rule(Rule),
|
|
ok = clear_metrics_for_rule(RuleId),
|
|
true = ets:delete(?RULE_TAB, RuleId),
|
|
ok;
|
|
not_found ->
|
|
ok
|
|
end.
|
|
|
|
parse_actions(Actions) ->
|
|
[do_parse_action(Act) || Act <- Actions].
|
|
|
|
do_parse_action(Action) when is_map(Action) ->
|
|
emqx_rule_actions:parse_action(Action);
|
|
do_parse_action(BridgeChannelId) when is_binary(BridgeChannelId) ->
|
|
BridgeChannelId.
|
|
|
|
get_all_records(Tab) ->
|
|
[Rule#{id => Id} || {Id, Rule} <- ets:tab2list(Tab)].
|
|
|
|
maps_foreach(Fun, Map) ->
|
|
lists:foreach(Fun, maps:to_list(Map)).
|
|
|
|
now_ms() ->
|
|
erlang:system_time(millisecond).
|
|
|
|
bin(A) when is_atom(A) -> atom_to_binary(A, utf8);
|
|
bin(B) when is_binary(B) -> B.
|
|
|
|
binary_to_module(ModName) ->
|
|
try
|
|
binary_to_existing_atom(ModName, utf8)
|
|
catch
|
|
error:badarg ->
|
|
not_exist_mod
|
|
end.
|
|
|
|
contains_actions(Actions, Mod0, Func0) ->
|
|
lists:any(
|
|
fun
|
|
(#{mod := Mod, func := Func}) when Mod =:= Mod0; Func =:= Func0 -> true;
|
|
(_) -> false
|
|
end,
|
|
Actions
|
|
).
|