feat: Add session router and persist messages to persistent subscribers

The session routing is keeping track of the subscriptions of persistent sessions.
This commit is contained in:
Tobias Lindahl 2021-06-23 10:13:00 +02:00
parent 5d4645b774
commit 6497dc30b7
10 changed files with 537 additions and 59 deletions

View File

@ -26,6 +26,7 @@
-define(COMMON_SHARD, emqx_common_shard).
-define(SHARED_SUB_SHARD, emqx_shared_sub_shard).
-define(MOD_DELAYED_SHARD, emqx_delayed_shard).
-define(PERSISTENT_SESSION_SHARD, emqx_persistent_session_shard).
%%--------------------------------------------------------------------
%% Banner
@ -89,7 +90,7 @@
-record(route, {
topic :: binary(),
dest :: node() | {binary(), node()}
dest :: node() | {binary(), node()} | binary()
}).
%%--------------------------------------------------------------------

View File

@ -38,6 +38,7 @@
, ?SHARED_SUB_SHARD
, ?RULE_ENGINE_SHARD
, ?MOD_DELAYED_SHARD
, ?PERSISTENT_SESSION_SHARD
]).

View File

@ -205,6 +205,7 @@ publish(Msg) when is_record(Msg, message) ->
?LOG(notice, "Stop publishing: ~s", [emqx_message:format(Msg)]),
[];
Msg1 = #message{topic = Topic} ->
emqx_session_router:persist(Msg1),
route(aggre(emqx_router:match_routes(Topic)), delivery(Msg1))
end.

View File

@ -755,8 +755,14 @@ maybe_update_expiry_interval(_Properties, Channel) -> Channel.
handle_deliver(Delivers, Channel = #channel{conn_state = disconnected,
session = Session,
clientinfo = #{clientid := ClientId}}) ->
NSession = emqx_session:enqueue(ignore_local(maybe_nack(Delivers), ClientId, Session), Session),
{ok, set_session(NSession, Channel)};
Delivers1 = maybe_nack(Delivers),
Delivers2 = ignore_local(Delivers1, ClientId, Session),
NSession = emqx_session:enqueue(Delivers2, Session),
NChannel = set_session(NSession, Channel),
%% We consider queued/dropped messages as delivered since they are now in the session state.
MsgIds = [emqx_message:id(Msg) || {deliver, _, Msg} <- Delivers],
emqx_session_router:delivered(ClientId, MsgIds),
{ok, NChannel};
handle_deliver(Delivers, Channel = #channel{takeover = true,
pendings = Pendings,
@ -766,10 +772,19 @@ handle_deliver(Delivers, Channel = #channel{takeover = true,
{ok, Channel#channel{pendings = NPendings}};
handle_deliver(Delivers, Channel = #channel{session = Session,
clientinfo = #{clientid := ClientId}}) ->
clientinfo = #{clientid := ClientId},
conninfo = #{expiry_interval := ExpiryInterval}
}) ->
case emqx_session:deliver(ignore_local(Delivers, ClientId, Session), Session) of
{ok, Publishes, NSession} ->
NChannel = set_session(NSession, Channel),
case ExpiryInterval > 0 of
true ->
MsgIds = [emqx_message:id(Msg) || {deliver, _, Msg} <- Delivers],
emqx_session_router:delivered(ClientId, MsgIds);
false ->
ignore
end,
handle_out(publish, Publishes, ensure_timer(retry_timer, NChannel));
{ok, NSession} ->
{ok, set_session(NSession, Channel)}
@ -1605,8 +1620,12 @@ maybe_resume_session(#channel{resuming = false}) ->
ignore;
maybe_resume_session(#channel{session = Session,
resuming = true,
pendings = Pendings}) ->
pendings = Pendings,
clientinfo = #{clientid := ClientId}}) ->
{ok, Publishes, Session1} = emqx_session:replay(Session),
%% We consider queued/dropped messages as delivered since they are now in the session state.
MsgIds = [emqx_message:id(Msg) || {deliver, _, Msg} <- Pendings],
emqx_session_router:delivered(ClientId, MsgIds),
case emqx_session:deliver(Pendings, Session1) of
{ok, Session2} ->
{ok, Publishes, Session2};

View File

@ -228,13 +228,14 @@ open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) ->
ResumeStart = fun(_) ->
case takeover_session(ClientId) of
{ok, Session} ->
%% TODO: Any messages in the mean time was lost.
ok = emqx_session:resume(ClientInfo, Session),
emqx_session:db_put(ClientId, EI, Session),
Pendings = [{deliver, emqx_message:topic(Msg), Msg}
|| Msg <- emqx_session_router:pending(ClientId)],
register_channel(ClientId, Self, ConnInfo),
{ok, #{session => Session,
present => true,
pendings => []}};
pendings => Pendings}};
{ok, ConnMod, ChanPid, Session} ->
ok = emqx_session:resume(ClientInfo, Session),
emqx_session:db_put(ClientId, EI, Session),

