diff --git a/apps/emqx/src/emqx_router.erl b/apps/emqx/src/emqx_router.erl index defa398a8..13efbe4ea 100644 --- a/apps/emqx/src/emqx_router.erl +++ b/apps/emqx/src/emqx_router.erl @@ -95,6 +95,18 @@ unused = [] :: nil() }). +-define(node_patterns(Node), [Node, {'_', Node}]). + +-define(UNSUPPORTED, unsupported). + +-define(with_fallback(Expr, FallbackExpr), + try + Expr + catch + throw:?UNSUPPORTED -> FallbackExpr + end +). + %%-------------------------------------------------------------------- %% Mnesia bootstrap %%-------------------------------------------------------------------- @@ -293,8 +305,6 @@ pick(Topic) -> %% Schema v1 %% -------------------------------------------------------------------- --dialyzer({nowarn_function, [cleanup_routes_v1/1]}). - mria_insert_route_v1(Topic, Dest) -> Route = #route{topic = Topic, dest = Dest}, case emqx_topic:wildcard(Topic) of @@ -356,10 +366,18 @@ has_route_tab_entry(Topic, Dest) -> [] =/= ets:match(?ROUTE_TAB, #route{topic = Topic, dest = Dest}). cleanup_routes_v1(Node) -> - Patterns = [ - #route{_ = '_', dest = Node}, - #route{_ = '_', dest = {'_', Node}} - ], + ?with_fallback( + lists:foreach( + fun(Pattern) -> + throw_unsupported(mria:match_delete(?ROUTE_TAB, make_route_rec_pat(Pattern))) + end, + ?node_patterns(Node) + ), + cleanup_routes_v1_fallback(Node) + ). + +cleanup_routes_v1_fallback(Node) -> + Patterns = [make_route_rec_pat(P) || P <- ?node_patterns(Node)], mria:transaction(?ROUTE_SHARD, fun() -> [ mnesia:delete_object(?ROUTE_TAB, Route, write) @@ -435,8 +453,25 @@ has_route_v2(Topic, Dest) -> end. cleanup_routes_v2(Node) -> - % NOTE - % No point in transaction here because all the operations on filters table are dirty. + ?with_fallback( + lists:foreach( + fun(Pattern) -> + _ = throw_unsupported( + mria:match_delete( + ?ROUTE_TAB_FILTERS, + #routeidx{entry = emqx_trie_search:make_pat('_', Pattern)} + ) + ), + throw_unsupported(mria:match_delete(?ROUTE_TAB, make_route_rec_pat(Pattern))) + end, + ?node_patterns(Node) + ), + cleanup_routes_v2_fallback(Node) + ). + +cleanup_routes_v2_fallback(Node) -> + %% NOTE + %% No point in transaction here because all the operations on filters table are dirty. ok = ets:foldl( fun(#routeidx{entry = K}, ok) -> case get_dest_node(emqx_topic_index:get_id(K)) of @@ -467,6 +502,19 @@ get_dest_node({_, Node}) -> get_dest_node(Node) -> Node. +throw_unsupported({error, unsupported_otp_version}) -> + throw(?UNSUPPORTED); +throw_unsupported(Other) -> + Other. + +%% Make Dialyzer happy +make_route_rec_pat(DestPattern) -> + erlang:make_tuple( + record_info(size, route), + '_', + [{1, route}, {#route.dest, DestPattern}] + ). + select_v2(Spec, Limit, undefined) -> Stream = mk_route_stream(Spec), select_next(Limit, Stream); diff --git a/apps/emqx/test/emqx_router_helper_SUITE.erl b/apps/emqx/test/emqx_router_helper_SUITE.erl index c16277884..be72d0e26 100644 --- a/apps/emqx/test/emqx_router_helper_SUITE.erl +++ b/apps/emqx/test/emqx_router_helper_SUITE.erl @@ -35,16 +35,32 @@ all() -> groups() -> TCs = emqx_common_test_helpers:all(?MODULE), [ - {routing_schema_v1, [], TCs}, - {routing_schema_v2, [], TCs} + {routing_schema_v1, [], [ + {mria_match_delete, [], TCs}, + {fallback, [], TCs} + ]}, + {routing_schema_v2, [], [ + {mria_match_delete, [], TCs}, + {fallback, [], TCs} + ]} ]. +init_per_group(fallback, Config) -> + ok = mock_mria_match_delete(), + Config; +init_per_group(mria_match_delete, Config) -> + Config; init_per_group(GroupName, Config) -> WorkDir = filename:join([?config(priv_dir, Config), ?MODULE, GroupName]), AppSpecs = [{emqx, mk_config(GroupName)}], Apps = emqx_cth_suite:start(AppSpecs, #{work_dir => WorkDir}), [{group_name, GroupName}, {group_apps, Apps} | Config]. +end_per_group(fallback, _Config) -> + unmock_mria_match_delete(), + ok; +end_per_group(mria_match_delete, _Config) -> + ok; end_per_group(_GroupName, Config) -> ok = emqx_cth_suite:stop(?config(group_apps, Config)). @@ -59,6 +75,13 @@ mk_config(routing_schema_v2) -> override_env => [{boot_modules, [broker]}] }. +mock_mria_match_delete() -> + ok = meck:new(mria, [no_link, passthrough]), + ok = meck:expect(mria, match_delete, fun(_, _) -> {error, unsupported_otp_version} end). + +unmock_mria_match_delete() -> + ok = meck:unload(mria). + init_per_testcase(_TestCase, Config) -> ok = snabbkaffe:start_trace(), Config. diff --git a/changes/ce/perf-12196.en.md b/changes/ce/perf-12196.en.md new file mode 100644 index 000000000..e9c26c79e --- /dev/null +++ b/changes/ce/perf-12196.en.md @@ -0,0 +1,5 @@ +Improve network efficiency during routes cleanup. + +Previously, when a node node was down, a delete operation for every route to that node must have been exchanged between all other live nodes. +After this change, only one 'match and delete' operation is exchanged between all live nodes, meaning that much fewer packets are to be sent over inter-cluster network. +This optimization must be especially helpful for geo-distributed EMQX deployments, when network latency can be significantly high.