merge 'slimrt' pubsub module
This commit is contained in:
parent
be019ca033
commit
0dbb739416
|
@ -37,6 +37,8 @@
|
||||||
-define(QOS_1, 1).
|
-define(QOS_1, 1).
|
||||||
-define(QOS_2, 2).
|
-define(QOS_2, 2).
|
||||||
|
|
||||||
|
-type qos() :: ?QOS_2 | ?QOS_1 | ?QOS_0.
|
||||||
|
|
||||||
-record(mqtt_msg, {
|
-record(mqtt_msg, {
|
||||||
retain,
|
retain,
|
||||||
qos,
|
qos,
|
||||||
|
@ -47,3 +49,6 @@
|
||||||
encoder
|
encoder
|
||||||
}).
|
}).
|
||||||
|
|
||||||
|
-type mqtt_msg() :: #mqtt_msg{}.
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -3,10 +3,7 @@
|
||||||
{description, "Erlang MQTT Broker"},
|
{description, "Erlang MQTT Broker"},
|
||||||
{vsn, git},
|
{vsn, git},
|
||||||
{modules, []},
|
{modules, []},
|
||||||
{registered, [emqtt_auth,
|
{registered, [ ]},
|
||||||
emqtt_router,
|
|
||||||
emqtt_registry
|
|
||||||
]},
|
|
||||||
{applications, [kernel,
|
{applications, [kernel,
|
||||||
stdlib]},
|
stdlib]},
|
||||||
{mod, {emqtt_app, []}},
|
{mod, {emqtt_app, []}},
|
||||||
|
|
|
@ -273,7 +273,7 @@ process_request(?CONNECT,
|
||||||
|
|
||||||
process_request(?PUBLISH, Frame=#mqtt_frame{
|
process_request(?PUBLISH, Frame=#mqtt_frame{
|
||||||
fixed = #mqtt_frame_fixed{qos = ?QOS_0}}, State) ->
|
fixed = #mqtt_frame_fixed{qos = ?QOS_0}}, State) ->
|
||||||
emqtt_router:publish(make_msg(Frame)),
|
emqtt_pubsub:publish(make_msg(Frame)),
|
||||||
{ok, State};
|
{ok, State};
|
||||||
|
|
||||||
process_request(?PUBLISH,
|
process_request(?PUBLISH,
|
||||||
|
@ -281,7 +281,7 @@ process_request(?PUBLISH,
|
||||||
fixed = #mqtt_frame_fixed{qos = ?QOS_1},
|
fixed = #mqtt_frame_fixed{qos = ?QOS_1},
|
||||||
variable = #mqtt_frame_publish{message_id = MsgId}},
|
variable = #mqtt_frame_publish{message_id = MsgId}},
|
||||||
State=#state{socket=Sock}) ->
|
State=#state{socket=Sock}) ->
|
||||||
emqtt_router:publish(make_msg(Frame)),
|
emqtt_pubsub:publish(make_msg(Frame)),
|
||||||
send_frame(Sock, #mqtt_frame{fixed = #mqtt_frame_fixed{ type = ?PUBACK },
|
send_frame(Sock, #mqtt_frame{fixed = #mqtt_frame_fixed{ type = ?PUBACK },
|
||||||
variable = #mqtt_frame_publish{ message_id = MsgId}}),
|
variable = #mqtt_frame_publish{ message_id = MsgId}}),
|
||||||
{ok, State};
|
{ok, State};
|
||||||
|
@ -291,7 +291,7 @@ process_request(?PUBLISH,
|
||||||
fixed = #mqtt_frame_fixed{qos = ?QOS_2},
|
fixed = #mqtt_frame_fixed{qos = ?QOS_2},
|
||||||
variable = #mqtt_frame_publish{message_id = MsgId}},
|
variable = #mqtt_frame_publish{message_id = MsgId}},
|
||||||
State=#state{socket=Sock}) ->
|
State=#state{socket=Sock}) ->
|
||||||
emqtt_router:publish(make_msg(Frame)),
|
emqtt_pubsub:publish(make_msg(Frame)),
|
||||||
put({msg, MsgId}, pubrec),
|
put({msg, MsgId}, pubrec),
|
||||||
send_frame(Sock, #mqtt_frame{fixed = #mqtt_frame_fixed{type = ?PUBREC},
|
send_frame(Sock, #mqtt_frame{fixed = #mqtt_frame_fixed{type = ?PUBREC},
|
||||||
variable = #mqtt_frame_publish{ message_id = MsgId}}),
|
variable = #mqtt_frame_publish{ message_id = MsgId}}),
|
||||||
|
@ -333,7 +333,7 @@ process_request(?SUBSCRIBE,
|
||||||
payload = undefined},
|
payload = undefined},
|
||||||
#state{socket=Sock} = State) ->
|
#state{socket=Sock} = State) ->
|
||||||
|
|
||||||
[emqtt_router:subscribe({Name, Qos}, self()) ||
|
[emqtt_pubsub:subscribe({Name, Qos}, self()) ||
|
||||||
#mqtt_topic{name=Name, qos=Qos} <- Topics],
|
#mqtt_topic{name=Name, qos=Qos} <- Topics],
|
||||||
|
|
||||||
GrantedQos = [Qos || #mqtt_topic{qos=Qos} <- Topics],
|
GrantedQos = [Qos || #mqtt_topic{qos=Qos} <- Topics],
|
||||||
|
@ -352,7 +352,7 @@ process_request(?UNSUBSCRIBE,
|
||||||
payload = undefined}, #state{socket = Sock, client_id = ClientId} = State) ->
|
payload = undefined}, #state{socket = Sock, client_id = ClientId} = State) ->
|
||||||
|
|
||||||
|
|
||||||
[emqtt_router:unsubscribe(Name, self()) || #mqtt_topic{name=Name} <- Topics],
|
[emqtt_pubsub:unsubscribe(Name, self()) || #mqtt_topic{name=Name} <- Topics],
|
||||||
|
|
||||||
send_frame(Sock, #mqtt_frame{fixed = #mqtt_frame_fixed{type = ?UNSUBACK },
|
send_frame(Sock, #mqtt_frame{fixed = #mqtt_frame_fixed{type = ?UNSUBACK },
|
||||||
variable = #mqtt_frame_suback{message_id = MessageId }}),
|
variable = #mqtt_frame_suback{message_id = MessageId }}),
|
||||||
|
|
|
@ -46,7 +46,7 @@ handle('POST', "/mqtt/publish", Req) ->
|
||||||
Message = list_to_binary(get_value("message", Params)),
|
Message = list_to_binary(get_value("message", Params)),
|
||||||
Qos = list_to_integer(get_value("qos", Params, "0")),
|
Qos = list_to_integer(get_value("qos", Params, "0")),
|
||||||
%TODO: DUP, RETAIN...
|
%TODO: DUP, RETAIN...
|
||||||
emqtt_router:publish(Topic, #mqtt_msg {
|
emqtt_pubsub:publish(#mqtt_msg {
|
||||||
retain = 0,
|
retain = 0,
|
||||||
qos = Qos,
|
qos = Qos,
|
||||||
topic = Topic,
|
topic = Topic,
|
||||||
|
|
|
@ -22,9 +22,13 @@
|
||||||
|
|
||||||
-module(emqtt_pubsub).
|
-module(emqtt_pubsub).
|
||||||
|
|
||||||
-behaviour(gen_server).
|
-include("emqtt.hrl").
|
||||||
|
|
||||||
-define(SERVER, ?MODULE).
|
-include("emqtt_log.hrl").
|
||||||
|
|
||||||
|
-include("emqtt_internal.hrl").
|
||||||
|
|
||||||
|
-include_lib("stdlib/include/qlc.hrl").
|
||||||
|
|
||||||
%% ------------------------------------------------------------------
|
%% ------------------------------------------------------------------
|
||||||
%% API Function Exports
|
%% API Function Exports
|
||||||
|
@ -32,35 +36,155 @@
|
||||||
|
|
||||||
-export([start_link/0]).
|
-export([start_link/0]).
|
||||||
|
|
||||||
|
-export([topics/0,
|
||||||
|
subscribe/2,
|
||||||
|
unsubscribe/2,
|
||||||
|
publish/1,
|
||||||
|
publish/2,
|
||||||
|
%local node
|
||||||
|
dispatch/2,
|
||||||
|
match/1]).
|
||||||
|
|
||||||
%% ------------------------------------------------------------------
|
%% ------------------------------------------------------------------
|
||||||
%% gen_server Function Exports
|
%% gen_server Function Exports
|
||||||
%% ------------------------------------------------------------------
|
%% ------------------------------------------------------------------
|
||||||
|
|
||||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
-behaviour(gen_server).
|
||||||
terminate/2, code_change/3]).
|
|
||||||
|
-define(SERVER, ?MODULE).
|
||||||
|
|
||||||
|
-export([init/1,
|
||||||
|
handle_call/3,
|
||||||
|
handle_cast/2,
|
||||||
|
handle_info/2,
|
||||||
|
terminate/2,
|
||||||
|
code_change/3]).
|
||||||
|
|
||||||
|
-record(state, {}).
|
||||||
|
|
||||||
%% ------------------------------------------------------------------
|
%% ------------------------------------------------------------------
|
||||||
%% API Function Definitions
|
%% API Function Definitions
|
||||||
%% ------------------------------------------------------------------
|
%% ------------------------------------------------------------------
|
||||||
|
|
||||||
|
%%
|
||||||
|
%% @doc Start Pubsub.
|
||||||
|
%%
|
||||||
start_link() ->
|
start_link() ->
|
||||||
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
|
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
|
||||||
|
|
||||||
|
%%
|
||||||
|
%% @doc All topics
|
||||||
|
%%
|
||||||
|
-spec topics() -> list(topic()).
|
||||||
|
topics() ->
|
||||||
|
mnesia:dirty_all_keys(topic).
|
||||||
|
|
||||||
|
%%
|
||||||
|
%% @doc Subscribe Topic
|
||||||
|
%%
|
||||||
|
-spec subscribe({Topic :: binary(), Qos :: qos()}, SubPid :: pid()) -> any().
|
||||||
|
subscribe({Topic, Qos}, SubPid) when is_binary(Topic) and is_pid(SubPid) ->
|
||||||
|
gen_server:call(?SERVER, {subscribe, {Topic, Qos}, SubPid}).
|
||||||
|
|
||||||
|
%%
|
||||||
|
%% @doc Unsubscribe Topic
|
||||||
|
%%
|
||||||
|
-spec unsubscribe(Topic :: binary(), SubPid :: pid()) -> any().
|
||||||
|
unsubscribe(Topic, SubPid) when is_binary(Topic) and is_pid(SubPid) ->
|
||||||
|
gen_server:cast(?SERVER, {unsubscribe, Topic, SubPid}).
|
||||||
|
|
||||||
|
%%
|
||||||
|
%% @doc Publish to cluster node.
|
||||||
|
%%
|
||||||
|
-spec publish(Msg :: mqtt_msg()) -> ok.
|
||||||
|
publish(Msg=#mqtt_msg{topic=Topic}) ->
|
||||||
|
publish(Topic, Msg).
|
||||||
|
|
||||||
|
-spec publish(Topic :: binary(), Msg :: mqtt_msg()) -> any().
|
||||||
|
publish(Topic, Msg) when is_binary(Topic) ->
|
||||||
|
lists:foreach(fun(#topic{name=Name, node=Node}) ->
|
||||||
|
case Node =:= node() of
|
||||||
|
true -> dispatch(Name, Msg);
|
||||||
|
false -> rpc:call(Node, ?MODULE, dispatch, [Name, Msg])
|
||||||
|
end
|
||||||
|
end, match(Topic)).
|
||||||
|
|
||||||
|
%dispatch locally, should only be called by publish
|
||||||
|
dispatch(Topic, Msg) when is_binary(Topic) ->
|
||||||
|
[SubPid ! {dispatch, Msg} || #topic_subscriber{subpid=SubPid} <- ets:lookup(topic_subscriber, Topic)].
|
||||||
|
|
||||||
|
-spec match(Topic :: binary()) -> [topic()].
|
||||||
|
match(Topic) when is_binary(Topic) ->
|
||||||
|
TrieNodes = mnesia:async_dirty(fun trie_match/1, [emqtt_topic:words(Topic)]),
|
||||||
|
Names = [Name || #topic_trie_node{topic=Name} <- TrieNodes, Name=/= undefined],
|
||||||
|
lists:flatten([mnesia:dirty_read(topic, Name) || Name <- Names]).
|
||||||
|
|
||||||
%% ------------------------------------------------------------------
|
%% ------------------------------------------------------------------
|
||||||
%% gen_server Function Definitions
|
%% gen_server Function Definitions
|
||||||
%% ------------------------------------------------------------------
|
%% ------------------------------------------------------------------
|
||||||
|
|
||||||
init(Args) ->
|
init([]) ->
|
||||||
{ok, Args}.
|
mnesia:create_table(topic_trie, [
|
||||||
|
{ram_copies, [node()]},
|
||||||
|
{attributes, record_info(fields, topic_trie)}]),
|
||||||
|
mnesia:create_table(topic_trie_node, [
|
||||||
|
{ram_copies, [node()]},
|
||||||
|
{attributes, record_info(fields, topic_trie_node)}]),
|
||||||
|
mnesia:create_table(topic, [
|
||||||
|
{type, bag},
|
||||||
|
{record_name, topic},
|
||||||
|
{ram_copies, [node()]},
|
||||||
|
{attributes, record_info(fields, topic)}]),
|
||||||
|
mnesia:add_table_copy(topic_trie, node(), ram_copies),
|
||||||
|
mnesia:add_table_copy(topic_trie_node, node(), ram_copies),
|
||||||
|
mnesia:add_table_copy(topic, node(), ram_copies),
|
||||||
|
ets:new(topic_subscriber, [bag, named_table, {keypos, 2}]),
|
||||||
|
{ok, #state{}}.
|
||||||
|
|
||||||
handle_call(_Request, _From, State) ->
|
handle_call({subscribe, {Topic, Qos}, SubPid}, _From, State) ->
|
||||||
{reply, ok, State}.
|
case mnesia:transaction(fun trie_add/1, [Topic]) of
|
||||||
|
{atomic, _} ->
|
||||||
|
case get({subscriber, SubPid}) of
|
||||||
|
undefined ->
|
||||||
|
MonRef = erlang:monitor(process, SubPid),
|
||||||
|
put({subcriber, SubPid}, MonRef),
|
||||||
|
put({submon, MonRef}, SubPid);
|
||||||
|
_ ->
|
||||||
|
already_monitored
|
||||||
|
end,
|
||||||
|
ets:insert(topic_subscriber, #topic_subscriber{topic=Topic, qos = Qos, subpid=SubPid}),
|
||||||
|
{reply, ok, State};
|
||||||
|
{aborted, Reason} ->
|
||||||
|
{reply, {error, Reason}, State}
|
||||||
|
end;
|
||||||
|
|
||||||
handle_cast(_Msg, State) ->
|
handle_call(Req, _From, State) ->
|
||||||
{noreply, State}.
|
{stop, {badreq, Req}, State}.
|
||||||
|
|
||||||
handle_info(_Info, State) ->
|
handle_cast({unsubscribe, Topic, SubPid}, State) ->
|
||||||
{noreply, State}.
|
ets:match_delete(topic_subscriber, #topic_subscriber{topic=Topic, qos ='_', subpid=SubPid}),
|
||||||
|
try_remove_topic(Topic),
|
||||||
|
{noreply, State};
|
||||||
|
|
||||||
|
handle_cast(Msg, State) ->
|
||||||
|
{stop, {badmsg, Msg}, State}.
|
||||||
|
|
||||||
|
handle_info({'DOWN', Mon, _Type, _Object, _Info}, State) ->
|
||||||
|
case get({submon, Mon}) of
|
||||||
|
undefined ->
|
||||||
|
?ERROR("unexpected 'DOWN': ~p", [Mon]);
|
||||||
|
SubPid ->
|
||||||
|
%?INFO("subscriber DOWN: ~p", [SubPid]),
|
||||||
|
erase({submon, Mon}),
|
||||||
|
erase({subscriber, SubPid}),
|
||||||
|
Subs = ets:match_object(topic_subscriber, #topic_subscriber{subpid=SubPid, _='_'}),
|
||||||
|
[ets:delete_object(topic_subscriber, Sub) || Sub <- Subs],
|
||||||
|
[try_remove_topic(Topic) || #topic_subscriber{topic=Topic} <- Subs]
|
||||||
|
end,
|
||||||
|
{noreply, State};
|
||||||
|
|
||||||
|
handle_info(Info, State) ->
|
||||||
|
{stop, {badinfo, Info}, State}.
|
||||||
|
|
||||||
terminate(_Reason, _State) ->
|
terminate(_Reason, _State) ->
|
||||||
ok.
|
ok.
|
||||||
|
@ -71,4 +195,99 @@ code_change(_OldVsn, State, _Extra) ->
|
||||||
%% ------------------------------------------------------------------
|
%% ------------------------------------------------------------------
|
||||||
%% Internal Function Definitions
|
%% Internal Function Definitions
|
||||||
%% ------------------------------------------------------------------
|
%% ------------------------------------------------------------------
|
||||||
|
try_remove_topic(Name) when is_binary(Name) ->
|
||||||
|
case ets:member(topic_subscriber, Name) of
|
||||||
|
false ->
|
||||||
|
Topic = emqtt_topic:new(Name),
|
||||||
|
Fun = fun() ->
|
||||||
|
mnesia:delete_object(Topic),
|
||||||
|
case mnesia:read(topic, Name) of
|
||||||
|
[] -> trie_delete(Name);
|
||||||
|
_ -> ignore
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
mnesia:transaction(Fun);
|
||||||
|
true ->
|
||||||
|
ok
|
||||||
|
end.
|
||||||
|
|
||||||
|
trie_add(Topic) when is_binary(Topic) ->
|
||||||
|
mnesia:write(emqtt_topic:new(Topic)),
|
||||||
|
case mnesia:read(topic_trie_node, Topic) of
|
||||||
|
[TrieNode=#topic_trie_node{topic=undefined}] ->
|
||||||
|
mnesia:write(TrieNode#topic_trie_node{topic=Topic});
|
||||||
|
[#topic_trie_node{topic=Topic}] ->
|
||||||
|
ignore;
|
||||||
|
[] ->
|
||||||
|
%add trie path
|
||||||
|
[trie_add_path(Triple) || Triple <- emqtt_topic:triples(Topic)],
|
||||||
|
%add last node
|
||||||
|
mnesia:write(#topic_trie_node{node_id=Topic, topic=Topic})
|
||||||
|
end.
|
||||||
|
|
||||||
|
trie_delete(Topic) when is_binary(Topic) ->
|
||||||
|
case mnesia:read(topic_trie_node, Topic) of
|
||||||
|
[#topic_trie_node{edge_count=0}] ->
|
||||||
|
mnesia:delete({topic_trie_node, Topic}),
|
||||||
|
trie_delete_path(lists:reverse(emqtt_topic:triples(Topic)));
|
||||||
|
[TrieNode] ->
|
||||||
|
mnesia:write(TrieNode#topic_trie_node{topic=Topic});
|
||||||
|
[] ->
|
||||||
|
ignore
|
||||||
|
end.
|
||||||
|
|
||||||
|
trie_match(Words) ->
|
||||||
|
trie_match(root, Words, []).
|
||||||
|
|
||||||
|
trie_match(NodeId, [], ResAcc) ->
|
||||||
|
mnesia:read(topic_trie_node, NodeId) ++ 'trie_match_#'(NodeId, ResAcc);
|
||||||
|
|
||||||
|
trie_match(NodeId, [W|Words], ResAcc) ->
|
||||||
|
lists:foldl(fun(WArg, Acc) ->
|
||||||
|
case mnesia:read(topic_trie, #topic_trie_edge{node_id=NodeId, word=WArg}) of
|
||||||
|
[#topic_trie{node_id=ChildId}] -> trie_match(ChildId, Words, Acc);
|
||||||
|
[] -> Acc
|
||||||
|
end
|
||||||
|
end, 'trie_match_#'(NodeId, ResAcc), [W, "+"]).
|
||||||
|
|
||||||
|
'trie_match_#'(NodeId, ResAcc) ->
|
||||||
|
case mnesia:read(topic_trie, #topic_trie_edge{node_id=NodeId, word="#"}) of
|
||||||
|
[#topic_trie{node_id=ChildId}] ->
|
||||||
|
mnesia:read(topic_trie_node, ChildId) ++ ResAcc;
|
||||||
|
[] ->
|
||||||
|
ResAcc
|
||||||
|
end.
|
||||||
|
|
||||||
|
trie_add_path({Node, Word, Child}) ->
|
||||||
|
Edge = #topic_trie_edge{node_id=Node, word=Word},
|
||||||
|
case mnesia:read(topic_trie_node, Node) of
|
||||||
|
[TrieNode = #topic_trie_node{edge_count=Count}] ->
|
||||||
|
case mnesia:read(topic_trie, Edge) of
|
||||||
|
[] ->
|
||||||
|
mnesia:write(TrieNode#topic_trie_node{edge_count=Count+1}),
|
||||||
|
mnesia:write(#topic_trie{edge=Edge, node_id=Child});
|
||||||
|
[_] ->
|
||||||
|
ok
|
||||||
|
end;
|
||||||
|
[] ->
|
||||||
|
mnesia:write(#topic_trie_node{node_id=Node, edge_count=1}),
|
||||||
|
mnesia:write(#topic_trie{edge=Edge, node_id=Child})
|
||||||
|
end.
|
||||||
|
|
||||||
|
trie_delete_path([]) ->
|
||||||
|
ok;
|
||||||
|
trie_delete_path([{NodeId, Word, _} | RestPath]) ->
|
||||||
|
Edge = #topic_trie_edge{node_id=NodeId, word=Word},
|
||||||
|
mnesia:delete({topic_trie, Edge}),
|
||||||
|
case mnesia:read(topic_trie_node, NodeId) of
|
||||||
|
[#topic_trie_node{edge_count=1, topic=undefined}] ->
|
||||||
|
mnesia:delete({topic_trie_node, NodeId}),
|
||||||
|
trie_delete_path(RestPath);
|
||||||
|
[TrieNode=#topic_trie_node{edge_count=1, topic=_}] ->
|
||||||
|
mnesia:write(TrieNode#topic_trie_node{edge_count=0});
|
||||||
|
[TrieNode=#topic_trie_node{edge_count=C}] ->
|
||||||
|
mnesia:write(TrieNode#topic_trie_node{edge_count=C-1});
|
||||||
|
[] ->
|
||||||
|
throw({notfound, NodeId})
|
||||||
|
end.
|
||||||
|
|
||||||
|
|
|
@ -1,251 +0,0 @@
|
||||||
%%-----------------------------------------------------------------------------
|
|
||||||
%% Copyright (c) 2014, Feng Lee <feng.lee@slimchat.io>
|
|
||||||
%%
|
|
||||||
%% Permission is hereby granted, free of charge, to any person obtaining a copy
|
|
||||||
%% of this software and associated documentation files (the "Software"), to deal
|
|
||||||
%% in the Software without restriction, including without limitation the rights
|
|
||||||
%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
|
||||||
%% copies of the Software, and to permit persons to whom the Software is
|
|
||||||
%% furnished to do so, subject to the following conditions:
|
|
||||||
%%
|
|
||||||
%% The above copyright notice and this permission notice shall be included in all
|
|
||||||
%% copies or substantial portions of the Software.
|
|
||||||
%%
|
|
||||||
%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
|
||||||
%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
|
||||||
%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
|
||||||
%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
|
||||||
%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
|
||||||
%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
|
||||||
%% SOFTWARE.
|
|
||||||
%%------------------------------------------------------------------------------
|
|
||||||
|
|
||||||
-module(emqtt_router).
|
|
||||||
|
|
||||||
-include("emqtt.hrl").
|
|
||||||
|
|
||||||
-include("emqtt_log.hrl").
|
|
||||||
|
|
||||||
-include("emqtt_frame.hrl").
|
|
||||||
|
|
||||||
-include("emqtt_internal.hrl").
|
|
||||||
|
|
||||||
-include_lib("stdlib/include/qlc.hrl").
|
|
||||||
|
|
||||||
-export([start_link/0]).
|
|
||||||
|
|
||||||
-export([topics/0,
|
|
||||||
subscribe/2,
|
|
||||||
unsubscribe/2,
|
|
||||||
publish/1,
|
|
||||||
publish/2,
|
|
||||||
route/2,
|
|
||||||
match/1,
|
|
||||||
down/1]).
|
|
||||||
|
|
||||||
-behaviour(gen_server).
|
|
||||||
|
|
||||||
-export([init/1,
|
|
||||||
handle_call/3,
|
|
||||||
handle_cast/2,
|
|
||||||
handle_info/2,
|
|
||||||
terminate/2,
|
|
||||||
code_change/3]).
|
|
||||||
|
|
||||||
-record(state, {}).
|
|
||||||
|
|
||||||
start_link() ->
|
|
||||||
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
|
|
||||||
|
|
||||||
topics() ->
|
|
||||||
mnesia:dirty_all_keys(topic).
|
|
||||||
|
|
||||||
subscribe({Topic, Qos}, Client) when is_pid(Client) ->
|
|
||||||
gen_server:call(?MODULE, {subscribe, {Topic, Qos}, Client}).
|
|
||||||
|
|
||||||
unsubscribe(Topic, Client) when is_list(Topic) and is_pid(Client) ->
|
|
||||||
gen_server:cast(?MODULE, {unsubscribe, Topic, Client}).
|
|
||||||
|
|
||||||
publish(Msg=#mqtt_msg{topic=Topic}) ->
|
|
||||||
publish(Topic, Msg).
|
|
||||||
|
|
||||||
%publish to cluster node.
|
|
||||||
publish(Topic, Msg) when is_list(Topic) and is_record(Msg, mqtt_msg) ->
|
|
||||||
lists:foreach(fun(#topic{name=Name, node=Node}) ->
|
|
||||||
case Node == node() of
|
|
||||||
true -> route(Name, Msg);
|
|
||||||
false -> rpc:call(Node, ?MODULE, route, [Name, Msg])
|
|
||||||
end
|
|
||||||
end, match(Topic)).
|
|
||||||
|
|
||||||
%route locally, should only be called by publish
|
|
||||||
route(Topic, Msg) ->
|
|
||||||
[Client ! {route, Msg#mqtt_msg{qos=Qos}} || #subscriber{qos=Qos, client=Client} <- ets:lookup(subscriber, Topic)].
|
|
||||||
|
|
||||||
match(Topic) when is_list(Topic) ->
|
|
||||||
TrieNodes = mnesia:async_dirty(fun trie_match/1, [emqtt_topic:words(Topic)]),
|
|
||||||
Names = [Name || #trie_node{topic=Name} <- TrieNodes, Name=/= undefined],
|
|
||||||
lists:flatten([mnesia:dirty_read(topic, Name) || Name <- Names]).
|
|
||||||
|
|
||||||
%TODO: this api is really ugly
|
|
||||||
down(Client) when is_pid(Client) ->
|
|
||||||
gen_server:cast(?MODULE, {down, Client}).
|
|
||||||
|
|
||||||
init([]) ->
|
|
||||||
mnesia:create_table(trie, [
|
|
||||||
{ram_copies, [node()]},
|
|
||||||
{attributes, record_info(fields, trie)}]),
|
|
||||||
mnesia:add_table_copy(trie, node(), ram_copies),
|
|
||||||
mnesia:create_table(trie_node, [
|
|
||||||
{ram_copies, [node()]},
|
|
||||||
{attributes, record_info(fields, trie_node)}]),
|
|
||||||
mnesia:add_table_copy(trie_node, node(), ram_copies),
|
|
||||||
mnesia:create_table(topic, [
|
|
||||||
{type, bag},
|
|
||||||
{record_name, topic},
|
|
||||||
{ram_copies, [node()]},
|
|
||||||
{attributes, record_info(fields, topic)}]),
|
|
||||||
mnesia:add_table_copy(topic, node(), ram_copies),
|
|
||||||
ets:new(subscriber, [bag, named_table, {keypos, 2}]),
|
|
||||||
?INFO_MSG("emqtt_router is started."),
|
|
||||||
{ok, #state{}}.
|
|
||||||
|
|
||||||
handle_call({subscribe, {Topic, Qos}, Client}, _From, State) ->
|
|
||||||
case mnesia:transaction(fun trie_add/1, [Topic]) of
|
|
||||||
{atomic, _} ->
|
|
||||||
ets:insert(subscriber, #subscriber{topic=Topic, qos=Qos, client=Client}),
|
|
||||||
emqtt_retained:send(Topic, Client),
|
|
||||||
{reply, ok, State};
|
|
||||||
{aborted, Reason} ->
|
|
||||||
{reply, {error, Reason}, State}
|
|
||||||
end;
|
|
||||||
|
|
||||||
handle_call(Req, _From, State) ->
|
|
||||||
{stop, {badreq, Req}, State}.
|
|
||||||
|
|
||||||
handle_cast({unsubscribe, Topic, Client}, State) ->
|
|
||||||
ets:match_delete(subscriber, #subscriber{topic=Topic, client=Client, _='_'}),
|
|
||||||
try_remove_topic(Topic),
|
|
||||||
{noreply, State};
|
|
||||||
|
|
||||||
handle_cast({down, Client}, State) ->
|
|
||||||
case ets:match_object(subscriber, #subscriber{client=Client, _='_'}) of
|
|
||||||
[] -> ignore;
|
|
||||||
Subs ->
|
|
||||||
[ets:delete_object(subscriber, Sub) || Sub <- Subs],
|
|
||||||
[try_remove_topic(Topic) || #subscriber{topic=Topic} <- Subs]
|
|
||||||
end,
|
|
||||||
{noreply, State};
|
|
||||||
|
|
||||||
handle_cast(Msg, State) ->
|
|
||||||
{stop, {badmsg, Msg}, State}.
|
|
||||||
|
|
||||||
handle_info(Info, State) ->
|
|
||||||
{stop, {badinfo, Info}, State}.
|
|
||||||
|
|
||||||
terminate(_Reason, _State) ->
|
|
||||||
ok.
|
|
||||||
|
|
||||||
code_change(_OldVsn, State, _Extra) ->
|
|
||||||
{ok, State}.
|
|
||||||
|
|
||||||
%% ------------------------------------------------------------------------
|
|
||||||
%% internal functions
|
|
||||||
%% ------------------------------------------------------------------------
|
|
||||||
try_remove_topic(Name) ->
|
|
||||||
case ets:member(subscriber, Name) of
|
|
||||||
false ->
|
|
||||||
Topic = emqtt_topic:new(Name),
|
|
||||||
Fun = fun() ->
|
|
||||||
mnesia:delete_object(topic, Topic),
|
|
||||||
case mnesia:read(topic, Topic) of
|
|
||||||
[] -> trie_delete(Name);
|
|
||||||
_ -> ignore
|
|
||||||
end
|
|
||||||
end,
|
|
||||||
mnesia:transaction(Fun);
|
|
||||||
true ->
|
|
||||||
ok
|
|
||||||
end.
|
|
||||||
|
|
||||||
trie_add(Topic) ->
|
|
||||||
mnesia:write(emqtt_topic:new(Topic)),
|
|
||||||
case mnesia:read(trie_node, Topic) of
|
|
||||||
[TrieNode=#trie_node{topic=undefined}] ->
|
|
||||||
mnesia:write(TrieNode#trie_node{topic=Topic});
|
|
||||||
[#trie_node{topic=Topic}] ->
|
|
||||||
ignore;
|
|
||||||
[] ->
|
|
||||||
%add trie path
|
|
||||||
[trie_add_path(Triple) || Triple <- emqtt_topic:triples(Topic)],
|
|
||||||
%add last node
|
|
||||||
mnesia:write(#trie_node{node_id=Topic, topic=Topic})
|
|
||||||
end.
|
|
||||||
|
|
||||||
trie_delete(Topic) ->
|
|
||||||
case mnesia:read(trie_node, Topic) of
|
|
||||||
[#trie_node{edge_count=0}] ->
|
|
||||||
mnesia:delete({trie_node, Topic}),
|
|
||||||
trie_delete_path(lists:reverse(emqtt_topic:triples(Topic)));
|
|
||||||
[TrieNode] ->
|
|
||||||
mnesia:write(TrieNode#trie_node{topic=Topic});
|
|
||||||
[] ->
|
|
||||||
ignore
|
|
||||||
end.
|
|
||||||
|
|
||||||
trie_match(Words) ->
|
|
||||||
trie_match(root, Words, []).
|
|
||||||
|
|
||||||
trie_match(NodeId, [], ResAcc) ->
|
|
||||||
mnesia:read(trie_node, NodeId) ++ 'trie_match_#'(NodeId, ResAcc);
|
|
||||||
|
|
||||||
trie_match(NodeId, [W|Words], ResAcc) ->
|
|
||||||
lists:foldl(fun(WArg, Acc) ->
|
|
||||||
case mnesia:read(trie, #trie_edge{node_id=NodeId, word=WArg}) of
|
|
||||||
[#trie{node_id=ChildId}] -> trie_match(ChildId, Words, Acc);
|
|
||||||
[] -> Acc
|
|
||||||
end
|
|
||||||
end, 'trie_match_#'(NodeId, ResAcc), [W, "+"]).
|
|
||||||
|
|
||||||
'trie_match_#'(NodeId, ResAcc) ->
|
|
||||||
case mnesia:read(trie, #trie_edge{node_id=NodeId, word="#"}) of
|
|
||||||
[#trie{node_id=ChildId}] ->
|
|
||||||
mnesia:read(trie_node, ChildId) ++ ResAcc;
|
|
||||||
[] ->
|
|
||||||
ResAcc
|
|
||||||
end.
|
|
||||||
|
|
||||||
trie_add_path({Node, Word, Child}) ->
|
|
||||||
Edge = #trie_edge{node_id=Node, word=Word},
|
|
||||||
case mnesia:read(trie_node, Node) of
|
|
||||||
[TrieNode = #trie_node{edge_count=Count}] ->
|
|
||||||
case mnesia:read(trie, Edge) of
|
|
||||||
[] ->
|
|
||||||
mnesia:write(TrieNode#trie_node{edge_count=Count+1}),
|
|
||||||
mnesia:write(#trie{edge=Edge, node_id=Child});
|
|
||||||
[_] ->
|
|
||||||
ok
|
|
||||||
end;
|
|
||||||
[] ->
|
|
||||||
mnesia:write(#trie_node{node_id=Node, edge_count=1}),
|
|
||||||
mnesia:write(#trie{edge=Edge, node_id=Child})
|
|
||||||
end.
|
|
||||||
|
|
||||||
trie_delete_path([]) ->
|
|
||||||
ok;
|
|
||||||
trie_delete_path([{NodeId, Word, _} | RestPath]) ->
|
|
||||||
Edge = #trie_edge{node_id=NodeId, word=Word},
|
|
||||||
mnesia:delete({trie, Edge}),
|
|
||||||
case mnesia:read(trie_node, NodeId) of
|
|
||||||
[#trie_node{edge_count=1, topic=undefined}] ->
|
|
||||||
mnesia:delete({trie_node, NodeId}),
|
|
||||||
trie_delete_path(RestPath);
|
|
||||||
[TrieNode=#trie_node{edge_count=1, topic=_}] ->
|
|
||||||
mnesia:write(TrieNode#trie_node{edge_count=0});
|
|
||||||
[TrieNode=#trie_node{edge_count=C}] ->
|
|
||||||
mnesia:write(TrieNode#trie_node{edge_count=C-1});
|
|
||||||
[] ->
|
|
||||||
throw({notfound, NodeId})
|
|
||||||
end.
|
|
||||||
|
|
||||||
|
|
|
@ -64,7 +64,6 @@ init([]) ->
|
||||||
?CHILD(emqtt_auth, worker),
|
?CHILD(emqtt_auth, worker),
|
||||||
?CHILD(emqtt_retained, worker),
|
?CHILD(emqtt_retained, worker),
|
||||||
?CHILD(emqtt_pubsub, worker),
|
?CHILD(emqtt_pubsub, worker),
|
||||||
?CHILD(emqtt_router, worker),
|
|
||||||
?CHILD(emqtt_registry, worker)]}
|
?CHILD(emqtt_registry, worker)]}
|
||||||
}.
|
}.
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue