diff --git a/apps/emqx/src/emqx_router.erl b/apps/emqx/src/emqx_router.erl index 1a5e344f2..a38af3d6b 100644 --- a/apps/emqx/src/emqx_router.erl +++ b/apps/emqx/src/emqx_router.erl @@ -119,8 +119,10 @@ do_add_route(Topic, Dest) when is_binary(Topic) -> ok = emqx_router_helper:monitor(Dest), case emqx_topic:wildcard(Topic) of true -> - maybe_trans(fun insert_trie_route/1, [Route]); - false -> insert_direct_route(Route) + Fun = fun emqx_router_utils:insert_trie_route/2, + emqx_router_utils:maybe_trans(Fun, [?ROUTE_TAB, Route], ?ROUTE_SHARD); + false -> + emqx_router_utils:insert_direct_route(?ROUTE_TAB, Route) end end. @@ -165,8 +167,10 @@ do_delete_route(Topic, Dest) -> Route = #route{topic = Topic, dest = Dest}, case emqx_topic:wildcard(Topic) of true -> - maybe_trans(fun delete_trie_route/1, [Route]); - false -> delete_direct_route(Route) + Fun = fun emqx_router_utils:delete_trie_route/2, + emqx_router_utils:maybe_trans(Fun, [?ROUTE_TAB, Route], ?ROUTE_SHARD); + false -> + emqx_router_utils:delete_direct_route(?ROUTE_TAB, Route) end. -spec(topics() -> list(emqx_topic:topic())). @@ -219,100 +223,3 @@ terminate(_Reason, #{pool := Pool, id := Id}) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. - -%%-------------------------------------------------------------------- -%% Internal functions -%%-------------------------------------------------------------------- - -insert_direct_route(Route) -> - ekka_mnesia:dirty_write(?ROUTE_TAB, Route). - -insert_trie_route(Route = #route{topic = Topic}) -> - case mnesia:wread({?ROUTE_TAB, Topic}) of - [] -> emqx_trie:insert(Topic); - _ -> ok - end, - mnesia:write(?ROUTE_TAB, Route, sticky_write). - -delete_direct_route(Route) -> - ekka_mnesia:dirty_delete_object(?ROUTE_TAB, Route). - -delete_trie_route(Route = #route{topic = Topic}) -> - case mnesia:wread({?ROUTE_TAB, Topic}) of - [Route] -> %% Remove route and trie - ok = mnesia:delete_object(?ROUTE_TAB, Route, sticky_write), - emqx_trie:delete(Topic); - [_|_] -> %% Remove route only - mnesia:delete_object(?ROUTE_TAB, Route, sticky_write); - [] -> ok - end. - -%% @private --spec(maybe_trans(function(), list(any())) -> ok | {error, term()}). -maybe_trans(Fun, Args) -> - case emqx_config:get([broker, perf, route_lock_type]) of - key -> - trans(Fun, Args); - global -> - %% Assert: - mnesia = ekka_rlog:backend(), %% TODO: do something smarter than just crash - lock_router(), - try mnesia:sync_dirty(Fun, Args) - after - unlock_router() - end; - tab -> - trans(fun() -> - emqx_trie:lock_tables(), - apply(Fun, Args) - end, []) - end. - -%% The created fun only terminates with explicit exception --dialyzer({nowarn_function, [trans/2]}). - --spec(trans(function(), list(any())) -> ok | {error, term()}). -trans(Fun, Args) -> - {WPid, RefMon} = - spawn_monitor( - %% NOTE: this is under the assumption that crashes in Fun - %% are caught by mnesia:transaction/2. - %% Future changes should keep in mind that this process - %% always exit with database write result. - fun() -> - Res = case ekka_mnesia:transaction(?ROUTE_SHARD, Fun, Args) of - {atomic, Ok} -> Ok; - {aborted, Reason} -> {error, Reason} - end, - exit({shutdown, Res}) - end), - %% Receive a 'shutdown' exit to pass result from the short-lived process. - %% so the receive below can be receive-mark optimized by the compiler. - %% - %% If the result is sent as a regular message, we'll have to - %% either demonitor (with flush which is essentially a 'receive' since - %% the process is no longer alive after the result has been received), - %% or use a plain 'receive' to drain the normal 'DOWN' message. - %% However the compiler does not optimize this second 'receive'. - receive - {'DOWN', RefMon, process, WPid, Info} -> - case Info of - {shutdown, Result} -> Result; - _ -> {error, {trans_crash, Info}} - end - end. - -lock_router() -> - %% if Retry is not 0, global:set_lock could sleep a random time up to 8s. - %% Considering we have a limited number of brokers, it is safe to use sleep 1 ms. - case global:set_lock({?MODULE, self()}, [node() | nodes()], 0) of - false -> - %% Force to sleep 1ms instead. - timer:sleep(1), - lock_router(); - true -> - ok - end. - -unlock_router() -> - global:del_lock({?MODULE, self()}). diff --git a/apps/emqx/src/emqx_router_utils.erl b/apps/emqx/src/emqx_router_utils.erl new file mode 100644 index 000000000..3dd18f007 --- /dev/null +++ b/apps/emqx/src/emqx_router_utils.erl @@ -0,0 +1,126 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2017-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_router_utils). + +-include("emqx.hrl"). + +-export([ delete_direct_route/2 + , delete_trie_route/2 + , insert_direct_route/2 + , insert_trie_route/2 + , maybe_trans/3 + ]). + +insert_direct_route(Tab, Route) -> + ekka_mnesia:dirty_write(Tab, Route). + +insert_trie_route(RouteTab, Route = #route{topic = Topic}) -> + case mnesia:wread({RouteTab, Topic}) of + [] when RouteTab =:= emqx_route -> emqx_trie:insert(Topic); + [] when RouteTab =:= emqx_session_route -> emqx_trie:insert_session(Topic); + _ -> ok + end, + mnesia:write(RouteTab, Route, sticky_write). + +delete_direct_route(RouteTab, Route) -> + ekka_mnesia:dirty_delete_object(RouteTab, Route). + +delete_trie_route(RouteTab, Route = #route{topic = Topic}) -> + case mnesia:wread({RouteTab, Topic}) of + [R] when R =:= Route -> + %% Remove route and trie + ok = mnesia:delete_object(RouteTab, Route, sticky_write), + case RouteTab of + emqx_route -> emqx_trie:delete(Topic); + emqx_session_route -> emqx_trie:delete_session(Topic) + end; + [_|_] -> + %% Remove route only + mnesia:delete_object(RouteTab, Route, sticky_write); + [] -> + ok + end. + +%% @private +-spec(maybe_trans(function(), list(any()), Shard :: atom()) -> ok | {error, term()}). +maybe_trans(Fun, Args, Shard) -> + case emqx_config:get([broker, perf, route_lock_type]) of + key -> + trans(Fun, Args, Shard); + global -> + %% Assert: + mnesia = ekka_rlog:backend(), %% TODO: do something smarter than just crash + lock_router(Shard), + try mnesia:sync_dirty(Fun, Args) + after + unlock_router(Shard) + end; + tab -> + trans(fun() -> + emqx_trie:lock_tables(), + apply(Fun, Args) + end, [], Shard) + end. + +%% The created fun only terminates with explicit exception +-dialyzer({nowarn_function, [trans/3]}). + +-spec(trans(function(), list(any()), atom()) -> ok | {error, term()}). +trans(Fun, Args, Shard) -> + {WPid, RefMon} = + spawn_monitor( + %% NOTE: this is under the assumption that crashes in Fun + %% are caught by mnesia:transaction/2. + %% Future changes should keep in mind that this process + %% always exit with database write result. + fun() -> + Res = case ekka_mnesia:transaction(Shard, Fun, Args) of + {atomic, Ok} -> Ok; + {aborted, Reason} -> {error, Reason} + end, + exit({shutdown, Res}) + end), + %% Receive a 'shutdown' exit to pass result from the short-lived process. + %% so the receive below can be receive-mark optimized by the compiler. + %% + %% If the result is sent as a regular message, we'll have to + %% either demonitor (with flush which is essentially a 'receive' since + %% the process is no longer alive after the result has been received), + %% or use a plain 'receive' to drain the normal 'DOWN' message. + %% However the compiler does not optimize this second 'receive'. + receive + {'DOWN', RefMon, process, WPid, Info} -> + case Info of + {shutdown, Result} -> Result; + _ -> {error, {trans_crash, Info}} + end + end. + +lock_router(Shard) -> + %% if Retry is not 0, global:set_lock could sleep a random time up to 8s. + %% Considering we have a limited number of brokers, it is safe to use sleep 1 ms. + case global:set_lock({{?MODULE, Shard}, self()}, [node() | nodes()], 0) of + false -> + %% Force to sleep 1ms instead. + timer:sleep(1), + lock_router(Shard); + true -> + ok + end. + +unlock_router(Shard) -> + global:del_lock({{?MODULE, Shard}, self()}). diff --git a/apps/emqx/src/emqx_session_router.erl b/apps/emqx/src/emqx_session_router.erl index 70bc6a73a..8e88a8244 100644 --- a/apps/emqx/src/emqx_session_router.erl +++ b/apps/emqx/src/emqx_session_router.erl @@ -35,15 +35,8 @@ %% Route APIs -export([ do_add_route/2 - ]). - --export([ delete_route/2 , do_delete_route/2 - ]). - --export([ match_routes/1 - , lookup_routes/1 - , has_routes/1 + , match_routes/1 ]). -export([ persist/1 @@ -53,8 +46,6 @@ -export([print_routes/1]). --export([topics/0]). - %% gen_server callbacks -export([ init/1 , handle_call/3 @@ -136,8 +127,12 @@ do_add_route(Topic, SessionID) when is_binary(Topic) -> true -> ok; false -> case emqx_topic:wildcard(Topic) of - true -> maybe_trans(fun insert_trie_route/1, [Route]); - false -> insert_direct_route(Route) + true -> + Fun = fun emqx_router_utils:insert_trie_route/2, + emqx_router_utils:maybe_trans(Fun, [?ROUTE_TAB, Route], + ?PERSISTENT_SESSION_SHARD); + false -> + emqx_router_utils:insert_direct_route(?ROUTE_TAB, Route) end end. @@ -157,30 +152,17 @@ match_trie(Topic) -> false -> emqx_trie:match_session(Topic) end. --spec(lookup_routes(emqx_topic:topic()) -> [emqx_types:route()]). -lookup_routes(Topic) -> - ets:lookup(?ROUTE_TAB, Topic). - --spec(has_routes(emqx_topic:topic()) -> boolean()). -has_routes(Topic) when is_binary(Topic) -> - ets:member(?ROUTE_TAB, Topic). - --spec(delete_route(emqx_topic:topic(), dest()) -> ok | {error, term()}). -delete_route(Topic, SessionID) when is_binary(Topic), is_binary(SessionID) -> - call(pick(Topic), {delete_route, Topic, SessionID}). - -spec(do_delete_route(emqx_topic:topic(), dest()) -> ok | {error, term()}). do_delete_route(Topic, SessionID) -> Route = #route{topic = Topic, dest = SessionID}, case emqx_topic:wildcard(Topic) of - true -> maybe_trans(fun delete_trie_route/1, [Route]); - false -> delete_direct_route(Route) + true -> + Fun = fun emqx_router_utils:delete_trie_route/2, + emqx_router_utils:maybe_trans(Fun, [?ROUTE_TAB, Route], ?PERSISTENT_SESSION_SHARD); + false -> + emqx_router_utils:delete_direct_route(?ROUTE_TAB, Route) end. --spec(topics() -> list(emqx_topic:topic())). -topics() -> - mnesia:dirty_all_keys(?ROUTE_TAB). - %% @doc Print routes to a topic -spec(print_routes(emqx_topic:topic()) -> ok). print_routes(Topic) -> @@ -199,7 +181,7 @@ persist(Msg) -> case match_routes(emqx_message:topic(Msg)) of [] -> ok; Routes -> - mnesia:dirty_write(?MSG_TAB, Msg), + ekka_mnesia:dirty_write(?MSG_TAB, Msg), Fun = fun(Route) -> cast(pick(Route), {persist, Route, Msg}) end, lists:foreach(Fun, Routes) end @@ -265,6 +247,9 @@ code_change(_OldVsn, State, _Extra) -> %% Internal functions %%-------------------------------------------------------------------- +lookup_routes(Topic) -> + ets:lookup(?ROUTE_TAB, Topic). + pending_messages(SessionID) -> %% TODO: The reading of messages should be from external DB Fun = fun() -> [hd(mnesia:read(?MSG_TAB, MsgId)) @@ -297,96 +282,3 @@ pending_messages(SessionID, PrevMsgId, PrevTag, Acc) -> ?UNDELIVERED -> lists:reverse([PrevMsgId|Acc]) end end. - -insert_direct_route(Route) -> - ekka_mnesia:dirty_write(?ROUTE_TAB, Route). - -insert_trie_route(Route = #route{topic = Topic}) -> - case mnesia:wread({?ROUTE_TAB, Topic}) of - [] -> emqx_trie:insert_session(Topic); - _ -> ok - end, - mnesia:write(?ROUTE_TAB, Route, sticky_write). - -delete_direct_route(Route) -> - mnesia:dirty_delete_object(?ROUTE_TAB, Route). - -delete_trie_route(Route = #route{topic = Topic}) -> - case mnesia:wread({?ROUTE_TAB, Topic}) of - [Route] -> %% Remove route and trie - ok = mnesia:delete_object(?ROUTE_TAB, Route, sticky_write), - emqx_trie:delete(Topic); - [_|_] -> %% Remove route only - mnesia:delete_object(?ROUTE_TAB, Route, sticky_write); - [] -> ok - end. - -%% @private --spec(maybe_trans(function(), list(any())) -> ok | {error, term()}). -maybe_trans(Fun, Args) -> - case persistent_term:get(emqx_route_lock_type) of - key -> - trans(Fun, Args); - global -> - %% Assert: - mnesia = ekka_rlog:backend(), %% TODO: do something smarter than just crash - lock_router(), - try mnesia:sync_dirty(Fun, Args) - after - unlock_router() - end; - tab -> - trans(fun() -> - emqx_trie:lock_session_tables(), - apply(Fun, Args) - end, []) - end. - -%% The created fun only terminates with explicit exception --dialyzer({nowarn_function, [trans/2]}). - --spec(trans(function(), list(any())) -> ok | {error, term()}). -trans(Fun, Args) -> - {WPid, RefMon} = - spawn_monitor( - %% NOTE: this is under the assumption that crashes in Fun - %% are caught by mnesia:transaction/2. - %% Future changes should keep in mind that this process - %% always exit with database write result. - fun() -> - Res = case ekka_mnesia:transaction(?PERSISTENT_SESSION_SHARD, Fun, Args) of - {atomic, Ok} -> Ok; - {aborted, Reason} -> {error, Reason} - end, - exit({shutdown, Res}) - end), - %% Receive a 'shutdown' exit to pass result from the short-lived process. - %% so the receive below can be receive-mark optimized by the compiler. - %% - %% If the result is sent as a regular message, we'll have to - %% either demonitor (with flush which is essentially a 'receive' since - %% the process is no longer alive after the result has been received), - %% or use a plain 'receive' to drain the normal 'DOWN' message. - %% However the compiler does not optimize this second 'receive'. - receive - {'DOWN', RefMon, process, WPid, Info} -> - case Info of - {shutdown, Result} -> Result; - _ -> {error, {trans_crash, Info}} - end - end. - -lock_router() -> - %% if Retry is not 0, global:set_lock could sleep a random time up to 8s. - %% Considering we have a limited number of brokers, it is safe to use sleep 1 ms. - case global:set_lock({?MODULE, self()}, [node() | nodes()], 0) of - false -> - %% Force to sleep 1ms instead. - timer:sleep(1), - lock_router(); - true -> - ok - end. - -unlock_router() -> - global:del_lock({?MODULE, self()}). diff --git a/apps/emqx/src/emqx_trie.erl b/apps/emqx/src/emqx_trie.erl index 3469c0f5e..295ef63a4 100644 --- a/apps/emqx/src/emqx_trie.erl +++ b/apps/emqx/src/emqx_trie.erl @@ -57,6 +57,7 @@ }). -rlog_shard({?ROUTE_SHARD, ?TRIE}). +-rlog_shard({?PERSISTENT_SESSION_SHARD, ?SESSION_TRIE}). %%-------------------------------------------------------------------- %% Mnesia bootstrap