refactor: move common router parts out

This commit is contained in:
Tobias Lindahl 2021-07-02 13:25:54 +02:00
parent 6497dc30b7
commit 1c274b15d2
4 changed files with 151 additions and 225 deletions

View File

@ -119,8 +119,10 @@ do_add_route(Topic, Dest) when is_binary(Topic) ->
ok = emqx_router_helper:monitor(Dest),
case emqx_topic:wildcard(Topic) of
true ->
maybe_trans(fun insert_trie_route/1, [Route]);
false -> insert_direct_route(Route)
Fun = fun emqx_router_utils:insert_trie_route/2,
emqx_router_utils:maybe_trans(Fun, [?ROUTE_TAB, Route], ?ROUTE_SHARD);
false ->
emqx_router_utils:insert_direct_route(?ROUTE_TAB, Route)
end
end.
@ -165,8 +167,10 @@ do_delete_route(Topic, Dest) ->
Route = #route{topic = Topic, dest = Dest},
case emqx_topic:wildcard(Topic) of
true ->
maybe_trans(fun delete_trie_route/1, [Route]);
false -> delete_direct_route(Route)
Fun = fun emqx_router_utils:delete_trie_route/2,
emqx_router_utils:maybe_trans(Fun, [?ROUTE_TAB, Route], ?ROUTE_SHARD);
false ->
emqx_router_utils:delete_direct_route(?ROUTE_TAB, Route)
end.
-spec(topics() -> list(emqx_topic:topic())).
@ -219,100 +223,3 @@ terminate(_Reason, #{pool := Pool, id := Id}) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
insert_direct_route(Route) ->
ekka_mnesia:dirty_write(?ROUTE_TAB, Route).
insert_trie_route(Route = #route{topic = Topic}) ->
case mnesia:wread({?ROUTE_TAB, Topic}) of
[] -> emqx_trie:insert(Topic);
_ -> ok
end,
mnesia:write(?ROUTE_TAB, Route, sticky_write).
delete_direct_route(Route) ->
ekka_mnesia:dirty_delete_object(?ROUTE_TAB, Route).
delete_trie_route(Route = #route{topic = Topic}) ->
case mnesia:wread({?ROUTE_TAB, Topic}) of
[Route] -> %% Remove route and trie
ok = mnesia:delete_object(?ROUTE_TAB, Route, sticky_write),
emqx_trie:delete(Topic);
[_|_] -> %% Remove route only
mnesia:delete_object(?ROUTE_TAB, Route, sticky_write);
[] -> ok
end.
%% @private
-spec(maybe_trans(function(), list(any())) -> ok | {error, term()}).
maybe_trans(Fun, Args) ->
case emqx_config:get([broker, perf, route_lock_type]) of
key ->
trans(Fun, Args);
global ->
%% Assert:
mnesia = ekka_rlog:backend(), %% TODO: do something smarter than just crash
lock_router(),
try mnesia:sync_dirty(Fun, Args)
after
unlock_router()
end;
tab ->
trans(fun() ->
emqx_trie:lock_tables(),
apply(Fun, Args)
end, [])
end.
%% The created fun only terminates with explicit exception
-dialyzer({nowarn_function, [trans/2]}).
-spec(trans(function(), list(any())) -> ok | {error, term()}).
trans(Fun, Args) ->
{WPid, RefMon} =
spawn_monitor(
%% NOTE: this is under the assumption that crashes in Fun
%% are caught by mnesia:transaction/2.
%% Future changes should keep in mind that this process
%% always exit with database write result.
fun() ->
Res = case ekka_mnesia:transaction(?ROUTE_SHARD, Fun, Args) of
{atomic, Ok} -> Ok;
{aborted, Reason} -> {error, Reason}
end,
exit({shutdown, Res})
end),
%% Receive a 'shutdown' exit to pass result from the short-lived process.
%% so the receive below can be receive-mark optimized by the compiler.
%%
%% If the result is sent as a regular message, we'll have to
%% either demonitor (with flush which is essentially a 'receive' since
%% the process is no longer alive after the result has been received),
%% or use a plain 'receive' to drain the normal 'DOWN' message.
%% However the compiler does not optimize this second 'receive'.
receive
{'DOWN', RefMon, process, WPid, Info} ->
case Info of
{shutdown, Result} -> Result;
_ -> {error, {trans_crash, Info}}
end
end.
lock_router() ->
%% if Retry is not 0, global:set_lock could sleep a random time up to 8s.
%% Considering we have a limited number of brokers, it is safe to use sleep 1 ms.
case global:set_lock({?MODULE, self()}, [node() | nodes()], 0) of
false ->
%% Force to sleep 1ms instead.
timer:sleep(1),
lock_router();
true ->
ok
end.
unlock_router() ->
global:del_lock({?MODULE, self()}).

View File

@ -0,0 +1,126 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2017-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_router_utils).
-include("emqx.hrl").
-export([ delete_direct_route/2
, delete_trie_route/2
, insert_direct_route/2
, insert_trie_route/2
, maybe_trans/3
]).
insert_direct_route(Tab, Route) ->
ekka_mnesia:dirty_write(Tab, Route).
insert_trie_route(RouteTab, Route = #route{topic = Topic}) ->
case mnesia:wread({RouteTab, Topic}) of
[] when RouteTab =:= emqx_route -> emqx_trie:insert(Topic);
[] when RouteTab =:= emqx_session_route -> emqx_trie:insert_session(Topic);
_ -> ok
end,
mnesia:write(RouteTab, Route, sticky_write).
delete_direct_route(RouteTab, Route) ->
ekka_mnesia:dirty_delete_object(RouteTab, Route).
delete_trie_route(RouteTab, Route = #route{topic = Topic}) ->
case mnesia:wread({RouteTab, Topic}) of
[R] when R =:= Route ->
%% Remove route and trie
ok = mnesia:delete_object(RouteTab, Route, sticky_write),
case RouteTab of
emqx_route -> emqx_trie:delete(Topic);
emqx_session_route -> emqx_trie:delete_session(Topic)
end;
[_|_] ->
%% Remove route only
mnesia:delete_object(RouteTab, Route, sticky_write);
[] ->
ok
end.
%% @private
-spec(maybe_trans(function(), list(any()), Shard :: atom()) -> ok | {error, term()}).
maybe_trans(Fun, Args, Shard) ->
case emqx_config:get([broker, perf, route_lock_type]) of
key ->
trans(Fun, Args, Shard);
global ->
%% Assert:
mnesia = ekka_rlog:backend(), %% TODO: do something smarter than just crash
lock_router(Shard),
try mnesia:sync_dirty(Fun, Args)
after
unlock_router(Shard)
end;
tab ->
trans(fun() ->
emqx_trie:lock_tables(),
apply(Fun, Args)
end, [], Shard)
end.
%% The created fun only terminates with explicit exception
-dialyzer({nowarn_function, [trans/3]}).
-spec(trans(function(), list(any()), atom()) -> ok | {error, term()}).
trans(Fun, Args, Shard) ->
{WPid, RefMon} =
spawn_monitor(
%% NOTE: this is under the assumption that crashes in Fun
%% are caught by mnesia:transaction/2.
%% Future changes should keep in mind that this process
%% always exit with database write result.
fun() ->
Res = case ekka_mnesia:transaction(Shard, Fun, Args) of
{atomic, Ok} -> Ok;
{aborted, Reason} -> {error, Reason}
end,
exit({shutdown, Res})
end),
%% Receive a 'shutdown' exit to pass result from the short-lived process.
%% so the receive below can be receive-mark optimized by the compiler.
%%
%% If the result is sent as a regular message, we'll have to
%% either demonitor (with flush which is essentially a 'receive' since
%% the process is no longer alive after the result has been received),
%% or use a plain 'receive' to drain the normal 'DOWN' message.
%% However the compiler does not optimize this second 'receive'.
receive
{'DOWN', RefMon, process, WPid, Info} ->
case Info of
{shutdown, Result} -> Result;
_ -> {error, {trans_crash, Info}}
end
end.
lock_router(Shard) ->
%% if Retry is not 0, global:set_lock could sleep a random time up to 8s.
%% Considering we have a limited number of brokers, it is safe to use sleep 1 ms.
case global:set_lock({{?MODULE, Shard}, self()}, [node() | nodes()], 0) of
false ->
%% Force to sleep 1ms instead.
timer:sleep(1),
lock_router(Shard);
true ->
ok
end.
unlock_router(Shard) ->
global:del_lock({{?MODULE, Shard}, self()}).

View File

@ -35,15 +35,8 @@
%% Route APIs
-export([ do_add_route/2
]).
-export([ delete_route/2
, do_delete_route/2
]).
-export([ match_routes/1
, lookup_routes/1
, has_routes/1
, match_routes/1
]).
-export([ persist/1
@ -53,8 +46,6 @@
-export([print_routes/1]).
-export([topics/0]).
%% gen_server callbacks
-export([ init/1
, handle_call/3
@ -136,8 +127,12 @@ do_add_route(Topic, SessionID) when is_binary(Topic) ->
true -> ok;
false ->
case emqx_topic:wildcard(Topic) of
true -> maybe_trans(fun insert_trie_route/1, [Route]);
false -> insert_direct_route(Route)
true ->
Fun = fun emqx_router_utils:insert_trie_route/2,
emqx_router_utils:maybe_trans(Fun, [?ROUTE_TAB, Route],
?PERSISTENT_SESSION_SHARD);
false ->
emqx_router_utils:insert_direct_route(?ROUTE_TAB, Route)
end
end.
@ -157,30 +152,17 @@ match_trie(Topic) ->
false -> emqx_trie:match_session(Topic)
end.
-spec(lookup_routes(emqx_topic:topic()) -> [emqx_types:route()]).
lookup_routes(Topic) ->
ets:lookup(?ROUTE_TAB, Topic).
-spec(has_routes(emqx_topic:topic()) -> boolean()).
has_routes(Topic) when is_binary(Topic) ->
ets:member(?ROUTE_TAB, Topic).
-spec(delete_route(emqx_topic:topic(), dest()) -> ok | {error, term()}).
delete_route(Topic, SessionID) when is_binary(Topic), is_binary(SessionID) ->
call(pick(Topic), {delete_route, Topic, SessionID}).
-spec(do_delete_route(emqx_topic:topic(), dest()) -> ok | {error, term()}).
do_delete_route(Topic, SessionID) ->
Route = #route{topic = Topic, dest = SessionID},
case emqx_topic:wildcard(Topic) of
true -> maybe_trans(fun delete_trie_route/1, [Route]);
false -> delete_direct_route(Route)
true ->
Fun = fun emqx_router_utils:delete_trie_route/2,
emqx_router_utils:maybe_trans(Fun, [?ROUTE_TAB, Route], ?PERSISTENT_SESSION_SHARD);
false ->
emqx_router_utils:delete_direct_route(?ROUTE_TAB, Route)
end.
-spec(topics() -> list(emqx_topic:topic())).
topics() ->
mnesia:dirty_all_keys(?ROUTE_TAB).
%% @doc Print routes to a topic
-spec(print_routes(emqx_topic:topic()) -> ok).
print_routes(Topic) ->
@ -199,7 +181,7 @@ persist(Msg) ->
case match_routes(emqx_message:topic(Msg)) of
[] -> ok;
Routes ->
mnesia:dirty_write(?MSG_TAB, Msg),
ekka_mnesia:dirty_write(?MSG_TAB, Msg),
Fun = fun(Route) -> cast(pick(Route), {persist, Route, Msg}) end,
lists:foreach(Fun, Routes)
end
@ -265,6 +247,9 @@ code_change(_OldVsn, State, _Extra) ->
%% Internal functions
%%--------------------------------------------------------------------
lookup_routes(Topic) ->
ets:lookup(?ROUTE_TAB, Topic).
pending_messages(SessionID) ->
%% TODO: The reading of messages should be from external DB
Fun = fun() -> [hd(mnesia:read(?MSG_TAB, MsgId))
@ -297,96 +282,3 @@ pending_messages(SessionID, PrevMsgId, PrevTag, Acc) ->
?UNDELIVERED -> lists:reverse([PrevMsgId|Acc])
end
end.
insert_direct_route(Route) ->
ekka_mnesia:dirty_write(?ROUTE_TAB, Route).
insert_trie_route(Route = #route{topic = Topic}) ->
case mnesia:wread({?ROUTE_TAB, Topic}) of
[] -> emqx_trie:insert_session(Topic);
_ -> ok
end,
mnesia:write(?ROUTE_TAB, Route, sticky_write).
delete_direct_route(Route) ->
mnesia:dirty_delete_object(?ROUTE_TAB, Route).
delete_trie_route(Route = #route{topic = Topic}) ->
case mnesia:wread({?ROUTE_TAB, Topic}) of
[Route] -> %% Remove route and trie
ok = mnesia:delete_object(?ROUTE_TAB, Route, sticky_write),
emqx_trie:delete(Topic);
[_|_] -> %% Remove route only
mnesia:delete_object(?ROUTE_TAB, Route, sticky_write);
[] -> ok
end.
%% @private
-spec(maybe_trans(function(), list(any())) -> ok | {error, term()}).
maybe_trans(Fun, Args) ->
case persistent_term:get(emqx_route_lock_type) of
key ->
trans(Fun, Args);
global ->
%% Assert:
mnesia = ekka_rlog:backend(), %% TODO: do something smarter than just crash
lock_router(),
try mnesia:sync_dirty(Fun, Args)
after
unlock_router()
end;
tab ->
trans(fun() ->
emqx_trie:lock_session_tables(),
apply(Fun, Args)
end, [])
end.
%% The created fun only terminates with explicit exception
-dialyzer({nowarn_function, [trans/2]}).
-spec(trans(function(), list(any())) -> ok | {error, term()}).
trans(Fun, Args) ->
{WPid, RefMon} =
spawn_monitor(
%% NOTE: this is under the assumption that crashes in Fun
%% are caught by mnesia:transaction/2.
%% Future changes should keep in mind that this process
%% always exit with database write result.
fun() ->
Res = case ekka_mnesia:transaction(?PERSISTENT_SESSION_SHARD, Fun, Args) of
{atomic, Ok} -> Ok;
{aborted, Reason} -> {error, Reason}
end,
exit({shutdown, Res})
end),
%% Receive a 'shutdown' exit to pass result from the short-lived process.
%% so the receive below can be receive-mark optimized by the compiler.
%%
%% If the result is sent as a regular message, we'll have to
%% either demonitor (with flush which is essentially a 'receive' since
%% the process is no longer alive after the result has been received),
%% or use a plain 'receive' to drain the normal 'DOWN' message.
%% However the compiler does not optimize this second 'receive'.
receive
{'DOWN', RefMon, process, WPid, Info} ->
case Info of
{shutdown, Result} -> Result;
_ -> {error, {trans_crash, Info}}
end
end.
lock_router() ->
%% if Retry is not 0, global:set_lock could sleep a random time up to 8s.
%% Considering we have a limited number of brokers, it is safe to use sleep 1 ms.
case global:set_lock({?MODULE, self()}, [node() | nodes()], 0) of
false ->
%% Force to sleep 1ms instead.
timer:sleep(1),
lock_router();
true ->
ok
end.
unlock_router() ->
global:del_lock({?MODULE, self()}).

View File

@ -57,6 +57,7 @@
}).
-rlog_shard({?ROUTE_SHARD, ?TRIE}).
-rlog_shard({?PERSISTENT_SESSION_SHARD, ?SESSION_TRIE}).
%%--------------------------------------------------------------------
%% Mnesia bootstrap