482 lines
16 KiB
Erlang
482 lines
16 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(emqx_rule_registry).
|
|
|
|
-behaviour(gen_server).
|
|
|
|
-include("rule_engine.hrl").
|
|
-include_lib("emqx/include/logger.hrl").
|
|
-include_lib("stdlib/include/qlc.hrl").
|
|
|
|
-export([start_link/0]).
|
|
|
|
%% Rule Management
|
|
-export([ get_rules/0
|
|
, get_rules_for/1
|
|
, get_rules_with_same_event/1
|
|
, get_rules_ordered_by_ts/0
|
|
, get_rule/1
|
|
, add_rule/1
|
|
, add_rules/1
|
|
, remove_rule/1
|
|
, remove_rules/1
|
|
]).
|
|
|
|
%% Action Management
|
|
-export([ add_action/1
|
|
, add_actions/1
|
|
, get_actions/0
|
|
, find_action/1
|
|
, remove_action/1
|
|
, remove_actions/1
|
|
, remove_actions_of/1
|
|
, add_action_instance_params/1
|
|
, get_action_instance_params/1
|
|
, remove_action_instance_params/1
|
|
]).
|
|
|
|
%% Resource Management
|
|
-export([ get_resources/0
|
|
, add_resource/1
|
|
, add_resource_params/1
|
|
, find_resource/1
|
|
, find_resource_params/1
|
|
, get_resources_by_type/1
|
|
, remove_resource/1
|
|
, remove_resource_params/1
|
|
]).
|
|
|
|
%% Resource Types
|
|
-export([ get_resource_types/0
|
|
, find_resource_type/1
|
|
, find_rules_depends_on_resource/1
|
|
, find_enabled_rules_depends_on_resource/1
|
|
, register_resource_types/1
|
|
, unregister_resource_types_of/1
|
|
]).
|
|
|
|
-export([ load_hooks_for_rule/1
|
|
, unload_hooks_for_rule/1
|
|
]).
|
|
|
|
%% for debug purposes
|
|
-export([dump/0]).
|
|
|
|
%% gen_server Callbacks
|
|
-export([ init/1
|
|
, handle_call/3
|
|
, handle_cast/2
|
|
, handle_info/2
|
|
, terminate/2
|
|
, code_change/3
|
|
]).
|
|
|
|
%% Mnesia bootstrap
|
|
-export([mnesia/1]).
|
|
|
|
-boot_mnesia({mnesia, [boot]}).
|
|
-copy_mnesia({mnesia, [copy]}).
|
|
|
|
-define(REGISTRY, ?MODULE).
|
|
|
|
-define(T_CALL, 10000).
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% Mnesia bootstrap
|
|
%%------------------------------------------------------------------------------
|
|
|
|
%% @doc Create or replicate tables.
|
|
-spec(mnesia(boot | copy) -> ok).
|
|
mnesia(boot) ->
|
|
%% Optimize storage
|
|
StoreProps = [{ets, [{read_concurrency, true}]}],
|
|
%% Rule table
|
|
ok = ekka_mnesia:create_table(?RULE_TAB, [
|
|
{disc_copies, [node()]},
|
|
{record_name, rule},
|
|
{index, [#rule.for]},
|
|
{attributes, record_info(fields, rule)},
|
|
{storage_properties, StoreProps}]),
|
|
%% Rule action table
|
|
ok = ekka_mnesia:create_table(?ACTION_TAB, [
|
|
{ram_copies, [node()]},
|
|
{record_name, action},
|
|
{index, [#action.for, #action.app]},
|
|
{attributes, record_info(fields, action)},
|
|
{storage_properties, StoreProps}]),
|
|
%% Resource table
|
|
ok = ekka_mnesia:create_table(?RES_TAB, [
|
|
{disc_copies, [node()]},
|
|
{record_name, resource},
|
|
{index, [#resource.type]},
|
|
{attributes, record_info(fields, resource)},
|
|
{storage_properties, StoreProps}]),
|
|
%% Resource type table
|
|
ok = ekka_mnesia:create_table(?RES_TYPE_TAB, [
|
|
{ram_copies, [node()]},
|
|
{record_name, resource_type},
|
|
{index, [#resource_type.provider]},
|
|
{attributes, record_info(fields, resource_type)},
|
|
{storage_properties, StoreProps}]);
|
|
|
|
mnesia(copy) ->
|
|
%% Copy rule table
|
|
ok = ekka_mnesia:copy_table(?RULE_TAB, disc_copies),
|
|
%% Copy rule action table
|
|
ok = ekka_mnesia:copy_table(?ACTION_TAB, ram_copies),
|
|
%% Copy resource table
|
|
ok = ekka_mnesia:copy_table(?RES_TAB, disc_copies),
|
|
%% Copy resource type table
|
|
ok = ekka_mnesia:copy_table(?RES_TYPE_TAB, ram_copies).
|
|
|
|
dump() ->
|
|
io:format("Rules: ~p~n"
|
|
"ActionInstParams: ~p~n"
|
|
"Resources: ~p~n"
|
|
"ResourceParams: ~p~n",
|
|
[ets:tab2list(?RULE_TAB),
|
|
ets:tab2list(?ACTION_INST_PARAMS_TAB),
|
|
ets:tab2list(?RES_TAB),
|
|
ets:tab2list(?RES_PARAMS_TAB)]).
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% Start the registry
|
|
%%------------------------------------------------------------------------------
|
|
|
|
-spec(start_link() -> {ok, pid()} | ignore | {error, Reason :: term()}).
|
|
start_link() ->
|
|
gen_server:start_link({local, ?REGISTRY}, ?MODULE, [], []).
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% Rule Management
|
|
%%------------------------------------------------------------------------------
|
|
|
|
-spec(get_rules() -> list(emqx_rule_engine:rule())).
|
|
get_rules() ->
|
|
get_all_records(?RULE_TAB).
|
|
|
|
get_rules_ordered_by_ts() ->
|
|
F = fun() ->
|
|
Query = qlc:q([E || E <- mnesia:table(?RULE_TAB)]),
|
|
qlc:e(qlc:keysort(#rule.created_at, Query, [{order, ascending}]))
|
|
end,
|
|
{atomic, List} = mnesia:transaction(F),
|
|
List.
|
|
|
|
-spec(get_rules_for(Topic :: binary()) -> list(emqx_rule_engine:rule())).
|
|
get_rules_for(Topic) ->
|
|
[Rule || Rule = #rule{for = For} <- get_rules(),
|
|
emqx_rule_utils:can_topic_match_oneof(Topic, For)].
|
|
|
|
-spec(get_rules_with_same_event(Topic :: binary()) -> list(emqx_rule_engine:rule())).
|
|
get_rules_with_same_event(Topic) ->
|
|
EventName = emqx_rule_events:event_name(Topic),
|
|
[Rule || Rule = #rule{for = For} <- get_rules(),
|
|
lists:any(fun(T) -> is_of_event_name(EventName, T) end, For)].
|
|
|
|
is_of_event_name(EventName, Topic) ->
|
|
EventName =:= emqx_rule_events:event_name(Topic).
|
|
|
|
-spec(get_rule(Id :: rule_id()) -> {ok, emqx_rule_engine:rule()} | not_found).
|
|
get_rule(Id) ->
|
|
case mnesia:dirty_read(?RULE_TAB, Id) of
|
|
[Rule] -> {ok, Rule};
|
|
[] -> not_found
|
|
end.
|
|
|
|
-spec(add_rule(emqx_rule_engine:rule()) -> ok).
|
|
add_rule(Rule) when is_record(Rule, rule) ->
|
|
add_rules([Rule]).
|
|
|
|
-spec(add_rules(list(emqx_rule_engine:rule())) -> ok).
|
|
add_rules(Rules) ->
|
|
gen_server:call(?REGISTRY, {add_rules, Rules}, ?T_CALL).
|
|
|
|
-spec(remove_rule(emqx_rule_engine:rule() | rule_id()) -> ok).
|
|
remove_rule(RuleOrId) ->
|
|
remove_rules([RuleOrId]).
|
|
|
|
-spec(remove_rules(list(emqx_rule_engine:rule()) | list(rule_id())) -> ok).
|
|
remove_rules(Rules) ->
|
|
gen_server:call(?REGISTRY, {remove_rules, Rules}, ?T_CALL).
|
|
|
|
%% @private
|
|
insert_rule(Rule) ->
|
|
_ = ?CLUSTER_CALL(load_hooks_for_rule, [Rule]),
|
|
mnesia:write(?RULE_TAB, Rule, write).
|
|
|
|
%% @private
|
|
delete_rule(RuleId) when is_binary(RuleId) ->
|
|
case get_rule(RuleId) of
|
|
{ok, Rule} -> delete_rule(Rule);
|
|
not_found -> ok
|
|
end;
|
|
delete_rule(Rule) ->
|
|
_ = ?CLUSTER_CALL(unload_hooks_for_rule, [Rule]),
|
|
mnesia:delete_object(?RULE_TAB, Rule, write).
|
|
|
|
load_hooks_for_rule(#rule{for = Topics}) ->
|
|
lists:foreach(fun emqx_rule_events:load/1, Topics).
|
|
|
|
unload_hooks_for_rule(#rule{id = Id, for = Topics}) ->
|
|
lists:foreach(fun(Topic) ->
|
|
case get_rules_with_same_event(Topic) of
|
|
[#rule{id = Id}] -> %% we are now deleting the last rule
|
|
emqx_rule_events:unload(Topic);
|
|
_ -> ok
|
|
end
|
|
end, Topics).
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% Action Management
|
|
%%------------------------------------------------------------------------------
|
|
|
|
%% @doc Get all actions.
|
|
-spec(get_actions() -> list(emqx_rule_engine:action())).
|
|
get_actions() ->
|
|
get_all_records(?ACTION_TAB).
|
|
|
|
%% @doc Find an action by name.
|
|
-spec(find_action(Name :: action_name()) -> {ok, emqx_rule_engine:action()} | not_found).
|
|
find_action(Name) ->
|
|
case mnesia:dirty_read(?ACTION_TAB, Name) of
|
|
[Action] -> {ok, Action};
|
|
[] -> not_found
|
|
end.
|
|
|
|
%% @doc Add an action.
|
|
-spec(add_action(emqx_rule_engine:action()) -> ok).
|
|
add_action(Action) when is_record(Action, action) ->
|
|
trans(fun insert_action/1, [Action]).
|
|
|
|
%% @doc Add actions.
|
|
-spec(add_actions(list(emqx_rule_engine:action())) -> ok).
|
|
add_actions(Actions) when is_list(Actions) ->
|
|
trans(fun lists:foreach/2, [fun insert_action/1, Actions]).
|
|
|
|
%% @doc Remove an action.
|
|
-spec(remove_action(emqx_rule_engine:action() | atom()) -> ok).
|
|
remove_action(Action) when is_record(Action, action) ->
|
|
trans(fun delete_action/1, [Action]);
|
|
|
|
remove_action(Name) ->
|
|
trans(fun mnesia:delete/1, [{?ACTION_TAB, Name}]).
|
|
|
|
%% @doc Remove actions.
|
|
-spec(remove_actions(list(emqx_rule_engine:action())) -> ok).
|
|
remove_actions(Actions) ->
|
|
trans(fun lists:foreach/2, [fun delete_action/1, Actions]).
|
|
|
|
%% @doc Remove actions of the App.
|
|
-spec(remove_actions_of(App :: atom()) -> ok).
|
|
remove_actions_of(App) ->
|
|
trans(fun() ->
|
|
lists:foreach(fun delete_action/1, mnesia:index_read(?ACTION_TAB, App, #action.app))
|
|
end).
|
|
|
|
%% @private
|
|
insert_action(Action) ->
|
|
mnesia:write(?ACTION_TAB, Action, write).
|
|
|
|
%% @private
|
|
delete_action(Action) when is_record(Action, action) ->
|
|
mnesia:delete_object(?ACTION_TAB, Action, write);
|
|
delete_action(Name) when is_atom(Name) ->
|
|
mnesia:delete(?ACTION_TAB, Name, write).
|
|
|
|
%% @doc Add an action instance params.
|
|
-spec(add_action_instance_params(emqx_rule_engine:action_instance_params()) -> ok).
|
|
add_action_instance_params(ActionInstParams) when is_record(ActionInstParams, action_instance_params) ->
|
|
ets:insert(?ACTION_INST_PARAMS_TAB, ActionInstParams),
|
|
ok.
|
|
|
|
-spec(get_action_instance_params(action_instance_id()) -> {ok, emqx_rule_engine:action_instance_params()} | not_found).
|
|
get_action_instance_params(ActionInstId) ->
|
|
case ets:lookup(?ACTION_INST_PARAMS_TAB, ActionInstId) of
|
|
[ActionInstParams] -> {ok, ActionInstParams};
|
|
[] -> not_found
|
|
end.
|
|
|
|
%% @doc Delete an action instance params.
|
|
-spec(remove_action_instance_params(action_instance_id()) -> ok).
|
|
remove_action_instance_params(ActionInstId) ->
|
|
ets:delete(?ACTION_INST_PARAMS_TAB, ActionInstId),
|
|
ok.
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% Resource Management
|
|
%%------------------------------------------------------------------------------
|
|
|
|
-spec(get_resources() -> list(emqx_rule_engine:resource())).
|
|
get_resources() ->
|
|
get_all_records(?RES_TAB).
|
|
|
|
-spec(add_resource(emqx_rule_engine:resource()) -> ok).
|
|
add_resource(Resource) when is_record(Resource, resource) ->
|
|
trans(fun insert_resource/1, [Resource]).
|
|
|
|
-spec(add_resource_params(emqx_rule_engine:resource_params()) -> ok).
|
|
add_resource_params(ResParams) when is_record(ResParams, resource_params) ->
|
|
ets:insert(?RES_PARAMS_TAB, ResParams),
|
|
ok.
|
|
|
|
-spec(find_resource(Id :: resource_id()) -> {ok, emqx_rule_engine:resource()} | not_found).
|
|
find_resource(Id) ->
|
|
case mnesia:dirty_read(?RES_TAB, Id) of
|
|
[Res] -> {ok, Res};
|
|
[] -> not_found
|
|
end.
|
|
|
|
-spec(find_resource_params(Id :: resource_id())
|
|
-> {ok, emqx_rule_engine:resource_params()} | not_found).
|
|
find_resource_params(Id) ->
|
|
case ets:lookup(?RES_PARAMS_TAB, Id) of
|
|
[ResParams] -> {ok, ResParams};
|
|
[] -> not_found
|
|
end.
|
|
|
|
-spec(remove_resource(emqx_rule_engine:resource() | emqx_rule_engine:resource_id()) -> ok | {error, term()}).
|
|
remove_resource(Resource) when is_record(Resource, resource) ->
|
|
trans(fun delete_resource/1, [Resource#resource.id]);
|
|
|
|
remove_resource(ResId) when is_binary(ResId) ->
|
|
trans(fun delete_resource/1, [ResId]).
|
|
|
|
-spec(remove_resource_params(emqx_rule_engine:resource_id()) -> ok).
|
|
remove_resource_params(ResId) ->
|
|
ets:delete(?RES_PARAMS_TAB, ResId),
|
|
ok.
|
|
|
|
%% @private
|
|
delete_resource(ResId) ->
|
|
case find_enabled_rules_depends_on_resource(ResId) of
|
|
[] -> mnesia:delete(?RES_TAB, ResId, write);
|
|
Rules ->
|
|
{error, {dependent_rules_exists, [Id || #rule{id = Id} <- Rules]}}
|
|
end.
|
|
|
|
%% @private
|
|
insert_resource(Resource) ->
|
|
mnesia:write(?RES_TAB, Resource, write).
|
|
|
|
find_enabled_rules_depends_on_resource(ResId) ->
|
|
[R || #rule{enabled = true} = R <- find_rules_depends_on_resource(ResId)].
|
|
|
|
find_rules_depends_on_resource(ResId) ->
|
|
lists:foldl(fun(#rule{actions = Actions} = R, Rules) ->
|
|
case search_action_despends_on_resource(ResId, Actions) of
|
|
false -> Rules;
|
|
{value, _} -> [R | Rules]
|
|
end
|
|
end, [], get_rules()).
|
|
|
|
search_action_despends_on_resource(ResId, Actions) ->
|
|
lists:search(fun
|
|
(#action_instance{args = #{<<"$resource">> := ResId0}}) ->
|
|
ResId0 =:= ResId;
|
|
(_) ->
|
|
false
|
|
end, Actions).
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% Resource Type Management
|
|
%%------------------------------------------------------------------------------
|
|
|
|
-spec(get_resource_types() -> list(emqx_rule_engine:resource_type())).
|
|
get_resource_types() ->
|
|
get_all_records(?RES_TYPE_TAB).
|
|
|
|
-spec(find_resource_type(Name :: resource_type_name()) -> {ok, emqx_rule_engine:resource_type()} | not_found).
|
|
find_resource_type(Name) ->
|
|
case mnesia:dirty_read(?RES_TYPE_TAB, Name) of
|
|
[ResType] -> {ok, ResType};
|
|
[] -> not_found
|
|
end.
|
|
|
|
-spec(get_resources_by_type(Type :: resource_type_name()) -> list(emqx_rule_engine:resource())).
|
|
get_resources_by_type(Type) ->
|
|
mnesia:dirty_index_read(?RES_TAB, Type, #resource.type).
|
|
|
|
-spec(register_resource_types(list(emqx_rule_engine:resource_type())) -> ok).
|
|
register_resource_types(Types) ->
|
|
trans(fun lists:foreach/2, [fun insert_resource_type/1, Types]).
|
|
|
|
%% @doc Unregister resource types of the App.
|
|
-spec(unregister_resource_types_of(App :: atom()) -> ok).
|
|
unregister_resource_types_of(App) ->
|
|
trans(fun() ->
|
|
lists:foreach(fun delete_resource_type/1, mnesia:index_read(?RES_TYPE_TAB, App, #resource_type.provider))
|
|
end).
|
|
|
|
%% @private
|
|
insert_resource_type(Type) ->
|
|
mnesia:write(?RES_TYPE_TAB, Type, write).
|
|
|
|
%% @private
|
|
delete_resource_type(Type) ->
|
|
mnesia:delete_object(?RES_TYPE_TAB, Type, write).
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% gen_server callbacks
|
|
%%------------------------------------------------------------------------------
|
|
|
|
init([]) ->
|
|
_TableId = ets:new(?KV_TAB, [named_table, set, public, {write_concurrency, true},
|
|
{read_concurrency, true}]),
|
|
{ok, #{}}.
|
|
|
|
handle_call({add_rules, Rules}, _From, State) ->
|
|
trans(fun lists:foreach/2, [fun insert_rule/1, Rules]),
|
|
{reply, ok, State};
|
|
|
|
handle_call({remove_rules, Rules}, _From, State) ->
|
|
trans(fun lists:foreach/2, [fun delete_rule/1, Rules]),
|
|
{reply, ok, State};
|
|
|
|
handle_call(Req, _From, State) ->
|
|
?LOG(error, "[RuleRegistry]: unexpected call - ~p", [Req]),
|
|
{reply, ignored, State}.
|
|
|
|
handle_cast(Msg, State) ->
|
|
?LOG(error, "[RuleRegistry]: unexpected cast ~p", [Msg]),
|
|
{noreply, State}.
|
|
|
|
handle_info(Info, State) ->
|
|
?LOG(error, "[RuleRegistry]: unexpected info ~p", [Info]),
|
|
{noreply, State}.
|
|
|
|
terminate(_Reason, _State) ->
|
|
ok.
|
|
|
|
code_change(_OldVsn, State, _Extra) ->
|
|
{ok, State}.
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% Private functions
|
|
%%------------------------------------------------------------------------------
|
|
|
|
get_all_records(Tab) ->
|
|
%mnesia:dirty_match_object(Tab, mnesia:table_info(Tab, wild_pattern)).
|
|
ets:tab2list(Tab).
|
|
|
|
trans(Fun) -> trans(Fun, []).
|
|
trans(Fun, Args) ->
|
|
case mnesia:transaction(Fun, Args) of
|
|
{atomic, Result} -> Result;
|
|
{aborted, Reason} -> error(Reason)
|
|
end.
|