%%-------------------------------------------------------------------- %% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. %% You may obtain a copy of the License at %% %% http://www.apache.org/licenses/LICENSE-2.0 %% %% Unless required by applicable law or agreed to in writing, software %% distributed under the License is distributed on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. %%-------------------------------------------------------------------- -module(emqx_rule_registry). -behaviour(gen_server). -include("rule_engine.hrl"). -include_lib("emqx/include/logger.hrl"). -include_lib("stdlib/include/qlc.hrl"). -export([start_link/0]). %% Rule Management -export([ get_rules/0 , get_rules_for/1 , get_rules_with_same_event/1 , get_rules_ordered_by_ts/0 , get_rule/1 , add_rule/1 , add_rules/1 , remove_rule/1 , remove_rules/1 ]). %% Action Management -export([ add_action/1 , add_actions/1 , get_actions/0 , find_action/1 , remove_action/1 , remove_actions/1 , remove_actions_of/1 , add_action_instance_params/1 , get_action_instance_params/1 , remove_action_instance_params/1 ]). %% Resource Management -export([ get_resources/0 , add_resource/1 , add_resource_params/1 , find_resource/1 , find_resource_params/1 , get_resources_by_type/1 , remove_resource/1 , remove_resource_params/1 ]). %% Resource Types -export([ get_resource_types/0 , find_resource_type/1 , find_rules_depends_on_resource/1 , find_enabled_rules_depends_on_resource/1 , register_resource_types/1 , unregister_resource_types_of/1 ]). -export([ load_hooks_for_rule/1 , unload_hooks_for_rule/1 ]). %% for debug purposes -export([dump/0]). %% gen_server Callbacks -export([ init/1 , handle_call/3 , handle_cast/2 , handle_info/2 , terminate/2 , code_change/3 ]). %% Mnesia bootstrap -export([mnesia/1]). -boot_mnesia({mnesia, [boot]}). -copy_mnesia({mnesia, [copy]}). -define(REGISTRY, ?MODULE). -define(T_CALL, 10000). %%------------------------------------------------------------------------------ %% Mnesia bootstrap %%------------------------------------------------------------------------------ %% @doc Create or replicate tables. -spec(mnesia(boot | copy) -> ok). mnesia(boot) -> %% Optimize storage StoreProps = [{ets, [{read_concurrency, true}]}], %% Rule table ok = ekka_mnesia:create_table(?RULE_TAB, [ {disc_copies, [node()]}, {record_name, rule}, {index, [#rule.for]}, {attributes, record_info(fields, rule)}, {storage_properties, StoreProps}]), %% Rule action table ok = ekka_mnesia:create_table(?ACTION_TAB, [ {ram_copies, [node()]}, {record_name, action}, {index, [#action.for, #action.app]}, {attributes, record_info(fields, action)}, {storage_properties, StoreProps}]), %% Resource table ok = ekka_mnesia:create_table(?RES_TAB, [ {disc_copies, [node()]}, {record_name, resource}, {index, [#resource.type]}, {attributes, record_info(fields, resource)}, {storage_properties, StoreProps}]), %% Resource type table ok = ekka_mnesia:create_table(?RES_TYPE_TAB, [ {ram_copies, [node()]}, {record_name, resource_type}, {index, [#resource_type.provider]}, {attributes, record_info(fields, resource_type)}, {storage_properties, StoreProps}]); mnesia(copy) -> %% Copy rule table ok = ekka_mnesia:copy_table(?RULE_TAB, disc_copies), %% Copy rule action table ok = ekka_mnesia:copy_table(?ACTION_TAB, ram_copies), %% Copy resource table ok = ekka_mnesia:copy_table(?RES_TAB, disc_copies), %% Copy resource type table ok = ekka_mnesia:copy_table(?RES_TYPE_TAB, ram_copies). dump() -> io:format("Rules: ~p~n" "ActionInstParams: ~p~n" "Resources: ~p~n" "ResourceParams: ~p~n", [ets:tab2list(?RULE_TAB), ets:tab2list(?ACTION_INST_PARAMS_TAB), ets:tab2list(?RES_TAB), ets:tab2list(?RES_PARAMS_TAB)]). %%------------------------------------------------------------------------------ %% Start the registry %%------------------------------------------------------------------------------ -spec(start_link() -> {ok, pid()} | ignore | {error, Reason :: term()}). start_link() -> gen_server:start_link({local, ?REGISTRY}, ?MODULE, [], []). %%------------------------------------------------------------------------------ %% Rule Management %%------------------------------------------------------------------------------ -spec(get_rules() -> list(emqx_rule_engine:rule())). get_rules() -> get_all_records(?RULE_TAB). get_rules_ordered_by_ts() -> F = fun() -> Query = qlc:q([E || E <- mnesia:table(?RULE_TAB)]), qlc:e(qlc:keysort(#rule.created_at, Query, [{order, ascending}])) end, {atomic, List} = mnesia:transaction(F), List. -spec(get_rules_for(Topic :: binary()) -> list(emqx_rule_engine:rule())). get_rules_for(Topic) -> [Rule || Rule = #rule{for = For} <- get_rules(), emqx_rule_utils:can_topic_match_oneof(Topic, For)]. -spec(get_rules_with_same_event(Topic :: binary()) -> list(emqx_rule_engine:rule())). get_rules_with_same_event(Topic) -> EventName = emqx_rule_events:event_name(Topic), [Rule || Rule = #rule{for = For} <- get_rules(), lists:any(fun(T) -> is_of_event_name(EventName, T) end, For)]. is_of_event_name(EventName, Topic) -> EventName =:= emqx_rule_events:event_name(Topic). -spec(get_rule(Id :: rule_id()) -> {ok, emqx_rule_engine:rule()} | not_found). get_rule(Id) -> case mnesia:dirty_read(?RULE_TAB, Id) of [Rule] -> {ok, Rule}; [] -> not_found end. -spec(add_rule(emqx_rule_engine:rule()) -> ok). add_rule(Rule) when is_record(Rule, rule) -> add_rules([Rule]). -spec(add_rules(list(emqx_rule_engine:rule())) -> ok). add_rules(Rules) -> gen_server:call(?REGISTRY, {add_rules, Rules}, ?T_CALL). -spec(remove_rule(emqx_rule_engine:rule() | rule_id()) -> ok). remove_rule(RuleOrId) -> remove_rules([RuleOrId]). -spec(remove_rules(list(emqx_rule_engine:rule()) | list(rule_id())) -> ok). remove_rules(Rules) -> gen_server:call(?REGISTRY, {remove_rules, Rules}, ?T_CALL). %% @private insert_rule(Rule) -> _ = ?CLUSTER_CALL(load_hooks_for_rule, [Rule]), mnesia:write(?RULE_TAB, Rule, write). %% @private delete_rule(RuleId) when is_binary(RuleId) -> case get_rule(RuleId) of {ok, Rule} -> delete_rule(Rule); not_found -> ok end; delete_rule(Rule) -> _ = ?CLUSTER_CALL(unload_hooks_for_rule, [Rule]), mnesia:delete_object(?RULE_TAB, Rule, write). load_hooks_for_rule(#rule{for = Topics}) -> lists:foreach(fun emqx_rule_events:load/1, Topics). unload_hooks_for_rule(#rule{id = Id, for = Topics}) -> lists:foreach(fun(Topic) -> case get_rules_with_same_event(Topic) of [#rule{id = Id}] -> %% we are now deleting the last rule emqx_rule_events:unload(Topic); _ -> ok end end, Topics). %%------------------------------------------------------------------------------ %% Action Management %%------------------------------------------------------------------------------ %% @doc Get all actions. -spec(get_actions() -> list(emqx_rule_engine:action())). get_actions() -> get_all_records(?ACTION_TAB). %% @doc Find an action by name. -spec(find_action(Name :: action_name()) -> {ok, emqx_rule_engine:action()} | not_found). find_action(Name) -> case mnesia:dirty_read(?ACTION_TAB, Name) of [Action] -> {ok, Action}; [] -> not_found end. %% @doc Add an action. -spec(add_action(emqx_rule_engine:action()) -> ok). add_action(Action) when is_record(Action, action) -> trans(fun insert_action/1, [Action]). %% @doc Add actions. -spec(add_actions(list(emqx_rule_engine:action())) -> ok). add_actions(Actions) when is_list(Actions) -> trans(fun lists:foreach/2, [fun insert_action/1, Actions]). %% @doc Remove an action. -spec(remove_action(emqx_rule_engine:action() | atom()) -> ok). remove_action(Action) when is_record(Action, action) -> trans(fun delete_action/1, [Action]); remove_action(Name) -> trans(fun mnesia:delete/1, [{?ACTION_TAB, Name}]). %% @doc Remove actions. -spec(remove_actions(list(emqx_rule_engine:action())) -> ok). remove_actions(Actions) -> trans(fun lists:foreach/2, [fun delete_action/1, Actions]). %% @doc Remove actions of the App. -spec(remove_actions_of(App :: atom()) -> ok). remove_actions_of(App) -> trans(fun() -> lists:foreach(fun delete_action/1, mnesia:index_read(?ACTION_TAB, App, #action.app)) end). %% @private insert_action(Action) -> mnesia:write(?ACTION_TAB, Action, write). %% @private delete_action(Action) when is_record(Action, action) -> mnesia:delete_object(?ACTION_TAB, Action, write); delete_action(Name) when is_atom(Name) -> mnesia:delete(?ACTION_TAB, Name, write). %% @doc Add an action instance params. -spec(add_action_instance_params(emqx_rule_engine:action_instance_params()) -> ok). add_action_instance_params(ActionInstParams) when is_record(ActionInstParams, action_instance_params) -> ets:insert(?ACTION_INST_PARAMS_TAB, ActionInstParams), ok. -spec(get_action_instance_params(action_instance_id()) -> {ok, emqx_rule_engine:action_instance_params()} | not_found). get_action_instance_params(ActionInstId) -> case ets:lookup(?ACTION_INST_PARAMS_TAB, ActionInstId) of [ActionInstParams] -> {ok, ActionInstParams}; [] -> not_found end. %% @doc Delete an action instance params. -spec(remove_action_instance_params(action_instance_id()) -> ok). remove_action_instance_params(ActionInstId) -> ets:delete(?ACTION_INST_PARAMS_TAB, ActionInstId), ok. %%------------------------------------------------------------------------------ %% Resource Management %%------------------------------------------------------------------------------ -spec(get_resources() -> list(emqx_rule_engine:resource())). get_resources() -> get_all_records(?RES_TAB). -spec(add_resource(emqx_rule_engine:resource()) -> ok). add_resource(Resource) when is_record(Resource, resource) -> trans(fun insert_resource/1, [Resource]). -spec(add_resource_params(emqx_rule_engine:resource_params()) -> ok). add_resource_params(ResParams) when is_record(ResParams, resource_params) -> ets:insert(?RES_PARAMS_TAB, ResParams), ok. -spec(find_resource(Id :: resource_id()) -> {ok, emqx_rule_engine:resource()} | not_found). find_resource(Id) -> case mnesia:dirty_read(?RES_TAB, Id) of [Res] -> {ok, Res}; [] -> not_found end. -spec(find_resource_params(Id :: resource_id()) -> {ok, emqx_rule_engine:resource_params()} | not_found). find_resource_params(Id) -> case ets:lookup(?RES_PARAMS_TAB, Id) of [ResParams] -> {ok, ResParams}; [] -> not_found end. -spec(remove_resource(emqx_rule_engine:resource() | emqx_rule_engine:resource_id()) -> ok | {error, term()}). remove_resource(Resource) when is_record(Resource, resource) -> trans(fun delete_resource/1, [Resource#resource.id]); remove_resource(ResId) when is_binary(ResId) -> trans(fun delete_resource/1, [ResId]). -spec(remove_resource_params(emqx_rule_engine:resource_id()) -> ok). remove_resource_params(ResId) -> ets:delete(?RES_PARAMS_TAB, ResId), ok. %% @private delete_resource(ResId) -> case find_enabled_rules_depends_on_resource(ResId) of [] -> mnesia:delete(?RES_TAB, ResId, write); Rules -> {error, {dependent_rules_exists, [Id || #rule{id = Id} <- Rules]}} end. %% @private insert_resource(Resource) -> mnesia:write(?RES_TAB, Resource, write). find_enabled_rules_depends_on_resource(ResId) -> [R || #rule{enabled = true} = R <- find_rules_depends_on_resource(ResId)]. find_rules_depends_on_resource(ResId) -> lists:foldl(fun(#rule{actions = Actions} = R, Rules) -> case search_action_despends_on_resource(ResId, Actions) of false -> Rules; {value, _} -> [R | Rules] end end, [], get_rules()). search_action_despends_on_resource(ResId, Actions) -> lists:search(fun (#action_instance{args = #{<<"$resource">> := ResId0}}) -> ResId0 =:= ResId; (_) -> false end, Actions). %%------------------------------------------------------------------------------ %% Resource Type Management %%------------------------------------------------------------------------------ -spec(get_resource_types() -> list(emqx_rule_engine:resource_type())). get_resource_types() -> get_all_records(?RES_TYPE_TAB). -spec(find_resource_type(Name :: resource_type_name()) -> {ok, emqx_rule_engine:resource_type()} | not_found). find_resource_type(Name) -> case mnesia:dirty_read(?RES_TYPE_TAB, Name) of [ResType] -> {ok, ResType}; [] -> not_found end. -spec(get_resources_by_type(Type :: resource_type_name()) -> list(emqx_rule_engine:resource())). get_resources_by_type(Type) -> mnesia:dirty_index_read(?RES_TAB, Type, #resource.type). -spec(register_resource_types(list(emqx_rule_engine:resource_type())) -> ok). register_resource_types(Types) -> trans(fun lists:foreach/2, [fun insert_resource_type/1, Types]). %% @doc Unregister resource types of the App. -spec(unregister_resource_types_of(App :: atom()) -> ok). unregister_resource_types_of(App) -> trans(fun() -> lists:foreach(fun delete_resource_type/1, mnesia:index_read(?RES_TYPE_TAB, App, #resource_type.provider)) end). %% @private insert_resource_type(Type) -> mnesia:write(?RES_TYPE_TAB, Type, write). %% @private delete_resource_type(Type) -> mnesia:delete_object(?RES_TYPE_TAB, Type, write). %%------------------------------------------------------------------------------ %% gen_server callbacks %%------------------------------------------------------------------------------ init([]) -> _TableId = ets:new(?KV_TAB, [named_table, set, public, {write_concurrency, true}, {read_concurrency, true}]), {ok, #{}}. handle_call({add_rules, Rules}, _From, State) -> trans(fun lists:foreach/2, [fun insert_rule/1, Rules]), {reply, ok, State}; handle_call({remove_rules, Rules}, _From, State) -> trans(fun lists:foreach/2, [fun delete_rule/1, Rules]), {reply, ok, State}; handle_call(Req, _From, State) -> ?LOG(error, "[RuleRegistry]: unexpected call - ~p", [Req]), {reply, ignored, State}. handle_cast(Msg, State) -> ?LOG(error, "[RuleRegistry]: unexpected cast ~p", [Msg]), {noreply, State}. handle_info(Info, State) -> ?LOG(error, "[RuleRegistry]: unexpected info ~p", [Info]), {noreply, State}. terminate(_Reason, _State) -> ok. code_change(_OldVsn, State, _Extra) -> {ok, State}. %%------------------------------------------------------------------------------ %% Private functions %%------------------------------------------------------------------------------ get_all_records(Tab) -> %mnesia:dirty_match_object(Tab, mnesia:table_info(Tab, wild_pattern)). ets:tab2list(Tab). trans(Fun) -> trans(Fun, []). trans(Fun, Args) -> case mnesia:transaction(Fun, Args) of {atomic, Result} -> Result; {aborted, Reason} -> error(Reason) end.