perf(ruleeng): employ `emqx_topic_index` to speed up topic matching

This commit is contained in:
Andrew Mayorov 2023-07-20 14:49:10 +02:00 committed by JimMoen
parent 28dad5d7a9
commit b821bdee00
No known key found for this signature in database
GPG Key ID: 87A520B4F76BA86D
3 changed files with 49 additions and 21 deletions

View File

@ -109,6 +109,7 @@
%% Tables %% Tables
-define(RULE_TAB, emqx_rule_engine). -define(RULE_TAB, emqx_rule_engine).
-define(RULE_TOPIC_INDEX, emqx_rule_engine_topic_index).
%% Allowed sql function provider modules %% Allowed sql function provider modules
-define(DEFAULT_SQL_FUNC_PROVIDER, emqx_rule_funcs). -define(DEFAULT_SQL_FUNC_PROVIDER, emqx_rule_funcs).

View File

@ -176,7 +176,7 @@ create_rule(Params) ->
create_rule(Params = #{id := RuleId}, CreatedAt) when is_binary(RuleId) -> create_rule(Params = #{id := RuleId}, CreatedAt) when is_binary(RuleId) ->
case get_rule(RuleId) of case get_rule(RuleId) of
not_found -> parse_and_insert(Params, CreatedAt); not_found -> with_parsed_rule(Params, CreatedAt, fun insert_rule/1);
{ok, _} -> {error, already_exists} {ok, _} -> {error, already_exists}
end. end.
@ -185,18 +185,27 @@ update_rule(Params = #{id := RuleId}) when is_binary(RuleId) ->
case get_rule(RuleId) of case get_rule(RuleId) of
not_found -> not_found ->
{error, not_found}; {error, not_found};
{ok, #{created_at := CreatedAt}} -> {ok, RulePrev = #{created_at := CreatedAt}} ->
parse_and_insert(Params, CreatedAt) with_parsed_rule(Params, CreatedAt, fun(Rule) -> update_rule(Rule, RulePrev) end)
end. end.
-spec delete_rule(RuleId :: rule_id()) -> ok. -spec delete_rule(RuleId :: rule_id()) -> ok.
delete_rule(RuleId) when is_binary(RuleId) -> delete_rule(RuleId) when is_binary(RuleId) ->
gen_server:call(?RULE_ENGINE, {delete_rule, RuleId}, ?T_CALL). case get_rule(RuleId) of
not_found ->
ok;
{ok, Rule} ->
gen_server:call(?RULE_ENGINE, {delete_rule, Rule}, ?T_CALL)
end.
-spec insert_rule(Rule :: rule()) -> ok. -spec insert_rule(Rule :: rule()) -> ok.
insert_rule(Rule) -> insert_rule(Rule) ->
gen_server:call(?RULE_ENGINE, {insert_rule, Rule}, ?T_CALL). gen_server:call(?RULE_ENGINE, {insert_rule, Rule}, ?T_CALL).
-spec update_rule(Rule :: rule(), RulePrev :: rule()) -> ok.
update_rule(Rule, RulePrev) ->
gen_server:call(?RULE_ENGINE, {update_rule, Rule, RulePrev}, ?T_CALL).
%%---------------------------------------------------------------------------------------- %%----------------------------------------------------------------------------------------
%% Rule Management %% Rule Management
%%---------------------------------------------------------------------------------------- %%----------------------------------------------------------------------------------------
@ -216,9 +225,8 @@ get_rules_ordered_by_ts() ->
-spec get_rules_for_topic(Topic :: binary()) -> [rule()]. -spec get_rules_for_topic(Topic :: binary()) -> [rule()].
get_rules_for_topic(Topic) -> get_rules_for_topic(Topic) ->
[ [
Rule emqx_topic_index:get_record(M, ?RULE_TOPIC_INDEX)
|| Rule = #{from := From} <- get_rules(), || M <- emqx_topic_index:matches(Topic, ?RULE_TOPIC_INDEX)
emqx_topic:match_any(Topic, From)
]. ].
-spec get_rules_with_same_event(Topic :: binary()) -> [rule()]. -spec get_rules_with_same_event(Topic :: binary()) -> [rule()].
@ -411,10 +419,17 @@ init([]) ->
{ok, #{}}. {ok, #{}}.
handle_call({insert_rule, Rule}, _From, State) -> handle_call({insert_rule, Rule}, _From, State) ->
do_insert_rule(Rule), ok = do_insert_rule(Rule),
ok = do_update_rule_index(Rule),
{reply, ok, State};
handle_call({update_rule, Rule, RulePrev}, _From, State) ->
ok = do_delete_rule_index(RulePrev),
ok = do_insert_rule(Rule),
ok = do_update_rule_index(Rule),
{reply, ok, State}; {reply, ok, State};
handle_call({delete_rule, Rule}, _From, State) -> handle_call({delete_rule, Rule}, _From, State) ->
do_delete_rule(Rule), ok = do_delete_rule_index(Rule),
ok = do_delete_rule(Rule),
{reply, ok, State}; {reply, ok, State};
handle_call(Req, _From, State) -> handle_call(Req, _From, State) ->
?SLOG(error, #{msg => "unexpected_call", request => Req}), ?SLOG(error, #{msg => "unexpected_call", request => Req}),
@ -438,7 +453,7 @@ code_change(_OldVsn, State, _Extra) ->
%% Internal Functions %% Internal Functions
%%---------------------------------------------------------------------------------------- %%----------------------------------------------------------------------------------------
parse_and_insert(Params = #{id := RuleId, sql := Sql, actions := Actions}, CreatedAt) -> with_parsed_rule(Params = #{id := RuleId, sql := Sql, actions := Actions}, CreatedAt, Fun) ->
case emqx_rule_sqlparser:parse(Sql) of case emqx_rule_sqlparser:parse(Sql) of
{ok, Select} -> {ok, Select} ->
Rule = #{ Rule = #{
@ -459,7 +474,7 @@ parse_and_insert(Params = #{id := RuleId, sql := Sql, actions := Actions}, Creat
conditions => emqx_rule_sqlparser:select_where(Select) conditions => emqx_rule_sqlparser:select_where(Select)
%% -- calculated fields end %% -- calculated fields end
}, },
ok = insert_rule(Rule), ok = Fun(Rule),
{ok, Rule}; {ok, Rule};
{error, Reason} -> {error, Reason} ->
{error, Reason} {error, Reason}
@ -471,16 +486,27 @@ do_insert_rule(#{id := Id} = Rule) ->
true = ets:insert(?RULE_TAB, {Id, maps:remove(id, Rule)}), true = ets:insert(?RULE_TAB, {Id, maps:remove(id, Rule)}),
ok. ok.
do_delete_rule(RuleId) -> do_delete_rule(#{id := Id} = Rule) ->
case get_rule(RuleId) of ok = unload_hooks_for_rule(Rule),
{ok, Rule} -> ok = clear_metrics_for_rule(Id),
ok = unload_hooks_for_rule(Rule), true = ets:delete(?RULE_TAB, Id),
ok = clear_metrics_for_rule(RuleId), ok.
true = ets:delete(?RULE_TAB, RuleId),
ok; do_update_rule_index(#{id := Id, from := From} = Rule) ->
not_found -> ok = lists:foreach(
ok fun(Topic) ->
end. true = emqx_topic_index:insert(Topic, Id, Rule, ?RULE_TOPIC_INDEX)
end,
From
).
do_delete_rule_index(#{id := Id, from := From}) ->
ok = lists:foreach(
fun(Topic) ->
true = emqx_topic_index:delete(Topic, Id, ?RULE_TOPIC_INDEX)
end,
From
).
parse_actions(Actions) -> parse_actions(Actions) ->
[do_parse_action(Act) || Act <- Actions]. [do_parse_action(Act) || Act <- Actions].

View File

@ -26,6 +26,7 @@
start(_Type, _Args) -> start(_Type, _Args) ->
_ = ets:new(?RULE_TAB, [named_table, public, ordered_set, {read_concurrency, true}]), _ = ets:new(?RULE_TAB, [named_table, public, ordered_set, {read_concurrency, true}]),
_ = ets:new(?RULE_TOPIC_INDEX, [named_table, public, ordered_set, {read_concurrency, true}]),
ok = emqx_rule_events:reload(), ok = emqx_rule_events:reload(),
SupRet = emqx_rule_engine_sup:start_link(), SupRet = emqx_rule_engine_sup:start_link(),
ok = emqx_rule_engine:load_rules(), ok = emqx_rule_engine:load_rules(),