emqx/apps/emqx_rule_engine/src/emqx_rule_runtime.erl

469 lines
18 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_rule_runtime).
-include("rule_engine.hrl").
-include("rule_actions.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/logger.hrl").
-export([ apply_rule/2
, apply_rules/2
, clear_rule_payload/0
]).
-import(emqx_rule_maps,
[ nested_get/2
, range_gen/2
, range_get/3
]).
-compile({no_auto_import,[alias/1]}).
-type(input() :: map()).
-type(alias() :: atom()).
-type(collection() :: {alias(), [term()]}).
-define(ephemeral_alias(TYPE, NAME),
iolist_to_binary(io_lib:format("_v_~s_~p_~p", [TYPE, NAME, erlang:system_time()]))).
-define(ActionMaxRetry, 3).
%%------------------------------------------------------------------------------
%% Apply rules
%%------------------------------------------------------------------------------
-spec(apply_rules(list(emqx_rule_engine:rule()), input()) -> ok).
apply_rules([], _Input) ->
ok;
apply_rules([#rule{enabled = false}|More], Input) ->
apply_rules(More, Input);
apply_rules([Rule|More], Input) ->
_ = apply_rule(Rule, Input),
apply_rules(More, Input).
apply_rule(Rule = #rule{id = RuleId}, Input) ->
clear_rule_payload(),
ok = emqx_rule_metrics:inc_rules_matched(RuleId),
%% Add metadata here caused we need support `metadata` and `rule_id` in SQL
try do_apply_rule(Rule, emqx_rule_utils:add_metadata(Input, #{rule_id => RuleId}))
catch
%% ignore the errors if select or match failed
_:Reason = {select_and_transform_error, Error} ->
emqx_rule_metrics:inc_rules_exception(RuleId),
?LOG(warning, "SELECT clause exception for ~s failed: ~p",
[RuleId, Error]),
{error, Reason};
_:Reason = {match_conditions_error, Error} ->
emqx_rule_metrics:inc_rules_exception(RuleId),
?LOG(warning, "WHERE clause exception for ~s failed: ~p",
[RuleId, Error]),
{error, Reason};
_:Reason = {select_and_collect_error, Error} ->
emqx_rule_metrics:inc_rules_exception(RuleId),
?LOG(warning, "FOREACH clause exception for ~s failed: ~p",
[RuleId, Error]),
{error, Reason};
_:Reason = {match_incase_error, Error} ->
emqx_rule_metrics:inc_rules_exception(RuleId),
?LOG(warning, "INCASE clause exception for ~s failed: ~p",
[RuleId, Error]),
{error, Reason};
_:Error:StkTrace ->
emqx_rule_metrics:inc_rules_exception(RuleId),
?LOG(error, "Apply rule ~s failed: ~p. Stacktrace:~n~p",
[RuleId, Error, StkTrace]),
{error, {Error, StkTrace}}
end.
do_apply_rule(#rule{id = RuleId,
is_foreach = true,
fields = Fields,
doeach = DoEach,
incase = InCase,
conditions = Conditions,
on_action_failed = OnFailed,
actions = Actions}, Input) ->
{Selected, Collection} = ?RAISE(select_and_collect(Fields, Input),
{select_and_collect_error, {_EXCLASS_,_EXCPTION_,_ST_}}),
ColumnsAndSelected = maps:merge(Input, Selected),
case ?RAISE(match_conditions(Conditions, ColumnsAndSelected),
{match_conditions_error, {_EXCLASS_,_EXCPTION_,_ST_}}) of
true ->
Collection2 = filter_collection(Input, InCase, DoEach, Collection),
case Collection2 of
[] -> emqx_rule_metrics:inc_rules_no_result(RuleId);
_ -> emqx_rule_metrics:inc_rules_passed(RuleId)
end,
{ok, [take_actions(Actions, Coll, Input, OnFailed) || Coll <- Collection2]};
false ->
ok = emqx_rule_metrics:inc_rules_no_result(RuleId),
{error, nomatch}
end;
do_apply_rule(#rule{id = RuleId,
is_foreach = false,
fields = Fields,
conditions = Conditions,
on_action_failed = OnFailed,
actions = Actions}, Input) ->
Selected = ?RAISE(select_and_transform(Fields, Input),
{select_and_transform_error, {_EXCLASS_,_EXCPTION_,_ST_}}),
case ?RAISE(match_conditions(Conditions, maps:merge(Input, Selected)),
{match_conditions_error, {_EXCLASS_,_EXCPTION_,_ST_}}) of
true ->
ok = emqx_rule_metrics:inc_rules_passed(RuleId),
{ok, take_actions(Actions, Selected, Input, OnFailed)};
false ->
ok = emqx_rule_metrics:inc_rules_no_result(RuleId),
{error, nomatch}
end.
clear_rule_payload() ->
erlang:erase(rule_payload).
%% SELECT Clause
select_and_transform(Fields, Input) ->
select_and_transform(Fields, Input, #{}).
select_and_transform([], _Input, Output) ->
Output;
select_and_transform(['*'|More], Input, Output) ->
select_and_transform(More, Input, maps:merge(Output, Input));
select_and_transform([{as, Field, Alias}|More], Input, Output) ->
Val = eval(Field, Input),
select_and_transform(More,
nested_put(Alias, Val, Input),
nested_put(Alias, Val, Output));
select_and_transform([Field|More], Input, Output) ->
Val = eval(Field, Input),
Key = alias(Field),
select_and_transform(More,
nested_put(Key, Val, Input),
nested_put(Key, Val, Output)).
%% FOREACH Clause
-spec select_and_collect(list(), input()) -> {input(), collection()}.
select_and_collect(Fields, Input) ->
select_and_collect(Fields, Input, {#{}, {'item', []}}).
select_and_collect([{as, Field, {_, A} = Alias}], Input, {Output, _}) ->
Val = eval(Field, Input),
{nested_put(Alias, Val, Output), {A, ensure_list(Val)}};
select_and_collect([{as, Field, Alias}|More], Input, {Output, LastKV}) ->
Val = eval(Field, Input),
select_and_collect(More,
nested_put(Alias, Val, Input),
{nested_put(Alias, Val, Output), LastKV});
select_and_collect([Field], Input, {Output, _}) ->
Val = eval(Field, Input),
Key = alias(Field),
{nested_put(Key, Val, Output), {'item', ensure_list(Val)}};
select_and_collect([Field|More], Input, {Output, LastKV}) ->
Val = eval(Field, Input),
Key = alias(Field),
select_and_collect(More,
nested_put(Key, Val, Input),
{nested_put(Key, Val, Output), LastKV}).
%% Filter each item got from FOREACH
filter_collection(Input, InCase, DoEach, {CollKey, CollVal}) ->
lists:filtermap(
fun(Item) ->
InputAndItem = maps:merge(Input, #{CollKey => Item}),
case ?RAISE(match_conditions(InCase, InputAndItem),
{match_incase_error, {_EXCLASS_,_EXCPTION_,_ST_}}) of
true when DoEach == [] -> {true, InputAndItem};
true ->
{true, ?RAISE(select_and_transform(DoEach, InputAndItem),
{doeach_error, {_EXCLASS_,_EXCPTION_,_ST_}})};
false -> false
end
end, CollVal).
%% Conditional Clauses such as WHERE, WHEN.
match_conditions({'and', L, R}, Data) ->
match_conditions(L, Data) andalso match_conditions(R, Data);
match_conditions({'or', L, R}, Data) ->
match_conditions(L, Data) orelse match_conditions(R, Data);
match_conditions({'not', Var}, Data) ->
case eval(Var, Data) of
Bool when is_boolean(Bool) ->
not Bool;
_other -> false
end;
match_conditions({in, Var, {list, Vals}}, Data) ->
lists:member(eval(Var, Data), [eval(V, Data) || V <- Vals]);
match_conditions({'fun', {_, Name}, Args}, Data) ->
apply_func(Name, [eval(Arg, Data) || Arg <- Args], Data);
match_conditions({Op, L, R}, Data) when ?is_comp(Op) ->
compare(Op, eval(L, Data), eval(R, Data));
%%match_conditions({'like', Var, Pattern}, Data) ->
%% match_like(eval(Var, Data), Pattern);
match_conditions({}, _Data) ->
true.
%% comparing numbers against strings
compare(Op, L, R) when L == undefined; R == undefined ->
do_compare(Op, L, R);
compare(Op, L, R) when is_number(L), is_binary(R) ->
do_compare(Op, L, number(R));
compare(Op, L, R) when is_binary(L), is_number(R) ->
do_compare(Op, number(L), R);
compare(Op, L, R) when is_atom(L), is_binary(R) ->
do_compare(Op, atom_to_binary(L, utf8), R);
compare(Op, L, R) when is_binary(L), is_atom(R) ->
do_compare(Op, L, atom_to_binary(R, utf8));
compare(Op, L, R) ->
do_compare(Op, L, R).
do_compare('=', L, R) -> L == R;
do_compare('>', L, R) when L == undefined; R == undefined -> false;
do_compare('>', L, R) -> L > R;
do_compare('<', L, R) when L == undefined; R == undefined -> false;
do_compare('<', L, R) -> L < R;
do_compare('<=', L, R) ->
do_compare('=', L, R) orelse do_compare('<', L, R);
do_compare('>=', L, R) ->
do_compare('=', L, R) orelse do_compare('>', L, R);
do_compare('<>', L, R) -> L /= R;
do_compare('!=', L, R) -> L /= R;
do_compare('=~', undefined, undefined) -> true;
do_compare('=~', T, F) when T == undefined; F == undefined -> false;
do_compare('=~', T, F) ->
emqx_topic:match(T, F).
number(Bin) ->
try binary_to_integer(Bin)
catch error:badarg -> binary_to_float(Bin)
end.
%% %% Step3 -> Take actions
%% fallback actions already have `rule_id` in `metadata`
take_actions(Actions, Selected, Envs, OnFailed) ->
[take_action(ActInst, Selected, emqx_rule_utils:add_metadata(Envs, ActInst), OnFailed, ?ActionMaxRetry)
|| ActInst <- Actions].
take_action(#action_instance{id = Id, name = ActName, fallbacks = Fallbacks} = ActInst,
Selected, Envs, OnFailed, RetryN) when RetryN >= 0 ->
try
{ok, #action_instance_params{apply = Apply}}
= emqx_rule_registry:get_action_instance_params(Id),
emqx_rule_metrics:inc_actions_taken(Id),
apply_action_func(Selected, Envs, Apply, ActName)
of
{badact, Reason} ->
handle_action_failure(OnFailed, Id, Fallbacks, Selected, Envs, Reason);
Result -> Result
catch
error:{badfun, _Func}:_ST ->
%?LOG(warning, "Action ~p maybe outdated, refresh it and try again."
% "Func: ~p~nST:~0p", [Id, Func, ST]),
_ = trans_action_on(Id, fun() ->
emqx_rule_engine:refresh_actions([ActInst])
end, 5000),
emqx_rule_metrics:inc_actions_retry(Id),
take_action(ActInst, Selected, Envs, OnFailed, RetryN-1);
Error:Reason:Stack ->
emqx_rule_metrics:inc_actions_exception(Id),
handle_action_failure(OnFailed, Id, Fallbacks, Selected, Envs, {Error, Reason, Stack})
end;
take_action(#action_instance{id = Id, fallbacks = Fallbacks}, Selected, Envs, OnFailed, _RetryN) ->
emqx_rule_metrics:inc_actions_error(Id),
handle_action_failure(OnFailed, Id, Fallbacks, Selected, Envs, {max_try_reached, ?ActionMaxRetry}).
apply_action_func(Data, Envs, #{mod := Mod, bindings := Bindings}, Name) ->
%% TODO: Build the Func Name when creating the action
Func = cbk_on_action_triggered(Name),
Mod:Func(Data, Envs#{'__bindings__' => Bindings});
apply_action_func(Data, Envs, Func, _Name) when is_function(Func) ->
erlang:apply(Func, [Data, Envs]).
cbk_on_action_triggered(Name) ->
list_to_atom("on_action_" ++ atom_to_list(Name)).
trans_action_on(Id, Callback, Timeout) ->
case emqx_rule_locker:lock(Id) of
true -> try Callback() after emqx_rule_locker:unlock(Id) end;
_ ->
wait_action_on(Id, Timeout div 10)
end.
wait_action_on(_, 0) ->
{error, timeout};
wait_action_on(Id, RetryN) ->
timer:sleep(10),
case emqx_rule_registry:get_action_instance_params(Id) of
not_found ->
{error, not_found};
{ok, #action_instance_params{apply = Apply}} ->
case catch apply_action_func(baddata, #{}, Apply, tryit) of
{'EXIT', {{badfun, _}, _}} ->
wait_action_on(Id, RetryN-1);
_ ->
ok
end
end.
handle_action_failure(continue, _Id, Fallbacks, Selected, Envs = #{metadata := Metadata}, Reason) ->
?LOG_RULE_ACTION(error, Metadata, "Continue next action, reason: ~0p", [Reason]),
_ = take_actions(Fallbacks, Selected, Envs, continue),
failed;
handle_action_failure(stop, Id, Fallbacks, Selected, Envs = #{metadata := Metadata}, Reason) ->
?LOG_RULE_ACTION(error, Metadata, "Skip all actions, reason: ~0p", [Reason]),
_ = take_actions(Fallbacks, Selected, Envs, continue),
error({take_action_failed, {Id, Reason}}).
eval({path, [{key, <<"payload">>} | Path]}, #{payload := Payload}) ->
nested_get({path, Path}, may_decode_payload(Payload));
eval({path, [{key, <<"payload">>} | Path]}, #{<<"payload">> := Payload}) ->
nested_get({path, Path}, may_decode_payload(Payload));
eval({path, _} = Path, Input) ->
nested_get(Path, Input);
eval({range, {Begin, End}}, _Input) ->
range_gen(Begin, End);
eval({get_range, {Begin, End}, Data}, Input) ->
range_get(Begin, End, eval(Data, Input));
eval({var, _} = Var, Input) ->
nested_get(Var, Input);
eval({const, Val}, _Input) ->
Val;
%% unary add
eval({'+', L}, Input) ->
eval(L, Input);
%% unary subtract
eval({'-', L}, Input) ->
-(eval(L, Input));
eval({Op, L, R}, Input) when ?is_arith(Op) ->
apply_func(Op, [eval(L, Input), eval(R, Input)], Input);
eval({Op, L, R}, Input) when ?is_comp(Op) ->
compare(Op, eval(L, Input), eval(R, Input));
eval({list, List}, Input) ->
[eval(L, Input) || L <- List];
eval({'case', <<>>, CaseClauses, ElseClauses}, Input) ->
eval_case_clauses(CaseClauses, ElseClauses, Input);
eval({'case', CaseOn, CaseClauses, ElseClauses}, Input) ->
eval_switch_clauses(CaseOn, CaseClauses, ElseClauses, Input);
eval({'fun', {_, Name}, Args}, Input) ->
apply_func(Name, [eval(Arg, Input) || Arg <- Args], Input).
handle_alias({path, [{key, <<"payload">>} | _]}, #{payload := Payload} = Input) ->
Input#{payload => may_decode_payload(Payload)};
handle_alias({path, [{key, <<"payload">>} | _]}, #{<<"payload">> := Payload} = Input) ->
Input#{<<"payload">> => may_decode_payload(Payload)};
handle_alias(_, Input) ->
Input.
alias({var, Var}) ->
{var, Var};
alias({const, Val}) when is_binary(Val) ->
{var, Val};
alias({list, L}) ->
{var, ?ephemeral_alias(list, length(L))};
alias({range, R}) ->
{var, ?ephemeral_alias(range, R)};
alias({get_range, _, {var, Key}}) ->
{var, Key};
alias({get_range, _, {path, Path}}) ->
{path, Path};
alias({path, Path}) ->
{path, Path};
alias({const, Val}) ->
{var, ?ephemeral_alias(const, Val)};
alias({Op, _L, _R}) when ?is_arith(Op); ?is_comp(Op) ->
{var, ?ephemeral_alias(op, Op)};
alias({'case', On, _, _}) ->
{var, ?ephemeral_alias('case', On)};
alias({'fun', Name, _}) ->
{var, ?ephemeral_alias('fun', Name)};
alias(_) ->
?ephemeral_alias(unknown, unknown).
eval_case_clauses([], ElseClauses, Input) ->
case ElseClauses of
{} -> undefined;
_ -> eval(ElseClauses, Input)
end;
eval_case_clauses([{Cond, Clause} | CaseClauses], ElseClauses, Input) ->
case match_conditions(Cond, Input) of
true ->
eval(Clause, Input);
_ ->
eval_case_clauses(CaseClauses, ElseClauses, Input)
end.
eval_switch_clauses(_CaseOn, [], ElseClauses, Input) ->
case ElseClauses of
{} -> undefined;
_ -> eval(ElseClauses, Input)
end;
eval_switch_clauses(CaseOn, [{Cond, Clause} | CaseClauses], ElseClauses, Input) ->
ConResult = eval(Cond, Input),
case eval(CaseOn, Input) of
ConResult ->
eval(Clause, Input);
_ ->
eval_switch_clauses(CaseOn, CaseClauses, ElseClauses, Input)
end.
apply_func(Name, Args, Input) when is_atom(Name) ->
do_apply_func(Name, Args, Input);
apply_func(Name, Args, Input) when is_binary(Name) ->
FunName =
try binary_to_existing_atom(Name, utf8)
catch error:badarg -> error({sql_function_not_supported, Name})
end,
do_apply_func(FunName, Args, Input).
do_apply_func(Name, Args, Input) ->
case erlang:apply(emqx_rule_funcs, Name, Args) of
Func when is_function(Func) ->
erlang:apply(Func, [Input]);
Result -> Result
end.
%%------------------------------------------------------------------------------
%% Internal Functions
%%------------------------------------------------------------------------------
may_decode_payload(Payload) when is_binary(Payload) ->
case get_cached_payload() of
undefined -> safe_decode_and_cache(Payload);
DecodedP -> DecodedP
end;
may_decode_payload(Payload) ->
Payload.
get_cached_payload() ->
erlang:get(rule_payload).
cache_payload(DecodedP) ->
erlang:put(rule_payload, DecodedP),
DecodedP.
safe_decode_and_cache(MaybeJson) ->
try cache_payload(emqx_json:decode(MaybeJson, [return_maps]))
catch
_:_:_-> error({decode_json_failed, MaybeJson})
end.
ensure_list(List) when is_list(List) -> List;
ensure_list(_NotList) -> [].
nested_put(Alias, Val, Input0) ->
Input = handle_alias(Alias, Input0),
emqx_rule_maps:nested_put(Alias, Val, Input).