perf: use mria:match_delete/2 to cleanup routes

This must be much more network efficient since both Mria and Mnesia
need only to replicate one op, e.g.: `{MatchPattern, clear_table}`
instead of replicating one per each key to be deleted.
This commit is contained in:
Serge Tupchii 2023-12-18 21:23:45 +02:00
parent febaaefc38
commit 965ce5d446
3 changed files with 86 additions and 10 deletions

View File

@ -95,6 +95,18 @@
unused = [] :: nil() unused = [] :: nil()
}). }).
-define(node_patterns(Node), [Node, {'_', Node}]).
-define(UNSUPPORTED, unsupported).
-define(with_fallback(Expr, FallbackExpr),
try
Expr
catch
throw:?UNSUPPORTED -> FallbackExpr
end
).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Mnesia bootstrap %% Mnesia bootstrap
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -293,8 +305,6 @@ pick(Topic) ->
%% Schema v1 %% Schema v1
%% -------------------------------------------------------------------- %% --------------------------------------------------------------------
-dialyzer({nowarn_function, [cleanup_routes_v1/1]}).
mria_insert_route_v1(Topic, Dest) -> mria_insert_route_v1(Topic, Dest) ->
Route = #route{topic = Topic, dest = Dest}, Route = #route{topic = Topic, dest = Dest},
case emqx_topic:wildcard(Topic) of case emqx_topic:wildcard(Topic) of
@ -356,10 +366,18 @@ has_route_tab_entry(Topic, Dest) ->
[] =/= ets:match(?ROUTE_TAB, #route{topic = Topic, dest = Dest}). [] =/= ets:match(?ROUTE_TAB, #route{topic = Topic, dest = Dest}).
cleanup_routes_v1(Node) -> cleanup_routes_v1(Node) ->
Patterns = [ ?with_fallback(
#route{_ = '_', dest = Node}, lists:foreach(
#route{_ = '_', dest = {'_', Node}} fun(Pattern) ->
], throw_unsupported(mria:match_delete(?ROUTE_TAB, make_route_rec_pat(Pattern)))
end,
?node_patterns(Node)
),
cleanup_routes_v1_fallback(Node)
).
cleanup_routes_v1_fallback(Node) ->
Patterns = [make_route_rec_pat(P) || P <- ?node_patterns(Node)],
mria:transaction(?ROUTE_SHARD, fun() -> mria:transaction(?ROUTE_SHARD, fun() ->
[ [
mnesia:delete_object(?ROUTE_TAB, Route, write) mnesia:delete_object(?ROUTE_TAB, Route, write)
@ -435,8 +453,25 @@ has_route_v2(Topic, Dest) ->
end. end.
cleanup_routes_v2(Node) -> cleanup_routes_v2(Node) ->
% NOTE ?with_fallback(
% No point in transaction here because all the operations on filters table are dirty. lists:foreach(
fun(Pattern) ->
_ = throw_unsupported(
mria:match_delete(
?ROUTE_TAB_FILTERS,
#routeidx{entry = emqx_trie_search:make_pat('_', Pattern)}
)
),
throw_unsupported(mria:match_delete(?ROUTE_TAB, make_route_rec_pat(Pattern)))
end,
?node_patterns(Node)
),
cleanup_routes_v2_fallback(Node)
).
cleanup_routes_v2_fallback(Node) ->
%% NOTE
%% No point in transaction here because all the operations on filters table are dirty.
ok = ets:foldl( ok = ets:foldl(
fun(#routeidx{entry = K}, ok) -> fun(#routeidx{entry = K}, ok) ->
case get_dest_node(emqx_topic_index:get_id(K)) of case get_dest_node(emqx_topic_index:get_id(K)) of
@ -467,6 +502,19 @@ get_dest_node({_, Node}) ->
get_dest_node(Node) -> get_dest_node(Node) ->
Node. Node.
throw_unsupported({error, unsupported_otp_version}) ->
throw(?UNSUPPORTED);
throw_unsupported(Other) ->
Other.
%% Make Dialyzer happy
make_route_rec_pat(DestPattern) ->
erlang:make_tuple(
record_info(size, route),
'_',
[{1, route}, {#route.dest, DestPattern}]
).
select_v2(Spec, Limit, undefined) -> select_v2(Spec, Limit, undefined) ->
Stream = mk_route_stream(Spec), Stream = mk_route_stream(Spec),
select_next(Limit, Stream); select_next(Limit, Stream);

View File

@ -35,16 +35,32 @@ all() ->
groups() -> groups() ->
TCs = emqx_common_test_helpers:all(?MODULE), TCs = emqx_common_test_helpers:all(?MODULE),
[ [
{routing_schema_v1, [], TCs}, {routing_schema_v1, [], [
{routing_schema_v2, [], TCs} {mria_match_delete, [], TCs},
{fallback, [], TCs}
]},
{routing_schema_v2, [], [
{mria_match_delete, [], TCs},
{fallback, [], TCs}
]}
]. ].
init_per_group(fallback, Config) ->
ok = mock_mria_match_delete(),
Config;
init_per_group(mria_match_delete, Config) ->
Config;
init_per_group(GroupName, Config) -> init_per_group(GroupName, Config) ->
WorkDir = filename:join([?config(priv_dir, Config), ?MODULE, GroupName]), WorkDir = filename:join([?config(priv_dir, Config), ?MODULE, GroupName]),
AppSpecs = [{emqx, mk_config(GroupName)}], AppSpecs = [{emqx, mk_config(GroupName)}],
Apps = emqx_cth_suite:start(AppSpecs, #{work_dir => WorkDir}), Apps = emqx_cth_suite:start(AppSpecs, #{work_dir => WorkDir}),
[{group_name, GroupName}, {group_apps, Apps} | Config]. [{group_name, GroupName}, {group_apps, Apps} | Config].
end_per_group(fallback, _Config) ->
unmock_mria_match_delete(),
ok;
end_per_group(mria_match_delete, _Config) ->
ok;
end_per_group(_GroupName, Config) -> end_per_group(_GroupName, Config) ->
ok = emqx_cth_suite:stop(?config(group_apps, Config)). ok = emqx_cth_suite:stop(?config(group_apps, Config)).
@ -59,6 +75,13 @@ mk_config(routing_schema_v2) ->
override_env => [{boot_modules, [broker]}] override_env => [{boot_modules, [broker]}]
}. }.
mock_mria_match_delete() ->
ok = meck:new(mria, [no_link, passthrough]),
ok = meck:expect(mria, match_delete, fun(_, _) -> {error, unsupported_otp_version} end).
unmock_mria_match_delete() ->
ok = meck:unload(mria).
init_per_testcase(_TestCase, Config) -> init_per_testcase(_TestCase, Config) ->
ok = snabbkaffe:start_trace(), ok = snabbkaffe:start_trace(),
Config. Config.

View File

@ -0,0 +1,5 @@
Improve network efficiency during routes cleanup.
Previously, when a node node was down, a delete operation for every route to that node must have been exchanged between all other live nodes.
After this change, only one 'match and delete' operation is exchanged between all live nodes, meaning that much fewer packets are to be sent over inter-cluster network.
This optimization must be especially helpful for geo-distributed EMQX deployments, when network latency can be significantly high.