469 lines
18 KiB
Erlang
469 lines
18 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(emqx_rule_runtime).
|
|
|
|
-include("rule_engine.hrl").
|
|
-include("rule_actions.hrl").
|
|
-include_lib("emqx/include/emqx.hrl").
|
|
-include_lib("emqx/include/logger.hrl").
|
|
|
|
-export([ apply_rule/2
|
|
, apply_rules/2
|
|
, clear_rule_payload/0
|
|
]).
|
|
|
|
-import(emqx_rule_maps,
|
|
[ nested_get/2
|
|
, range_gen/2
|
|
, range_get/3
|
|
]).
|
|
|
|
-compile({no_auto_import,[alias/1]}).
|
|
|
|
-type(input() :: map()).
|
|
-type(alias() :: atom()).
|
|
-type(collection() :: {alias(), [term()]}).
|
|
|
|
-define(ephemeral_alias(TYPE, NAME),
|
|
iolist_to_binary(io_lib:format("_v_~s_~p_~p", [TYPE, NAME, erlang:system_time()]))).
|
|
|
|
-define(ActionMaxRetry, 3).
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% Apply rules
|
|
%%------------------------------------------------------------------------------
|
|
-spec(apply_rules(list(emqx_rule_engine:rule()), input()) -> ok).
|
|
apply_rules([], _Input) ->
|
|
ok;
|
|
apply_rules([#rule{enabled = false}|More], Input) ->
|
|
apply_rules(More, Input);
|
|
apply_rules([Rule|More], Input) ->
|
|
_ = apply_rule(Rule, Input),
|
|
apply_rules(More, Input).
|
|
|
|
apply_rule(Rule = #rule{id = RuleId}, Input) ->
|
|
clear_rule_payload(),
|
|
ok = emqx_rule_metrics:inc_rules_matched(RuleId),
|
|
%% Add metadata here caused we need support `metadata` and `rule_id` in SQL
|
|
try do_apply_rule(Rule, emqx_rule_utils:add_metadata(Input, #{rule_id => RuleId}))
|
|
catch
|
|
%% ignore the errors if select or match failed
|
|
_:Reason = {select_and_transform_error, Error} ->
|
|
emqx_rule_metrics:inc_rules_exception(RuleId),
|
|
?LOG(warning, "SELECT clause exception for ~s failed: ~p",
|
|
[RuleId, Error]),
|
|
{error, Reason};
|
|
_:Reason = {match_conditions_error, Error} ->
|
|
emqx_rule_metrics:inc_rules_exception(RuleId),
|
|
?LOG(warning, "WHERE clause exception for ~s failed: ~p",
|
|
[RuleId, Error]),
|
|
{error, Reason};
|
|
_:Reason = {select_and_collect_error, Error} ->
|
|
emqx_rule_metrics:inc_rules_exception(RuleId),
|
|
?LOG(warning, "FOREACH clause exception for ~s failed: ~p",
|
|
[RuleId, Error]),
|
|
{error, Reason};
|
|
_:Reason = {match_incase_error, Error} ->
|
|
emqx_rule_metrics:inc_rules_exception(RuleId),
|
|
?LOG(warning, "INCASE clause exception for ~s failed: ~p",
|
|
[RuleId, Error]),
|
|
{error, Reason};
|
|
_:Error:StkTrace ->
|
|
emqx_rule_metrics:inc_rules_exception(RuleId),
|
|
?LOG(error, "Apply rule ~s failed: ~p. Stacktrace:~n~p",
|
|
[RuleId, Error, StkTrace]),
|
|
{error, {Error, StkTrace}}
|
|
end.
|
|
|
|
do_apply_rule(#rule{id = RuleId,
|
|
is_foreach = true,
|
|
fields = Fields,
|
|
doeach = DoEach,
|
|
incase = InCase,
|
|
conditions = Conditions,
|
|
on_action_failed = OnFailed,
|
|
actions = Actions}, Input) ->
|
|
{Selected, Collection} = ?RAISE(select_and_collect(Fields, Input),
|
|
{select_and_collect_error, {_EXCLASS_,_EXCPTION_,_ST_}}),
|
|
ColumnsAndSelected = maps:merge(Input, Selected),
|
|
case ?RAISE(match_conditions(Conditions, ColumnsAndSelected),
|
|
{match_conditions_error, {_EXCLASS_,_EXCPTION_,_ST_}}) of
|
|
true ->
|
|
Collection2 = filter_collection(Input, InCase, DoEach, Collection),
|
|
case Collection2 of
|
|
[] -> emqx_rule_metrics:inc_rules_no_result(RuleId);
|
|
_ -> emqx_rule_metrics:inc_rules_passed(RuleId)
|
|
end,
|
|
{ok, [take_actions(Actions, Coll, Input, OnFailed) || Coll <- Collection2]};
|
|
false ->
|
|
ok = emqx_rule_metrics:inc_rules_no_result(RuleId),
|
|
{error, nomatch}
|
|
end;
|
|
|
|
do_apply_rule(#rule{id = RuleId,
|
|
is_foreach = false,
|
|
fields = Fields,
|
|
conditions = Conditions,
|
|
on_action_failed = OnFailed,
|
|
actions = Actions}, Input) ->
|
|
Selected = ?RAISE(select_and_transform(Fields, Input),
|
|
{select_and_transform_error, {_EXCLASS_,_EXCPTION_,_ST_}}),
|
|
case ?RAISE(match_conditions(Conditions, maps:merge(Input, Selected)),
|
|
{match_conditions_error, {_EXCLASS_,_EXCPTION_,_ST_}}) of
|
|
true ->
|
|
ok = emqx_rule_metrics:inc_rules_passed(RuleId),
|
|
{ok, take_actions(Actions, Selected, Input, OnFailed)};
|
|
false ->
|
|
ok = emqx_rule_metrics:inc_rules_no_result(RuleId),
|
|
{error, nomatch}
|
|
end.
|
|
|
|
clear_rule_payload() ->
|
|
erlang:erase(rule_payload).
|
|
|
|
%% SELECT Clause
|
|
select_and_transform(Fields, Input) ->
|
|
select_and_transform(Fields, Input, #{}).
|
|
|
|
select_and_transform([], _Input, Output) ->
|
|
Output;
|
|
select_and_transform(['*'|More], Input, Output) ->
|
|
select_and_transform(More, Input, maps:merge(Output, Input));
|
|
select_and_transform([{as, Field, Alias}|More], Input, Output) ->
|
|
Val = eval(Field, Input),
|
|
select_and_transform(More,
|
|
nested_put(Alias, Val, Input),
|
|
nested_put(Alias, Val, Output));
|
|
select_and_transform([Field|More], Input, Output) ->
|
|
Val = eval(Field, Input),
|
|
Key = alias(Field),
|
|
select_and_transform(More,
|
|
nested_put(Key, Val, Input),
|
|
nested_put(Key, Val, Output)).
|
|
|
|
%% FOREACH Clause
|
|
-spec select_and_collect(list(), input()) -> {input(), collection()}.
|
|
select_and_collect(Fields, Input) ->
|
|
select_and_collect(Fields, Input, {#{}, {'item', []}}).
|
|
|
|
select_and_collect([{as, Field, {_, A} = Alias}], Input, {Output, _}) ->
|
|
Val = eval(Field, Input),
|
|
{nested_put(Alias, Val, Output), {A, ensure_list(Val)}};
|
|
select_and_collect([{as, Field, Alias}|More], Input, {Output, LastKV}) ->
|
|
Val = eval(Field, Input),
|
|
select_and_collect(More,
|
|
nested_put(Alias, Val, Input),
|
|
{nested_put(Alias, Val, Output), LastKV});
|
|
select_and_collect([Field], Input, {Output, _}) ->
|
|
Val = eval(Field, Input),
|
|
Key = alias(Field),
|
|
{nested_put(Key, Val, Output), {'item', ensure_list(Val)}};
|
|
select_and_collect([Field|More], Input, {Output, LastKV}) ->
|
|
Val = eval(Field, Input),
|
|
Key = alias(Field),
|
|
select_and_collect(More,
|
|
nested_put(Key, Val, Input),
|
|
{nested_put(Key, Val, Output), LastKV}).
|
|
|
|
%% Filter each item got from FOREACH
|
|
filter_collection(Input, InCase, DoEach, {CollKey, CollVal}) ->
|
|
lists:filtermap(
|
|
fun(Item) ->
|
|
InputAndItem = maps:merge(Input, #{CollKey => Item}),
|
|
case ?RAISE(match_conditions(InCase, InputAndItem),
|
|
{match_incase_error, {_EXCLASS_,_EXCPTION_,_ST_}}) of
|
|
true when DoEach == [] -> {true, InputAndItem};
|
|
true ->
|
|
{true, ?RAISE(select_and_transform(DoEach, InputAndItem),
|
|
{doeach_error, {_EXCLASS_,_EXCPTION_,_ST_}})};
|
|
false -> false
|
|
end
|
|
end, CollVal).
|
|
|
|
%% Conditional Clauses such as WHERE, WHEN.
|
|
match_conditions({'and', L, R}, Data) ->
|
|
match_conditions(L, Data) andalso match_conditions(R, Data);
|
|
match_conditions({'or', L, R}, Data) ->
|
|
match_conditions(L, Data) orelse match_conditions(R, Data);
|
|
match_conditions({'not', Var}, Data) ->
|
|
case eval(Var, Data) of
|
|
Bool when is_boolean(Bool) ->
|
|
not Bool;
|
|
_other -> false
|
|
end;
|
|
match_conditions({in, Var, {list, Vals}}, Data) ->
|
|
lists:member(eval(Var, Data), [eval(V, Data) || V <- Vals]);
|
|
match_conditions({'fun', {_, Name}, Args}, Data) ->
|
|
apply_func(Name, [eval(Arg, Data) || Arg <- Args], Data);
|
|
match_conditions({Op, L, R}, Data) when ?is_comp(Op) ->
|
|
compare(Op, eval(L, Data), eval(R, Data));
|
|
%%match_conditions({'like', Var, Pattern}, Data) ->
|
|
%% match_like(eval(Var, Data), Pattern);
|
|
match_conditions({}, _Data) ->
|
|
true.
|
|
|
|
%% comparing numbers against strings
|
|
compare(Op, L, R) when L == undefined; R == undefined ->
|
|
do_compare(Op, L, R);
|
|
compare(Op, L, R) when is_number(L), is_binary(R) ->
|
|
do_compare(Op, L, number(R));
|
|
compare(Op, L, R) when is_binary(L), is_number(R) ->
|
|
do_compare(Op, number(L), R);
|
|
compare(Op, L, R) when is_atom(L), is_binary(R) ->
|
|
do_compare(Op, atom_to_binary(L, utf8), R);
|
|
compare(Op, L, R) when is_binary(L), is_atom(R) ->
|
|
do_compare(Op, L, atom_to_binary(R, utf8));
|
|
compare(Op, L, R) ->
|
|
do_compare(Op, L, R).
|
|
|
|
do_compare('=', L, R) -> L == R;
|
|
do_compare('>', L, R) when L == undefined; R == undefined -> false;
|
|
do_compare('>', L, R) -> L > R;
|
|
do_compare('<', L, R) when L == undefined; R == undefined -> false;
|
|
do_compare('<', L, R) -> L < R;
|
|
do_compare('<=', L, R) ->
|
|
do_compare('=', L, R) orelse do_compare('<', L, R);
|
|
do_compare('>=', L, R) ->
|
|
do_compare('=', L, R) orelse do_compare('>', L, R);
|
|
do_compare('<>', L, R) -> L /= R;
|
|
do_compare('!=', L, R) -> L /= R;
|
|
do_compare('=~', undefined, undefined) -> true;
|
|
do_compare('=~', T, F) when T == undefined; F == undefined -> false;
|
|
do_compare('=~', T, F) ->
|
|
emqx_topic:match(T, F).
|
|
|
|
number(Bin) ->
|
|
try binary_to_integer(Bin)
|
|
catch error:badarg -> binary_to_float(Bin)
|
|
end.
|
|
|
|
%% %% Step3 -> Take actions
|
|
%% fallback actions already have `rule_id` in `metadata`
|
|
take_actions(Actions, Selected, Envs, OnFailed) ->
|
|
[take_action(ActInst, Selected, emqx_rule_utils:add_metadata(Envs, ActInst), OnFailed, ?ActionMaxRetry)
|
|
|| ActInst <- Actions].
|
|
|
|
take_action(#action_instance{id = Id, name = ActName, fallbacks = Fallbacks} = ActInst,
|
|
Selected, Envs, OnFailed, RetryN) when RetryN >= 0 ->
|
|
try
|
|
{ok, #action_instance_params{apply = Apply}}
|
|
= emqx_rule_registry:get_action_instance_params(Id),
|
|
emqx_rule_metrics:inc_actions_taken(Id),
|
|
apply_action_func(Selected, Envs, Apply, ActName)
|
|
of
|
|
{badact, Reason} ->
|
|
handle_action_failure(OnFailed, Id, Fallbacks, Selected, Envs, Reason);
|
|
Result -> Result
|
|
catch
|
|
error:{badfun, _Func}:_ST ->
|
|
%?LOG(warning, "Action ~p maybe outdated, refresh it and try again."
|
|
% "Func: ~p~nST:~0p", [Id, Func, ST]),
|
|
_ = trans_action_on(Id, fun() ->
|
|
emqx_rule_engine:refresh_actions([ActInst])
|
|
end, 5000),
|
|
emqx_rule_metrics:inc_actions_retry(Id),
|
|
take_action(ActInst, Selected, Envs, OnFailed, RetryN-1);
|
|
Error:Reason:Stack ->
|
|
emqx_rule_metrics:inc_actions_exception(Id),
|
|
handle_action_failure(OnFailed, Id, Fallbacks, Selected, Envs, {Error, Reason, Stack})
|
|
end;
|
|
|
|
take_action(#action_instance{id = Id, fallbacks = Fallbacks}, Selected, Envs, OnFailed, _RetryN) ->
|
|
emqx_rule_metrics:inc_actions_error(Id),
|
|
handle_action_failure(OnFailed, Id, Fallbacks, Selected, Envs, {max_try_reached, ?ActionMaxRetry}).
|
|
|
|
apply_action_func(Data, Envs, #{mod := Mod, bindings := Bindings}, Name) ->
|
|
%% TODO: Build the Func Name when creating the action
|
|
Func = cbk_on_action_triggered(Name),
|
|
Mod:Func(Data, Envs#{'__bindings__' => Bindings});
|
|
apply_action_func(Data, Envs, Func, _Name) when is_function(Func) ->
|
|
erlang:apply(Func, [Data, Envs]).
|
|
|
|
cbk_on_action_triggered(Name) ->
|
|
list_to_atom("on_action_" ++ atom_to_list(Name)).
|
|
|
|
trans_action_on(Id, Callback, Timeout) ->
|
|
case emqx_rule_locker:lock(Id) of
|
|
true -> try Callback() after emqx_rule_locker:unlock(Id) end;
|
|
_ ->
|
|
wait_action_on(Id, Timeout div 10)
|
|
end.
|
|
|
|
wait_action_on(_, 0) ->
|
|
{error, timeout};
|
|
wait_action_on(Id, RetryN) ->
|
|
timer:sleep(10),
|
|
case emqx_rule_registry:get_action_instance_params(Id) of
|
|
not_found ->
|
|
{error, not_found};
|
|
{ok, #action_instance_params{apply = Apply}} ->
|
|
case catch apply_action_func(baddata, #{}, Apply, tryit) of
|
|
{'EXIT', {{badfun, _}, _}} ->
|
|
wait_action_on(Id, RetryN-1);
|
|
_ ->
|
|
ok
|
|
end
|
|
end.
|
|
|
|
handle_action_failure(continue, _Id, Fallbacks, Selected, Envs = #{metadata := Metadata}, Reason) ->
|
|
?LOG_RULE_ACTION(error, Metadata, "Continue next action, reason: ~0p", [Reason]),
|
|
_ = take_actions(Fallbacks, Selected, Envs, continue),
|
|
failed;
|
|
handle_action_failure(stop, Id, Fallbacks, Selected, Envs = #{metadata := Metadata}, Reason) ->
|
|
?LOG_RULE_ACTION(error, Metadata, "Skip all actions, reason: ~0p", [Reason]),
|
|
_ = take_actions(Fallbacks, Selected, Envs, continue),
|
|
error({take_action_failed, {Id, Reason}}).
|
|
|
|
eval({path, [{key, <<"payload">>} | Path]}, #{payload := Payload}) ->
|
|
nested_get({path, Path}, may_decode_payload(Payload));
|
|
eval({path, [{key, <<"payload">>} | Path]}, #{<<"payload">> := Payload}) ->
|
|
nested_get({path, Path}, may_decode_payload(Payload));
|
|
eval({path, _} = Path, Input) ->
|
|
nested_get(Path, Input);
|
|
eval({range, {Begin, End}}, _Input) ->
|
|
range_gen(Begin, End);
|
|
eval({get_range, {Begin, End}, Data}, Input) ->
|
|
range_get(Begin, End, eval(Data, Input));
|
|
eval({var, _} = Var, Input) ->
|
|
nested_get(Var, Input);
|
|
eval({const, Val}, _Input) ->
|
|
Val;
|
|
%% unary add
|
|
eval({'+', L}, Input) ->
|
|
eval(L, Input);
|
|
%% unary subtract
|
|
eval({'-', L}, Input) ->
|
|
-(eval(L, Input));
|
|
eval({Op, L, R}, Input) when ?is_arith(Op) ->
|
|
apply_func(Op, [eval(L, Input), eval(R, Input)], Input);
|
|
eval({Op, L, R}, Input) when ?is_comp(Op) ->
|
|
compare(Op, eval(L, Input), eval(R, Input));
|
|
eval({list, List}, Input) ->
|
|
[eval(L, Input) || L <- List];
|
|
eval({'case', <<>>, CaseClauses, ElseClauses}, Input) ->
|
|
eval_case_clauses(CaseClauses, ElseClauses, Input);
|
|
eval({'case', CaseOn, CaseClauses, ElseClauses}, Input) ->
|
|
eval_switch_clauses(CaseOn, CaseClauses, ElseClauses, Input);
|
|
eval({'fun', {_, Name}, Args}, Input) ->
|
|
apply_func(Name, [eval(Arg, Input) || Arg <- Args], Input).
|
|
|
|
handle_alias({path, [{key, <<"payload">>} | _]}, #{payload := Payload} = Input) ->
|
|
Input#{payload => may_decode_payload(Payload)};
|
|
handle_alias({path, [{key, <<"payload">>} | _]}, #{<<"payload">> := Payload} = Input) ->
|
|
Input#{<<"payload">> => may_decode_payload(Payload)};
|
|
handle_alias(_, Input) ->
|
|
Input.
|
|
|
|
alias({var, Var}) ->
|
|
{var, Var};
|
|
alias({const, Val}) when is_binary(Val) ->
|
|
{var, Val};
|
|
alias({list, L}) ->
|
|
{var, ?ephemeral_alias(list, length(L))};
|
|
alias({range, R}) ->
|
|
{var, ?ephemeral_alias(range, R)};
|
|
alias({get_range, _, {var, Key}}) ->
|
|
{var, Key};
|
|
alias({get_range, _, {path, Path}}) ->
|
|
{path, Path};
|
|
alias({path, Path}) ->
|
|
{path, Path};
|
|
alias({const, Val}) ->
|
|
{var, ?ephemeral_alias(const, Val)};
|
|
alias({Op, _L, _R}) when ?is_arith(Op); ?is_comp(Op) ->
|
|
{var, ?ephemeral_alias(op, Op)};
|
|
alias({'case', On, _, _}) ->
|
|
{var, ?ephemeral_alias('case', On)};
|
|
alias({'fun', Name, _}) ->
|
|
{var, ?ephemeral_alias('fun', Name)};
|
|
alias(_) ->
|
|
?ephemeral_alias(unknown, unknown).
|
|
|
|
eval_case_clauses([], ElseClauses, Input) ->
|
|
case ElseClauses of
|
|
{} -> undefined;
|
|
_ -> eval(ElseClauses, Input)
|
|
end;
|
|
eval_case_clauses([{Cond, Clause} | CaseClauses], ElseClauses, Input) ->
|
|
case match_conditions(Cond, Input) of
|
|
true ->
|
|
eval(Clause, Input);
|
|
_ ->
|
|
eval_case_clauses(CaseClauses, ElseClauses, Input)
|
|
end.
|
|
|
|
eval_switch_clauses(_CaseOn, [], ElseClauses, Input) ->
|
|
case ElseClauses of
|
|
{} -> undefined;
|
|
_ -> eval(ElseClauses, Input)
|
|
end;
|
|
eval_switch_clauses(CaseOn, [{Cond, Clause} | CaseClauses], ElseClauses, Input) ->
|
|
ConResult = eval(Cond, Input),
|
|
case eval(CaseOn, Input) of
|
|
ConResult ->
|
|
eval(Clause, Input);
|
|
_ ->
|
|
eval_switch_clauses(CaseOn, CaseClauses, ElseClauses, Input)
|
|
end.
|
|
|
|
apply_func(Name, Args, Input) when is_atom(Name) ->
|
|
do_apply_func(Name, Args, Input);
|
|
apply_func(Name, Args, Input) when is_binary(Name) ->
|
|
FunName =
|
|
try binary_to_existing_atom(Name, utf8)
|
|
catch error:badarg -> error({sql_function_not_supported, Name})
|
|
end,
|
|
do_apply_func(FunName, Args, Input).
|
|
|
|
do_apply_func(Name, Args, Input) ->
|
|
case erlang:apply(emqx_rule_funcs, Name, Args) of
|
|
Func when is_function(Func) ->
|
|
erlang:apply(Func, [Input]);
|
|
Result -> Result
|
|
end.
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% Internal Functions
|
|
%%------------------------------------------------------------------------------
|
|
may_decode_payload(Payload) when is_binary(Payload) ->
|
|
case get_cached_payload() of
|
|
undefined -> safe_decode_and_cache(Payload);
|
|
DecodedP -> DecodedP
|
|
end;
|
|
may_decode_payload(Payload) ->
|
|
Payload.
|
|
|
|
get_cached_payload() ->
|
|
erlang:get(rule_payload).
|
|
|
|
cache_payload(DecodedP) ->
|
|
erlang:put(rule_payload, DecodedP),
|
|
DecodedP.
|
|
|
|
safe_decode_and_cache(MaybeJson) ->
|
|
try cache_payload(emqx_json:decode(MaybeJson, [return_maps]))
|
|
catch
|
|
_:_:_-> error({decode_json_failed, MaybeJson})
|
|
end.
|
|
|
|
ensure_list(List) when is_list(List) -> List;
|
|
ensure_list(_NotList) -> [].
|
|
|
|
nested_put(Alias, Val, Input0) ->
|
|
Input = handle_alias(Alias, Input0),
|
|
emqx_rule_maps:nested_put(Alias, Val, Input).
|