diff --git a/apps/emqx_rule_engine/include/rule_engine.hrl b/apps/emqx_rule_engine/include/rule_engine.hrl index b2a6a549e..7df5d9941 100644 --- a/apps/emqx_rule_engine/include/rule_engine.hrl +++ b/apps/emqx_rule_engine/include/rule_engine.hrl @@ -109,6 +109,7 @@ %% Tables -define(RULE_TAB, emqx_rule_engine). +-define(RULE_TOPIC_INDEX, emqx_rule_engine_topic_index). %% Allowed sql function provider modules -define(DEFAULT_SQL_FUNC_PROVIDER, emqx_rule_funcs). diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl index 66c82d3a1..9d2d918ae 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl @@ -176,7 +176,7 @@ create_rule(Params) -> create_rule(Params = #{id := RuleId}, CreatedAt) when is_binary(RuleId) -> case get_rule(RuleId) of - not_found -> parse_and_insert(Params, CreatedAt); + not_found -> with_parsed_rule(Params, CreatedAt, fun insert_rule/1); {ok, _} -> {error, already_exists} end. @@ -185,18 +185,27 @@ update_rule(Params = #{id := RuleId}) when is_binary(RuleId) -> case get_rule(RuleId) of not_found -> {error, not_found}; - {ok, #{created_at := CreatedAt}} -> - parse_and_insert(Params, CreatedAt) + {ok, RulePrev = #{created_at := CreatedAt}} -> + with_parsed_rule(Params, CreatedAt, fun(Rule) -> update_rule(Rule, RulePrev) end) end. -spec delete_rule(RuleId :: rule_id()) -> ok. delete_rule(RuleId) when is_binary(RuleId) -> - gen_server:call(?RULE_ENGINE, {delete_rule, RuleId}, ?T_CALL). + case get_rule(RuleId) of + not_found -> + ok; + {ok, Rule} -> + gen_server:call(?RULE_ENGINE, {delete_rule, Rule}, ?T_CALL) + end. -spec insert_rule(Rule :: rule()) -> ok. insert_rule(Rule) -> gen_server:call(?RULE_ENGINE, {insert_rule, Rule}, ?T_CALL). +-spec update_rule(Rule :: rule(), RulePrev :: rule()) -> ok. +update_rule(Rule, RulePrev) -> + gen_server:call(?RULE_ENGINE, {update_rule, Rule, RulePrev}, ?T_CALL). + %%---------------------------------------------------------------------------------------- %% Rule Management %%---------------------------------------------------------------------------------------- @@ -216,9 +225,8 @@ get_rules_ordered_by_ts() -> -spec get_rules_for_topic(Topic :: binary()) -> [rule()]. get_rules_for_topic(Topic) -> [ - Rule - || Rule = #{from := From} <- get_rules(), - emqx_topic:match_any(Topic, From) + emqx_topic_index:get_record(M, ?RULE_TOPIC_INDEX) + || M <- emqx_topic_index:matches(Topic, ?RULE_TOPIC_INDEX) ]. -spec get_rules_with_same_event(Topic :: binary()) -> [rule()]. @@ -411,10 +419,17 @@ init([]) -> {ok, #{}}. handle_call({insert_rule, Rule}, _From, State) -> - do_insert_rule(Rule), + ok = do_insert_rule(Rule), + ok = do_update_rule_index(Rule), + {reply, ok, State}; +handle_call({update_rule, Rule, RulePrev}, _From, State) -> + ok = do_delete_rule_index(RulePrev), + ok = do_insert_rule(Rule), + ok = do_update_rule_index(Rule), {reply, ok, State}; handle_call({delete_rule, Rule}, _From, State) -> - do_delete_rule(Rule), + ok = do_delete_rule_index(Rule), + ok = do_delete_rule(Rule), {reply, ok, State}; handle_call(Req, _From, State) -> ?SLOG(error, #{msg => "unexpected_call", request => Req}), @@ -438,7 +453,7 @@ code_change(_OldVsn, State, _Extra) -> %% Internal Functions %%---------------------------------------------------------------------------------------- -parse_and_insert(Params = #{id := RuleId, sql := Sql, actions := Actions}, CreatedAt) -> +with_parsed_rule(Params = #{id := RuleId, sql := Sql, actions := Actions}, CreatedAt, Fun) -> case emqx_rule_sqlparser:parse(Sql) of {ok, Select} -> Rule = #{ @@ -459,7 +474,7 @@ parse_and_insert(Params = #{id := RuleId, sql := Sql, actions := Actions}, Creat conditions => emqx_rule_sqlparser:select_where(Select) %% -- calculated fields end }, - ok = insert_rule(Rule), + ok = Fun(Rule), {ok, Rule}; {error, Reason} -> {error, Reason} @@ -471,16 +486,27 @@ do_insert_rule(#{id := Id} = Rule) -> true = ets:insert(?RULE_TAB, {Id, maps:remove(id, Rule)}), ok. -do_delete_rule(RuleId) -> - case get_rule(RuleId) of - {ok, Rule} -> - ok = unload_hooks_for_rule(Rule), - ok = clear_metrics_for_rule(RuleId), - true = ets:delete(?RULE_TAB, RuleId), - ok; - not_found -> - ok - end. +do_delete_rule(#{id := Id} = Rule) -> + ok = unload_hooks_for_rule(Rule), + ok = clear_metrics_for_rule(Id), + true = ets:delete(?RULE_TAB, Id), + ok. + +do_update_rule_index(#{id := Id, from := From} = Rule) -> + ok = lists:foreach( + fun(Topic) -> + true = emqx_topic_index:insert(Topic, Id, Rule, ?RULE_TOPIC_INDEX) + end, + From + ). + +do_delete_rule_index(#{id := Id, from := From}) -> + ok = lists:foreach( + fun(Topic) -> + true = emqx_topic_index:delete(Topic, Id, ?RULE_TOPIC_INDEX) + end, + From + ). parse_actions(Actions) -> [do_parse_action(Act) || Act <- Actions]. diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_app.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_app.erl index d8b031bdd..28515cb1a 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_app.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_app.erl @@ -26,6 +26,7 @@ start(_Type, _Args) -> _ = ets:new(?RULE_TAB, [named_table, public, ordered_set, {read_concurrency, true}]), + _ = ets:new(?RULE_TOPIC_INDEX, [named_table, public, ordered_set, {read_concurrency, true}]), ok = emqx_rule_events:reload(), SupRet = emqx_rule_engine_sup:start_link(), ok = emqx_rule_engine:load_rules(),