perf(ps_router): only check if there are any routes when deciding whether to persist message

This commit is contained in:
Thales Macedo Garitezi 2023-09-12 16:29:43 -03:00
parent cae9ae1fab
commit e081abb9e6
4 changed files with 29 additions and 6 deletions

View File

@ -71,11 +71,9 @@ init() ->
ok | {skipped, _Reason} | {error, _TODO}.
persist_message(Msg) ->
?WHEN_ENABLED(
case needs_persistence(Msg) andalso find_subscribers(Msg) of
[_ | _] ->
case needs_persistence(Msg) andalso has_subscribers(Msg) of
true ->
store_message(Msg);
[] ->
{skipped, no_subscribers};
false ->
{skipped, needs_no_persistence}
end
@ -90,8 +88,8 @@ store_message(Msg) ->
Topic = emqx_topic:words(emqx_message:topic(Msg)),
emqx_ds_storage_layer:store(?DS_SHARD, ID, Timestamp, Topic, serialize_message(Msg)).
find_subscribers(#message{topic = Topic}) ->
emqx_persistent_session_ds_router:match_routes(Topic).
has_subscribers(#message{topic = Topic}) ->
emqx_persistent_session_ds_router:has_any_route(Topic).
open_session(ClientID) ->
?WHEN_ENABLED(emqx_ds:session_open(ClientID)).

View File

@ -25,6 +25,7 @@
-export([
do_add_route/2,
do_delete_route/2,
has_any_route/1,
match_routes/1,
lookup_routes/1,
foldr_routes/2,
@ -101,6 +102,19 @@ do_delete_route(Topic, Dest) ->
mria_route_tab_delete(#ps_route{topic = Topic, dest = Dest})
end.
%% @doc Takes a real topic (not filter) as input, and returns whether there is any
%% matching filters.
-spec has_any_route(emqx_types:topic()) -> boolean().
has_any_route(Topic) ->
DirectTopicMatch = lookup_route_tab(Topic),
WildcardMatch = emqx_topic_index:match(Topic, ?PS_FILTERS_TAB),
case {DirectTopicMatch, WildcardMatch} of
{[], false} ->
false;
{_, _} ->
true
end.
%% @doc Take a real topic (not filter) as input, return the matching topics and topic
%% filters associated with route destination.
-spec match_routes(emqx_types:topic()) -> [emqx_types:route()].

View File

@ -66,6 +66,7 @@ match(Topic, Tab) ->
%% @doc Match given topic against the index and return _all_ matches.
%% If `unique` option is given, return only unique matches by record ID.
-spec matches(emqx_types:topic(), ets:table(), emqx_trie_search:opts()) -> [key(_ID)].
matches(Topic, Tab, Opts) ->
emqx_trie_search:matches(Topic, make_nextf(Tab), Opts).

View File

@ -80,12 +80,21 @@ delete_route(TopicFilter) ->
% error('TODO').
t_add_delete(_) ->
?assertNot(?R:has_any_route(<<"a/b/c">>)),
add_route(<<"a/b/c">>),
?assert(?R:has_any_route(<<"a/b/c">>)),
add_route(<<"a/b/c">>),
?assert(?R:has_any_route(<<"a/b/c">>)),
add_route(<<"a/+/b">>),
?assert(?R:has_any_route(<<"a/b/c">>)),
?assert(?R:has_any_route(<<"a/c/b">>)),
?assertEqual([<<"a/+/b">>, <<"a/b/c">>], lists:sort(?R:topics())),
delete_route(<<"a/b/c">>),
?assertNot(?R:has_any_route(<<"a/b/c">>)),
?assert(?R:has_any_route(<<"a/c/b">>)),
delete_route(<<"a/+/b">>),
?assertNot(?R:has_any_route(<<"a/b/c">>)),
?assertNot(?R:has_any_route(<<"a/c/b">>)),
?assertEqual([], ?R:topics()).
t_add_delete_incremental(_) ->
@ -94,6 +103,7 @@ t_add_delete_incremental(_) ->
add_route(<<"a/+/+">>),
add_route(<<"a/b/#">>),
add_route(<<"#">>),
?assert(?R:has_any_route(<<"any/topic">>)),
?assertEqual(
[
#ps_route{topic = <<"#">>, dest = ?DEF_DS_SESSION_ID},