emqx/apps/emqx_rule_engine/src/emqx_rule_engine.erl

652 lines
27 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_rule_engine).
-include("rule_engine.hrl").
-include_lib("emqx/include/logger.hrl").
-export([ load_providers/0
, unload_providers/0
, refresh_resources/0
, refresh_resource/1
, refresh_rule/1
, refresh_rules/0
, refresh_actions/1
, refresh_actions/2
, refresh_resource_status/0
]).
-export([ create_rule/1
, update_rule/1
, delete_rule/1
, create_resource/1
, test_resource/1
, start_resource/1
, get_resource_status/1
, get_resource_params/1
, delete_resource/1
, update_resource/2
]).
-export([ init_resource/4
, init_action/4
, clear_resource/3
, clear_rule/1
, clear_actions/1
, clear_action/3
]).
-type(rule() :: #rule{}).
-type(action() :: #action{}).
-type(resource() :: #resource{}).
-type(resource_type() :: #resource_type{}).
-type(resource_params() :: #resource_params{}).
-type(action_instance_params() :: #action_instance_params{}).
-export_type([ rule/0
, action/0
, resource/0
, resource_type/0
, resource_params/0
, action_instance_params/0
]).
-define(T_RETRY, 60000).
%%------------------------------------------------------------------------------
%% Load resource/action providers from all available applications
%%------------------------------------------------------------------------------
%% Load all providers .
-spec(load_providers() -> ok).
load_providers() ->
lists:foreach(fun(App) ->
load_provider(App)
end, ignore_lib_apps(application:loaded_applications())).
-spec(load_provider(App :: atom()) -> ok).
load_provider(App) when is_atom(App) ->
ok = load_actions(App),
ok = load_resource_types(App).
%%------------------------------------------------------------------------------
%% Unload providers
%%------------------------------------------------------------------------------
%% Load all providers .
-spec(unload_providers() -> ok).
unload_providers() ->
lists:foreach(fun(App) ->
unload_provider(App)
end, ignore_lib_apps(application:loaded_applications())).
%% @doc Unload a provider.
-spec(unload_provider(App :: atom()) -> ok).
unload_provider(App) ->
ok = emqx_rule_registry:remove_actions_of(App),
ok = emqx_rule_registry:unregister_resource_types_of(App).
load_actions(App) ->
Actions = find_actions(App),
emqx_rule_registry:add_actions(Actions).
load_resource_types(App) ->
ResourceTypes = find_resource_types(App),
emqx_rule_registry:register_resource_types(ResourceTypes).
-spec(find_actions(App :: atom()) -> list(action())).
find_actions(App) ->
lists:map(fun new_action/1, find_attrs(App, rule_action)).
-spec(find_resource_types(App :: atom()) -> list(resource_type())).
find_resource_types(App) ->
lists:map(fun new_resource_type/1, find_attrs(App, resource_type)).
new_action({App, Mod, #{name := Name,
for := Hook,
types := Types,
create := Create,
params := ParamsSpec} = Params}) ->
ok = emqx_rule_validator:validate_spec(ParamsSpec),
#action{name = Name, for = Hook, app = App, types = Types,
category = maps:get(category, Params, other),
module = Mod, on_create = Create,
hidden = maps:get(hidden, Params, false),
on_destroy = maps:get(destroy, Params, undefined),
params_spec = ParamsSpec,
title = maps:get(title, Params, ?descr),
description = maps:get(description, Params, ?descr)}.
new_resource_type({App, Mod, #{name := Name,
params := ParamsSpec,
create := Create} = Params}) ->
ok = emqx_rule_validator:validate_spec(ParamsSpec),
#resource_type{name = Name, provider = App,
params_spec = ParamsSpec,
on_create = {Mod, Create},
on_status = {Mod, maps:get(status, Params, undefined)},
on_destroy = {Mod, maps:get(destroy, Params, undefined)},
title = maps:get(title, Params, ?descr),
description = maps:get(description, Params, ?descr)}.
find_attrs(App, Def) ->
[{App, Mod, Attr} || {ok, Modules} <- [application:get_key(App, modules)],
Mod <- Modules,
{Name, Attrs} <- module_attributes(Mod), Name =:= Def,
Attr <- Attrs].
module_attributes(Module) ->
try Module:module_info(attributes)
catch
error:undef -> []
end.
%%------------------------------------------------------------------------------
%% APIs for rules and resources
%%------------------------------------------------------------------------------
-dialyzer([{nowarn_function, [create_rule/1, rule_id/0]}]).
-spec create_rule(map()) -> {ok, rule()} | {error, term()}.
create_rule(Params = #{rawsql := Sql, actions := ActArgs}) ->
case emqx_rule_sqlparser:parse_select(Sql) of
{ok, Select} ->
RuleId = maps:get(id, Params, rule_id()),
Enabled = maps:get(enabled, Params, true),
try prepare_actions(ActArgs, Enabled) of
Actions ->
Rule = #rule{
id = RuleId,
rawsql = Sql,
for = emqx_rule_sqlparser:select_from(Select),
is_foreach = emqx_rule_sqlparser:select_is_foreach(Select),
fields = emqx_rule_sqlparser:select_fields(Select),
doeach = emqx_rule_sqlparser:select_doeach(Select),
incase = emqx_rule_sqlparser:select_incase(Select),
conditions = emqx_rule_sqlparser:select_where(Select),
on_action_failed = maps:get(on_action_failed, Params, continue),
actions = Actions,
enabled = Enabled,
created_at = erlang:system_time(millisecond),
description = maps:get(description, Params, ""),
state = normal
},
ok = emqx_rule_registry:add_rule(Rule),
ok = emqx_rule_metrics:create_rule_metrics(RuleId),
{ok, Rule}
catch
throw:{action_not_found, ActionName} ->
{error, {action_not_found, ActionName}};
throw:Reason ->
{error, Reason}
end;
Reason -> {error, Reason}
end.
-spec(update_rule(#{id := binary(), _=>_}) -> {ok, rule()} | {error, {not_found, rule_id()}}).
update_rule(Params = #{id := RuleId}) ->
case emqx_rule_registry:get_rule(RuleId) of
{ok, Rule0} ->
try may_update_rule_params(Rule0, Params) of
Rule ->
ok = emqx_rule_registry:add_rule(Rule),
{ok, Rule}
catch
throw:Reason ->
{error, Reason}
end;
not_found ->
{error, {not_found, RuleId}}
end.
-spec(delete_rule(RuleId :: rule_id()) -> ok).
delete_rule(RuleId) ->
case emqx_rule_registry:get_rule(RuleId) of
{ok, Rule = #rule{actions = Actions}} ->
try
_ = ?CLUSTER_CALL(clear_rule, [Rule]),
ok = emqx_rule_registry:remove_rule(Rule)
catch
Error:Reason:ST ->
?LOG(error, "clear_rule ~p failed: ~p", [RuleId, {Error, Reason, ST}]),
refresh_actions(Actions)
end;
not_found ->
ok
end.
-spec(create_resource(#{type := _, config := _, _ => _}) -> {ok, resource()} | {error, Reason :: term()}).
create_resource(#{type := Type, config := Config0} = Params) ->
case emqx_rule_registry:find_resource_type(Type) of
{ok, #resource_type{on_create = {M, F}, params_spec = ParamSpec}} ->
Config = emqx_rule_validator:validate_params(Config0, ParamSpec),
ResId = maps:get(id, Params, resource_id()),
Resource = #resource{id = ResId,
type = Type,
config = Config,
description = iolist_to_binary(maps:get(description, Params, "")),
created_at = erlang:system_time(millisecond)
},
ok = emqx_rule_registry:add_resource(Resource),
%% Note that we will return OK in case of resource creation failure,
%% A timer is started to re-start the resource later.
catch _ = ?CLUSTER_CALL(init_resource, [M, F, ResId, Config]),
{ok, Resource};
not_found ->
{error, {resource_type_not_found, Type}}
end.
-spec(update_resource(resource_id(), map()) -> ok | {error, Reason :: term()}).
update_resource(ResId, NewParams) ->
case emqx_rule_registry:find_enabled_rules_depends_on_resource(ResId) of
[] -> check_and_update_resource(ResId, NewParams);
Rules ->
{error, {dependent_rules_exists, [Id || #rule{id = Id} <- Rules]}}
end.
check_and_update_resource(Id, NewParams) ->
case emqx_rule_registry:find_resource(Id) of
{ok, #resource{id = Id, type = Type, config = OldConfig, description = OldDescr}} ->
try
Conifg = maps:get(<<"config">>, NewParams, OldConfig),
Descr = maps:get(<<"description">>, NewParams, OldDescr),
do_check_and_update_resource(#{id => Id, config => Conifg, type => Type,
description => Descr})
catch Error:Reason:ST ->
?LOG(error, "check_and_update_resource failed: ~0p", [{Error, Reason, ST}]),
{error, Reason}
end;
_Other ->
{error, not_found}
end.
do_check_and_update_resource(#{id := Id, type := Type, description := NewDescription,
config := NewConfig}) ->
case emqx_rule_registry:find_resource_type(Type) of
{ok, #resource_type{on_create = {Module, Create},
params_spec = ParamSpec}} ->
Config = emqx_rule_validator:validate_params(NewConfig, ParamSpec),
case test_resource(#{type => Type, config => NewConfig}) of
ok ->
_ = ?CLUSTER_CALL(init_resource, [Module, Create, Id, Config]),
emqx_rule_registry:add_resource(#resource{
id = Id,
type = Type,
config = Config,
description = NewDescription,
created_at = erlang:system_time(millisecond)
}),
ok;
{error, Reason} ->
error({error, Reason})
end
end.
-spec(start_resource(resource_id()) -> ok | {error, Reason :: term()}).
start_resource(ResId) ->
case emqx_rule_registry:find_resource(ResId) of
{ok, #resource{type = ResType, config = Config}} ->
{ok, #resource_type{on_create = {Mod, Create}}}
= emqx_rule_registry:find_resource_type(ResType),
try
init_resource(Mod, Create, ResId, Config),
refresh_actions_of_a_resource(ResId)
catch
throw:Reason -> {error, Reason}
end;
not_found ->
{error, {resource_not_found, ResId}}
end.
-spec(test_resource(#{type := _, config := _, _ => _}) -> ok | {error, Reason :: term()}).
test_resource(#{type := Type, config := Config0}) ->
case emqx_rule_registry:find_resource_type(Type) of
{ok, #resource_type{on_create = {ModC, Create},
on_destroy = {ModD, Destroy},
params_spec = ParamSpec}} ->
Config = emqx_rule_validator:validate_params(Config0, ParamSpec),
ResId = resource_id(),
try
_ = ?CLUSTER_CALL(init_resource, [ModC, Create, ResId, Config]),
_ = ?CLUSTER_CALL(clear_resource, [ModD, Destroy, ResId]),
ok
catch
throw:Reason -> {error, Reason}
end;
not_found ->
{error, {resource_type_not_found, Type}}
end.
-spec(get_resource_status(resource_id()) -> {ok, resource_status()} | {error, Reason :: term()}).
get_resource_status(ResId) ->
case emqx_rule_registry:find_resource(ResId) of
{ok, #resource{type = ResType}} ->
{ok, #resource_type{on_status = {Mod, OnStatus}}}
= emqx_rule_registry:find_resource_type(ResType),
Status = fetch_resource_status(Mod, OnStatus, ResId),
{ok, Status};
not_found ->
{error, {resource_not_found, ResId}}
end.
-spec(get_resource_params(resource_id()) -> {ok, map()} | {error, Reason :: term()}).
get_resource_params(ResId) ->
case emqx_rule_registry:find_resource_params(ResId) of
{ok, #resource_params{params = Params}} ->
{ok, Params};
not_found ->
{error, resource_not_initialized}
end.
-spec(delete_resource(resource_id()) -> ok | {error, Reason :: term()}).
delete_resource(ResId) ->
case emqx_rule_registry:find_resource(ResId) of
{ok, #resource{type = ResType}} ->
{ok, #resource_type{on_destroy = {ModD, Destroy}}}
= emqx_rule_registry:find_resource_type(ResType),
try
case emqx_rule_registry:remove_resource(ResId) of
ok ->
_ = ?CLUSTER_CALL(clear_resource, [ModD, Destroy, ResId]),
ok;
{error, _} = R -> R
end
catch
throw:Reason -> {error, Reason}
end;
not_found ->
{error, not_found}
end.
%%------------------------------------------------------------------------------
%% Re-establish resources
%%------------------------------------------------------------------------------
-spec(refresh_resources() -> ok).
refresh_resources() ->
lists:foreach(fun refresh_resource/1,
emqx_rule_registry:get_resources()).
refresh_resource(Type) when is_atom(Type) ->
lists:foreach(fun refresh_resource/1,
emqx_rule_registry:get_resources_by_type(Type));
refresh_resource(#resource{id = ResId, type = Type, config = Config}) ->
try
{ok, #resource_type{on_create = {M, F}}} =
emqx_rule_registry:find_resource_type(Type),
ok = emqx_rule_engine:init_resource(M, F, ResId, Config)
catch _:_ ->
emqx_rule_monitor:ensure_resource_retrier(ResId, ?T_RETRY)
end.
-spec(refresh_rules() -> ok).
refresh_rules() ->
lists:foreach(fun
(#rule{enabled = true} = Rule) ->
try refresh_rule(Rule)
catch _:_ ->
emqx_rule_registry:add_rule(Rule#rule{enabled = false, state = refresh_failed_at_bootup})
end;
(_) -> ok
end, emqx_rule_registry:get_rules()).
refresh_rule(#rule{id = RuleId, for = Topics, actions = Actions}) ->
ok = emqx_rule_metrics:create_rule_metrics(RuleId),
lists:foreach(fun emqx_rule_events:load/1, Topics),
refresh_actions(Actions).
-spec(refresh_resource_status() -> ok).
refresh_resource_status() ->
lists:foreach(
fun(#resource{id = ResId, type = ResType}) ->
case emqx_rule_registry:find_resource_type(ResType) of
{ok, #resource_type{on_status = {Mod, OnStatus}}} ->
_ = fetch_resource_status(Mod, OnStatus, ResId);
_ -> ok
end
end, emqx_rule_registry:get_resources()).
%%------------------------------------------------------------------------------
%% Internal Functions
%%------------------------------------------------------------------------------
prepare_actions(Actions, NeedInit) ->
[prepare_action(Action, NeedInit) || Action <- Actions].
prepare_action(#{name := Name, args := Args0} = Action, NeedInit) ->
case emqx_rule_registry:find_action(Name) of
{ok, #action{module = Mod, on_create = Create, params_spec = ParamSpec}} ->
Args = emqx_rule_validator:validate_params(Args0, ParamSpec),
ActionInstId = maps:get(id, Action, action_instance_id(Name)),
case NeedInit of
true ->
_ = ?CLUSTER_CALL(init_action, [Mod, Create, ActionInstId,
with_resource_params(Args)]),
ok;
false -> ok
end,
#action_instance{
id = ActionInstId, name = Name, args = Args,
fallbacks = prepare_actions(maps:get(fallbacks, Action, []), NeedInit)
};
not_found ->
throw({action_not_found, Name})
end.
with_resource_params(Args = #{<<"$resource">> := ResId}) ->
case emqx_rule_registry:find_resource_params(ResId) of
{ok, #resource_params{params = Params}} ->
maps:merge(Args, Params);
not_found ->
throw({resource_not_initialized, ResId})
end;
with_resource_params(Args) -> Args.
-dialyzer([{nowarn_function, may_update_rule_params/2}]).
may_update_rule_params(Rule, Params = #{rawsql := SQL}) ->
case emqx_rule_sqlparser:parse_select(SQL) of
{ok, Select} ->
may_update_rule_params(
Rule#rule{
rawsql = SQL,
for = emqx_rule_sqlparser:select_from(Select),
is_foreach = emqx_rule_sqlparser:select_is_foreach(Select),
fields = emqx_rule_sqlparser:select_fields(Select),
doeach = emqx_rule_sqlparser:select_doeach(Select),
incase = emqx_rule_sqlparser:select_incase(Select),
conditions = emqx_rule_sqlparser:select_where(Select)
},
maps:remove(rawsql, Params));
Reason -> throw(Reason)
end;
may_update_rule_params(Rule = #rule{enabled = OldEnb, actions = Actions, state = OldState},
Params = #{enabled := NewEnb}) ->
State = case {OldEnb, NewEnb} of
{false, true} ->
_ = ?CLUSTER_CALL(refresh_rule, [Rule]),
force_changed;
{true, false} ->
_ = ?CLUSTER_CALL(clear_actions, [Actions]),
force_changed;
_NoChange -> OldState
end,
may_update_rule_params(Rule#rule{enabled = NewEnb, state = State}, maps:remove(enabled, Params));
may_update_rule_params(Rule, Params = #{description := Descr}) ->
may_update_rule_params(Rule#rule{description = Descr}, maps:remove(description, Params));
may_update_rule_params(Rule, Params = #{on_action_failed := OnFailed}) ->
may_update_rule_params(Rule#rule{on_action_failed = OnFailed},
maps:remove(on_action_failed, Params));
may_update_rule_params(Rule = #rule{actions = OldActions}, Params = #{actions := Actions}) ->
%% prepare new actions before removing old ones
NewActions = prepare_actions(Actions, maps:get(enabled, Params, true)),
_ = ?CLUSTER_CALL(clear_actions, [OldActions]),
may_update_rule_params(Rule#rule{actions = NewActions}, maps:remove(actions, Params));
may_update_rule_params(Rule, _Params) -> %% ignore all the unsupported params
Rule.
ignore_lib_apps(Apps) ->
LibApps = [kernel, stdlib, sasl, appmon, eldap, erts,
syntax_tools, ssl, crypto, mnesia, os_mon,
inets, goldrush, gproc, runtime_tools,
snmp, otp_mibs, public_key, asn1, ssh, hipe,
common_test, observer, webtool, xmerl, tools,
test_server, compiler, debugger, eunit, et,
wx],
[AppName || {AppName, _, _} <- Apps, not lists:member(AppName, LibApps)].
resource_id() ->
gen_id("resource:", fun emqx_rule_registry:find_resource/1).
rule_id() ->
gen_id("rule:", fun emqx_rule_registry:get_rule/1).
gen_id(Prefix, TestFun) ->
Id = iolist_to_binary([Prefix, emqx_rule_id:gen()]),
case TestFun(Id) of
not_found -> Id;
_Res -> gen_id(Prefix, TestFun)
end.
action_instance_id(ActionName) ->
iolist_to_binary([atom_to_list(ActionName), "_", integer_to_list(erlang:system_time())]).
init_resource(Module, OnCreate, ResId, Config) ->
Params = ?RAISE(Module:OnCreate(ResId, Config),
{{Module, OnCreate}, {_EXCLASS_, _EXCPTION_, _ST_}}),
ResParams = #resource_params{id = ResId,
params = Params,
status = #{is_alive => true}},
emqx_rule_registry:add_resource_params(ResParams).
init_action(Module, OnCreate, ActionInstId, Params) ->
ok = emqx_rule_metrics:create_metrics(ActionInstId),
case ?RAISE(Module:OnCreate(ActionInstId, Params),
{{init_action_failure, node()},
{{Module, OnCreate}, {_EXCLASS_, _EXCPTION_, _ST_}}}) of
{Apply, NewParams} when is_function(Apply) -> %% BACKW: =< e4.2.2
ok = emqx_rule_registry:add_action_instance_params(
#action_instance_params{id = ActionInstId, params = NewParams, apply = Apply});
{Bindings, NewParams} when is_list(Bindings) ->
ok = emqx_rule_registry:add_action_instance_params(
#action_instance_params{
id = ActionInstId, params = NewParams,
apply = #{mod => Module, bindings => maps:from_list(Bindings)}});
Apply when is_function(Apply) -> %% BACKW: =< e4.2.2
ok = emqx_rule_registry:add_action_instance_params(
#action_instance_params{id = ActionInstId, params = Params, apply = Apply})
end.
clear_resource(_Module, undefined, ResId) ->
ok = emqx_rule_registry:remove_resource_params(ResId);
clear_resource(Module, Destroy, ResId) ->
case emqx_rule_registry:find_resource_params(ResId) of
{ok, #resource_params{params = Params}} ->
?RAISE(Module:Destroy(ResId, Params),
{{destroy_resource_failure, node()}, {{Module, Destroy}, {_EXCLASS_,_EXCPTION_,_ST_}}}),
ok = emqx_rule_registry:remove_resource_params(ResId);
not_found ->
ok
end.
clear_rule(#rule{id = RuleId, actions = Actions}) ->
clear_actions(Actions),
emqx_rule_metrics:clear_rule_metrics(RuleId),
ok.
clear_actions(Actions) ->
lists:foreach(
fun(#action_instance{id = Id, name = ActName, fallbacks = Fallbacks}) ->
{ok, #action{module = Mod, on_destroy = Destory}} = emqx_rule_registry:find_action(ActName),
clear_action(Mod, Destory, Id),
clear_actions(Fallbacks)
end, Actions).
clear_action(_Module, undefined, ActionInstId) ->
emqx_rule_metrics:clear_metrics(ActionInstId),
ok = emqx_rule_registry:remove_action_instance_params(ActionInstId);
clear_action(Module, Destroy, ActionInstId) ->
case erlang:function_exported(Module, Destroy, 2) of
true ->
emqx_rule_metrics:clear_metrics(ActionInstId),
case emqx_rule_registry:get_action_instance_params(ActionInstId) of
{ok, #action_instance_params{params = Params}} ->
?RAISE(Module:Destroy(ActionInstId, Params),{{destroy_action_failure, node()},
{{Module, Destroy}, {_EXCLASS_,_EXCPTION_,_ST_}}}),
ok = emqx_rule_registry:remove_action_instance_params(ActionInstId);
not_found ->
ok
end;
false -> ok
end.
fetch_resource_status(Module, OnStatus, ResId) ->
case emqx_rule_registry:find_resource_params(ResId) of
{ok, ResParams = #resource_params{params = Params, status = #{is_alive := LastIsAlive}}} ->
NewStatus = try
case Module:OnStatus(ResId, Params) of
#{is_alive := LastIsAlive} = Status -> Status;
#{is_alive := true} = Status ->
{ok, Type} = find_type(ResId),
Name = alarm_name_of_resource_down(Type, ResId),
emqx_alarm:deactivate(Name),
Status;
#{is_alive := false} = Status ->
{ok, Type} = find_type(ResId),
Name = alarm_name_of_resource_down(Type, ResId),
emqx_alarm:activate(Name, #{id => ResId, type => Type}),
Status
end
catch _Error:Reason:STrace ->
?LOG(error, "get resource status for ~p failed: ~0p", [ResId, {Reason, STrace}]),
#{is_alive => false}
end,
emqx_rule_registry:add_resource_params(ResParams#resource_params{status = NewStatus}),
NewStatus;
not_found ->
#{is_alive => false}
end.
refresh_actions_of_a_resource(ResId) ->
R = fun (#action_instance{args = #{<<"$resource">> := ResId0}})
when ResId0 =:= ResId -> true;
(_) -> false
end,
F = fun(#rule{actions = Actions}) -> refresh_actions(Actions, R) end,
lists:foreach(F, emqx_rule_registry:get_rules()).
refresh_actions(Actions) ->
refresh_actions(Actions, fun(_) -> true end).
refresh_actions(Actions, Pred) ->
lists:foreach(
fun(#action_instance{args = Args,
id = Id, name = ActName,
fallbacks = Fallbacks} = ActionInst) ->
case Pred(ActionInst) of
true ->
{ok, #action{module = Mod, on_create = Create}}
= emqx_rule_registry:find_action(ActName),
_ = init_action(Mod, Create, Id, with_resource_params(Args)),
refresh_actions(Fallbacks, Pred);
false -> ok
end
end, Actions).
find_type(ResId) ->
{ok, #resource{type = Type}} = emqx_rule_registry:find_resource(ResId),
{ok, Type}.
alarm_name_of_resource_down(Type, ResId) ->
list_to_binary(io_lib:format("resource/~s/~s/down", [Type, ResId])).