diff --git a/apps/emqx/include/emqx.hrl b/apps/emqx/include/emqx.hrl index 60dccd9a3..e01d0f663 100644 --- a/apps/emqx/include/emqx.hrl +++ b/apps/emqx/include/emqx.hrl @@ -26,6 +26,7 @@ -define(COMMON_SHARD, emqx_common_shard). -define(SHARED_SUB_SHARD, emqx_shared_sub_shard). -define(MOD_DELAYED_SHARD, emqx_delayed_shard). +-define(PERSISTENT_SESSION_SHARD, emqx_persistent_session_shard). %%-------------------------------------------------------------------- %% Banner @@ -89,7 +90,7 @@ -record(route, { topic :: binary(), - dest :: node() | {binary(), node()} + dest :: node() | {binary(), node()} | binary() }). %%-------------------------------------------------------------------- diff --git a/apps/emqx/src/emqx_app.erl b/apps/emqx/src/emqx_app.erl index 40a92565b..010d8e5e5 100644 --- a/apps/emqx/src/emqx_app.erl +++ b/apps/emqx/src/emqx_app.erl @@ -38,6 +38,7 @@ , ?SHARED_SUB_SHARD , ?RULE_ENGINE_SHARD , ?MOD_DELAYED_SHARD + , ?PERSISTENT_SESSION_SHARD ]). diff --git a/apps/emqx/src/emqx_broker.erl b/apps/emqx/src/emqx_broker.erl index 1248f9980..d6bdfeec5 100644 --- a/apps/emqx/src/emqx_broker.erl +++ b/apps/emqx/src/emqx_broker.erl @@ -205,6 +205,7 @@ publish(Msg) when is_record(Msg, message) -> ?LOG(notice, "Stop publishing: ~s", [emqx_message:format(Msg)]), []; Msg1 = #message{topic = Topic} -> + emqx_session_router:persist(Msg1), route(aggre(emqx_router:match_routes(Topic)), delivery(Msg1)) end. diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index f5207b4ef..8bf7675c3 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -755,8 +755,14 @@ maybe_update_expiry_interval(_Properties, Channel) -> Channel. handle_deliver(Delivers, Channel = #channel{conn_state = disconnected, session = Session, clientinfo = #{clientid := ClientId}}) -> - NSession = emqx_session:enqueue(ignore_local(maybe_nack(Delivers), ClientId, Session), Session), - {ok, set_session(NSession, Channel)}; + Delivers1 = maybe_nack(Delivers), + Delivers2 = ignore_local(Delivers1, ClientId, Session), + NSession = emqx_session:enqueue(Delivers2, Session), + NChannel = set_session(NSession, Channel), + %% We consider queued/dropped messages as delivered since they are now in the session state. + MsgIds = [emqx_message:id(Msg) || {deliver, _, Msg} <- Delivers], + emqx_session_router:delivered(ClientId, MsgIds), + {ok, NChannel}; handle_deliver(Delivers, Channel = #channel{takeover = true, pendings = Pendings, @@ -766,10 +772,19 @@ handle_deliver(Delivers, Channel = #channel{takeover = true, {ok, Channel#channel{pendings = NPendings}}; handle_deliver(Delivers, Channel = #channel{session = Session, - clientinfo = #{clientid := ClientId}}) -> + clientinfo = #{clientid := ClientId}, + conninfo = #{expiry_interval := ExpiryInterval} + }) -> case emqx_session:deliver(ignore_local(Delivers, ClientId, Session), Session) of {ok, Publishes, NSession} -> NChannel = set_session(NSession, Channel), + case ExpiryInterval > 0 of + true -> + MsgIds = [emqx_message:id(Msg) || {deliver, _, Msg} <- Delivers], + emqx_session_router:delivered(ClientId, MsgIds); + false -> + ignore + end, handle_out(publish, Publishes, ensure_timer(retry_timer, NChannel)); {ok, NSession} -> {ok, set_session(NSession, Channel)} @@ -1605,8 +1620,12 @@ maybe_resume_session(#channel{resuming = false}) -> ignore; maybe_resume_session(#channel{session = Session, resuming = true, - pendings = Pendings}) -> + pendings = Pendings, + clientinfo = #{clientid := ClientId}}) -> {ok, Publishes, Session1} = emqx_session:replay(Session), + %% We consider queued/dropped messages as delivered since they are now in the session state. + MsgIds = [emqx_message:id(Msg) || {deliver, _, Msg} <- Pendings], + emqx_session_router:delivered(ClientId, MsgIds), case emqx_session:deliver(Pendings, Session1) of {ok, Session2} -> {ok, Publishes, Session2}; diff --git a/apps/emqx/src/emqx_cm.erl b/apps/emqx/src/emqx_cm.erl index 8676c9a8a..99dc53320 100644 --- a/apps/emqx/src/emqx_cm.erl +++ b/apps/emqx/src/emqx_cm.erl @@ -228,13 +228,14 @@ open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) -> ResumeStart = fun(_) -> case takeover_session(ClientId) of {ok, Session} -> - %% TODO: Any messages in the mean time was lost. ok = emqx_session:resume(ClientInfo, Session), emqx_session:db_put(ClientId, EI, Session), + Pendings = [{deliver, emqx_message:topic(Msg), Msg} + || Msg <- emqx_session_router:pending(ClientId)], register_channel(ClientId, Self, ConnInfo), {ok, #{session => Session, present => true, - pendings => []}}; + pendings => Pendings}}; {ok, ConnMod, ChanPid, Session} -> ok = emqx_session:resume(ClientInfo, Session), emqx_session:db_put(ClientId, EI, Session), diff --git a/apps/emqx/src/emqx_router_sup.erl b/apps/emqx/src/emqx_router_sup.erl index 2b75ce27f..c8f2ae11c 100644 --- a/apps/emqx/src/emqx_router_sup.erl +++ b/apps/emqx/src/emqx_router_sup.erl @@ -34,7 +34,13 @@ init([]) -> type => worker, modules => [emqx_router_helper]}, %% Router pool - RouterPool = emqx_pool_sup:spec([router_pool, hash, + RouterPool = emqx_pool_sup:spec(router_pool, + [router_pool, hash, {emqx_router, start_link, []}]), - {ok, {{one_for_all, 0, 1}, [Helper, RouterPool]}}. + + %% TODO: Should this be optional? + SessionRouterPool = emqx_pool_sup:spec(session_router_pool, + [session_router_pool, hash, + {emqx_session_router, start_link, []}]), + {ok, {{one_for_all, 0, 1}, [Helper, RouterPool, SessionRouterPool]}}. diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index d4cbdae73..5f77ee660 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -227,8 +227,13 @@ db_put(SessionID, ExpiryInterval, #session{} = Session) when is_binary(SessionID , ts = erlang:system_time(millisecond) , session = Session}, case use_db_session(SS) of - false -> ekka_mnesia:dirty_delete(?SESSION_STORE, SessionID); - true -> ekka_mnesia:dirty_write(?SESSION_STORE, SS) + false -> clean_up_session(SessionID, Session); + true -> + %% TODO: Should check for changes in the subscriptions. + maps:foreach(fun(Topic, _) -> + emqx_session_router:do_add_route(Topic, SessionID) + end, Session#session.subscriptions), + ekka_mnesia:dirty_write(?SESSION_STORE, SS) end. db_get(SessionID) when is_binary(SessionID) -> @@ -249,6 +254,11 @@ use_db_session(#session_store{expiry_interval = 16#FFFFFFFF}) -> use_db_session(#session_store{expiry_interval = E, ts = TS}) -> E*1000 + TS > erlang:system_time(millisecond). +clean_up_session(SessionID, Session) -> + Fun = fun(Topic, _) -> emqx_session_router:do_delete_route(Topic, SessionID) end, + maps:foreach(Fun, Session#session.subscriptions), + ekka_mnesia:dirty_delete(?SESSION_STORE, SessionID). + %%-------------------------------------------------------------------- %% Info, Stats %%-------------------------------------------------------------------- @@ -337,6 +347,8 @@ unsubscribe(ClientInfo, TopicFilter, UnSubOpts, Session = #session{subscriptions case maps:find(TopicFilter, Subs) of {ok, SubOpts} -> ok = emqx_broker:unsubscribe(TopicFilter), + ClientID = maps:get(clientid, ClientInfo, undefined), + emqx_session_router:do_delete_route(ClientID, TopicFilter), ok = emqx_hooks:run('session.unsubscribed', [ClientInfo, TopicFilter, maps:merge(SubOpts, UnSubOpts)]), {ok, Session#session{subscriptions = maps:remove(TopicFilter, Subs)}}; error -> @@ -702,7 +714,8 @@ terminate(ClientInfo, discarded, Session) -> run_hook('session.discarded', [ClientInfo, info(Session)]); terminate(ClientInfo, takeovered, Session) -> run_hook('session.takeovered', [ClientInfo, info(Session)]); -terminate(ClientInfo, Reason, Session) -> +terminate(#{clientid := ClientId} = ClientInfo, Reason, Session) -> + clean_up_session(ClientId, Session), run_hook('session.terminated', [ClientInfo, Reason, info(Session)]). -compile({inline, [run_hook/2]}). diff --git a/apps/emqx/src/emqx_session_router.erl b/apps/emqx/src/emqx_session_router.erl new file mode 100644 index 000000000..70bc6a73a --- /dev/null +++ b/apps/emqx/src/emqx_session_router.erl @@ -0,0 +1,392 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2017-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_session_router). + +-behaviour(gen_server). + +-include("emqx.hrl"). +-include("logger.hrl"). +-include("types.hrl"). +-include_lib("ekka/include/ekka.hrl"). + +-logger_header("[Router]"). + +%% Mnesia bootstrap +-export([mnesia/1]). + +-boot_mnesia({mnesia, [boot]}). +-copy_mnesia({mnesia, [copy]}). + +-export([start_link/2]). + +%% Route APIs +-export([ do_add_route/2 + ]). + +-export([ delete_route/2 + , do_delete_route/2 + ]). + +-export([ match_routes/1 + , lookup_routes/1 + , has_routes/1 + ]). + +-export([ persist/1 + , delivered/2 + , pending/1 + ]). + +-export([print_routes/1]). + +-export([topics/0]). + +%% gen_server callbacks +-export([ init/1 + , handle_call/3 + , handle_cast/2 + , handle_info/2 + , terminate/2 + , code_change/3 + ]). + +-type(group() :: binary()). + +-type(dest() :: node() | {group(), node()}). + +-define(ROUTE_TAB, emqx_session_route). +-define(SESS_MSG_TAB, emqx_session_msg). +-define(MSG_TAB, emqx_persistent_msg). + +%% NOTE: It is important that ?DELIVERED > ?UNDELIVERED because of traversal order +-define(DELIVERED, 1). +-define(UNDELIVERED, 0). +-type pending_tag() :: ?DELIVERED | ?UNDELIVERED. +-record(session_msg, {key :: {binary(), emqx_guid:guid(), pending_tag()}, + val = [] :: []}). + +-rlog_shard({?PERSISTENT_SESSION_SHARD, ?ROUTE_TAB}). +-rlog_shard({?PERSISTENT_SESSION_SHARD, ?SESS_MSG_TAB}). +-rlog_shard({?PERSISTENT_SESSION_SHARD, ?MSG_TAB}). + +%%-------------------------------------------------------------------- +%% Mnesia bootstrap +%%-------------------------------------------------------------------- + +mnesia(boot) -> + ok = ekka_mnesia:create_table(?ROUTE_TAB, [ + {type, bag}, + {ram_copies, [node()]}, + {record_name, route}, + {attributes, record_info(fields, route)}, + {storage_properties, [{ets, [{read_concurrency, true}, + {write_concurrency, true}]}]}]), + ok = ekka_mnesia:create_table(?SESS_MSG_TAB, [ + {type, ordered_set}, + {ram_copies, [node()]}, + {record_name, session_msg}, + {attributes, record_info(fields, session_msg)}, + {storage_properties, [{ets, [{read_concurrency, true}, + {write_concurrency, true}]}]}]), + %% TODO: This should be external + ok = ekka_mnesia:create_table(?MSG_TAB, [ + {type, set}, + {ram_copies, [node()]}, + {record_name, message}, + {attributes, record_info(fields, message)}, + {storage_properties, [{ets, [{read_concurrency, true}, + {write_concurrency, true}]}]}]); +mnesia(copy) -> + ok = ekka_mnesia:copy_table(?ROUTE_TAB, ram_copies), + ok = ekka_mnesia:copy_table(?SESS_MSG_TAB, ram_copies), + %% TODO: This should be external + ok = ekka_mnesia:copy_table(?MSG_TAB, ram_copies). + +%%-------------------------------------------------------------------- +%% Start a router +%%-------------------------------------------------------------------- + +-spec(start_link(atom(), pos_integer()) -> startlink_ret()). +start_link(Pool, Id) -> + gen_server:start_link({local, emqx_misc:proc_name(?MODULE, Id)}, + ?MODULE, [Pool, Id], [{hibernate_after, 1000}]). + +%%-------------------------------------------------------------------- +%% Route APIs +%%-------------------------------------------------------------------- + +-spec(do_add_route(emqx_topic:topic(), dest()) -> ok | {error, term()}). +do_add_route(Topic, SessionID) when is_binary(Topic) -> + Route = #route{topic = Topic, dest = SessionID}, + case lists:member(Route, lookup_routes(Topic)) of + true -> ok; + false -> + case emqx_topic:wildcard(Topic) of + true -> maybe_trans(fun insert_trie_route/1, [Route]); + false -> insert_direct_route(Route) + end + end. + +%% @doc Match routes +-spec(match_routes(emqx_topic:topic()) -> [emqx_types:route()]). +match_routes(Topic) when is_binary(Topic) -> + case match_trie(Topic) of + [] -> lookup_routes(Topic); + Matched -> + lists:append([lookup_routes(To) || To <- [Topic | Matched]]) + end. + +%% Optimize: routing table will be replicated to all router nodes. +match_trie(Topic) -> + case emqx_trie:empty_session() of + true -> []; + false -> emqx_trie:match_session(Topic) + end. + +-spec(lookup_routes(emqx_topic:topic()) -> [emqx_types:route()]). +lookup_routes(Topic) -> + ets:lookup(?ROUTE_TAB, Topic). + +-spec(has_routes(emqx_topic:topic()) -> boolean()). +has_routes(Topic) when is_binary(Topic) -> + ets:member(?ROUTE_TAB, Topic). + +-spec(delete_route(emqx_topic:topic(), dest()) -> ok | {error, term()}). +delete_route(Topic, SessionID) when is_binary(Topic), is_binary(SessionID) -> + call(pick(Topic), {delete_route, Topic, SessionID}). + +-spec(do_delete_route(emqx_topic:topic(), dest()) -> ok | {error, term()}). +do_delete_route(Topic, SessionID) -> + Route = #route{topic = Topic, dest = SessionID}, + case emqx_topic:wildcard(Topic) of + true -> maybe_trans(fun delete_trie_route/1, [Route]); + false -> delete_direct_route(Route) + end. + +-spec(topics() -> list(emqx_topic:topic())). +topics() -> + mnesia:dirty_all_keys(?ROUTE_TAB). + +%% @doc Print routes to a topic +-spec(print_routes(emqx_topic:topic()) -> ok). +print_routes(Topic) -> + lists:foreach(fun(#route{topic = To, dest = SessionID}) -> + io:format("~s -> ~s~n", [To, SessionID]) + end, match_routes(Topic)). + +%%-------------------------------------------------------------------- +%% Message APIs +%%-------------------------------------------------------------------- + +persist(Msg) -> + case emqx_message:get_flag(dup, Msg) orelse emqx_message:is_sys(Msg) of + true -> ok; + false -> + case match_routes(emqx_message:topic(Msg)) of + [] -> ok; + Routes -> + mnesia:dirty_write(?MSG_TAB, Msg), + Fun = fun(Route) -> cast(pick(Route), {persist, Route, Msg}) end, + lists:foreach(Fun, Routes) + end + end. + +delivered(SessionID, MsgIds) -> + cast(pick(SessionID), {delivered, SessionID, MsgIds}). + +pending(SessionID) -> + call(pick(SessionID), {pending, SessionID}). + +call(Router, Msg) -> + gen_server:call(Router, Msg, infinity). + +cast(Router, Msg) -> + gen_server:cast(Router, Msg). + +pick(#route{dest = SessionID}) -> + gproc_pool:pick_worker(session_router_pool, SessionID); +pick(SessionID) when is_binary(SessionID) -> + gproc_pool:pick_worker(session_router_pool, SessionID). + +%%-------------------------------------------------------------------- +%% gen_server callbacks +%%-------------------------------------------------------------------- + +init([Pool, Id]) -> + true = gproc_pool:connect_worker(Pool, {Pool, Id}), + {ok, #{pool => Pool, id => Id}}. + +handle_call({pending, SessionID}, _From, State) -> + {reply, pending_messages(SessionID), State}; +handle_call(Req, _From, State) -> + ?LOG(error, "Unexpected call: ~p", [Req]), + {reply, ignored, State}. + +handle_cast({persist, #route{dest = SessionID}, Msg}, State) -> + Key = {SessionID, emqx_message:id(Msg), ?UNDELIVERED}, + ekka_mnesia:dirty_write(?SESS_MSG_TAB, #session_msg{ key = Key }), + {noreply, State}; +handle_cast({delivered, SessionID, MsgIDs}, State) -> + Fun = fun(MsgID) -> + Key = {SessionID, MsgID, ?DELIVERED}, + ekka_mnesia:dirty_write(?SESS_MSG_TAB, #session_msg{ key = Key }) + end, + lists:foreach(Fun, MsgIDs), + {noreply, State}; +handle_cast(Msg, State) -> + ?LOG(error, "Unexpected cast: ~p", [Msg]), + {noreply, State}. + +handle_info(Info, State) -> + ?LOG(error, "Unexpected info: ~p", [Info]), + {noreply, State}. + +terminate(_Reason, #{pool := Pool, id := Id}) -> + gproc_pool:disconnect_worker(Pool, {Pool, Id}). + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- + +pending_messages(SessionID) -> + %% TODO: The reading of messages should be from external DB + Fun = fun() -> [hd(mnesia:read(?MSG_TAB, MsgId)) + || MsgId <- pending_messages(SessionID, <<>>, ?DELIVERED, [])] + end, + {atomic, Msgs} = ekka_mnesia:ro_transaction(?PERSISTENT_SESSION_SHARD, Fun), + Msgs. + +%% The keys are ordered by +%% {sessionID(), emqx_guid:guid(), ?DELIVERED | ?UNDELIVERED} +%% where +%% emqx_guid:guid() is ordered in ts() and by node() +%% ?UNDELIVERED < ?DELIVERED +%% +%% We traverse the table until we reach another session. +%% TODO: Garbage collect the delivered messages. +pending_messages(SessionID, PrevMsgId, PrevTag, Acc) -> + case mnesia:dirty_next(?SESS_MSG_TAB, {SessionID, PrevMsgId, PrevTag}) of + {S, MsgId, Tag} = Key when S =:= SessionID, MsgId =:= PrevMsgId -> + Tag =:= ?UNDELIVERED andalso error({assert_fail}, Key), + pending_messages(SessionID, MsgId, Tag, Acc); + {S, MsgId, Tag} when S =:= SessionID -> + case PrevTag of + ?DELIVERED -> pending_messages(SessionID, MsgId, Tag, Acc); + ?UNDELIVERED -> pending_messages(SessionID, MsgId, Tag, [PrevMsgId|Acc]) + end; + _ -> %% Next sessionID or '$end_of_table' + case PrevTag of + ?DELIVERED -> lists:reverse(Acc); + ?UNDELIVERED -> lists:reverse([PrevMsgId|Acc]) + end + end. + +insert_direct_route(Route) -> + ekka_mnesia:dirty_write(?ROUTE_TAB, Route). + +insert_trie_route(Route = #route{topic = Topic}) -> + case mnesia:wread({?ROUTE_TAB, Topic}) of + [] -> emqx_trie:insert_session(Topic); + _ -> ok + end, + mnesia:write(?ROUTE_TAB, Route, sticky_write). + +delete_direct_route(Route) -> + mnesia:dirty_delete_object(?ROUTE_TAB, Route). + +delete_trie_route(Route = #route{topic = Topic}) -> + case mnesia:wread({?ROUTE_TAB, Topic}) of + [Route] -> %% Remove route and trie + ok = mnesia:delete_object(?ROUTE_TAB, Route, sticky_write), + emqx_trie:delete(Topic); + [_|_] -> %% Remove route only + mnesia:delete_object(?ROUTE_TAB, Route, sticky_write); + [] -> ok + end. + +%% @private +-spec(maybe_trans(function(), list(any())) -> ok | {error, term()}). +maybe_trans(Fun, Args) -> + case persistent_term:get(emqx_route_lock_type) of + key -> + trans(Fun, Args); + global -> + %% Assert: + mnesia = ekka_rlog:backend(), %% TODO: do something smarter than just crash + lock_router(), + try mnesia:sync_dirty(Fun, Args) + after + unlock_router() + end; + tab -> + trans(fun() -> + emqx_trie:lock_session_tables(), + apply(Fun, Args) + end, []) + end. + +%% The created fun only terminates with explicit exception +-dialyzer({nowarn_function, [trans/2]}). + +-spec(trans(function(), list(any())) -> ok | {error, term()}). +trans(Fun, Args) -> + {WPid, RefMon} = + spawn_monitor( + %% NOTE: this is under the assumption that crashes in Fun + %% are caught by mnesia:transaction/2. + %% Future changes should keep in mind that this process + %% always exit with database write result. + fun() -> + Res = case ekka_mnesia:transaction(?PERSISTENT_SESSION_SHARD, Fun, Args) of + {atomic, Ok} -> Ok; + {aborted, Reason} -> {error, Reason} + end, + exit({shutdown, Res}) + end), + %% Receive a 'shutdown' exit to pass result from the short-lived process. + %% so the receive below can be receive-mark optimized by the compiler. + %% + %% If the result is sent as a regular message, we'll have to + %% either demonitor (with flush which is essentially a 'receive' since + %% the process is no longer alive after the result has been received), + %% or use a plain 'receive' to drain the normal 'DOWN' message. + %% However the compiler does not optimize this second 'receive'. + receive + {'DOWN', RefMon, process, WPid, Info} -> + case Info of + {shutdown, Result} -> Result; + _ -> {error, {trans_crash, Info}} + end + end. + +lock_router() -> + %% if Retry is not 0, global:set_lock could sleep a random time up to 8s. + %% Considering we have a limited number of brokers, it is safe to use sleep 1 ms. + case global:set_lock({?MODULE, self()}, [node() | nodes()], 0) of + false -> + %% Force to sleep 1ms instead. + timer:sleep(1), + lock_router(); + true -> + ok + end. + +unlock_router() -> + global:del_lock({?MODULE, self()}). diff --git a/apps/emqx/src/emqx_trie.erl b/apps/emqx/src/emqx_trie.erl index 32c176b65..3469c0f5e 100644 --- a/apps/emqx/src/emqx_trie.erl +++ b/apps/emqx/src/emqx_trie.erl @@ -26,12 +26,17 @@ %% Trie APIs -export([ insert/1 + , insert_session/1 , match/1 + , match_session/1 , delete/1 + , delete_session/1 ]). -export([ empty/0 + , empty_session/0 , lock_tables/0 + , lock_session_tables/0 ]). -export([is_compact/0, set_compact/1]). @@ -42,6 +47,7 @@ -endif. -define(TRIE, emqx_trie). +-define(SESSION_TRIE, emqx_session_trie). -define(PREFIX(Prefix), {Prefix, 0}). -define(TOPIC(Topic), {Topic, 1}). @@ -64,6 +70,12 @@ mnesia(boot) -> {write_concurrency, true} ]}], ok = ekka_mnesia:create_table(?TRIE, [ + {ram_copies, [node()]}, + {record_name, ?TRIE}, + {attributes, record_info(fields, ?TRIE)}, + {type, ordered_set}, + {storage_properties, StoreProps}]), + ok = ekka_mnesia:create_table(?SESSION_TRIE, [ {ram_copies, [node()]}, {record_name, ?TRIE}, {attributes, record_info(fields, ?TRIE)}, @@ -71,6 +83,7 @@ mnesia(boot) -> {storage_properties, StoreProps}]); mnesia(copy) -> %% Copy topics table + ok = ekka_mnesia:copy_table(?SESSION_TRIE, ram_copies), ok = ekka_mnesia:copy_table(?TRIE, ram_copies). %%-------------------------------------------------------------------- @@ -80,24 +93,46 @@ mnesia(copy) -> %% @doc Insert a topic filter into the trie. -spec(insert(emqx_topic:topic()) -> ok). insert(Topic) when is_binary(Topic) -> + insert(Topic, ?TRIE). + +-spec(insert_session(emqx_topic:topic()) -> ok). +insert_session(Topic) when is_binary(Topic) -> + insert(Topic, ?SESSION_TRIE). + +insert(Topic, Trie) when is_binary(Topic) -> {TopicKey, PrefixKeys} = make_keys(Topic), - case mnesia:wread({?TRIE, TopicKey}) of + case mnesia:wread({Trie, TopicKey}) of [_] -> ok; %% already inserted - [] -> lists:foreach(fun insert_key/1, [TopicKey | PrefixKeys]) + [] -> lists:foreach(fun(Key) -> insert_key(Key, Trie) end, [TopicKey | PrefixKeys]) end. %% @doc Delete a topic filter from the trie. -spec(delete(emqx_topic:topic()) -> ok). delete(Topic) when is_binary(Topic) -> + delete(Topic, ?TRIE). + +%% @doc Delete a topic filter from the trie. +-spec(delete_session(emqx_topic:topic()) -> ok). +delete_session(Topic) when is_binary(Topic) -> + delete(Topic, ?SESSION_TRIE). + +delete(Topic, Trie) when is_binary(Topic) -> {TopicKey, PrefixKeys} = make_keys(Topic), - case [] =/= mnesia:wread({?TRIE, TopicKey}) of - true -> lists:foreach(fun delete_key/1, [TopicKey | PrefixKeys]); + case [] =/= mnesia:wread({Trie, TopicKey}) of + true -> lists:foreach(fun(Key) -> delete_key(Key, Trie) end, [TopicKey | PrefixKeys]); false -> ok end. %% @doc Find trie nodes that matchs the topic name. -spec(match(emqx_topic:topic()) -> list(emqx_topic:topic())). match(Topic) when is_binary(Topic) -> + match(Topic, ?TRIE). + +-spec(match_session(emqx_topic:topic()) -> list(emqx_topic:topic())). +match_session(Topic) when is_binary(Topic) -> + match(Topic, ?SESSION_TRIE). + +match(Topic, Trie) when is_binary(Topic) -> Words = emqx_topic:words(Topic), case emqx_topic:wildcard(Words) of true -> @@ -110,17 +145,26 @@ match(Topic) when is_binary(Topic) -> %% Such clients will get disconnected. []; false -> - do_match(Words) + do_match(Words, Trie) end. %% @doc Is the trie empty? -spec(empty() -> boolean()). -empty() -> ets:first(?TRIE) =:= '$end_of_table'. +empty() -> empty(?TRIE). + +empty_session() -> + empty(?SESSION_TRIE). + +empty(Trie) -> ets:first(Trie) =:= '$end_of_table'. -spec lock_tables() -> ok. lock_tables() -> mnesia:write_lock_table(?TRIE). +-spec lock_session_tables() -> ok. +lock_session_tables() -> + mnesia:write_lock_table(?SESSION_TRIE). + %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- @@ -168,70 +212,70 @@ make_prefixes([H | T], Prefix0, Acc0) -> Acc = [Prefix | Acc0], make_prefixes(T, Prefix, Acc). -insert_key(Key) -> - T = case mnesia:wread({?TRIE, Key}) of +insert_key(Key, Trie) -> + T = case mnesia:wread({Trie, Key}) of [#?TRIE{count = C} = T1] -> T1#?TRIE{count = C + 1}; [] -> #?TRIE{key = Key, count = 1} end, - ok = mnesia:write(T). + ok = mnesia:write(Trie, T, write). -delete_key(Key) -> - case mnesia:wread({?TRIE, Key}) of +delete_key(Key, Trie) -> + case mnesia:wread({Trie, Key}) of [#?TRIE{count = C} = T] when C > 1 -> - ok = mnesia:write(T#?TRIE{count = C - 1}); + ok = mnesia:write(Trie, T#?TRIE{count = C - 1}, write); [_] -> - ok = mnesia:delete(?TRIE, Key, write); + ok = mnesia:delete(Trie, Key, write); [] -> ok end. %% micro-optimization: no need to lookup when topic is not wildcard %% because we only insert wildcards to emqx_trie -lookup_topic(_Topic, false) -> []; -lookup_topic(Topic, true) -> lookup_topic(Topic). +lookup_topic(_Topic,_Trie, false) -> []; +lookup_topic(Topic, Trie, true) -> lookup_topic(Topic, Trie). -lookup_topic(Topic) when is_binary(Topic) -> - case ets:lookup(?TRIE, ?TOPIC(Topic)) of +lookup_topic(Topic, Trie) when is_binary(Topic) -> + case ets:lookup(Trie, ?TOPIC(Topic)) of [#?TRIE{count = C}] -> [Topic || C > 0]; [] -> [] end. -has_prefix(empty) -> true; %% this is the virtual tree root -has_prefix(Prefix) -> - case ets:lookup(?TRIE, ?PREFIX(Prefix)) of +has_prefix(empty, _Trie) -> true; %% this is the virtual tree root +has_prefix(Prefix, Trie) -> + case ets:lookup(Trie, ?PREFIX(Prefix)) of [#?TRIE{count = C}] -> C > 0; [] -> false end. -do_match([<<"$", _/binary>> = Prefix | Words]) -> +do_match([<<"$", _/binary>> = Prefix | Words], Trie) -> %% For topics having dollar sign prefix, %% we do not match root level + or #, %% fast forward to the next level. case Words =:= [] of - true -> lookup_topic(Prefix); + true -> lookup_topic(Prefix, Trie); false -> [] - end ++ do_match(Words, Prefix); -do_match(Words) -> - do_match(Words, empty). + end ++ do_match(Words, Prefix, Trie); +do_match(Words, Trie) -> + do_match(Words, empty, Trie). -do_match(Words, Prefix) -> +do_match(Words, Prefix, Trie) -> case is_compact() of - true -> match_compact(Words, Prefix, false, []); - false -> match_no_compact(Words, Prefix, false, []) + true -> match_compact(Words, Prefix, Trie, false, []); + false -> match_no_compact(Words, Prefix, Trie, false, []) end. -match_no_compact([], Topic, IsWildcard, Acc) -> - 'match_#'(Topic) ++ %% try match foo/+/# or foo/bar/# - lookup_topic(Topic, IsWildcard) ++ %% e.g. foo/+ +match_no_compact([], Topic, Trie, IsWildcard, Acc) -> + 'match_#'(Topic, Trie) ++ %% try match foo/+/# or foo/bar/# + lookup_topic(Topic, Trie, IsWildcard) ++ %% e.g. foo/+ Acc; -match_no_compact([Word | Words], Prefix, IsWildcard, Acc0) -> - case has_prefix(Prefix) of +match_no_compact([Word | Words], Prefix, Trie, IsWildcard, Acc0) -> + case has_prefix(Prefix, Trie) of true -> - Acc1 = 'match_#'(Prefix) ++ Acc0, - Acc = match_no_compact(Words, join(Prefix, '+'), true, Acc1), - match_no_compact(Words, join(Prefix, Word), IsWildcard, Acc); + Acc1 = 'match_#'(Prefix, Trie) ++ Acc0, + Acc = match_no_compact(Words, join(Prefix, '+'), Trie, true, Acc1), + match_no_compact(Words, join(Prefix, Word), Trie, IsWildcard, Acc); false -> %% non-compact paths in database %% if there is no prefix matches the current topic prefix @@ -248,26 +292,26 @@ match_no_compact([Word | Words], Prefix, IsWildcard, Acc0) -> Acc0 end. -match_compact([], Topic, IsWildcard, Acc) -> - 'match_#'(Topic) ++ %% try match foo/bar/# - lookup_topic(Topic, IsWildcard) ++ %% try match foo/bar +match_compact([], Topic, Trie, IsWildcard, Acc) -> + 'match_#'(Topic, Trie) ++ %% try match foo/bar/# + lookup_topic(Topic, Trie, IsWildcard) ++ %% try match foo/bar Acc; -match_compact([Word | Words], Prefix, IsWildcard, Acc0) -> - Acc1 = 'match_#'(Prefix) ++ Acc0, - Acc = match_compact(Words, join(Prefix, Word), IsWildcard, Acc1), +match_compact([Word | Words], Prefix, Trie, IsWildcard, Acc0) -> + Acc1 = 'match_#'(Prefix, Trie) ++ Acc0, + Acc = match_compact(Words, join(Prefix, Word), Trie, IsWildcard, Acc1), WildcardPrefix = join(Prefix, '+'), %% go deeper to match current_prefix/+ only when: %% 1. current word is the last %% OR %% 2. there is a prefix = 'current_prefix/+' - case Words =:= [] orelse has_prefix(WildcardPrefix) of - true -> match_compact(Words, WildcardPrefix, true, Acc); + case Words =:= [] orelse has_prefix(WildcardPrefix, Trie) of + true -> match_compact(Words, WildcardPrefix, Trie, true, Acc); false -> Acc end. -'match_#'(Prefix) -> +'match_#'(Prefix, Trie) -> MlTopic = join(Prefix, '#'), - lookup_topic(MlTopic). + lookup_topic(MlTopic, Trie). is_compact() -> emqx_config:get([broker, perf, trie_compaction], true). diff --git a/apps/emqx/test/emqx_trie_SUITE.erl b/apps/emqx/test/emqx_trie_SUITE.erl index 51106f529..b70c17c48 100644 --- a/apps/emqx/test/emqx_trie_SUITE.erl +++ b/apps/emqx/test/emqx_trie_SUITE.erl @@ -186,7 +186,7 @@ t_delete3(_) -> ?TRIE:delete(<<"sensor/+/unknown">>) end), ?assertEqual([], ?TRIE:match(<<"sensor">>)), - ?assertEqual([], ?TRIE:lookup_topic(<<"sensor/+">>)). + ?assertEqual([], ?TRIE:lookup_topic(<<"sensor/+">>, ?TRIE)). clear_tables() -> emqx_trie:clear_tables().