From 0dbb7394163dd33b544ce91eb69d83350ff5a509 Mon Sep 17 00:00:00 2001 From: Ery Lee Date: Mon, 8 Dec 2014 13:04:03 +0800 Subject: [PATCH] merge 'slimrt' pubsub module --- apps/emqtt/include/emqtt.hrl | 5 + apps/emqtt/src/emqtt.app.src | 5 +- apps/emqtt/src/emqtt_client.erl | 10 +- apps/emqtt/src/emqtt_http.erl | 2 +- apps/emqtt/src/emqtt_pubsub.erl | 243 +++++++++++++++++++++++++++++-- apps/emqtt/src/emqtt_router.erl | 251 -------------------------------- apps/emqtt/src/emqtt_sup.erl | 1 - 7 files changed, 243 insertions(+), 274 deletions(-) delete mode 100644 apps/emqtt/src/emqtt_router.erl diff --git a/apps/emqtt/include/emqtt.hrl b/apps/emqtt/include/emqtt.hrl index 0876c84f8..b543bde66 100644 --- a/apps/emqtt/include/emqtt.hrl +++ b/apps/emqtt/include/emqtt.hrl @@ -37,6 +37,8 @@ -define(QOS_1, 1). -define(QOS_2, 2). +-type qos() :: ?QOS_2 | ?QOS_1 | ?QOS_0. + -record(mqtt_msg, { retain, qos, @@ -47,3 +49,6 @@ encoder }). +-type mqtt_msg() :: #mqtt_msg{}. + + diff --git a/apps/emqtt/src/emqtt.app.src b/apps/emqtt/src/emqtt.app.src index 864caf5b3..68c2c1fbd 100644 --- a/apps/emqtt/src/emqtt.app.src +++ b/apps/emqtt/src/emqtt.app.src @@ -3,10 +3,7 @@ {description, "Erlang MQTT Broker"}, {vsn, git}, {modules, []}, - {registered, [emqtt_auth, - emqtt_router, - emqtt_registry - ]}, + {registered, [ ]}, {applications, [kernel, stdlib]}, {mod, {emqtt_app, []}}, diff --git a/apps/emqtt/src/emqtt_client.erl b/apps/emqtt/src/emqtt_client.erl index 305c4237e..dc35f3149 100644 --- a/apps/emqtt/src/emqtt_client.erl +++ b/apps/emqtt/src/emqtt_client.erl @@ -273,7 +273,7 @@ process_request(?CONNECT, process_request(?PUBLISH, Frame=#mqtt_frame{ fixed = #mqtt_frame_fixed{qos = ?QOS_0}}, State) -> - emqtt_router:publish(make_msg(Frame)), + emqtt_pubsub:publish(make_msg(Frame)), {ok, State}; process_request(?PUBLISH, @@ -281,7 +281,7 @@ process_request(?PUBLISH, fixed = #mqtt_frame_fixed{qos = ?QOS_1}, variable = #mqtt_frame_publish{message_id = MsgId}}, State=#state{socket=Sock}) -> - emqtt_router:publish(make_msg(Frame)), + emqtt_pubsub:publish(make_msg(Frame)), send_frame(Sock, #mqtt_frame{fixed = #mqtt_frame_fixed{ type = ?PUBACK }, variable = #mqtt_frame_publish{ message_id = MsgId}}), {ok, State}; @@ -291,7 +291,7 @@ process_request(?PUBLISH, fixed = #mqtt_frame_fixed{qos = ?QOS_2}, variable = #mqtt_frame_publish{message_id = MsgId}}, State=#state{socket=Sock}) -> - emqtt_router:publish(make_msg(Frame)), + emqtt_pubsub:publish(make_msg(Frame)), put({msg, MsgId}, pubrec), send_frame(Sock, #mqtt_frame{fixed = #mqtt_frame_fixed{type = ?PUBREC}, variable = #mqtt_frame_publish{ message_id = MsgId}}), @@ -333,7 +333,7 @@ process_request(?SUBSCRIBE, payload = undefined}, #state{socket=Sock} = State) -> - [emqtt_router:subscribe({Name, Qos}, self()) || + [emqtt_pubsub:subscribe({Name, Qos}, self()) || #mqtt_topic{name=Name, qos=Qos} <- Topics], GrantedQos = [Qos || #mqtt_topic{qos=Qos} <- Topics], @@ -352,7 +352,7 @@ process_request(?UNSUBSCRIBE, payload = undefined}, #state{socket = Sock, client_id = ClientId} = State) -> - [emqtt_router:unsubscribe(Name, self()) || #mqtt_topic{name=Name} <- Topics], + [emqtt_pubsub:unsubscribe(Name, self()) || #mqtt_topic{name=Name} <- Topics], send_frame(Sock, #mqtt_frame{fixed = #mqtt_frame_fixed{type = ?UNSUBACK }, variable = #mqtt_frame_suback{message_id = MessageId }}), diff --git a/apps/emqtt/src/emqtt_http.erl b/apps/emqtt/src/emqtt_http.erl index 80a009cc5..63de4eaac 100644 --- a/apps/emqtt/src/emqtt_http.erl +++ b/apps/emqtt/src/emqtt_http.erl @@ -46,7 +46,7 @@ handle('POST', "/mqtt/publish", Req) -> Message = list_to_binary(get_value("message", Params)), Qos = list_to_integer(get_value("qos", Params, "0")), %TODO: DUP, RETAIN... - emqtt_router:publish(Topic, #mqtt_msg { + emqtt_pubsub:publish(#mqtt_msg { retain = 0, qos = Qos, topic = Topic, diff --git a/apps/emqtt/src/emqtt_pubsub.erl b/apps/emqtt/src/emqtt_pubsub.erl index d61153e67..ed5e93fee 100644 --- a/apps/emqtt/src/emqtt_pubsub.erl +++ b/apps/emqtt/src/emqtt_pubsub.erl @@ -22,9 +22,13 @@ -module(emqtt_pubsub). --behaviour(gen_server). +-include("emqtt.hrl"). --define(SERVER, ?MODULE). +-include("emqtt_log.hrl"). + +-include("emqtt_internal.hrl"). + +-include_lib("stdlib/include/qlc.hrl"). %% ------------------------------------------------------------------ %% API Function Exports @@ -32,35 +36,155 @@ -export([start_link/0]). +-export([topics/0, + subscribe/2, + unsubscribe/2, + publish/1, + publish/2, + %local node + dispatch/2, + match/1]). + %% ------------------------------------------------------------------ %% gen_server Function Exports %% ------------------------------------------------------------------ --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). +-behaviour(gen_server). + +-define(SERVER, ?MODULE). + +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3]). + +-record(state, {}). %% ------------------------------------------------------------------ %% API Function Definitions %% ------------------------------------------------------------------ +%% +%% @doc Start Pubsub. +%% start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). +%% +%% @doc All topics +%% +-spec topics() -> list(topic()). +topics() -> + mnesia:dirty_all_keys(topic). + +%% +%% @doc Subscribe Topic +%% +-spec subscribe({Topic :: binary(), Qos :: qos()}, SubPid :: pid()) -> any(). +subscribe({Topic, Qos}, SubPid) when is_binary(Topic) and is_pid(SubPid) -> + gen_server:call(?SERVER, {subscribe, {Topic, Qos}, SubPid}). + +%% +%% @doc Unsubscribe Topic +%% +-spec unsubscribe(Topic :: binary(), SubPid :: pid()) -> any(). +unsubscribe(Topic, SubPid) when is_binary(Topic) and is_pid(SubPid) -> + gen_server:cast(?SERVER, {unsubscribe, Topic, SubPid}). + +%% +%% @doc Publish to cluster node. +%% +-spec publish(Msg :: mqtt_msg()) -> ok. +publish(Msg=#mqtt_msg{topic=Topic}) -> + publish(Topic, Msg). + +-spec publish(Topic :: binary(), Msg :: mqtt_msg()) -> any(). +publish(Topic, Msg) when is_binary(Topic) -> + lists:foreach(fun(#topic{name=Name, node=Node}) -> + case Node =:= node() of + true -> dispatch(Name, Msg); + false -> rpc:call(Node, ?MODULE, dispatch, [Name, Msg]) + end + end, match(Topic)). + +%dispatch locally, should only be called by publish +dispatch(Topic, Msg) when is_binary(Topic) -> + [SubPid ! {dispatch, Msg} || #topic_subscriber{subpid=SubPid} <- ets:lookup(topic_subscriber, Topic)]. + +-spec match(Topic :: binary()) -> [topic()]. +match(Topic) when is_binary(Topic) -> + TrieNodes = mnesia:async_dirty(fun trie_match/1, [emqtt_topic:words(Topic)]), + Names = [Name || #topic_trie_node{topic=Name} <- TrieNodes, Name=/= undefined], + lists:flatten([mnesia:dirty_read(topic, Name) || Name <- Names]). + %% ------------------------------------------------------------------ %% gen_server Function Definitions %% ------------------------------------------------------------------ -init(Args) -> - {ok, Args}. +init([]) -> + mnesia:create_table(topic_trie, [ + {ram_copies, [node()]}, + {attributes, record_info(fields, topic_trie)}]), + mnesia:create_table(topic_trie_node, [ + {ram_copies, [node()]}, + {attributes, record_info(fields, topic_trie_node)}]), + mnesia:create_table(topic, [ + {type, bag}, + {record_name, topic}, + {ram_copies, [node()]}, + {attributes, record_info(fields, topic)}]), + mnesia:add_table_copy(topic_trie, node(), ram_copies), + mnesia:add_table_copy(topic_trie_node, node(), ram_copies), + mnesia:add_table_copy(topic, node(), ram_copies), + ets:new(topic_subscriber, [bag, named_table, {keypos, 2}]), + {ok, #state{}}. -handle_call(_Request, _From, State) -> - {reply, ok, State}. +handle_call({subscribe, {Topic, Qos}, SubPid}, _From, State) -> + case mnesia:transaction(fun trie_add/1, [Topic]) of + {atomic, _} -> + case get({subscriber, SubPid}) of + undefined -> + MonRef = erlang:monitor(process, SubPid), + put({subcriber, SubPid}, MonRef), + put({submon, MonRef}, SubPid); + _ -> + already_monitored + end, + ets:insert(topic_subscriber, #topic_subscriber{topic=Topic, qos = Qos, subpid=SubPid}), + {reply, ok, State}; + {aborted, Reason} -> + {reply, {error, Reason}, State} + end; -handle_cast(_Msg, State) -> - {noreply, State}. +handle_call(Req, _From, State) -> + {stop, {badreq, Req}, State}. -handle_info(_Info, State) -> - {noreply, State}. +handle_cast({unsubscribe, Topic, SubPid}, State) -> + ets:match_delete(topic_subscriber, #topic_subscriber{topic=Topic, qos ='_', subpid=SubPid}), + try_remove_topic(Topic), + {noreply, State}; + +handle_cast(Msg, State) -> + {stop, {badmsg, Msg}, State}. + +handle_info({'DOWN', Mon, _Type, _Object, _Info}, State) -> + case get({submon, Mon}) of + undefined -> + ?ERROR("unexpected 'DOWN': ~p", [Mon]); + SubPid -> + %?INFO("subscriber DOWN: ~p", [SubPid]), + erase({submon, Mon}), + erase({subscriber, SubPid}), + Subs = ets:match_object(topic_subscriber, #topic_subscriber{subpid=SubPid, _='_'}), + [ets:delete_object(topic_subscriber, Sub) || Sub <- Subs], + [try_remove_topic(Topic) || #topic_subscriber{topic=Topic} <- Subs] + end, + {noreply, State}; + +handle_info(Info, State) -> + {stop, {badinfo, Info}, State}. terminate(_Reason, _State) -> ok. @@ -71,4 +195,99 @@ code_change(_OldVsn, State, _Extra) -> %% ------------------------------------------------------------------ %% Internal Function Definitions %% ------------------------------------------------------------------ +try_remove_topic(Name) when is_binary(Name) -> + case ets:member(topic_subscriber, Name) of + false -> + Topic = emqtt_topic:new(Name), + Fun = fun() -> + mnesia:delete_object(Topic), + case mnesia:read(topic, Name) of + [] -> trie_delete(Name); + _ -> ignore + end + end, + mnesia:transaction(Fun); + true -> + ok + end. + +trie_add(Topic) when is_binary(Topic) -> + mnesia:write(emqtt_topic:new(Topic)), + case mnesia:read(topic_trie_node, Topic) of + [TrieNode=#topic_trie_node{topic=undefined}] -> + mnesia:write(TrieNode#topic_trie_node{topic=Topic}); + [#topic_trie_node{topic=Topic}] -> + ignore; + [] -> + %add trie path + [trie_add_path(Triple) || Triple <- emqtt_topic:triples(Topic)], + %add last node + mnesia:write(#topic_trie_node{node_id=Topic, topic=Topic}) + end. + +trie_delete(Topic) when is_binary(Topic) -> + case mnesia:read(topic_trie_node, Topic) of + [#topic_trie_node{edge_count=0}] -> + mnesia:delete({topic_trie_node, Topic}), + trie_delete_path(lists:reverse(emqtt_topic:triples(Topic))); + [TrieNode] -> + mnesia:write(TrieNode#topic_trie_node{topic=Topic}); + [] -> + ignore + end. + +trie_match(Words) -> + trie_match(root, Words, []). + +trie_match(NodeId, [], ResAcc) -> + mnesia:read(topic_trie_node, NodeId) ++ 'trie_match_#'(NodeId, ResAcc); + +trie_match(NodeId, [W|Words], ResAcc) -> + lists:foldl(fun(WArg, Acc) -> + case mnesia:read(topic_trie, #topic_trie_edge{node_id=NodeId, word=WArg}) of + [#topic_trie{node_id=ChildId}] -> trie_match(ChildId, Words, Acc); + [] -> Acc + end + end, 'trie_match_#'(NodeId, ResAcc), [W, "+"]). + +'trie_match_#'(NodeId, ResAcc) -> + case mnesia:read(topic_trie, #topic_trie_edge{node_id=NodeId, word="#"}) of + [#topic_trie{node_id=ChildId}] -> + mnesia:read(topic_trie_node, ChildId) ++ ResAcc; + [] -> + ResAcc + end. + +trie_add_path({Node, Word, Child}) -> + Edge = #topic_trie_edge{node_id=Node, word=Word}, + case mnesia:read(topic_trie_node, Node) of + [TrieNode = #topic_trie_node{edge_count=Count}] -> + case mnesia:read(topic_trie, Edge) of + [] -> + mnesia:write(TrieNode#topic_trie_node{edge_count=Count+1}), + mnesia:write(#topic_trie{edge=Edge, node_id=Child}); + [_] -> + ok + end; + [] -> + mnesia:write(#topic_trie_node{node_id=Node, edge_count=1}), + mnesia:write(#topic_trie{edge=Edge, node_id=Child}) + end. + +trie_delete_path([]) -> + ok; +trie_delete_path([{NodeId, Word, _} | RestPath]) -> + Edge = #topic_trie_edge{node_id=NodeId, word=Word}, + mnesia:delete({topic_trie, Edge}), + case mnesia:read(topic_trie_node, NodeId) of + [#topic_trie_node{edge_count=1, topic=undefined}] -> + mnesia:delete({topic_trie_node, NodeId}), + trie_delete_path(RestPath); + [TrieNode=#topic_trie_node{edge_count=1, topic=_}] -> + mnesia:write(TrieNode#topic_trie_node{edge_count=0}); + [TrieNode=#topic_trie_node{edge_count=C}] -> + mnesia:write(TrieNode#topic_trie_node{edge_count=C-1}); + [] -> + throw({notfound, NodeId}) + end. diff --git a/apps/emqtt/src/emqtt_router.erl b/apps/emqtt/src/emqtt_router.erl deleted file mode 100644 index 3803215bb..000000000 --- a/apps/emqtt/src/emqtt_router.erl +++ /dev/null @@ -1,251 +0,0 @@ -%%----------------------------------------------------------------------------- -%% Copyright (c) 2014, Feng Lee -%% -%% Permission is hereby granted, free of charge, to any person obtaining a copy -%% of this software and associated documentation files (the "Software"), to deal -%% in the Software without restriction, including without limitation the rights -%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -%% copies of the Software, and to permit persons to whom the Software is -%% furnished to do so, subject to the following conditions: -%% -%% The above copyright notice and this permission notice shall be included in all -%% copies or substantial portions of the Software. -%% -%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -%% SOFTWARE. -%%------------------------------------------------------------------------------ - --module(emqtt_router). - --include("emqtt.hrl"). - --include("emqtt_log.hrl"). - --include("emqtt_frame.hrl"). - --include("emqtt_internal.hrl"). - --include_lib("stdlib/include/qlc.hrl"). - --export([start_link/0]). - --export([topics/0, - subscribe/2, - unsubscribe/2, - publish/1, - publish/2, - route/2, - match/1, - down/1]). - --behaviour(gen_server). - --export([init/1, - handle_call/3, - handle_cast/2, - handle_info/2, - terminate/2, - code_change/3]). - --record(state, {}). - -start_link() -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). - -topics() -> - mnesia:dirty_all_keys(topic). - -subscribe({Topic, Qos}, Client) when is_pid(Client) -> - gen_server:call(?MODULE, {subscribe, {Topic, Qos}, Client}). - -unsubscribe(Topic, Client) when is_list(Topic) and is_pid(Client) -> - gen_server:cast(?MODULE, {unsubscribe, Topic, Client}). - -publish(Msg=#mqtt_msg{topic=Topic}) -> - publish(Topic, Msg). - -%publish to cluster node. -publish(Topic, Msg) when is_list(Topic) and is_record(Msg, mqtt_msg) -> - lists:foreach(fun(#topic{name=Name, node=Node}) -> - case Node == node() of - true -> route(Name, Msg); - false -> rpc:call(Node, ?MODULE, route, [Name, Msg]) - end - end, match(Topic)). - -%route locally, should only be called by publish -route(Topic, Msg) -> - [Client ! {route, Msg#mqtt_msg{qos=Qos}} || #subscriber{qos=Qos, client=Client} <- ets:lookup(subscriber, Topic)]. - -match(Topic) when is_list(Topic) -> - TrieNodes = mnesia:async_dirty(fun trie_match/1, [emqtt_topic:words(Topic)]), - Names = [Name || #trie_node{topic=Name} <- TrieNodes, Name=/= undefined], - lists:flatten([mnesia:dirty_read(topic, Name) || Name <- Names]). - -%TODO: this api is really ugly -down(Client) when is_pid(Client) -> - gen_server:cast(?MODULE, {down, Client}). - -init([]) -> - mnesia:create_table(trie, [ - {ram_copies, [node()]}, - {attributes, record_info(fields, trie)}]), - mnesia:add_table_copy(trie, node(), ram_copies), - mnesia:create_table(trie_node, [ - {ram_copies, [node()]}, - {attributes, record_info(fields, trie_node)}]), - mnesia:add_table_copy(trie_node, node(), ram_copies), - mnesia:create_table(topic, [ - {type, bag}, - {record_name, topic}, - {ram_copies, [node()]}, - {attributes, record_info(fields, topic)}]), - mnesia:add_table_copy(topic, node(), ram_copies), - ets:new(subscriber, [bag, named_table, {keypos, 2}]), - ?INFO_MSG("emqtt_router is started."), - {ok, #state{}}. - -handle_call({subscribe, {Topic, Qos}, Client}, _From, State) -> - case mnesia:transaction(fun trie_add/1, [Topic]) of - {atomic, _} -> - ets:insert(subscriber, #subscriber{topic=Topic, qos=Qos, client=Client}), - emqtt_retained:send(Topic, Client), - {reply, ok, State}; - {aborted, Reason} -> - {reply, {error, Reason}, State} - end; - -handle_call(Req, _From, State) -> - {stop, {badreq, Req}, State}. - -handle_cast({unsubscribe, Topic, Client}, State) -> - ets:match_delete(subscriber, #subscriber{topic=Topic, client=Client, _='_'}), - try_remove_topic(Topic), - {noreply, State}; - -handle_cast({down, Client}, State) -> - case ets:match_object(subscriber, #subscriber{client=Client, _='_'}) of - [] -> ignore; - Subs -> - [ets:delete_object(subscriber, Sub) || Sub <- Subs], - [try_remove_topic(Topic) || #subscriber{topic=Topic} <- Subs] - end, - {noreply, State}; - -handle_cast(Msg, State) -> - {stop, {badmsg, Msg}, State}. - -handle_info(Info, State) -> - {stop, {badinfo, Info}, State}. - -terminate(_Reason, _State) -> - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -%% ------------------------------------------------------------------------ -%% internal functions -%% ------------------------------------------------------------------------ -try_remove_topic(Name) -> - case ets:member(subscriber, Name) of - false -> - Topic = emqtt_topic:new(Name), - Fun = fun() -> - mnesia:delete_object(topic, Topic), - case mnesia:read(topic, Topic) of - [] -> trie_delete(Name); - _ -> ignore - end - end, - mnesia:transaction(Fun); - true -> - ok - end. - -trie_add(Topic) -> - mnesia:write(emqtt_topic:new(Topic)), - case mnesia:read(trie_node, Topic) of - [TrieNode=#trie_node{topic=undefined}] -> - mnesia:write(TrieNode#trie_node{topic=Topic}); - [#trie_node{topic=Topic}] -> - ignore; - [] -> - %add trie path - [trie_add_path(Triple) || Triple <- emqtt_topic:triples(Topic)], - %add last node - mnesia:write(#trie_node{node_id=Topic, topic=Topic}) - end. - -trie_delete(Topic) -> - case mnesia:read(trie_node, Topic) of - [#trie_node{edge_count=0}] -> - mnesia:delete({trie_node, Topic}), - trie_delete_path(lists:reverse(emqtt_topic:triples(Topic))); - [TrieNode] -> - mnesia:write(TrieNode#trie_node{topic=Topic}); - [] -> - ignore - end. - -trie_match(Words) -> - trie_match(root, Words, []). - -trie_match(NodeId, [], ResAcc) -> - mnesia:read(trie_node, NodeId) ++ 'trie_match_#'(NodeId, ResAcc); - -trie_match(NodeId, [W|Words], ResAcc) -> - lists:foldl(fun(WArg, Acc) -> - case mnesia:read(trie, #trie_edge{node_id=NodeId, word=WArg}) of - [#trie{node_id=ChildId}] -> trie_match(ChildId, Words, Acc); - [] -> Acc - end - end, 'trie_match_#'(NodeId, ResAcc), [W, "+"]). - -'trie_match_#'(NodeId, ResAcc) -> - case mnesia:read(trie, #trie_edge{node_id=NodeId, word="#"}) of - [#trie{node_id=ChildId}] -> - mnesia:read(trie_node, ChildId) ++ ResAcc; - [] -> - ResAcc - end. - -trie_add_path({Node, Word, Child}) -> - Edge = #trie_edge{node_id=Node, word=Word}, - case mnesia:read(trie_node, Node) of - [TrieNode = #trie_node{edge_count=Count}] -> - case mnesia:read(trie, Edge) of - [] -> - mnesia:write(TrieNode#trie_node{edge_count=Count+1}), - mnesia:write(#trie{edge=Edge, node_id=Child}); - [_] -> - ok - end; - [] -> - mnesia:write(#trie_node{node_id=Node, edge_count=1}), - mnesia:write(#trie{edge=Edge, node_id=Child}) - end. - -trie_delete_path([]) -> - ok; -trie_delete_path([{NodeId, Word, _} | RestPath]) -> - Edge = #trie_edge{node_id=NodeId, word=Word}, - mnesia:delete({trie, Edge}), - case mnesia:read(trie_node, NodeId) of - [#trie_node{edge_count=1, topic=undefined}] -> - mnesia:delete({trie_node, NodeId}), - trie_delete_path(RestPath); - [TrieNode=#trie_node{edge_count=1, topic=_}] -> - mnesia:write(TrieNode#trie_node{edge_count=0}); - [TrieNode=#trie_node{edge_count=C}] -> - mnesia:write(TrieNode#trie_node{edge_count=C-1}); - [] -> - throw({notfound, NodeId}) - end. - - diff --git a/apps/emqtt/src/emqtt_sup.erl b/apps/emqtt/src/emqtt_sup.erl index f2baca7a0..f8e0af072 100644 --- a/apps/emqtt/src/emqtt_sup.erl +++ b/apps/emqtt/src/emqtt_sup.erl @@ -64,7 +64,6 @@ init([]) -> ?CHILD(emqtt_auth, worker), ?CHILD(emqtt_retained, worker), ?CHILD(emqtt_pubsub, worker), - ?CHILD(emqtt_router, worker), ?CHILD(emqtt_registry, worker)]} }.