%%-------------------------------------------------------------------- %% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. %% You may obtain a copy of the License at %% %% http://www.apache.org/licenses/LICENSE-2.0 %% %% Unless required by applicable law or agreed to in writing, software %% distributed under the License is distributed on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. %%-------------------------------------------------------------------- -module(emqx_rule_engine). -behaviour(gen_server). -behaviour(emqx_config_handler). -include("rule_engine.hrl"). -include_lib("emqx/include/logger.hrl"). -include_lib("stdlib/include/qlc.hrl"). -export([start_link/0]). -export([ post_config_update/5, config_key_path/0 ]). %% Rule Management -export([load_rules/0]). -export([ create_rule/1, insert_rule/1, update_rule/1, delete_rule/1, get_rule/1 ]). -export([ get_rules/0, get_rules_for_topic/1, get_rules_with_same_event/1, get_rule_ids_by_action/1, ensure_action_removed/2, get_rules_ordered_by_ts/0 ]). %% exported for cluster_call -export([ do_delete_rule/1, do_insert_rule/1 ]). -export([ load_hooks_for_rule/1, unload_hooks_for_rule/1, maybe_add_metrics_for_rule/1, clear_metrics_for_rule/1, reset_metrics_for_rule/1 ]). %% exported for `emqx_telemetry' -export([get_basic_usage_info/0]). -export([now_ms/0]). %% gen_server Callbacks -export([ init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3 ]). -define(RULE_ENGINE, ?MODULE). -define(T_CALL, infinity). %% NOTE: This order cannot be changed! This is to make the metric working during relup. %% Append elements to this list to add new metrics. -define(METRICS, [ 'matched', 'passed', 'failed', 'failed.exception', 'failed.no_result', 'actions.total', 'actions.success', 'actions.failed', 'actions.failed.out_of_service', 'actions.failed.unknown' ]). -define(RATE_METRICS, ['matched']). -type action_name() :: binary() | #{function := binary()}. config_key_path() -> [rule_engine, rules]. -spec start_link() -> {ok, pid()} | ignore | {error, Reason :: term()}. start_link() -> gen_server:start_link({local, ?RULE_ENGINE}, ?MODULE, [], []). %%------------------------------------------------------------------------------ %% The config handler for emqx_rule_engine %%------------------------------------------------------------------------------ post_config_update(_, _Req, NewRules, OldRules, _AppEnvs) -> #{added := Added, removed := Removed, changed := Updated} = emqx_map_lib:diff_maps(NewRules, OldRules), maps_foreach( fun({Id, {_Old, New}}) -> {ok, _} = update_rule(New#{id => bin(Id)}) end, Updated ), maps_foreach( fun({Id, _Rule}) -> ok = delete_rule(bin(Id)) end, Removed ), maps_foreach( fun({Id, Rule}) -> {ok, _} = create_rule(Rule#{id => bin(Id)}) end, Added ), {ok, get_rules()}. %%------------------------------------------------------------------------------ %% APIs for rules %%------------------------------------------------------------------------------ -spec load_rules() -> ok. load_rules() -> maps_foreach( fun ({Id, #{metadata := #{created_at := CreatedAt}} = Rule}) -> create_rule(Rule#{id => bin(Id)}, CreatedAt); ({Id, Rule}) -> create_rule(Rule#{id => bin(Id)}) end, emqx:get_config([rule_engine, rules], #{}) ). -spec create_rule(map()) -> {ok, rule()} | {error, term()}. create_rule(Params) -> create_rule(Params, now_ms()). create_rule(Params = #{id := RuleId}, CreatedAt) when is_binary(RuleId) -> case get_rule(RuleId) of not_found -> parse_and_insert(Params, CreatedAt); {ok, _} -> {error, already_exists} end. -spec update_rule(map()) -> {ok, rule()} | {error, term()}. update_rule(Params = #{id := RuleId}) when is_binary(RuleId) -> case get_rule(RuleId) of not_found -> {error, not_found}; {ok, #{created_at := CreatedAt}} -> parse_and_insert(Params, CreatedAt) end. -spec delete_rule(RuleId :: rule_id()) -> ok. delete_rule(RuleId) when is_binary(RuleId) -> gen_server:call(?RULE_ENGINE, {delete_rule, RuleId}, ?T_CALL). -spec insert_rule(Rule :: rule()) -> ok. insert_rule(Rule) -> gen_server:call(?RULE_ENGINE, {insert_rule, Rule}, ?T_CALL). %%------------------------------------------------------------------------------ %% Rule Management %%------------------------------------------------------------------------------ -spec get_rules() -> [rule()]. get_rules() -> get_all_records(?RULE_TAB). get_rules_ordered_by_ts() -> lists:sort( fun(#{created_at := CreatedA}, #{created_at := CreatedB}) -> CreatedA =< CreatedB end, get_rules() ). -spec get_rules_for_topic(Topic :: binary()) -> [rule()]. get_rules_for_topic(Topic) -> [ Rule || Rule = #{from := From} <- get_rules(), emqx_plugin_libs_rule:can_topic_match_oneof(Topic, From) ]. -spec get_rules_with_same_event(Topic :: binary()) -> [rule()]. get_rules_with_same_event(Topic) -> EventName = emqx_rule_events:event_name(Topic), [ Rule || Rule = #{from := From} <- get_rules(), lists:any(fun(T) -> is_of_event_name(EventName, T) end, From) ]. -spec get_rule_ids_by_action(action_name()) -> [rule_id()]. get_rule_ids_by_action(ActionName) when is_binary(ActionName) -> [ Id || #{actions := Acts, id := Id} <- get_rules(), lists:any(fun(A) -> A =:= ActionName end, Acts) ]; get_rule_ids_by_action(#{function := FuncName}) when is_binary(FuncName) -> {Mod, Fun} = case string:split(FuncName, ":", leading) of [M, F] -> {binary_to_module(M), F}; [F] -> {emqx_rule_actions, F} end, [ Id || #{actions := Acts, id := Id} <- get_rules(), contains_actions(Acts, Mod, Fun) ]. -spec ensure_action_removed(rule_id(), action_name()) -> ok. ensure_action_removed(RuleId, ActionName) -> FilterFunc = fun (Func, Func) -> false; (#{<<"function">> := Func}, #{function := Func}) -> false; (_, _) -> true end, case emqx:get_raw_config([rule_engine, rules, RuleId], not_found) of not_found -> ok; #{<<"actions">> := Acts} -> NewActs = [AName || AName <- Acts, FilterFunc(AName, ActionName)], {ok, _} = emqx_conf:update( emqx_rule_engine:config_key_path() ++ [RuleId, actions], NewActs, #{override_to => cluster} ), ok end. is_of_event_name(EventName, Topic) -> EventName =:= emqx_rule_events:event_name(Topic). -spec get_rule(Id :: rule_id()) -> {ok, rule()} | not_found. get_rule(Id) -> case ets:lookup(?RULE_TAB, Id) of [{Id, Rule}] -> {ok, Rule#{id => Id}}; [] -> not_found end. load_hooks_for_rule(#{from := Topics}) -> lists:foreach(fun emqx_rule_events:load/1, Topics). maybe_add_metrics_for_rule(Id) -> case emqx_metrics_worker:has_metrics(rule_metrics, Id) of true -> ok = reset_metrics_for_rule(Id); false -> ok = emqx_metrics_worker:create_metrics(rule_metrics, Id, ?METRICS, ?RATE_METRICS) end. clear_metrics_for_rule(Id) -> ok = emqx_metrics_worker:clear_metrics(rule_metrics, Id). -spec reset_metrics_for_rule(rule_id()) -> ok. reset_metrics_for_rule(Id) -> emqx_metrics_worker:reset_metrics(rule_metrics, Id). unload_hooks_for_rule(#{id := Id, from := Topics}) -> lists:foreach( fun(Topic) -> case get_rules_with_same_event(Topic) of %% we are now deleting the last rule [#{id := Id0}] when Id0 == Id -> emqx_rule_events:unload(Topic); _ -> ok end end, Topics ). %%------------------------------------------------------------------------------ %% Telemetry helper functions %%------------------------------------------------------------------------------ -spec get_basic_usage_info() -> #{ num_rules => non_neg_integer(), referenced_bridges => #{BridgeType => non_neg_integer()} } when BridgeType :: atom(). get_basic_usage_info() -> try Rules = get_rules(), EnabledRules = lists:filter( fun(#{enable := Enabled}) -> Enabled end, Rules ), NumRules = length(EnabledRules), ReferencedBridges = lists:foldl( fun(#{actions := Actions, from := From}, Acc) -> BridgeIDs0 = [BridgeID || <<"$bridges/", BridgeID/binary>> <- From], BridgeIDs1 = lists:filter(fun is_binary/1, Actions), tally_referenced_bridges(BridgeIDs0 ++ BridgeIDs1, Acc) end, #{}, EnabledRules ), #{ num_rules => NumRules, referenced_bridges => ReferencedBridges } catch _:_ -> #{ num_rules => 0, referenced_bridges => #{} } end. tally_referenced_bridges(BridgeIDs, Acc0) -> lists:foldl( fun(BridgeID, Acc) -> {BridgeType, _BridgeName} = emqx_bridge_resource:parse_bridge_id(BridgeID), maps:update_with( BridgeType, fun(X) -> X + 1 end, 1, Acc ) end, Acc0, BridgeIDs ). %%------------------------------------------------------------------------------ %% gen_server callbacks %%------------------------------------------------------------------------------ init([]) -> _TableId = ets:new(?KV_TAB, [ named_table, set, public, {write_concurrency, true}, {read_concurrency, true} ]), ok = emqx_config_handler:add_handler( [rule_engine, jq_implementation_module], emqx_rule_engine_schema ), {ok, #{}}. handle_call({insert_rule, Rule}, _From, State) -> do_insert_rule(Rule), {reply, ok, State}; handle_call({delete_rule, Rule}, _From, State) -> do_delete_rule(Rule), {reply, ok, State}; handle_call(Req, _From, State) -> ?SLOG(error, #{msg => "unexpected_call", request => Req}), {reply, ignored, State}. handle_cast(Msg, State) -> ?SLOG(error, #{msg => "unexpected_cast", request => Msg}), {noreply, State}. handle_info(Info, State) -> ?SLOG(error, #{msg => "unexpected_info", request => Info}), {noreply, State}. terminate(_Reason, _State) -> ok. code_change(_OldVsn, State, _Extra) -> {ok, State}. %%------------------------------------------------------------------------------ %% Internal Functions %%------------------------------------------------------------------------------ parse_and_insert(Params = #{id := RuleId, sql := Sql, actions := Actions}, CreatedAt) -> case emqx_rule_sqlparser:parse(Sql) of {ok, Select} -> Rule = #{ id => RuleId, name => maps:get(name, Params, <<"">>), created_at => CreatedAt, updated_at => now_ms(), enable => maps:get(enable, Params, true), sql => Sql, actions => parse_actions(Actions), description => maps:get(description, Params, ""), %% -- calculated fields: from => emqx_rule_sqlparser:select_from(Select), is_foreach => emqx_rule_sqlparser:select_is_foreach(Select), fields => emqx_rule_sqlparser:select_fields(Select), doeach => emqx_rule_sqlparser:select_doeach(Select), incase => emqx_rule_sqlparser:select_incase(Select), conditions => emqx_rule_sqlparser:select_where(Select) %% -- calculated fields end }, ok = insert_rule(Rule), {ok, Rule}; {error, Reason} -> {error, Reason} end. do_insert_rule(#{id := Id} = Rule) -> ok = load_hooks_for_rule(Rule), ok = maybe_add_metrics_for_rule(Id), true = ets:insert(?RULE_TAB, {Id, maps:remove(id, Rule)}), ok. do_delete_rule(RuleId) -> case get_rule(RuleId) of {ok, Rule} -> ok = unload_hooks_for_rule(Rule), ok = clear_metrics_for_rule(RuleId), true = ets:delete(?RULE_TAB, RuleId), ok; not_found -> ok end. parse_actions(Actions) -> [do_parse_action(Act) || Act <- Actions]. do_parse_action(Action) when is_map(Action) -> emqx_rule_actions:parse_action(Action); do_parse_action(BridgeChannelId) when is_binary(BridgeChannelId) -> BridgeChannelId. get_all_records(Tab) -> [Rule#{id => Id} || {Id, Rule} <- ets:tab2list(Tab)]. maps_foreach(Fun, Map) -> lists:foreach(Fun, maps:to_list(Map)). now_ms() -> erlang:system_time(millisecond). bin(A) when is_atom(A) -> atom_to_binary(A, utf8); bin(B) when is_binary(B) -> B. binary_to_module(ModName) -> try binary_to_existing_atom(ModName, utf8) catch error:badarg -> not_exist_mod end. contains_actions(Actions, Mod0, Func0) -> lists:any( fun (#{mod := Mod, func := Func}) when Mod =:= Mod0; Func =:= Func0 -> true; (_) -> false end, Actions ).