View File

@ -34,7 +34,13 @@ init([]) ->
type => worker,
modules => [emqx_router_helper]},
%% Router pool
RouterPool = emqx_pool_sup:spec([router_pool, hash,
RouterPool = emqx_pool_sup:spec(router_pool,
[router_pool, hash,
{emqx_router, start_link, []}]),
{ok, {{one_for_all, 0, 1}, [Helper, RouterPool]}}.
%% TODO: Should this be optional?
SessionRouterPool = emqx_pool_sup:spec(session_router_pool,
[session_router_pool, hash,
{emqx_session_router, start_link, []}]),
{ok, {{one_for_all, 0, 1}, [Helper, RouterPool, SessionRouterPool]}}.

View File

@ -227,8 +227,13 @@ db_put(SessionID, ExpiryInterval, #session{} = Session) when is_binary(SessionID
, ts = erlang:system_time(millisecond)
, session = Session},
case use_db_session(SS) of
false -> ekka_mnesia:dirty_delete(?SESSION_STORE, SessionID);
true -> ekka_mnesia:dirty_write(?SESSION_STORE, SS)
false -> clean_up_session(SessionID, Session);
true ->
%% TODO: Should check for changes in the subscriptions.
maps:foreach(fun(Topic, _) ->
emqx_session_router:do_add_route(Topic, SessionID)
end, Session#session.subscriptions),
ekka_mnesia:dirty_write(?SESSION_STORE, SS)
end.
db_get(SessionID) when is_binary(SessionID) ->
@ -249,6 +254,11 @@ use_db_session(#session_store{expiry_interval = 16#FFFFFFFF}) ->
use_db_session(#session_store{expiry_interval = E, ts = TS}) ->
E*1000 + TS > erlang:system_time(millisecond).
clean_up_session(SessionID, Session) ->
Fun = fun(Topic, _) -> emqx_session_router:do_delete_route(Topic, SessionID) end,
maps:foreach(Fun, Session#session.subscriptions),
ekka_mnesia:dirty_delete(?SESSION_STORE, SessionID).
%%--------------------------------------------------------------------
%% Info, Stats
%%--------------------------------------------------------------------
@ -337,6 +347,8 @@ unsubscribe(ClientInfo, TopicFilter, UnSubOpts, Session = #session{subscriptions
case maps:find(TopicFilter, Subs) of
{ok, SubOpts} ->
ok = emqx_broker:unsubscribe(TopicFilter),
ClientID = maps:get(clientid, ClientInfo, undefined),
emqx_session_router:do_delete_route(ClientID, TopicFilter),
ok = emqx_hooks:run('session.unsubscribed', [ClientInfo, TopicFilter, maps:merge(SubOpts, UnSubOpts)]),
{ok, Session#session{subscriptions = maps:remove(TopicFilter, Subs)}};
error ->
@ -702,7 +714,8 @@ terminate(ClientInfo, discarded, Session) ->
run_hook('session.discarded', [ClientInfo, info(Session)]);
terminate(ClientInfo, takeovered, Session) ->
run_hook('session.takeovered', [ClientInfo, info(Session)]);
terminate(ClientInfo, Reason, Session) ->
terminate(#{clientid := ClientId} = ClientInfo, Reason, Session) ->
clean_up_session(ClientId, Session),
run_hook('session.terminated', [ClientInfo, Reason, info(Session)]).
-compile({inline, [run_hook/2]}).

View File

@ -0,0 +1,392 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2017-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_session_router).
-behaviour(gen_server).
-include("emqx.hrl").
-include("logger.hrl").
-include("types.hrl").
-include_lib("ekka/include/ekka.hrl").
-logger_header("[Router]").
%% Mnesia bootstrap
-export([mnesia/1]).
-boot_mnesia({mnesia, [boot]}).
-copy_mnesia({mnesia, [copy]}).
-export([start_link/2]).
%% Route APIs
-export([ do_add_route/2
]).
-export([ delete_route/2
, do_delete_route/2
]).
-export([ match_routes/1
, lookup_routes/1
, has_routes/1
]).
-export([ persist/1
, delivered/2
, pending/1
]).
-export([print_routes/1]).
-export([topics/0]).
%% gen_server callbacks
-export([ init/1
, handle_call/3
, handle_cast/2
, handle_info/2
, terminate/2
, code_change/3
]).
-type(group() :: binary()).
-type(dest() :: node() | {group(), node()}).
-define(ROUTE_TAB, emqx_session_route).
-define(SESS_MSG_TAB, emqx_session_msg).
-define(MSG_TAB, emqx_persistent_msg).
%% NOTE: It is important that ?DELIVERED > ?UNDELIVERED because of traversal order
-define(DELIVERED, 1).
-define(UNDELIVERED, 0).
-type pending_tag() :: ?DELIVERED | ?UNDELIVERED.
-record(session_msg, {key :: {binary(), emqx_guid:guid(), pending_tag()},
val = [] :: []}).
-rlog_shard({?PERSISTENT_SESSION_SHARD, ?ROUTE_TAB}).
-rlog_shard({?PERSISTENT_SESSION_SHARD, ?SESS_MSG_TAB}).
-rlog_shard({?PERSISTENT_SESSION_SHARD, ?MSG_TAB}).
%%--------------------------------------------------------------------
%% Mnesia bootstrap
%%--------------------------------------------------------------------
mnesia(boot) ->
ok = ekka_mnesia:create_table(?ROUTE_TAB, [
{type, bag},
{ram_copies, [node()]},
{record_name, route},
{attributes, record_info(fields, route)},
{storage_properties, [{ets, [{read_concurrency, true},
{write_concurrency, true}]}]}]),
ok = ekka_mnesia:create_table(?SESS_MSG_TAB, [
{type, ordered_set},
{ram_copies, [node()]},
{record_name, session_msg},
{attributes, record_info(fields, session_msg)},
{storage_properties, [{ets, [{read_concurrency, true},
{write_concurrency, true}]}]}]),
%% TODO: This should be external
ok = ekka_mnesia:create_table(?MSG_TAB, [
{type, set},
{ram_copies, [node()]},
{record_name, message},
{attributes, record_info(fields, message)},
{storage_properties, [{ets, [{read_concurrency, true},
{write_concurrency, true}]}]}]);
mnesia(copy) ->
ok = ekka_mnesia:copy_table(?ROUTE_TAB, ram_copies),
ok = ekka_mnesia:copy_table(?SESS_MSG_TAB, ram_copies),
%% TODO: This should be external
ok = ekka_mnesia:copy_table(?MSG_TAB, ram_copies).
%%--------------------------------------------------------------------
%% Start a router
%%--------------------------------------------------------------------
-spec(start_link(atom(), pos_integer()) -> startlink_ret()).
start_link(Pool, Id) ->
gen_server:start_link({local, emqx_misc:proc_name(?MODULE, Id)},
?MODULE, [Pool, Id], [{hibernate_after, 1000}]).
%%--------------------------------------------------------------------
%% Route APIs
%%--------------------------------------------------------------------
-spec(do_add_route(emqx_topic:topic(), dest()) -> ok | {error, term()}).
do_add_route(Topic, SessionID) when is_binary(Topic) ->
Route = #route{topic = Topic, dest = SessionID},
case lists:member(Route, lookup_routes(Topic)) of
true -> ok;
false ->
case emqx_topic:wildcard(Topic) of
true -> maybe_trans(fun insert_trie_route/1, [Route]);
false -> insert_direct_route(Route)
end
end.
%% @doc Match routes
-spec(match_routes(emqx_topic:topic()) -> [emqx_types:route()]).
match_routes(Topic) when is_binary(Topic) ->
case match_trie(Topic) of
[] -> lookup_routes(Topic);
Matched ->
lists:append([lookup_routes(To) || To <- [Topic | Matched]])
end.
%% Optimize: routing table will be replicated to all router nodes.
match_trie(Topic) ->
case emqx_trie:empty_session() of
true -> [];
false -> emqx_trie:match_session(Topic)
end.
-spec(lookup_routes(emqx_topic:topic()) -> [emqx_types:route()]).
lookup_routes(Topic) ->
ets:lookup(?ROUTE_TAB, Topic).
-spec(has_routes(emqx_topic:topic()) -> boolean()).
has_routes(Topic) when is_binary(Topic) ->
ets:member(?ROUTE_TAB, Topic).
-spec(delete_route(emqx_topic:topic(), dest()) -> ok | {error, term()}).
delete_route(Topic, SessionID) when is_binary(Topic), is_binary(SessionID) ->
call(pick(Topic), {delete_route, Topic, SessionID}).
-spec(do_delete_route(emqx_topic:topic(), dest()) -> ok | {error, term()}).
do_delete_route(Topic, SessionID) ->
Route = #route{topic = Topic, dest = SessionID},
case emqx_topic:wildcard(Topic) of
true -> maybe_trans(fun delete_trie_route/1, [Route]);
false -> delete_direct_route(Route)
end.
-spec(topics() -> list(emqx_topic:topic())).
topics() ->
mnesia:dirty_all_keys(?ROUTE_TAB).
%% @doc Print routes to a topic
-spec(print_routes(emqx_topic:topic()) -> ok).
print_routes(Topic) ->
lists:foreach(fun(#route{topic = To, dest = SessionID}) ->
io:format("~s -> ~s~n", [To, SessionID])
end, match_routes(Topic)).
%%--------------------------------------------------------------------
%% Message APIs
%%--------------------------------------------------------------------
persist(Msg) ->
case emqx_message:get_flag(dup, Msg) orelse emqx_message:is_sys(Msg) of
true -> ok;
false ->
case match_routes(emqx_message:topic(Msg)) of
[] -> ok;
Routes ->
mnesia:dirty_write(?MSG_TAB, Msg),
Fun = fun(Route) -> cast(pick(Route), {persist, Route, Msg}) end,
lists:foreach(Fun, Routes)
end
end.
delivered(SessionID, MsgIds) ->
cast(pick(SessionID), {delivered, SessionID, MsgIds}).
pending(SessionID) ->
call(pick(SessionID), {pending, SessionID}).
call(Router, Msg) ->
gen_server:call(Router, Msg, infinity).
cast(Router, Msg) ->
gen_server:cast(Router, Msg).
pick(#route{dest = SessionID}) ->
gproc_pool:pick_worker(session_router_pool, SessionID);
pick(SessionID) when is_binary(SessionID) ->
gproc_pool:pick_worker(session_router_pool, SessionID).
%%--------------------------------------------------------------------
%% gen_server callbacks
%%--------------------------------------------------------------------
init([Pool, Id]) ->
true = gproc_pool:connect_worker(Pool, {Pool, Id}),
{ok, #{pool => Pool, id => Id}}.
handle_call({pending, SessionID}, _From, State) ->
{reply, pending_messages(SessionID), State};
handle_call(Req, _From, State) ->
?LOG(error, "Unexpected call: ~p", [Req]),
{reply, ignored, State}.
handle_cast({persist, #route{dest = SessionID}, Msg}, State) ->
Key = {SessionID, emqx_message:id(Msg), ?UNDELIVERED},
ekka_mnesia:dirty_write(?SESS_MSG_TAB, #session_msg{ key = Key }),
{noreply, State};
handle_cast({delivered, SessionID, MsgIDs}, State) ->
Fun = fun(MsgID) ->
Key = {SessionID, MsgID, ?DELIVERED},
ekka_mnesia:dirty_write(?SESS_MSG_TAB, #session_msg{ key = Key })
end,
lists:foreach(Fun, MsgIDs),
{noreply, State};
handle_cast(Msg, State) ->
?LOG(error, "Unexpected cast: ~p", [Msg]),
{noreply, State}.
handle_info(Info, State) ->
?LOG(error, "Unexpected info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, #{pool := Pool, id := Id}) ->
gproc_pool:disconnect_worker(Pool, {Pool, Id}).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
pending_messages(SessionID) ->
%% TODO: The reading of messages should be from external DB
Fun = fun() -> [hd(mnesia:read(?MSG_TAB, MsgId))
|| MsgId <- pending_messages(SessionID, <<>>, ?DELIVERED, [])]
end,
{atomic, Msgs} = ekka_mnesia:ro_transaction(?PERSISTENT_SESSION_SHARD, Fun),
Msgs.
%% The keys are ordered by
%% {sessionID(), emqx_guid:guid(), ?DELIVERED | ?UNDELIVERED}
%% where
%% emqx_guid:guid() is ordered in ts() and by node()
%% ?UNDELIVERED < ?DELIVERED
%%
%% We traverse the table until we reach another session.
%% TODO: Garbage collect the delivered messages.
pending_messages(SessionID, PrevMsgId, PrevTag, Acc) ->
case mnesia:dirty_next(?SESS_MSG_TAB, {SessionID, PrevMsgId, PrevTag}) of
{S, MsgId, Tag} = Key when S =:= SessionID, MsgId =:= PrevMsgId ->
Tag =:= ?UNDELIVERED andalso error({assert_fail}, Key),
pending_messages(SessionID, MsgId, Tag, Acc);
{S, MsgId, Tag} when S =:= SessionID ->
case PrevTag of
?DELIVERED -> pending_messages(SessionID, MsgId, Tag, Acc);
?UNDELIVERED -> pending_messages(SessionID, MsgId, Tag, [PrevMsgId|Acc])
end;
_ -> %% Next sessionID or '$end_of_table'
case PrevTag of
?DELIVERED -> lists:reverse(Acc);
?UNDELIVERED -> lists:reverse([PrevMsgId|Acc])
end
end.
insert_direct_route(Route) ->
ekka_mnesia:dirty_write(?ROUTE_TAB, Route).
insert_trie_route(Route = #route{topic = Topic}) ->
case mnesia:wread({?ROUTE_TAB, Topic}) of
[] -> emqx_trie:insert_session(Topic);
_ -> ok
end,
mnesia:write(?ROUTE_TAB, Route, sticky_write).
delete_direct_route(Route) ->
mnesia:dirty_delete_object(?ROUTE_TAB, Route).
delete_trie_route(Route = #route{topic = Topic}) ->
case mnesia:wread({?ROUTE_TAB, Topic}) of
[Route] -> %% Remove route and trie
ok = mnesia:delete_object(?ROUTE_TAB, Route, sticky_write),
emqx_trie:delete(Topic);
[_|_] -> %% Remove route only
mnesia:delete_object(?ROUTE_TAB, Route, sticky_write);
[] -> ok
end.
%% @private
-spec(maybe_trans(function(), list(any())) -> ok | {error, term()}).
maybe_trans(Fun, Args) ->
case persistent_term:get(emqx_route_lock_type) of
key ->
trans(Fun, Args);
global ->
%% Assert:
mnesia = ekka_rlog:backend(), %% TODO: do something smarter than just crash
lock_router(),
try mnesia:sync_dirty(Fun, Args)
after
unlock_router()
end;
tab ->
trans(fun() ->
emqx_trie:lock_session_tables(),
apply(Fun, Args)
end, [])
end.
%% The created fun only terminates with explicit exception
-dialyzer({nowarn_function, [trans/2]}).
-spec(trans(function(), list(any())) -> ok | {error, term()}).
trans(Fun, Args) ->
{WPid, RefMon} =
spawn_monitor(
%% NOTE: this is under the assumption that crashes in Fun
%% are caught by mnesia:transaction/2.
%% Future changes should keep in mind that this process
%% always exit with database write result.
fun() ->
Res = case ekka_mnesia:transaction(?PERSISTENT_SESSION_SHARD, Fun, Args) of
{atomic, Ok} -> Ok;
{aborted, Reason} -> {error, Reason}
end,
exit({shutdown, Res})
end),
%% Receive a 'shutdown' exit to pass result from the short-lived process.
%% so the receive below can be receive-mark optimized by the compiler.
%%
%% If the result is sent as a regular message, we'll have to
%% either demonitor (with flush which is essentially a 'receive' since
%% the process is no longer alive after the result has been received),
%% or use a plain 'receive' to drain the normal 'DOWN' message.
%% However the compiler does not optimize this second 'receive'.
receive
{'DOWN', RefMon, process, WPid, Info} ->
case Info of
{shutdown, Result} -> Result;
_ -> {error, {trans_crash, Info}}
end
end.
lock_router() ->
%% if Retry is not 0, global:set_lock could sleep a random time up to 8s.
%% Considering we have a limited number of brokers, it is safe to use sleep 1 ms.
case global:set_lock({?MODULE, self()}, [node() | nodes()], 0) of
false ->
%% Force to sleep 1ms instead.
timer:sleep(1),
lock_router();
true ->
ok
end.
unlock_router() ->
global:del_lock({?MODULE, self()}).

View File

@ -26,12 +26,17 @@
%% Trie APIs
-export([ insert/1
, insert_session/1
, match/1
, match_session/1
, delete/1
, delete_session/1
]).
-export([ empty/0
, empty_session/0
, lock_tables/0
, lock_session_tables/0
]).
-export([is_compact/0, set_compact/1]).
@ -42,6 +47,7 @@
-endif.
-define(TRIE, emqx_trie).
-define(SESSION_TRIE, emqx_session_trie).
-define(PREFIX(Prefix), {Prefix, 0}).
-define(TOPIC(Topic), {Topic, 1}).
@ -64,6 +70,12 @@ mnesia(boot) ->
{write_concurrency, true}
]}],
ok = ekka_mnesia:create_table(?TRIE, [
{ram_copies, [node()]},
{record_name, ?TRIE},
{attributes, record_info(fields, ?TRIE)},
{type, ordered_set},
{storage_properties, StoreProps}]),
ok = ekka_mnesia:create_table(?SESSION_TRIE, [
{ram_copies, [node()]},
{record_name, ?TRIE},
{attributes, record_info(fields, ?TRIE)},
@ -71,6 +83,7 @@ mnesia(boot) ->
{storage_properties, StoreProps}]);
mnesia(copy) ->
%% Copy topics table
ok = ekka_mnesia:copy_table(?SESSION_TRIE, ram_copies),
ok = ekka_mnesia:copy_table(?TRIE, ram_copies).
%%--------------------------------------------------------------------
@ -80,24 +93,46 @@ mnesia(copy) ->
%% @doc Insert a topic filter into the trie.
-spec(insert(emqx_topic:topic()) -> ok).
insert(Topic) when is_binary(Topic) ->
insert(Topic, ?TRIE).
-spec(insert_session(emqx_topic:topic()) -> ok).
insert_session(Topic) when is_binary(Topic) ->
insert(Topic, ?SESSION_TRIE).
insert(Topic, Trie) when is_binary(Topic) ->
{TopicKey, PrefixKeys} = make_keys(Topic),
case mnesia:wread({?TRIE, TopicKey}) of
case mnesia:wread({Trie, TopicKey}) of
[_] -> ok; %% already inserted
[] -> lists:foreach(fun insert_key/1, [TopicKey | PrefixKeys])
[] -> lists:foreach(fun(Key) -> insert_key(Key, Trie) end, [TopicKey | PrefixKeys])
end.
%% @doc Delete a topic filter from the trie.
-spec(delete(emqx_topic:topic()) -> ok).
delete(Topic) when is_binary(Topic) ->
delete(Topic, ?TRIE).
%% @doc Delete a topic filter from the trie.
-spec(delete_session(emqx_topic:topic()) -> ok).
delete_session(Topic) when is_binary(Topic) ->
delete(Topic, ?SESSION_TRIE).
delete(Topic, Trie) when is_binary(Topic) ->
{TopicKey, PrefixKeys} = make_keys(Topic),
case [] =/= mnesia:wread({?TRIE, TopicKey}) of
true -> lists:foreach(fun delete_key/1, [TopicKey | PrefixKeys]);
case [] =/= mnesia:wread({Trie, TopicKey}) of
true -> lists:foreach(fun(Key) -> delete_key(Key, Trie) end, [TopicKey | PrefixKeys]);
false -> ok
end.
%% @doc Find trie nodes that matchs the topic name.
-spec(match(emqx_topic:topic()) -> list(emqx_topic:topic())).
match(Topic) when is_binary(Topic) ->
match(Topic, ?TRIE).
-spec(match_session(emqx_topic:topic()) -> list(emqx_topic:topic())).
match_session(Topic) when is_binary(Topic) ->
match(Topic, ?SESSION_TRIE).
match(Topic, Trie) when is_binary(Topic) ->
Words = emqx_topic:words(Topic),
case emqx_topic:wildcard(Words) of
true ->
@ -110,17 +145,26 @@ match(Topic) when is_binary(Topic) ->
%% Such clients will get disconnected.
[];
false ->
do_match(Words)
do_match(Words, Trie)
end.
%% @doc Is the trie empty?
-spec(empty() -> boolean()).
empty() -> ets:first(?TRIE) =:= '$end_of_table'.
empty() -> empty(?TRIE).
empty_session() ->
empty(?SESSION_TRIE).
empty(Trie) -> ets:first(Trie) =:= '$end_of_table'.
-spec lock_tables() -> ok.
lock_tables() ->
mnesia:write_lock_table(?TRIE).
-spec lock_session_tables() -> ok.
lock_session_tables() ->
mnesia:write_lock_table(?SESSION_TRIE).
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
@ -168,70 +212,70 @@ make_prefixes([H | T], Prefix0, Acc0) ->
Acc = [Prefix | Acc0],
make_prefixes(T, Prefix, Acc).
insert_key(Key) ->
T = case mnesia:wread({?TRIE, Key}) of
insert_key(Key, Trie) ->
T = case mnesia:wread({Trie, Key}) of
[#?TRIE{count = C} = T1] ->
T1#?TRIE{count = C + 1};
[] ->
#?TRIE{key = Key, count = 1}
end,
ok = mnesia:write(T).
ok = mnesia:write(Trie, T, write).
delete_key(Key) ->
case mnesia:wread({?TRIE, Key}) of
delete_key(Key, Trie) ->
case mnesia:wread({Trie, Key}) of
[#?TRIE{count = C} = T] when C > 1 ->
ok = mnesia:write(T#?TRIE{count = C - 1});
ok = mnesia:write(Trie, T#?TRIE{count = C - 1}, write);
[_] ->
ok = mnesia:delete(?TRIE, Key, write);
ok = mnesia:delete(Trie, Key, write);
[] ->
ok
end.
%% micro-optimization: no need to lookup when topic is not wildcard
%% because we only insert wildcards to emqx_trie
lookup_topic(_Topic, false) -> [];
lookup_topic(Topic, true) -> lookup_topic(Topic).
lookup_topic(_Topic,_Trie, false) -> [];
lookup_topic(Topic, Trie, true) -> lookup_topic(Topic, Trie).
lookup_topic(Topic) when is_binary(Topic) ->
case ets:lookup(?TRIE, ?TOPIC(Topic)) of
lookup_topic(Topic, Trie) when is_binary(Topic) ->
case ets:lookup(Trie, ?TOPIC(Topic)) of
[#?TRIE{count = C}] -> [Topic || C > 0];
[] -> []
end.
has_prefix(empty) -> true; %% this is the virtual tree root
has_prefix(Prefix) ->
case ets:lookup(?TRIE, ?PREFIX(Prefix)) of
has_prefix(empty, _Trie) -> true; %% this is the virtual tree root
has_prefix(Prefix, Trie) ->
case ets:lookup(Trie, ?PREFIX(Prefix)) of
[#?TRIE{count = C}] -> C > 0;
[] -> false
end.
do_match([<<"$", _/binary>> = Prefix | Words]) ->
do_match([<<"$", _/binary>> = Prefix | Words], Trie) ->
%% For topics having dollar sign prefix,
%% we do not match root level + or #,
%% fast forward to the next level.
case Words =:= [] of
true -> lookup_topic(Prefix);
true -> lookup_topic(Prefix, Trie);
false -> []
end ++ do_match(Words, Prefix);
do_match(Words) ->
do_match(Words, empty).
end ++ do_match(Words, Prefix, Trie);
do_match(Words, Trie) ->
do_match(Words, empty, Trie).
do_match(Words, Prefix) ->
do_match(Words, Prefix, Trie) ->
case is_compact() of
true -> match_compact(Words, Prefix, false, []);
false -> match_no_compact(Words, Prefix, false, [])
true -> match_compact(Words, Prefix, Trie, false, []);
false -> match_no_compact(Words, Prefix, Trie, false, [])
end.
match_no_compact([], Topic, IsWildcard, Acc) ->
'match_#'(Topic) ++ %% try match foo/+/# or foo/bar/#
lookup_topic(Topic, IsWildcard) ++ %% e.g. foo/+
match_no_compact([], Topic, Trie, IsWildcard, Acc) ->
'match_#'(Topic, Trie) ++ %% try match foo/+/# or foo/bar/#
lookup_topic(Topic, Trie, IsWildcard) ++ %% e.g. foo/+
Acc;
match_no_compact([Word | Words], Prefix, IsWildcard, Acc0) ->
case has_prefix(Prefix) of
match_no_compact([Word | Words], Prefix, Trie, IsWildcard, Acc0) ->
case has_prefix(Prefix, Trie) of
true ->
Acc1 = 'match_#'(Prefix) ++ Acc0,
Acc = match_no_compact(Words, join(Prefix, '+'), true, Acc1),
match_no_compact(Words, join(Prefix, Word), IsWildcard, Acc);
Acc1 = 'match_#'(Prefix, Trie) ++ Acc0,
Acc = match_no_compact(Words, join(Prefix, '+'), Trie, true, Acc1),
match_no_compact(Words, join(Prefix, Word), Trie, IsWildcard, Acc);
false ->
%% non-compact paths in database
%% if there is no prefix matches the current topic prefix
@ -248,26 +292,26 @@ match_no_compact([Word | Words], Prefix, IsWildcard, Acc0) ->
Acc0
end.
match_compact([], Topic, IsWildcard, Acc) ->
'match_#'(Topic) ++ %% try match foo/bar/#
lookup_topic(Topic, IsWildcard) ++ %% try match foo/bar
match_compact([], Topic, Trie, IsWildcard, Acc) ->
'match_#'(Topic, Trie) ++ %% try match foo/bar/#
lookup_topic(Topic, Trie, IsWildcard) ++ %% try match foo/bar
Acc;
match_compact([Word | Words], Prefix, IsWildcard, Acc0) ->
Acc1 = 'match_#'(Prefix) ++ Acc0,
Acc = match_compact(Words, join(Prefix, Word), IsWildcard, Acc1),
match_compact([Word | Words], Prefix, Trie, IsWildcard, Acc0) ->
Acc1 = 'match_#'(Prefix, Trie) ++ Acc0,
Acc = match_compact(Words, join(Prefix, Word), Trie, IsWildcard, Acc1),
WildcardPrefix = join(Prefix, '+'),
%% go deeper to match current_prefix/+ only when:
%% 1. current word is the last
%% OR
%% 2. there is a prefix = 'current_prefix/+'
case Words =:= [] orelse has_prefix(WildcardPrefix) of
true -> match_compact(Words, WildcardPrefix, true, Acc);
case Words =:= [] orelse has_prefix(WildcardPrefix, Trie) of
true -> match_compact(Words, WildcardPrefix, Trie, true, Acc);
false -> Acc
end.
'match_#'(Prefix) ->
'match_#'(Prefix, Trie) ->
MlTopic = join(Prefix, '#'),
lookup_topic(MlTopic).
lookup_topic(MlTopic, Trie).
is_compact() ->
emqx_config:get([broker, perf, trie_compaction], true).

View File

@ -186,7 +186,7 @@ t_delete3(_) ->
?TRIE:delete(<<"sensor/+/unknown">>)
end),
?assertEqual([], ?TRIE:match(<<"sensor">>)),
?assertEqual([], ?TRIE:lookup_topic(<<"sensor/+">>)).
?assertEqual([], ?TRIE:lookup_topic(<<"sensor/+">>, ?TRIE)).
clear_tables() -> emqx_trie:clear_tables().