diff --git a/apps/emqx/include/emqx_router.hrl b/apps/emqx/include/emqx_router.hrl index 035ff5455..99ca3e185 100644 --- a/apps/emqx/include/emqx_router.hrl +++ b/apps/emqx/include/emqx_router.hrl @@ -17,8 +17,9 @@ -ifndef(EMQX_ROUTER_HRL). -define(EMQX_ROUTER_HRL, true). -%% ETS table for message routing +%% ETS tables for message routing -define(ROUTE_TAB, emqx_route). +-define(ROUTE_TAB_UNIFIED, emqx_route_unified). %% Mnesia table for message routing -define(ROUTING_NODE, emqx_routing_node). diff --git a/apps/emqx/src/emqx_router.erl b/apps/emqx/src/emqx_router.erl index a403efa84..d286af1d7 100644 --- a/apps/emqx/src/emqx_router.erl +++ b/apps/emqx/src/emqx_router.erl @@ -21,7 +21,6 @@ -include("emqx.hrl"). -include("logger.hrl"). -include("types.hrl"). --include_lib("mria/include/mria.hrl"). -include_lib("emqx/include/emqx_router.hrl"). %% Mnesia bootstrap @@ -73,11 +72,19 @@ code_change/3 ]). +%% test / debugging purposes +-export([is_unified_table_active/0]). + -type group() :: binary(). -type dest() :: node() | {group(), node()}. --dialyzer({nowarn_function, [cleanup_routes/1]}). +-record(routeidx, { + entry :: emqx_topic_index:key(dest()), + unused = [] :: nil() +}). + +-dialyzer({nowarn_function, [cleanup_routes_regular/1]}). %%-------------------------------------------------------------------- %% Mnesia bootstrap @@ -97,6 +104,19 @@ mnesia(boot) -> {write_concurrency, true} ]} ]} + ]), + ok = mria:create_table(?ROUTE_TAB_UNIFIED, [ + {type, ordered_set}, + {rlog_shard, ?ROUTE_SHARD}, + {storage, ram_copies}, + {record_name, routeidx}, + {attributes, record_info(fields, routeidx)}, + {storage_properties, [ + {ets, [ + {read_concurrency, true}, + {write_concurrency, auto} + ]} + ]} ]). %%-------------------------------------------------------------------- @@ -130,31 +150,54 @@ do_add_route(Topic) when is_binary(Topic) -> -spec do_add_route(emqx_types:topic(), dest()) -> ok | {error, term()}. do_add_route(Topic, Dest) when is_binary(Topic) -> - Route = #route{topic = Topic, dest = Dest}, - case lists:member(Route, lookup_routes(Topic)) of + case has_route(Topic, Dest) of true -> ok; false -> ok = emqx_router_helper:monitor(Dest), - case emqx_topic:wildcard(Topic) of - true -> - Fun = fun emqx_router_utils:insert_trie_route/2, - emqx_router_utils:maybe_trans(Fun, [?ROUTE_TAB, Route], ?ROUTE_SHARD); - false -> - emqx_router_utils:insert_direct_route(?ROUTE_TAB, Route) - end + mria_insert_route(is_unified_table_active(), Topic, Dest) end. +mria_insert_route(_Unified = true, Topic, Dest) -> + mria_insert_route_unified(Topic, Dest); +mria_insert_route(_Unified = false, Topic, Dest) -> + Route = #route{topic = Topic, dest = Dest}, + case emqx_topic:wildcard(Topic) of + true -> + mria_insert_route_update_trie(Route); + false -> + mria_insert_route(Route) + end. + +mria_insert_route_unified(Topic, Dest) -> + K = emqx_topic_index:make_key(Topic, Dest), + mria:dirty_write(?ROUTE_TAB_UNIFIED, #routeidx{entry = K}). + +mria_insert_route_update_trie(Route) -> + emqx_router_utils:maybe_trans( + fun emqx_router_utils:insert_trie_route/2, + [?ROUTE_TAB, Route], + ?ROUTE_SHARD + ). + +mria_insert_route(Route) -> + mria:dirty_write(?ROUTE_TAB, Route). + %% @doc Match routes -spec match_routes(emqx_types:topic()) -> [emqx_types:route()]. match_routes(Topic) when is_binary(Topic) -> - case match_trie(Topic) of - [] -> lookup_routes(Topic); - Matched -> lists:append([lookup_routes(To) || To <- [Topic | Matched]]) - end. + match_routes(is_unified_table_active(), Topic). -%% Optimize: routing table will be replicated to all router nodes. -match_trie(Topic) -> +match_routes(_Unified = true, Topic) -> + [match_to_route(M) || M <- match_unified(Topic)]; +match_routes(_Unified = false, Topic) -> + lookup_routes_regular(Topic) ++ + lists:flatmap(fun lookup_routes_regular/1, match_global_trie(Topic)). + +match_unified(Topic) -> + emqx_topic_index:matches(Topic, ?ROUTE_TAB_UNIFIED, []). + +match_global_trie(Topic) -> case emqx_trie:empty() of true -> []; false -> emqx_trie:match(Topic) @@ -162,12 +205,59 @@ match_trie(Topic) -> -spec lookup_routes(emqx_types:topic()) -> [emqx_types:route()]. lookup_routes(Topic) -> + case is_unified_table_active() of + true -> + lookup_routes_unified(Topic); + false -> + lookup_routes_regular(Topic) + end. + +lookup_routes_unified(Topic) -> + Pat = #routeidx{entry = emqx_topic_index:make_key(Topic, '$1')}, + [Dest || [Dest] <- ets:match(?ROUTE_TAB_UNIFIED, Pat)]. + +lookup_routes_regular(Topic) -> ets:lookup(?ROUTE_TAB, Topic). +match_to_route(M) -> + #route{topic = emqx_topic_index:get_topic(M), dest = emqx_topic_index:get_id(M)}. + -spec has_routes(emqx_types:topic()) -> boolean(). has_routes(Topic) when is_binary(Topic) -> + case is_unified_table_active() of + true -> + has_routes_unified(Topic); + false -> + has_routes_regular(Topic) + end. + +has_routes_unified(Topic) -> + Pat = #routeidx{entry = emqx_topic_index:mk_key(Topic, '$1'), _ = '_'}, + case ets:match(?ROUTE_TAB_UNIFIED, Pat, 1) of + {[_], _} -> + true; + _ -> + false + end. + +has_routes_regular(Topic) -> ets:member(?ROUTE_TAB, Topic). +-spec has_route(emqx_types:topic(), dest()) -> boolean(). +has_route(Topic, Dest) -> + case is_unified_table_active() of + true -> + has_route_unified(Topic, Dest); + false -> + has_route_regular(Topic, Dest) + end. + +has_route_unified(Topic, Dest) -> + ets:member(?ROUTE_TAB_UNIFIED, emqx_topic_index:make_key(Topic, Dest)). + +has_route_regular(Topic, Dest) -> + lists:any(fun(Route) -> Route#route.dest =:= Dest end, ets:lookup(?ROUTE_TAB, Topic)). + -spec delete_route(emqx_types:topic()) -> ok | {error, term()}. delete_route(Topic) when is_binary(Topic) -> delete_route(Topic, node()). @@ -182,17 +272,54 @@ do_delete_route(Topic) when is_binary(Topic) -> -spec do_delete_route(emqx_types:topic(), dest()) -> ok | {error, term()}. do_delete_route(Topic, Dest) -> + mria_delete_route(is_unified_table_active(), Topic, Dest). + +mria_delete_route(_Unified = true, Topic, Dest) -> + mria_delete_route_unified(Topic, Dest); +mria_delete_route(_Unified = false, Topic, Dest) -> Route = #route{topic = Topic, dest = Dest}, case emqx_topic:wildcard(Topic) of true -> - Fun = fun emqx_router_utils:delete_trie_route/2, - emqx_router_utils:maybe_trans(Fun, [?ROUTE_TAB, Route], ?ROUTE_SHARD); + mria_delete_route_update_trie(Route); false -> - emqx_router_utils:delete_direct_route(?ROUTE_TAB, Route) + mria_delete_route(Route) end. +mria_delete_route_unified(Topic, Dest) -> + K = emqx_topic_index:make_key(Topic, Dest), + mria:dirty_delete(?ROUTE_TAB_UNIFIED, K). + +mria_delete_route_update_trie(Route) -> + emqx_router_utils:maybe_trans( + fun emqx_router_utils:delete_trie_route/2, + [?ROUTE_TAB, Route], + ?ROUTE_SHARD + ). + +mria_delete_route(Route) -> + mria:dirty_delete_object(?ROUTE_TAB, Route). + +-spec is_unified_table_active() -> boolean(). +is_unified_table_active() -> + is_empty(?ROUTE_TAB) andalso + ((not is_empty(?ROUTE_TAB_UNIFIED)) orelse + emqx_config:get([broker, unified_routing_table])). + +is_empty(Tab) -> + % NOTE + % Supposedly, should be better than `ets:info(Tab, size)` because the latter suffers + % from `{decentralized_counters, true}` which is default when `write_concurrency` is + % either `auto` or `true`. + ets:first(Tab) =:= '$end_of_table'. + -spec topics() -> list(emqx_types:topic()). topics() -> + topics(is_unified_table_active()). + +topics(_Unified = true) -> + Pat = #routeidx{entry = '$1'}, + [emqx_topic_index:get_topic(K) || [K] <- ets:match(?ROUTE_TAB_UNIFIED, Pat)]; +topics(_Unified = false) -> mnesia:dirty_all_keys(?ROUTE_TAB). %% @doc Print routes to a topic @@ -207,23 +334,63 @@ print_routes(Topic) -> -spec cleanup_routes(node()) -> ok. cleanup_routes(Node) -> + case is_unified_table_active() of + true -> + cleanup_routes_unified(Node); + false -> + cleanup_routes_regular(Node) + end. + +cleanup_routes_unified(Node) -> + % NOTE + % No point in transaction here because all the operations on unified routing table + % are dirty. + ets:foldl( + fun(#routeidx{entry = K}, ok) -> + case emqx_topic_index:get_id(K) of + Node -> + mria:dirty_delete(?ROUTE_TAB_UNIFIED, K); + _ -> + ok + end + end, + ok, + ?ROUTE_TAB_UNIFIED + ). + +cleanup_routes_regular(Node) -> Patterns = [ #route{_ = '_', dest = Node}, #route{_ = '_', dest = {'_', Node}} ], - [ - mnesia:delete_object(?ROUTE_TAB, Route, write) - || Pat <- Patterns, - Route <- mnesia:match_object(?ROUTE_TAB, Pat, write) - ]. + mria:transaction(?ROUTE_SHARD, fun() -> + [ + mnesia:delete_object(?ROUTE_TAB, Route, write) + || Pat <- Patterns, + Route <- mnesia:match_object(?ROUTE_TAB, Pat, write) + ] + end). -spec foldl_routes(fun((emqx_types:route(), Acc) -> Acc), Acc) -> Acc. foldl_routes(FoldFun, AccIn) -> - ets:foldl(FoldFun, AccIn, ?ROUTE_TAB). + case is_unified_table_active() of + true -> + ets:foldl(mk_fold_fun_unified(FoldFun), AccIn, ?ROUTE_TAB_UNIFIED); + false -> + ets:foldl(FoldFun, AccIn, ?ROUTE_TAB) + end. -spec foldr_routes(fun((emqx_types:route(), Acc) -> Acc), Acc) -> Acc. foldr_routes(FoldFun, AccIn) -> - ets:foldr(FoldFun, AccIn, ?ROUTE_TAB). + case is_unified_table_active() of + true -> + ets:foldr(mk_fold_fun_unified(FoldFun), AccIn, ?ROUTE_TAB_UNIFIED); + false -> + ets:foldr(FoldFun, AccIn, ?ROUTE_TAB) + end. + +mk_fold_fun_unified(FoldFun) -> + fun(#routeidx{entry = K}, Acc) -> FoldFun(match_to_route(K), Acc) end. call(Router, Msg) -> gen_server:call(Router, Msg, infinity). diff --git a/apps/emqx/src/emqx_router_helper.erl b/apps/emqx/src/emqx_router_helper.erl index 61573fcff..77f1cd11b 100644 --- a/apps/emqx/src/emqx_router_helper.erl +++ b/apps/emqx/src/emqx_router_helper.erl @@ -148,11 +148,13 @@ handle_info({mnesia_table_event, Event}, State) -> handle_info({nodedown, Node}, State = #{nodes := Nodes}) -> case mria_rlog:role() of core -> + % TODO + % Node may flap, do we need to wait for any pending cleanups in `init/1` + % on the flapping node? + % This also implies changing lock id to `{?LOCK, Node}`. global:trans( {?LOCK, self()}, - fun() -> - mria:transaction(?ROUTE_SHARD, fun ?MODULE:cleanup_routes/1, [Node]) - end + fun() -> cleanup_routes(Node) end ), ok = mria:dirty_delete(?ROUTING_NODE, Node); replicant -> diff --git a/apps/emqx/src/emqx_topic_index.erl b/apps/emqx/src/emqx_topic_index.erl index 0b153ac01..5db8ce7cc 100644 --- a/apps/emqx/src/emqx_topic_index.erl +++ b/apps/emqx/src/emqx_topic_index.erl @@ -24,6 +24,8 @@ -export([match/2]). -export([matches/3]). +-export([make_key/2]). + -export([get_id/1]). -export([get_topic/1]). -export([get_record/2]). @@ -42,14 +44,18 @@ new() -> %% between regular and "materialized" indexes, for example. -spec insert(emqx_types:topic(), _ID, _Record, ets:table()) -> true. insert(Filter, ID, Record, Tab) -> - Key = key(Filter, ID), + Key = make_key(Filter, ID), true = ets:insert(Tab, {Key, Record}). %% @doc Delete an entry from the index that associates given topic filter to given %% record ID. Deleting non-existing entry is not an error. -spec delete(emqx_types:topic(), _ID, ets:table()) -> true. delete(Filter, ID, Tab) -> - true = ets:delete(Tab, key(Filter, ID)). + ets:delete(Tab, make_key(Filter, ID)). + +-spec make_key(emqx_types:topic(), ID) -> key(ID). +make_key(TopicOrFilter, ID) -> + emqx_trie_search:make_key(TopicOrFilter, ID). %% @doc Match given topic against the index and return the first match, or `false` if %% no match is found. @@ -84,8 +90,5 @@ get_record(K, Tab) -> [] end. -key(TopicOrFilter, ID) -> - emqx_trie_search:make_key(TopicOrFilter, ID). - make_nextf(Tab) -> fun(Key) -> ets:next(Tab, Key) end. diff --git a/apps/emqx/test/emqx_router_SUITE.erl b/apps/emqx/test/emqx_router_SUITE.erl index 453f86257..04029e822 100644 --- a/apps/emqx/test/emqx_router_SUITE.erl +++ b/apps/emqx/test/emqx_router_SUITE.erl @@ -26,24 +26,37 @@ -define(R, emqx_router). -all() -> emqx_common_test_helpers:all(?MODULE). - -init_per_suite(Config) -> - PrevBootModules = application:get_env(emqx, boot_modules), - emqx_common_test_helpers:boot_modules([router]), - emqx_common_test_helpers:start_apps([]), +all() -> [ - {prev_boot_modules, PrevBootModules} - | Config + {group, routing_table_regular}, + {group, routing_table_unified} ]. -end_per_suite(Config) -> - PrevBootModules = ?config(prev_boot_modules, Config), - case PrevBootModules of - undefined -> ok; - {ok, Mods} -> emqx_common_test_helpers:boot_modules(Mods) - end, - emqx_common_test_helpers:stop_apps([]). +groups() -> + TCs = emqx_common_test_helpers:all(?MODULE), + [ + {routing_table_regular, [], TCs}, + {routing_table_unified, [], TCs} + ]. + +init_per_group(GroupName, Config) -> + WorkDir = filename:join([?config(priv_dir, Config), GroupName]), + AppSpecs = [ + {emqx, #{ + config => mk_config(GroupName), + override_env => [{boot_modules, [router]}] + }} + ], + Apps = emqx_cth_suite:start(AppSpecs, #{work_dir => WorkDir}), + [{group_apps, Apps} | Config]. + +end_per_group(_GroupName, Config) -> + ok = emqx_cth_suite:stop(?config(group_apps, Config)). + +mk_config(routing_table_regular) -> + "broker.unified_routing_table = false"; +mk_config(routing_table_unified) -> + "broker.unified_routing_table = true". init_per_testcase(_TestCase, Config) -> clear_tables(), @@ -177,5 +190,5 @@ t_unexpected(_) -> clear_tables() -> lists:foreach( fun mnesia:clear_table/1, - [?ROUTE_TAB, ?TRIE, emqx_trie_node] + [?ROUTE_TAB, ?ROUTE_TAB_UNIFIED, ?TRIE] ). diff --git a/apps/emqx/test/emqx_router_helper_SUITE.erl b/apps/emqx/test/emqx_router_helper_SUITE.erl index c0796288e..12a3a34dd 100644 --- a/apps/emqx/test/emqx_router_helper_SUITE.erl +++ b/apps/emqx/test/emqx_router_helper_SUITE.erl @@ -26,32 +26,38 @@ -define(ROUTER_HELPER, emqx_router_helper). -all() -> emqx_common_test_helpers:all(?MODULE). +all() -> + [ + {group, routing_table_regular}, + {group, routing_table_unified} + ]. -init_per_suite(Config) -> - DistPid = - case net_kernel:nodename() of - ignored -> - %% calling `net_kernel:start' without `epmd' - %% running will result in a failure. - emqx_common_test_helpers:start_epmd(), - {ok, Pid} = net_kernel:start(['test@127.0.0.1', longnames]), - Pid; - _ -> - undefined - end, - emqx_common_test_helpers:start_apps([]), - [{dist_pid, DistPid} | Config]. +groups() -> + TCs = emqx_common_test_helpers:all(?MODULE), + [ + {routing_table_regular, [], TCs}, + {routing_table_unified, [], TCs} + ]. -end_per_suite(Config) -> - DistPid = ?config(dist_pid, Config), - case DistPid of - Pid when is_pid(Pid) -> - net_kernel:stop(); - _ -> - ok - end, - emqx_common_test_helpers:stop_apps([]). +init_per_group(GroupName, Config) -> + WorkDir = filename:join([?config(priv_dir, Config), GroupName]), + AppSpecs = [{emqx, mk_config(GroupName)}], + Apps = emqx_cth_suite:start(AppSpecs, #{work_dir => WorkDir}), + [{group_name, GroupName}, {group_apps, Apps} | Config]. + +end_per_group(_GroupName, Config) -> + ok = emqx_cth_suite:stop(?config(group_apps, Config)). + +mk_config(routing_table_regular) -> + #{ + config => "broker.unified_routing_table = false", + override_env => [{boot_modules, [router]}] + }; +mk_config(routing_table_unified) -> + #{ + config => "broker.unified_routing_table = true", + override_env => [{boot_modules, [router]}] + }. init_per_testcase(TestCase, Config) when TestCase =:= t_cleanup_membership_mnesia_down; @@ -59,7 +65,16 @@ init_per_testcase(TestCase, Config) when TestCase =:= t_cleanup_monitor_node_down -> ok = snabbkaffe:start_trace(), - Slave = emqx_common_test_helpers:start_slave(some_node, []), + WorkDir = filename:join([?config(priv_dir, Config), ?config(group_name, Config), TestCase]), + [Slave] = emqx_cth_cluster:start( + [ + {?MODULE, #{ + apps => [{emqx, mk_config(?config(group_name, Config))}], + join_to => node() + }} + ], + #{work_dir => WorkDir} + ), [{slave, Slave} | Config]; init_per_testcase(_TestCase, Config) -> Config. @@ -70,9 +85,8 @@ end_per_testcase(TestCase, Config) when TestCase =:= t_cleanup_monitor_node_down -> Slave = ?config(slave, Config), - emqx_common_test_helpers:stop_slave(Slave), - mria:clear_table(?ROUTE_TAB), - snabbkaffe:stop(), + ok = emqx_cth_cluster:stop([Slave]), + ok = snabbkaffe:stop(), ok; end_per_testcase(_TestCase, _Config) -> ok.