feat(router): add unified routing table

This commit is contained in:
Andrew Mayorov 2023-08-25 14:49:01 +04:00
parent 166375a000
commit 33e5e1ba57
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
6 changed files with 280 additions and 80 deletions

View File

@ -17,8 +17,9 @@
-ifndef(EMQX_ROUTER_HRL).
-define(EMQX_ROUTER_HRL, true).
%% ETS table for message routing
%% ETS tables for message routing
-define(ROUTE_TAB, emqx_route).
-define(ROUTE_TAB_UNIFIED, emqx_route_unified).
%% Mnesia table for message routing
-define(ROUTING_NODE, emqx_routing_node).

View File

@ -21,7 +21,6 @@
-include("emqx.hrl").
-include("logger.hrl").
-include("types.hrl").
-include_lib("mria/include/mria.hrl").
-include_lib("emqx/include/emqx_router.hrl").
%% Mnesia bootstrap
@ -73,11 +72,19 @@
code_change/3
]).
%% test / debugging purposes
-export([is_unified_table_active/0]).
-type group() :: binary().
-type dest() :: node() | {group(), node()}.
-dialyzer({nowarn_function, [cleanup_routes/1]}).
-record(routeidx, {
entry :: emqx_topic_index:key(dest()),
unused = [] :: nil()
}).
-dialyzer({nowarn_function, [cleanup_routes_regular/1]}).
%%--------------------------------------------------------------------
%% Mnesia bootstrap
@ -97,6 +104,19 @@ mnesia(boot) ->
{write_concurrency, true}
]}
]}
]),
ok = mria:create_table(?ROUTE_TAB_UNIFIED, [
{type, ordered_set},
{rlog_shard, ?ROUTE_SHARD},
{storage, ram_copies},
{record_name, routeidx},
{attributes, record_info(fields, routeidx)},
{storage_properties, [
{ets, [
{read_concurrency, true},
{write_concurrency, auto}
]}
]}
]).
%%--------------------------------------------------------------------
@ -130,31 +150,54 @@ do_add_route(Topic) when is_binary(Topic) ->
-spec do_add_route(emqx_types:topic(), dest()) -> ok | {error, term()}.
do_add_route(Topic, Dest) when is_binary(Topic) ->
Route = #route{topic = Topic, dest = Dest},
case lists:member(Route, lookup_routes(Topic)) of
case has_route(Topic, Dest) of
true ->
ok;
false ->
ok = emqx_router_helper:monitor(Dest),
case emqx_topic:wildcard(Topic) of
true ->
Fun = fun emqx_router_utils:insert_trie_route/2,
emqx_router_utils:maybe_trans(Fun, [?ROUTE_TAB, Route], ?ROUTE_SHARD);
false ->
emqx_router_utils:insert_direct_route(?ROUTE_TAB, Route)
end
mria_insert_route(is_unified_table_active(), Topic, Dest)
end.
mria_insert_route(_Unified = true, Topic, Dest) ->
mria_insert_route_unified(Topic, Dest);
mria_insert_route(_Unified = false, Topic, Dest) ->
Route = #route{topic = Topic, dest = Dest},
case emqx_topic:wildcard(Topic) of
true ->
mria_insert_route_update_trie(Route);
false ->
mria_insert_route(Route)
end.
mria_insert_route_unified(Topic, Dest) ->
K = emqx_topic_index:make_key(Topic, Dest),
mria:dirty_write(?ROUTE_TAB_UNIFIED, #routeidx{entry = K}).
mria_insert_route_update_trie(Route) ->
emqx_router_utils:maybe_trans(
fun emqx_router_utils:insert_trie_route/2,
[?ROUTE_TAB, Route],
?ROUTE_SHARD
).
mria_insert_route(Route) ->
mria:dirty_write(?ROUTE_TAB, Route).
%% @doc Match routes
-spec match_routes(emqx_types:topic()) -> [emqx_types:route()].
match_routes(Topic) when is_binary(Topic) ->
case match_trie(Topic) of
[] -> lookup_routes(Topic);
Matched -> lists:append([lookup_routes(To) || To <- [Topic | Matched]])
end.
match_routes(is_unified_table_active(), Topic).
%% Optimize: routing table will be replicated to all router nodes.
match_trie(Topic) ->
match_routes(_Unified = true, Topic) ->
[match_to_route(M) || M <- match_unified(Topic)];
match_routes(_Unified = false, Topic) ->
lookup_routes_regular(Topic) ++
lists:flatmap(fun lookup_routes_regular/1, match_global_trie(Topic)).
match_unified(Topic) ->
emqx_topic_index:matches(Topic, ?ROUTE_TAB_UNIFIED, []).
match_global_trie(Topic) ->
case emqx_trie:empty() of
true -> [];
false -> emqx_trie:match(Topic)
@ -162,12 +205,59 @@ match_trie(Topic) ->
-spec lookup_routes(emqx_types:topic()) -> [emqx_types:route()].
lookup_routes(Topic) ->
case is_unified_table_active() of
true ->
lookup_routes_unified(Topic);
false ->
lookup_routes_regular(Topic)
end.
lookup_routes_unified(Topic) ->
Pat = #routeidx{entry = emqx_topic_index:make_key(Topic, '$1')},
[Dest || [Dest] <- ets:match(?ROUTE_TAB_UNIFIED, Pat)].
lookup_routes_regular(Topic) ->
ets:lookup(?ROUTE_TAB, Topic).
match_to_route(M) ->
#route{topic = emqx_topic_index:get_topic(M), dest = emqx_topic_index:get_id(M)}.
-spec has_routes(emqx_types:topic()) -> boolean().
has_routes(Topic) when is_binary(Topic) ->
case is_unified_table_active() of
true ->
has_routes_unified(Topic);
false ->
has_routes_regular(Topic)
end.
has_routes_unified(Topic) ->
Pat = #routeidx{entry = emqx_topic_index:mk_key(Topic, '$1'), _ = '_'},
case ets:match(?ROUTE_TAB_UNIFIED, Pat, 1) of
{[_], _} ->
true;
_ ->
false
end.
has_routes_regular(Topic) ->
ets:member(?ROUTE_TAB, Topic).
-spec has_route(emqx_types:topic(), dest()) -> boolean().
has_route(Topic, Dest) ->
case is_unified_table_active() of
true ->
has_route_unified(Topic, Dest);
false ->
has_route_regular(Topic, Dest)
end.
has_route_unified(Topic, Dest) ->
ets:member(?ROUTE_TAB_UNIFIED, emqx_topic_index:make_key(Topic, Dest)).
has_route_regular(Topic, Dest) ->
lists:any(fun(Route) -> Route#route.dest =:= Dest end, ets:lookup(?ROUTE_TAB, Topic)).
-spec delete_route(emqx_types:topic()) -> ok | {error, term()}.
delete_route(Topic) when is_binary(Topic) ->
delete_route(Topic, node()).
@ -182,17 +272,54 @@ do_delete_route(Topic) when is_binary(Topic) ->
-spec do_delete_route(emqx_types:topic(), dest()) -> ok | {error, term()}.
do_delete_route(Topic, Dest) ->
mria_delete_route(is_unified_table_active(), Topic, Dest).
mria_delete_route(_Unified = true, Topic, Dest) ->
mria_delete_route_unified(Topic, Dest);
mria_delete_route(_Unified = false, Topic, Dest) ->
Route = #route{topic = Topic, dest = Dest},
case emqx_topic:wildcard(Topic) of
true ->
Fun = fun emqx_router_utils:delete_trie_route/2,
emqx_router_utils:maybe_trans(Fun, [?ROUTE_TAB, Route], ?ROUTE_SHARD);
mria_delete_route_update_trie(Route);
false ->
emqx_router_utils:delete_direct_route(?ROUTE_TAB, Route)
mria_delete_route(Route)
end.
mria_delete_route_unified(Topic, Dest) ->
K = emqx_topic_index:make_key(Topic, Dest),
mria:dirty_delete(?ROUTE_TAB_UNIFIED, K).
mria_delete_route_update_trie(Route) ->
emqx_router_utils:maybe_trans(
fun emqx_router_utils:delete_trie_route/2,
[?ROUTE_TAB, Route],
?ROUTE_SHARD
).
mria_delete_route(Route) ->
mria:dirty_delete_object(?ROUTE_TAB, Route).
-spec is_unified_table_active() -> boolean().
is_unified_table_active() ->
is_empty(?ROUTE_TAB) andalso
((not is_empty(?ROUTE_TAB_UNIFIED)) orelse
emqx_config:get([broker, unified_routing_table])).
is_empty(Tab) ->
% NOTE
% Supposedly, should be better than `ets:info(Tab, size)` because the latter suffers
% from `{decentralized_counters, true}` which is default when `write_concurrency` is
% either `auto` or `true`.
ets:first(Tab) =:= '$end_of_table'.
-spec topics() -> list(emqx_types:topic()).
topics() ->
topics(is_unified_table_active()).
topics(_Unified = true) ->
Pat = #routeidx{entry = '$1'},
[emqx_topic_index:get_topic(K) || [K] <- ets:match(?ROUTE_TAB_UNIFIED, Pat)];
topics(_Unified = false) ->
mnesia:dirty_all_keys(?ROUTE_TAB).
%% @doc Print routes to a topic
@ -207,23 +334,63 @@ print_routes(Topic) ->
-spec cleanup_routes(node()) -> ok.
cleanup_routes(Node) ->
case is_unified_table_active() of
true ->
cleanup_routes_unified(Node);
false ->
cleanup_routes_regular(Node)
end.
cleanup_routes_unified(Node) ->
% NOTE
% No point in transaction here because all the operations on unified routing table
% are dirty.
ets:foldl(
fun(#routeidx{entry = K}, ok) ->
case emqx_topic_index:get_id(K) of
Node ->
mria:dirty_delete(?ROUTE_TAB_UNIFIED, K);
_ ->
ok
end
end,
ok,
?ROUTE_TAB_UNIFIED
).
cleanup_routes_regular(Node) ->
Patterns = [
#route{_ = '_', dest = Node},
#route{_ = '_', dest = {'_', Node}}
],
[
mnesia:delete_object(?ROUTE_TAB, Route, write)
|| Pat <- Patterns,
Route <- mnesia:match_object(?ROUTE_TAB, Pat, write)
].
mria:transaction(?ROUTE_SHARD, fun() ->
[
mnesia:delete_object(?ROUTE_TAB, Route, write)
|| Pat <- Patterns,
Route <- mnesia:match_object(?ROUTE_TAB, Pat, write)
]
end).
-spec foldl_routes(fun((emqx_types:route(), Acc) -> Acc), Acc) -> Acc.
foldl_routes(FoldFun, AccIn) ->
ets:foldl(FoldFun, AccIn, ?ROUTE_TAB).
case is_unified_table_active() of
true ->
ets:foldl(mk_fold_fun_unified(FoldFun), AccIn, ?ROUTE_TAB_UNIFIED);
false ->
ets:foldl(FoldFun, AccIn, ?ROUTE_TAB)
end.
-spec foldr_routes(fun((emqx_types:route(), Acc) -> Acc), Acc) -> Acc.
foldr_routes(FoldFun, AccIn) ->
ets:foldr(FoldFun, AccIn, ?ROUTE_TAB).
case is_unified_table_active() of
true ->
ets:foldr(mk_fold_fun_unified(FoldFun), AccIn, ?ROUTE_TAB_UNIFIED);
false ->
ets:foldr(FoldFun, AccIn, ?ROUTE_TAB)
end.
mk_fold_fun_unified(FoldFun) ->
fun(#routeidx{entry = K}, Acc) -> FoldFun(match_to_route(K), Acc) end.
call(Router, Msg) ->
gen_server:call(Router, Msg, infinity).

View File

@ -148,11 +148,13 @@ handle_info({mnesia_table_event, Event}, State) ->
handle_info({nodedown, Node}, State = #{nodes := Nodes}) ->
case mria_rlog:role() of
core ->
% TODO
% Node may flap, do we need to wait for any pending cleanups in `init/1`
% on the flapping node?
% This also implies changing lock id to `{?LOCK, Node}`.
global:trans(
{?LOCK, self()},
fun() ->
mria:transaction(?ROUTE_SHARD, fun ?MODULE:cleanup_routes/1, [Node])
end
fun() -> cleanup_routes(Node) end
),
ok = mria:dirty_delete(?ROUTING_NODE, Node);
replicant ->

View File

@ -24,6 +24,8 @@
-export([match/2]).
-export([matches/3]).
-export([make_key/2]).
-export([get_id/1]).
-export([get_topic/1]).
-export([get_record/2]).
@ -42,14 +44,18 @@ new() ->
%% between regular and "materialized" indexes, for example.
-spec insert(emqx_types:topic(), _ID, _Record, ets:table()) -> true.
insert(Filter, ID, Record, Tab) ->
Key = key(Filter, ID),
Key = make_key(Filter, ID),
true = ets:insert(Tab, {Key, Record}).
%% @doc Delete an entry from the index that associates given topic filter to given
%% record ID. Deleting non-existing entry is not an error.
-spec delete(emqx_types:topic(), _ID, ets:table()) -> true.
delete(Filter, ID, Tab) ->
true = ets:delete(Tab, key(Filter, ID)).
ets:delete(Tab, make_key(Filter, ID)).
-spec make_key(emqx_types:topic(), ID) -> key(ID).
make_key(TopicOrFilter, ID) ->
emqx_trie_search:make_key(TopicOrFilter, ID).
%% @doc Match given topic against the index and return the first match, or `false` if
%% no match is found.
@ -84,8 +90,5 @@ get_record(K, Tab) ->
[]
end.
key(TopicOrFilter, ID) ->
emqx_trie_search:make_key(TopicOrFilter, ID).
make_nextf(Tab) ->
fun(Key) -> ets:next(Tab, Key) end.

View File

@ -26,24 +26,37 @@
-define(R, emqx_router).
all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
PrevBootModules = application:get_env(emqx, boot_modules),
emqx_common_test_helpers:boot_modules([router]),
emqx_common_test_helpers:start_apps([]),
all() ->
[
{prev_boot_modules, PrevBootModules}
| Config
{group, routing_table_regular},
{group, routing_table_unified}
].
end_per_suite(Config) ->
PrevBootModules = ?config(prev_boot_modules, Config),
case PrevBootModules of
undefined -> ok;
{ok, Mods} -> emqx_common_test_helpers:boot_modules(Mods)
end,
emqx_common_test_helpers:stop_apps([]).
groups() ->
TCs = emqx_common_test_helpers:all(?MODULE),
[
{routing_table_regular, [], TCs},
{routing_table_unified, [], TCs}
].
init_per_group(GroupName, Config) ->
WorkDir = filename:join([?config(priv_dir, Config), GroupName]),
AppSpecs = [
{emqx, #{
config => mk_config(GroupName),
override_env => [{boot_modules, [router]}]
}}
],
Apps = emqx_cth_suite:start(AppSpecs, #{work_dir => WorkDir}),
[{group_apps, Apps} | Config].
end_per_group(_GroupName, Config) ->
ok = emqx_cth_suite:stop(?config(group_apps, Config)).
mk_config(routing_table_regular) ->
"broker.unified_routing_table = false";
mk_config(routing_table_unified) ->
"broker.unified_routing_table = true".
init_per_testcase(_TestCase, Config) ->
clear_tables(),
@ -177,5 +190,5 @@ t_unexpected(_) ->
clear_tables() ->
lists:foreach(
fun mnesia:clear_table/1,
[?ROUTE_TAB, ?TRIE, emqx_trie_node]
[?ROUTE_TAB, ?ROUTE_TAB_UNIFIED, ?TRIE]
).

View File

@ -26,32 +26,38 @@
-define(ROUTER_HELPER, emqx_router_helper).
all() -> emqx_common_test_helpers:all(?MODULE).
all() ->
[
{group, routing_table_regular},
{group, routing_table_unified}
].
init_per_suite(Config) ->
DistPid =
case net_kernel:nodename() of
ignored ->
%% calling `net_kernel:start' without `epmd'
%% running will result in a failure.
emqx_common_test_helpers:start_epmd(),
{ok, Pid} = net_kernel:start(['test@127.0.0.1', longnames]),
Pid;
_ ->
undefined
end,
emqx_common_test_helpers:start_apps([]),
[{dist_pid, DistPid} | Config].
groups() ->
TCs = emqx_common_test_helpers:all(?MODULE),
[
{routing_table_regular, [], TCs},
{routing_table_unified, [], TCs}
].
end_per_suite(Config) ->
DistPid = ?config(dist_pid, Config),
case DistPid of
Pid when is_pid(Pid) ->
net_kernel:stop();
_ ->
ok
end,
emqx_common_test_helpers:stop_apps([]).
init_per_group(GroupName, Config) ->
WorkDir = filename:join([?config(priv_dir, Config), GroupName]),
AppSpecs = [{emqx, mk_config(GroupName)}],
Apps = emqx_cth_suite:start(AppSpecs, #{work_dir => WorkDir}),
[{group_name, GroupName}, {group_apps, Apps} | Config].
end_per_group(_GroupName, Config) ->
ok = emqx_cth_suite:stop(?config(group_apps, Config)).
mk_config(routing_table_regular) ->
#{
config => "broker.unified_routing_table = false",
override_env => [{boot_modules, [router]}]
};
mk_config(routing_table_unified) ->
#{
config => "broker.unified_routing_table = true",
override_env => [{boot_modules, [router]}]
}.
init_per_testcase(TestCase, Config) when
TestCase =:= t_cleanup_membership_mnesia_down;
@ -59,7 +65,16 @@ init_per_testcase(TestCase, Config) when
TestCase =:= t_cleanup_monitor_node_down
->
ok = snabbkaffe:start_trace(),
Slave = emqx_common_test_helpers:start_slave(some_node, []),
WorkDir = filename:join([?config(priv_dir, Config), ?config(group_name, Config), TestCase]),
[Slave] = emqx_cth_cluster:start(
[
{?MODULE, #{
apps => [{emqx, mk_config(?config(group_name, Config))}],
join_to => node()
}}
],
#{work_dir => WorkDir}
),
[{slave, Slave} | Config];
init_per_testcase(_TestCase, Config) ->
Config.
@ -70,9 +85,8 @@ end_per_testcase(TestCase, Config) when
TestCase =:= t_cleanup_monitor_node_down
->
Slave = ?config(slave, Config),
emqx_common_test_helpers:stop_slave(Slave),
mria:clear_table(?ROUTE_TAB),
snabbkaffe:stop(),
ok = emqx_cth_cluster:stop([Slave]),
ok = snabbkaffe:stop(),
ok;
end_per_testcase(_TestCase, _Config) ->
ok.