From fc7b4c0009d0e43efc7d8369d2cd189f531dde6f Mon Sep 17 00:00:00 2001 From: Tobias Lindahl Date: Mon, 30 Aug 2021 10:41:15 +0200 Subject: [PATCH] refactor: make it possible to use different tries In preparation for persistent sessions --- apps/emqx/src/emqx_router.erl | 109 ++---------------------- apps/emqx/src/emqx_router_utils.erl | 126 ++++++++++++++++++++++++++++ apps/emqx/src/emqx_trie.erl | 102 ++++++++++++---------- apps/emqx/test/emqx_trie_SUITE.erl | 2 +- 4 files changed, 191 insertions(+), 148 deletions(-) create mode 100644 apps/emqx/src/emqx_router_utils.erl diff --git a/apps/emqx/src/emqx_router.erl b/apps/emqx/src/emqx_router.erl index 3337b57c8..81d3786e3 100644 --- a/apps/emqx/src/emqx_router.erl +++ b/apps/emqx/src/emqx_router.erl @@ -116,8 +116,10 @@ do_add_route(Topic, Dest) when is_binary(Topic) -> ok = emqx_router_helper:monitor(Dest), case emqx_topic:wildcard(Topic) of true -> - maybe_trans(fun insert_trie_route/1, [Route]); - false -> insert_direct_route(Route) + Fun = fun emqx_router_utils:insert_trie_route/2, + emqx_router_utils:maybe_trans(Fun, [?ROUTE_TAB, Route], ?ROUTE_SHARD); + false -> + emqx_router_utils:insert_direct_route(?ROUTE_TAB, Route) end end. @@ -162,8 +164,10 @@ do_delete_route(Topic, Dest) -> Route = #route{topic = Topic, dest = Dest}, case emqx_topic:wildcard(Topic) of true -> - maybe_trans(fun delete_trie_route/1, [Route]); - false -> delete_direct_route(Route) + Fun = fun emqx_router_utils:delete_trie_route/2, + emqx_router_utils:maybe_trans(Fun, [?ROUTE_TAB, Route], ?ROUTE_SHARD); + false -> + emqx_router_utils:delete_direct_route(?ROUTE_TAB, Route) end. -spec(topics() -> list(emqx_types:topic())). @@ -216,100 +220,3 @@ terminate(_Reason, #{pool := Pool, id := Id}) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. - -%%-------------------------------------------------------------------- -%% Internal functions -%%-------------------------------------------------------------------- - -insert_direct_route(Route) -> - mria:dirty_write(?ROUTE_TAB, Route). - -insert_trie_route(Route = #route{topic = Topic}) -> - case mnesia:wread({?ROUTE_TAB, Topic}) of - [] -> emqx_trie:insert(Topic); - _ -> ok - end, - mnesia:write(?ROUTE_TAB, Route, sticky_write). - -delete_direct_route(Route) -> - mria:dirty_delete_object(?ROUTE_TAB, Route). - -delete_trie_route(Route = #route{topic = Topic}) -> - case mnesia:wread({?ROUTE_TAB, Topic}) of - [Route] -> %% Remove route and trie - ok = mnesia:delete_object(?ROUTE_TAB, Route, sticky_write), - emqx_trie:delete(Topic); - [_|_] -> %% Remove route only - mnesia:delete_object(?ROUTE_TAB, Route, sticky_write); - [] -> ok - end. - -%% @private --spec(maybe_trans(function(), list(any())) -> ok | {error, term()}). -maybe_trans(Fun, Args) -> - case emqx:get_config([broker, perf, route_lock_type]) of - key -> - trans(Fun, Args); - global -> - %% Assert: - mnesia = mria_rlog:backend(), %% TODO: do something smarter than just crash - lock_router(), - try mnesia:sync_dirty(Fun, Args) - after - unlock_router() - end; - tab -> - trans(fun() -> - emqx_trie:lock_tables(), - apply(Fun, Args) - end, []) - end. - -%% The created fun only terminates with explicit exception --dialyzer({nowarn_function, [trans/2]}). - --spec(trans(function(), list(any())) -> ok | {error, term()}). -trans(Fun, Args) -> - {WPid, RefMon} = - spawn_monitor( - %% NOTE: this is under the assumption that crashes in Fun - %% are caught by mria:transaction/2. - %% Future changes should keep in mind that this process - %% always exit with database write result. - fun() -> - Res = case mria:transaction(?ROUTE_SHARD, Fun, Args) of - {atomic, Ok} -> Ok; - {aborted, Reason} -> {error, Reason} - end, - exit({shutdown, Res}) - end), - %% Receive a 'shutdown' exit to pass result from the short-lived process. - %% so the receive below can be receive-mark optimized by the compiler. - %% - %% If the result is sent as a regular message, we'll have to - %% either demonitor (with flush which is essentially a 'receive' since - %% the process is no longer alive after the result has been received), - %% or use a plain 'receive' to drain the normal 'DOWN' message. - %% However the compiler does not optimize this second 'receive'. - receive - {'DOWN', RefMon, process, WPid, Info} -> - case Info of - {shutdown, Result} -> Result; - _ -> {error, {trans_crash, Info}} - end - end. - -lock_router() -> - %% if Retry is not 0, global:set_lock could sleep a random time up to 8s. - %% Considering we have a limited number of brokers, it is safe to use sleep 1 ms. - case global:set_lock({?MODULE, self()}, [node() | nodes()], 0) of - false -> - %% Force to sleep 1ms instead. - timer:sleep(1), - lock_router(); - true -> - ok - end. - -unlock_router() -> - global:del_lock({?MODULE, self()}). diff --git a/apps/emqx/src/emqx_router_utils.erl b/apps/emqx/src/emqx_router_utils.erl new file mode 100644 index 000000000..147e0e979 --- /dev/null +++ b/apps/emqx/src/emqx_router_utils.erl @@ -0,0 +1,126 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2017-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_router_utils). + +-include("emqx.hrl"). + +-export([ delete_direct_route/2 + , delete_trie_route/2 + , insert_direct_route/2 + , insert_trie_route/2 + , maybe_trans/3 + ]). + +insert_direct_route(Tab, Route) -> + mria:dirty_write(Tab, Route). + +insert_trie_route(RouteTab, Route = #route{topic = Topic}) -> + case mnesia:wread({RouteTab, Topic}) of + [] when RouteTab =:= emqx_route -> emqx_trie:insert(Topic); + [] when RouteTab =:= emqx_session_route -> emqx_trie:insert_session(Topic); + _ -> ok + end, + mnesia:write(RouteTab, Route, sticky_write). + +delete_direct_route(RouteTab, Route) -> + mria:dirty_delete_object(RouteTab, Route). + +delete_trie_route(RouteTab, Route = #route{topic = Topic}) -> + case mnesia:wread({RouteTab, Topic}) of + [R] when R =:= Route -> + %% Remove route and trie + ok = mnesia:delete_object(RouteTab, Route, sticky_write), + case RouteTab of + emqx_route -> emqx_trie:delete(Topic); + emqx_session_route -> emqx_trie:delete_session(Topic) + end; + [_|_] -> + %% Remove route only + mnesia:delete_object(RouteTab, Route, sticky_write); + [] -> + ok + end. + +%% @private +-spec(maybe_trans(function(), list(any()), Shard :: atom()) -> ok | {error, term()}). +maybe_trans(Fun, Args, Shard) -> + case emqx:get_config([broker, perf, route_lock_type]) of + key -> + trans(Fun, Args, Shard); + global -> + %% Assert: + mnesia = ekka_rlog:backend(), %% TODO: do something smarter than just crash + lock_router(Shard), + try mnesia:sync_dirty(Fun, Args) + after + unlock_router(Shard) + end; + tab -> + trans(fun() -> + emqx_trie:lock_tables(), + apply(Fun, Args) + end, [], Shard) + end. + +%% The created fun only terminates with explicit exception +-dialyzer({nowarn_function, [trans/3]}). + +-spec(trans(function(), list(any()), atom()) -> ok | {error, term()}). +trans(Fun, Args, Shard) -> + {WPid, RefMon} = + spawn_monitor( + %% NOTE: this is under the assumption that crashes in Fun + %% are caught by mnesia:transaction/2. + %% Future changes should keep in mind that this process + %% always exit with database write result. + fun() -> + Res = case mria:transaction(Shard, Fun, Args) of + {atomic, Ok} -> Ok; + {aborted, Reason} -> {error, Reason} + end, + exit({shutdown, Res}) + end), + %% Receive a 'shutdown' exit to pass result from the short-lived process. + %% so the receive below can be receive-mark optimized by the compiler. + %% + %% If the result is sent as a regular message, we'll have to + %% either demonitor (with flush which is essentially a 'receive' since + %% the process is no longer alive after the result has been received), + %% or use a plain 'receive' to drain the normal 'DOWN' message. + %% However the compiler does not optimize this second 'receive'. + receive + {'DOWN', RefMon, process, WPid, Info} -> + case Info of + {shutdown, Result} -> Result; + _ -> {error, {trans_crash, Info}} + end + end. + +lock_router(Shard) -> + %% if Retry is not 0, global:set_lock could sleep a random time up to 8s. + %% Considering we have a limited number of brokers, it is safe to use sleep 1 ms. + case global:set_lock({{?MODULE, Shard}, self()}, [node() | nodes()], 0) of + false -> + %% Force to sleep 1ms instead. + timer:sleep(1), + lock_router(Shard); + true -> + ok + end. + +unlock_router(Shard) -> + global:del_lock({{?MODULE, Shard}, self()}). diff --git a/apps/emqx/src/emqx_trie.erl b/apps/emqx/src/emqx_trie.erl index 4354a2bab..ae439e56a 100644 --- a/apps/emqx/src/emqx_trie.erl +++ b/apps/emqx/src/emqx_trie.erl @@ -75,24 +75,32 @@ mnesia(boot) -> %% @doc Insert a topic filter into the trie. -spec(insert(emqx_types:topic()) -> ok). insert(Topic) when is_binary(Topic) -> + insert(Topic, ?TRIE). + +insert(Topic, Trie) when is_binary(Topic) -> {TopicKey, PrefixKeys} = make_keys(Topic), - case mnesia:wread({?TRIE, TopicKey}) of + case mnesia:wread({Trie, TopicKey}) of [_] -> ok; %% already inserted - [] -> lists:foreach(fun insert_key/1, [TopicKey | PrefixKeys]) + [] -> lists:foreach(fun(Key) -> insert_key(Key, Trie) end, [TopicKey | PrefixKeys]) end. %% @doc Delete a topic filter from the trie. -spec(delete(emqx_types:topic()) -> ok). delete(Topic) when is_binary(Topic) -> + delete(Topic, ?TRIE). + +delete(Topic, Trie) when is_binary(Topic) -> {TopicKey, PrefixKeys} = make_keys(Topic), - case [] =/= mnesia:wread({?TRIE, TopicKey}) of - true -> lists:foreach(fun delete_key/1, [TopicKey | PrefixKeys]); + case [] =/= mnesia:wread({Trie, TopicKey}) of + true -> lists:foreach(fun(Key) -> delete_key(Key, Trie) end, [TopicKey | PrefixKeys]); false -> ok end. %% @doc Find trie nodes that matchs the topic name. -spec(match(emqx_types:topic()) -> list(emqx_types:topic())). match(Topic) when is_binary(Topic) -> + match(Topic, ?TRIE). +match(Topic, Trie) when is_binary(Topic) -> Words = emqx_topic:words(Topic), case emqx_topic:wildcard(Words) of true -> @@ -105,12 +113,14 @@ match(Topic) when is_binary(Topic) -> %% Such clients will get disconnected. []; false -> - do_match(Words) + do_match(Words, Trie) end. %% @doc Is the trie empty? -spec(empty() -> boolean()). -empty() -> ets:first(?TRIE) =:= '$end_of_table'. +empty() -> empty(?TRIE). + +empty(Trie) -> ets:first(Trie) =:= '$end_of_table'. -spec lock_tables() -> ok. lock_tables() -> @@ -163,70 +173,70 @@ make_prefixes([H | T], Prefix0, Acc0) -> Acc = [Prefix | Acc0], make_prefixes(T, Prefix, Acc). -insert_key(Key) -> - T = case mnesia:wread({?TRIE, Key}) of +insert_key(Key, Trie) -> + T = case mnesia:wread({Trie, Key}) of [#?TRIE{count = C} = T1] -> T1#?TRIE{count = C + 1}; [] -> #?TRIE{key = Key, count = 1} end, - ok = mnesia:write(T). + ok = mnesia:write(Trie, T, write). -delete_key(Key) -> - case mnesia:wread({?TRIE, Key}) of +delete_key(Key, Trie) -> + case mnesia:wread({Trie, Key}) of [#?TRIE{count = C} = T] when C > 1 -> - ok = mnesia:write(T#?TRIE{count = C - 1}); + ok = mnesia:write(Trie, T#?TRIE{count = C - 1}, write); [_] -> - ok = mnesia:delete(?TRIE, Key, write); + ok = mnesia:delete(Trie, Key, write); [] -> ok end. %% micro-optimization: no need to lookup when topic is not wildcard %% because we only insert wildcards to emqx_trie -lookup_topic(_Topic, false) -> []; -lookup_topic(Topic, true) -> lookup_topic(Topic). +lookup_topic(_Topic,_Trie, false) -> []; +lookup_topic(Topic, Trie, true) -> lookup_topic(Topic, Trie). -lookup_topic(Topic) when is_binary(Topic) -> - case ets:lookup(?TRIE, ?TOPIC(Topic)) of +lookup_topic(Topic, Trie) when is_binary(Topic) -> + case ets:lookup(Trie, ?TOPIC(Topic)) of [#?TRIE{count = C}] -> [Topic || C > 0]; [] -> [] end. -has_prefix(empty) -> true; %% this is the virtual tree root -has_prefix(Prefix) -> - case ets:lookup(?TRIE, ?PREFIX(Prefix)) of +has_prefix(empty, _Trie) -> true; %% this is the virtual tree root +has_prefix(Prefix, Trie) -> + case ets:lookup(Trie, ?PREFIX(Prefix)) of [#?TRIE{count = C}] -> C > 0; [] -> false end. -do_match([<<"$", _/binary>> = Prefix | Words]) -> +do_match([<<"$", _/binary>> = Prefix | Words], Trie) -> %% For topics having dollar sign prefix, %% we do not match root level + or #, %% fast forward to the next level. case Words =:= [] of - true -> lookup_topic(Prefix); + true -> lookup_topic(Prefix, Trie); false -> [] - end ++ do_match(Words, Prefix); -do_match(Words) -> - do_match(Words, empty). + end ++ do_match(Words, Prefix, Trie); +do_match(Words, Trie) -> + do_match(Words, empty, Trie). -do_match(Words, Prefix) -> +do_match(Words, Prefix, Trie) -> case is_compact() of - true -> match_compact(Words, Prefix, false, []); - false -> match_no_compact(Words, Prefix, false, []) + true -> match_compact(Words, Prefix, Trie, false, []); + false -> match_no_compact(Words, Prefix, Trie, false, []) end. -match_no_compact([], Topic, IsWildcard, Acc) -> - 'match_#'(Topic) ++ %% try match foo/+/# or foo/bar/# - lookup_topic(Topic, IsWildcard) ++ %% e.g. foo/+ +match_no_compact([], Topic, Trie, IsWildcard, Acc) -> + 'match_#'(Topic, Trie) ++ %% try match foo/+/# or foo/bar/# + lookup_topic(Topic, Trie, IsWildcard) ++ %% e.g. foo/+ Acc; -match_no_compact([Word | Words], Prefix, IsWildcard, Acc0) -> - case has_prefix(Prefix) of +match_no_compact([Word | Words], Prefix, Trie, IsWildcard, Acc0) -> + case has_prefix(Prefix, Trie) of true -> - Acc1 = 'match_#'(Prefix) ++ Acc0, - Acc = match_no_compact(Words, join(Prefix, '+'), true, Acc1), - match_no_compact(Words, join(Prefix, Word), IsWildcard, Acc); + Acc1 = 'match_#'(Prefix, Trie) ++ Acc0, + Acc = match_no_compact(Words, join(Prefix, '+'), Trie, true, Acc1), + match_no_compact(Words, join(Prefix, Word), Trie, IsWildcard, Acc); false -> %% non-compact paths in database %% if there is no prefix matches the current topic prefix @@ -243,26 +253,26 @@ match_no_compact([Word | Words], Prefix, IsWildcard, Acc0) -> Acc0 end. -match_compact([], Topic, IsWildcard, Acc) -> - 'match_#'(Topic) ++ %% try match foo/bar/# - lookup_topic(Topic, IsWildcard) ++ %% try match foo/bar +match_compact([], Topic, Trie, IsWildcard, Acc) -> + 'match_#'(Topic, Trie) ++ %% try match foo/bar/# + lookup_topic(Topic, Trie, IsWildcard) ++ %% try match foo/bar Acc; -match_compact([Word | Words], Prefix, IsWildcard, Acc0) -> - Acc1 = 'match_#'(Prefix) ++ Acc0, - Acc = match_compact(Words, join(Prefix, Word), IsWildcard, Acc1), +match_compact([Word | Words], Prefix, Trie, IsWildcard, Acc0) -> + Acc1 = 'match_#'(Prefix, Trie) ++ Acc0, + Acc = match_compact(Words, join(Prefix, Word), Trie, IsWildcard, Acc1), WildcardPrefix = join(Prefix, '+'), %% go deeper to match current_prefix/+ only when: %% 1. current word is the last %% OR %% 2. there is a prefix = 'current_prefix/+' - case Words =:= [] orelse has_prefix(WildcardPrefix) of - true -> match_compact(Words, WildcardPrefix, true, Acc); + case Words =:= [] orelse has_prefix(WildcardPrefix, Trie) of + true -> match_compact(Words, WildcardPrefix, Trie, true, Acc); false -> Acc end. -'match_#'(Prefix) -> +'match_#'(Prefix, Trie) -> MlTopic = join(Prefix, '#'), - lookup_topic(MlTopic). + lookup_topic(MlTopic, Trie). is_compact() -> emqx:get_config([broker, perf, trie_compaction], true). diff --git a/apps/emqx/test/emqx_trie_SUITE.erl b/apps/emqx/test/emqx_trie_SUITE.erl index 00d64877c..769674abc 100644 --- a/apps/emqx/test/emqx_trie_SUITE.erl +++ b/apps/emqx/test/emqx_trie_SUITE.erl @@ -183,7 +183,7 @@ t_delete3(_) -> ?TRIE:delete(<<"sensor/+/unknown">>) end), ?assertEqual([], ?TRIE:match(<<"sensor">>)), - ?assertEqual([], ?TRIE:lookup_topic(<<"sensor/+">>)). + ?assertEqual([], ?TRIE:lookup_topic(<<"sensor/+">>, ?TRIE)). clear_tables() -> emqx_trie:clear_tables().