rewrite emqttd_pubsub
This commit is contained in:
parent
f2b0449117
commit
6c9fc41c3b
|
@ -41,6 +41,27 @@
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
-type pubsub() :: publish | subscribe.
|
-type pubsub() :: publish | subscribe.
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% MQTT Topic
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
-record(mqtt_topic, {
|
||||||
|
name :: binary(),
|
||||||
|
node :: node()
|
||||||
|
}).
|
||||||
|
|
||||||
|
-type mqtt_topic() :: #mqtt_topic{}.
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% MQTT Subscriber
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
-record(mqtt_subscriber, {
|
||||||
|
topic :: binary(),
|
||||||
|
qos = 0 :: non_neg_integer(),
|
||||||
|
subpid :: pid()
|
||||||
|
}).
|
||||||
|
|
||||||
|
-type mqtt_subscriber() :: #mqtt_subscriber{}.
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% MQTT Client
|
%% MQTT Client
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
|
@ -20,7 +20,7 @@
|
||||||
%%% SOFTWARE.
|
%%% SOFTWARE.
|
||||||
%%%-----------------------------------------------------------------------------
|
%%%-----------------------------------------------------------------------------
|
||||||
%%% @doc
|
%%% @doc
|
||||||
%%% emqtt packet header.
|
%%% MQTT Packet Header.
|
||||||
%%%
|
%%%
|
||||||
%%% @end
|
%%% @end
|
||||||
%%%-----------------------------------------------------------------------------
|
%%%-----------------------------------------------------------------------------
|
||||||
|
@ -235,3 +235,4 @@
|
||||||
payload :: binary()}).
|
payload :: binary()}).
|
||||||
|
|
||||||
-type mqtt_message() :: #mqtt_message{}.
|
-type mqtt_message() :: #mqtt_message{}.
|
||||||
|
|
||||||
|
|
|
@ -1,55 +0,0 @@
|
||||||
%%-----------------------------------------------------------------------------
|
|
||||||
%% Copyright (c) 2012-2015, Feng Lee <feng@emqtt.io>
|
|
||||||
%%
|
|
||||||
%% Permission is hereby granted, free of charge, to any person obtaining a copy
|
|
||||||
%% of this software and associated documentation files (the "Software"), to deal
|
|
||||||
%% in the Software without restriction, including without limitation the rights
|
|
||||||
%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
|
||||||
%% copies of the Software, and to permit persons to whom the Software is
|
|
||||||
%% furnished to do so, subject to the following conditions:
|
|
||||||
%%
|
|
||||||
%% The above copyright notice and this permission notice shall be included in all
|
|
||||||
%% copies or substantial portions of the Software.
|
|
||||||
%%
|
|
||||||
%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
|
||||||
%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
|
||||||
%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
|
||||||
%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
|
||||||
%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
|
||||||
%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
|
||||||
%% SOFTWARE.
|
|
||||||
%%------------------------------------------------------------------------------
|
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
|
||||||
%% Core PubSub Topic
|
|
||||||
%%------------------------------------------------------------------------------
|
|
||||||
-record(topic, {
|
|
||||||
name :: binary(),
|
|
||||||
%type = dynamic :: static | dynamic | bridge,
|
|
||||||
node :: node()
|
|
||||||
}).
|
|
||||||
|
|
||||||
-type topic() :: #topic{}.
|
|
||||||
|
|
||||||
-record(topic_subscriber, {
|
|
||||||
topic :: binary(),
|
|
||||||
qos = 0 :: non_neg_integer(),
|
|
||||||
subpid :: pid()
|
|
||||||
}).
|
|
||||||
|
|
||||||
-record(topic_trie_node, {
|
|
||||||
node_id :: binary() | atom(),
|
|
||||||
edge_count = 0 :: non_neg_integer(),
|
|
||||||
topic :: binary()
|
|
||||||
}).
|
|
||||||
|
|
||||||
-record(topic_trie_edge, {
|
|
||||||
node_id :: binary() | atom(),
|
|
||||||
word :: binary() | atom()
|
|
||||||
}).
|
|
||||||
|
|
||||||
-record(topic_trie, {
|
|
||||||
edge :: #topic_trie_edge{},
|
|
||||||
node_id :: binary() | atom()
|
|
||||||
}).
|
|
||||||
|
|
|
@ -0,0 +1,214 @@
|
||||||
|
%%%-----------------------------------------------------------------------------
|
||||||
|
%%% @Copyright (C) 2012-2015, Feng Lee <feng@emqtt.io>
|
||||||
|
%%%
|
||||||
|
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||||
|
%%% of this software and associated documentation files (the "Software"), to deal
|
||||||
|
%%% in the Software without restriction, including without limitation the rights
|
||||||
|
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||||
|
%%% copies of the Software, and to permit persons to whom the Software is
|
||||||
|
%%% furnished to do so, subject to the following conditions:
|
||||||
|
%%%
|
||||||
|
%%% The above copyright notice and this permission notice shall be included in all
|
||||||
|
%%% copies or substantial portions of the Software.
|
||||||
|
%%%
|
||||||
|
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||||
|
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||||
|
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||||
|
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||||
|
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||||
|
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||||
|
%%% SOFTWARE.
|
||||||
|
%%%-----------------------------------------------------------------------------
|
||||||
|
%%% @doc
|
||||||
|
%%% MQTT Topic Trie Tree.
|
||||||
|
%%%
|
||||||
|
%%% [Trie](http://en.wikipedia.org/wiki/Trie)
|
||||||
|
%%%
|
||||||
|
%%% @end
|
||||||
|
%%%-----------------------------------------------------------------------------
|
||||||
|
-module(emqttd_trie).
|
||||||
|
|
||||||
|
-author('feng@emqtt.io').
|
||||||
|
|
||||||
|
%% Mnesia Callbacks
|
||||||
|
-export([mnesia/1]).
|
||||||
|
|
||||||
|
-mnesia_create({mnesia, [create]}).
|
||||||
|
-mnesia_replicate({mnesia, [replicate]}).
|
||||||
|
|
||||||
|
%% Trie API
|
||||||
|
-export([insert/1, find/1, delete/1]).
|
||||||
|
|
||||||
|
-type node_id() :: binary() | atom().
|
||||||
|
|
||||||
|
-record(trie_node, {
|
||||||
|
node_id :: node_id(),
|
||||||
|
edge_count = 0 :: non_neg_integer(),
|
||||||
|
topic :: binary() | undefined
|
||||||
|
}).
|
||||||
|
|
||||||
|
-record(trie_edge, {
|
||||||
|
node_id :: node_id(),
|
||||||
|
word :: binary() | atom()
|
||||||
|
}).
|
||||||
|
|
||||||
|
-record(trie, {
|
||||||
|
edge :: #trie_edge{},
|
||||||
|
node_id :: node_id()
|
||||||
|
}).
|
||||||
|
|
||||||
|
%%%=============================================================================
|
||||||
|
%%% Mnesia Callbacks
|
||||||
|
%%%=============================================================================
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% @doc Create trie tables.
|
||||||
|
%% @end
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
-spec mnesia(create | replicate) -> ok.
|
||||||
|
mnesia(create) ->
|
||||||
|
%% trie tree tables
|
||||||
|
ok = emqttd_mnesia:create_table(trie, [
|
||||||
|
{ram_copies, [node()]},
|
||||||
|
{record_name, trie},
|
||||||
|
{attributes, record_info(fields, trie)}]),
|
||||||
|
ok = emqttd_mnesia:create_table(trie_node, [
|
||||||
|
{ram_copies, [node()]},
|
||||||
|
{record_name, trie_node},
|
||||||
|
{attributes, record_info(fields, trie_node)}]);
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% @doc Replicate trie tables.
|
||||||
|
%% @end
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
mnesia(replicate) ->
|
||||||
|
ok = emqttd_mnesia:copy_table(trie),
|
||||||
|
ok = emqttd_mnesia:copy_table(trie_node).
|
||||||
|
|
||||||
|
%%%=============================================================================
|
||||||
|
%%% API
|
||||||
|
%%%=============================================================================
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% @doc Insert topic to trie tree.
|
||||||
|
%% @end
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
insert(Topic) when is_binary(Topic) ->
|
||||||
|
case mnesia:read(trie_node, Topic) of
|
||||||
|
[#trie_node{topic=Topic}] ->
|
||||||
|
ok;
|
||||||
|
[TrieNode=#trie_node{topic=undefined}] ->
|
||||||
|
mnesia:write(TrieNode#trie_node{topic=Topic});
|
||||||
|
[] ->
|
||||||
|
%add trie path
|
||||||
|
[add_path(Triple) || Triple <- emqttd_topic:triples(Topic)],
|
||||||
|
%add last node
|
||||||
|
mnesia:write(#trie_node{node_id=Topic, topic=Topic})
|
||||||
|
end.
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% @doc Find trie nodes that match topic.
|
||||||
|
%% @end
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
find(Topic) when is_binary(Topic) ->
|
||||||
|
match_node(root, emqttd_topic:words(Topic), []).
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% @doc Delete topic to trie tree.
|
||||||
|
%% @end
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
delete(Topic) when is_binary(Topic) ->
|
||||||
|
case mnesia:read(trie_node, Topic) of
|
||||||
|
[#trie_node{edge_count=0}] ->
|
||||||
|
mnesia:delete({trie_node, Topic}),
|
||||||
|
delete_path(lists:reverse(emqttd_topic:triples(Topic)));
|
||||||
|
[TrieNode] ->
|
||||||
|
mnesia:write(TrieNode#trie_node{topic=Topic});
|
||||||
|
[] ->
|
||||||
|
ignore
|
||||||
|
end.
|
||||||
|
|
||||||
|
|
||||||
|
%%%=============================================================================
|
||||||
|
%%% Internal functions
|
||||||
|
%%%=============================================================================
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% @doc
|
||||||
|
%% @private
|
||||||
|
%% Add one path to trie tree.
|
||||||
|
%%
|
||||||
|
%% @end
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
add_path({Node, Word, Child}) ->
|
||||||
|
Edge = #trie_edge{node_id=Node, word=Word},
|
||||||
|
case mnesia:read(trie_node, Node) of
|
||||||
|
[TrieNode = #trie_node{edge_count=Count}] ->
|
||||||
|
case mnesia:wread(trie, Edge) of
|
||||||
|
[] ->
|
||||||
|
mnesia:write(TrieNode#trie_node{edge_count=Count+1}),
|
||||||
|
mnesia:write(#trie{edge=Edge, node_id=Child});
|
||||||
|
[_] ->
|
||||||
|
ok
|
||||||
|
end;
|
||||||
|
[] ->
|
||||||
|
mnesia:write(#trie_node{node_id=Node, edge_count=1}),
|
||||||
|
mnesia:write(#trie{edge=Edge, node_id=Child})
|
||||||
|
end.
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% @doc
|
||||||
|
%% @private
|
||||||
|
%% Match node include word or '+'.
|
||||||
|
%%
|
||||||
|
%% @end
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
match_node(NodeId, [], ResAcc) ->
|
||||||
|
mnesia:read(trie_node, NodeId) ++ 'match_#'(NodeId, ResAcc);
|
||||||
|
|
||||||
|
match_node(NodeId, [W|Words], ResAcc) ->
|
||||||
|
lists:foldl(fun(WArg, Acc) ->
|
||||||
|
case mnesia:read(trie, #trie_edge{node_id=NodeId, word=WArg}) of
|
||||||
|
[#trie{node_id=ChildId}] -> match_node(ChildId, Words, Acc);
|
||||||
|
[] -> Acc
|
||||||
|
end
|
||||||
|
end, 'match_#'(NodeId, ResAcc), [W, '+']).
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% @doc
|
||||||
|
%% @private
|
||||||
|
%% Match '#'.
|
||||||
|
%%
|
||||||
|
%% @end
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
'match_#'(NodeId, ResAcc) ->
|
||||||
|
case mnesia:read(trie, #trie_edge{node_id=NodeId, word = '#'}) of
|
||||||
|
[#trie{node_id=ChildId}] ->
|
||||||
|
mnesia:read(trie_node, ChildId) ++ ResAcc;
|
||||||
|
[] ->
|
||||||
|
ResAcc
|
||||||
|
end.
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% @doc
|
||||||
|
%% @private
|
||||||
|
%% Delete paths from trie tree.
|
||||||
|
%%
|
||||||
|
%% @end
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
delete_path([]) ->
|
||||||
|
ok;
|
||||||
|
delete_path([{NodeId, Word, _} | RestPath]) ->
|
||||||
|
mnesia:delete({trie, #trie_edge{node_id=NodeId, word=Word}}),
|
||||||
|
case mnesia:read(trie_node, NodeId) of
|
||||||
|
[#trie_node{edge_count=1, topic=undefined}] ->
|
||||||
|
mnesia:delete({trie_node, NodeId}),
|
||||||
|
delete_path(RestPath);
|
||||||
|
[TrieNode=#trie_node{edge_count=1, topic=_}] ->
|
||||||
|
mnesia:write(TrieNode#trie_node{edge_count=0});
|
||||||
|
[TrieNode=#trie_node{edge_count=C}] ->
|
||||||
|
mnesia:write(TrieNode#trie_node{edge_count=C-1});
|
||||||
|
[] ->
|
||||||
|
throw({notfound, NodeId})
|
||||||
|
end.
|
||||||
|
|
|
@ -129,13 +129,13 @@ create_table(Table, Attrs) ->
|
||||||
%% @end
|
%% @end
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
copy_tables() ->
|
copy_tables() ->
|
||||||
ok = copy_ram_table(topic),
|
ok = copy_table(topic),
|
||||||
ok = copy_ram_table(topic_trie),
|
ok = copy_table(topic_trie),
|
||||||
ok = copy_ram_table(topic_trie_node),
|
ok = copy_table(topic_trie_node),
|
||||||
ok = copy_ram_table(topic_subscriber),
|
ok = copy_table(topic_subscriber),
|
||||||
ok = copy_ram_table(message_retained).
|
ok = copy_table(message_retained).
|
||||||
|
|
||||||
copy_ram_table(Table) ->
|
copy_table(Table) ->
|
||||||
case mnesia:add_table_copy(Table, node(), ram_copies) of
|
case mnesia:add_table_copy(Table, node(), ram_copies) of
|
||||||
{atomic, ok} -> ok;
|
{atomic, ok} -> ok;
|
||||||
{aborted, {already_exists, Table, _Node}} -> ok;
|
{aborted, {already_exists, Table, _Node}} -> ok;
|
||||||
|
|
|
@ -22,18 +22,12 @@
|
||||||
%%% @doc
|
%%% @doc
|
||||||
%%% emqttd core pubsub.
|
%%% emqttd core pubsub.
|
||||||
%%%
|
%%%
|
||||||
%%% TODO: should not use gen_server:call to create, subscribe topics...
|
|
||||||
%%%
|
|
||||||
%%% @end
|
%%% @end
|
||||||
%%%-----------------------------------------------------------------------------
|
%%%-----------------------------------------------------------------------------
|
||||||
-module(emqttd_pubsub).
|
-module(emqttd_pubsub).
|
||||||
|
|
||||||
-author('feng@emqtt.io').
|
-author('feng@emqtt.io').
|
||||||
|
|
||||||
-behaviour(gen_server).
|
|
||||||
|
|
||||||
-define(SERVER, ?MODULE).
|
|
||||||
|
|
||||||
-include("emqttd.hrl").
|
-include("emqttd.hrl").
|
||||||
|
|
||||||
-include("emqttd_topic.hrl").
|
-include("emqttd_topic.hrl").
|
||||||
|
@ -42,28 +36,24 @@
|
||||||
|
|
||||||
-include_lib("stdlib/include/qlc.hrl").
|
-include_lib("stdlib/include/qlc.hrl").
|
||||||
|
|
||||||
|
-behaviour(gen_server).
|
||||||
|
|
||||||
|
-define(SERVER, ?MODULE).
|
||||||
|
|
||||||
%% API Exports
|
%% API Exports
|
||||||
|
|
||||||
-export([start_link/0, getstats/0]).
|
-export([start_link/0, getstats/0]).
|
||||||
|
|
||||||
-export([topics/0,
|
-export([create/1,
|
||||||
create/1,
|
subscribe/1, unsubscribe/1,
|
||||||
subscribe/1,
|
publish/1, publish/2,
|
||||||
unsubscribe/1,
|
|
||||||
publish/1,
|
|
||||||
publish/2,
|
|
||||||
%local node
|
%local node
|
||||||
dispatch/2,
|
dispatch/2, match/1]).
|
||||||
match/1]).
|
|
||||||
|
|
||||||
%% gen_server Function Exports
|
%% gen_server Function Exports
|
||||||
|
|
||||||
-export([init/1,
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
||||||
handle_call/3,
|
terminate/2, code_change/3]).
|
||||||
handle_cast/2,
|
|
||||||
handle_info/2,
|
|
||||||
terminate/2,
|
|
||||||
code_change/3]).
|
|
||||||
|
|
||||||
-record(state, {}).
|
-record(state, {}).
|
||||||
|
|
||||||
|
@ -93,29 +83,19 @@ getstats() ->
|
||||||
{'subscribers/count', mnesia:table_info(topic_subscriber, size)},
|
{'subscribers/count', mnesia:table_info(topic_subscriber, size)},
|
||||||
{'subscribers/max', emqttd_broker:getstat('subscribers/max')}].
|
{'subscribers/max', emqttd_broker:getstat('subscribers/max')}].
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
|
||||||
%% @doc
|
|
||||||
%% All Topics.
|
|
||||||
%%
|
|
||||||
%% @end
|
|
||||||
%%------------------------------------------------------------------------------
|
|
||||||
-spec topics() -> list(binary()).
|
|
||||||
topics() ->
|
|
||||||
mnesia:dirty_all_keys(topic).
|
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% @doc
|
%% @doc
|
||||||
%% Create static topic.
|
%% Create static topic.
|
||||||
%%
|
%%
|
||||||
%% @end
|
%% @end
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
-spec create(binary()) -> {atomic, Reason :: any()} | {aborted, Reason :: any()}.
|
-spec create(binary()) -> ok.
|
||||||
create(Topic) when is_binary(Topic) ->
|
create(Topic) when is_binary(Topic) ->
|
||||||
{atomic, ok} = mnesia:transaction(fun trie_add/1, [Topic]), ok.
|
{atomic, ok} = mnesia:transaction(fun add_topic/1, [emqttd_topic:new(Topic)]), ok.
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% @doc
|
%% @doc
|
||||||
%% Subscribe Topic or Topics
|
%% Subscribe topics
|
||||||
%%
|
%%
|
||||||
%% @end
|
%% @end
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
@ -130,13 +110,15 @@ subscribe(Topics = [{_Topic, _Qos}|_]) ->
|
||||||
|
|
||||||
subscribe([], _SubPid, Acc) ->
|
subscribe([], _SubPid, Acc) ->
|
||||||
{ok, lists:reverse(Acc)};
|
{ok, lists:reverse(Acc)};
|
||||||
%%TODO: check this function later.
|
|
||||||
subscribe([{Topic, Qos}|Topics], SubPid, Acc) ->
|
subscribe([{Topic, Qos}|Topics], SubPid, Acc) ->
|
||||||
Subscriber = #topic_subscriber{topic=Topic, qos = Qos, subpid=SubPid},
|
TopicObj = emqttd_topic:new(Topic),
|
||||||
F = fun() -> trie_add(Topic), mnesia:write(Subscriber) end,
|
Subscriber = #topic_subscriber{topic = Topic, qos = Qos, subpid = SubPid},
|
||||||
|
F = fun() -> trie_add(TopicObj), mnesia:write(Subscriber) end,
|
||||||
case mnesia:transaction(F) of
|
case mnesia:transaction(F) of
|
||||||
{atomic, ok} -> subscribe(Topics, SubPid, [Qos|Acc]);
|
{atomic, ok} ->
|
||||||
Error -> {error, Error}
|
subscribe(Topics, SubPid, [Qos|Acc]);
|
||||||
|
Error ->
|
||||||
|
{error, Error}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
@ -152,7 +134,6 @@ unsubscribe(Topic) when is_binary(Topic) ->
|
||||||
unsubscribe(Topics = [Topic|_]) when is_list(Topics) and is_binary(Topic) ->
|
unsubscribe(Topics = [Topic|_]) when is_list(Topics) and is_binary(Topic) ->
|
||||||
unsubscribe(Topics, self()).
|
unsubscribe(Topics, self()).
|
||||||
|
|
||||||
%%TODO: check this function later.
|
|
||||||
unsubscribe(Topics, SubPid) ->
|
unsubscribe(Topics, SubPid) ->
|
||||||
F = fun() ->
|
F = fun() ->
|
||||||
Subscribers = mnesia:index_read(topic_subscriber, SubPid, #topic_subscriber.subpid),
|
Subscribers = mnesia:index_read(topic_subscriber, SubPid, #topic_subscriber.subpid),
|
||||||
|
@ -303,86 +284,19 @@ code_change(_OldVsn, State, _Extra) ->
|
||||||
%% true ->
|
%% true ->
|
||||||
%% ok
|
%% ok
|
||||||
%% end.
|
%% end.
|
||||||
|
%%
|
||||||
trie_add(Topic) when is_binary(Topic) ->
|
add_topic(Topic = #topic{name = Name, node = Node}) ->
|
||||||
mnesia:write(emqttd_topic:new(Topic)),
|
case mnesia:wread(topic, Name) of
|
||||||
case mnesia:read(topic_trie_node, Topic) of
|
|
||||||
[TrieNode=#topic_trie_node{topic=undefined}] ->
|
|
||||||
mnesia:write(TrieNode#topic_trie_node{topic=Topic});
|
|
||||||
[#topic_trie_node{topic=Topic}] ->
|
|
||||||
ok;
|
|
||||||
[] ->
|
[] ->
|
||||||
%add trie path
|
trie_add(Name),
|
||||||
[trie_add_path(Triple) || Triple <- emqttd_topic:triples(Topic)],
|
mnesia:write(Topic);
|
||||||
%add last node
|
Topics ->
|
||||||
mnesia:write(#topic_trie_node{node_id=Topic, topic=Topic})
|
case lists:member(Topic, Topics) of
|
||||||
end.
|
true -> ok;
|
||||||
|
false -> mnesia:write(Topic)
|
||||||
trie_delete(Topic) when is_binary(Topic) ->
|
|
||||||
case mnesia:read(topic_trie_node, Topic) of
|
|
||||||
[#topic_trie_node{edge_count=0}] ->
|
|
||||||
mnesia:delete({topic_trie_node, Topic}),
|
|
||||||
trie_delete_path(lists:reverse(emqttd_topic:triples(Topic)));
|
|
||||||
[TrieNode] ->
|
|
||||||
mnesia:write(TrieNode#topic_trie_node{topic=Topic});
|
|
||||||
[] ->
|
|
||||||
ignore
|
|
||||||
end.
|
|
||||||
|
|
||||||
trie_match(Words) ->
|
|
||||||
trie_match(root, Words, []).
|
|
||||||
|
|
||||||
trie_match(NodeId, [], ResAcc) ->
|
|
||||||
mnesia:read(topic_trie_node, NodeId) ++ 'trie_match_#'(NodeId, ResAcc);
|
|
||||||
|
|
||||||
trie_match(NodeId, [W|Words], ResAcc) ->
|
|
||||||
lists:foldl(fun(WArg, Acc) ->
|
|
||||||
case mnesia:read(topic_trie, #topic_trie_edge{node_id=NodeId, word=WArg}) of
|
|
||||||
[#topic_trie{node_id=ChildId}] -> trie_match(ChildId, Words, Acc);
|
|
||||||
[] -> Acc
|
|
||||||
end
|
end
|
||||||
end, 'trie_match_#'(NodeId, ResAcc), [W, '+']).
|
|
||||||
|
|
||||||
'trie_match_#'(NodeId, ResAcc) ->
|
|
||||||
case mnesia:read(topic_trie, #topic_trie_edge{node_id=NodeId, word = '#'}) of
|
|
||||||
[#topic_trie{node_id=ChildId}] ->
|
|
||||||
mnesia:read(topic_trie_node, ChildId) ++ ResAcc;
|
|
||||||
[] ->
|
|
||||||
ResAcc
|
|
||||||
end.
|
end.
|
||||||
|
|
||||||
trie_add_path({Node, Word, Child}) ->
|
|
||||||
Edge = #topic_trie_edge{node_id=Node, word=Word},
|
|
||||||
case mnesia:read(topic_trie_node, Node) of
|
|
||||||
[TrieNode = #topic_trie_node{edge_count=Count}] ->
|
|
||||||
case mnesia:read(topic_trie, Edge) of
|
|
||||||
[] ->
|
|
||||||
mnesia:write(TrieNode#topic_trie_node{edge_count=Count+1}),
|
|
||||||
mnesia:write(#topic_trie{edge=Edge, node_id=Child});
|
|
||||||
[_] ->
|
|
||||||
ok
|
|
||||||
end;
|
|
||||||
[] ->
|
|
||||||
mnesia:write(#topic_trie_node{node_id=Node, edge_count=1}),
|
|
||||||
mnesia:write(#topic_trie{edge=Edge, node_id=Child})
|
|
||||||
end.
|
|
||||||
|
|
||||||
trie_delete_path([]) ->
|
|
||||||
ok;
|
|
||||||
trie_delete_path([{NodeId, Word, _} | RestPath]) ->
|
|
||||||
Edge = #topic_trie_edge{node_id=NodeId, word=Word},
|
|
||||||
mnesia:delete({topic_trie, Edge}),
|
|
||||||
case mnesia:read(topic_trie_node, NodeId) of
|
|
||||||
[#topic_trie_node{edge_count=1, topic=undefined}] ->
|
|
||||||
mnesia:delete({topic_trie_node, NodeId}),
|
|
||||||
trie_delete_path(RestPath);
|
|
||||||
[TrieNode=#topic_trie_node{edge_count=1, topic=_}] ->
|
|
||||||
mnesia:write(TrieNode#topic_trie_node{edge_count=0});
|
|
||||||
[TrieNode=#topic_trie_node{edge_count=C}] ->
|
|
||||||
mnesia:write(TrieNode#topic_trie_node{edge_count=C-1});
|
|
||||||
[] ->
|
|
||||||
throw({notfound, NodeId})
|
|
||||||
end.
|
|
||||||
|
|
||||||
upstats() ->
|
upstats() ->
|
||||||
upstats(topic), upstats(subscribe).
|
upstats(topic), upstats(subscribe).
|
||||||
|
|
|
@ -85,10 +85,10 @@ env() ->
|
||||||
CPid :: pid().
|
CPid :: pid().
|
||||||
redeliver(Topics, CPid) when is_pid(CPid) ->
|
redeliver(Topics, CPid) when is_pid(CPid) ->
|
||||||
lists:foreach(fun(Topic) ->
|
lists:foreach(fun(Topic) ->
|
||||||
case emqttd_topic:type(#topic{name=Topic}) of
|
case emqttd_topic:wildcard(Topic) of
|
||||||
direct ->
|
false ->
|
||||||
dispatch(CPid, mnesia:dirty_read(message_retained, Topic));
|
dispatch(CPid, mnesia:dirty_read(message_retained, Topic));
|
||||||
wildcard ->
|
true ->
|
||||||
Fun = fun(Msg = #message_retained{topic = Name}, Acc) ->
|
Fun = fun(Msg = #message_retained{topic = Name}, Acc) ->
|
||||||
case emqttd_topic:match(Name, Topic) of
|
case emqttd_topic:match(Name, Topic) of
|
||||||
true -> [Msg|Acc];
|
true -> [Msg|Acc];
|
||||||
|
|
|
@ -32,7 +32,9 @@
|
||||||
|
|
||||||
-import(lists, [reverse/1]).
|
-import(lists, [reverse/1]).
|
||||||
|
|
||||||
-export([new/1, type/1, match/2, validate/1, triples/1, words/1]).
|
-export([new/1, new/2, wildcard/1, match/2, validate/1, triples/1, words/1]).
|
||||||
|
|
||||||
|
-type type() :: static | dynamic.
|
||||||
|
|
||||||
-type word() :: '' | '+' | '#' | binary().
|
-type word() :: '' | '+' | '#' | binary().
|
||||||
|
|
||||||
|
@ -40,7 +42,7 @@
|
||||||
|
|
||||||
-type triple() :: {root | binary(), word(), binary()}.
|
-type triple() :: {root | binary(), word(), binary()}.
|
||||||
|
|
||||||
-export_type([word/0, triple/0]).
|
-export_type([type/0, word/0, triple/0]).
|
||||||
|
|
||||||
-define(MAX_TOPIC_LEN, 65535).
|
-define(MAX_TOPIC_LEN, 65535).
|
||||||
|
|
||||||
|
@ -56,24 +58,34 @@ new(Name) when is_binary(Name) ->
|
||||||
|
|
||||||
%%%-----------------------------------------------------------------------------
|
%%%-----------------------------------------------------------------------------
|
||||||
%% @doc
|
%% @doc
|
||||||
%% Topic Type: direct or wildcard
|
%% New Topic with Type
|
||||||
%%
|
%%
|
||||||
%% @end
|
%% @end
|
||||||
%%%-----------------------------------------------------------------------------
|
%%%-----------------------------------------------------------------------------
|
||||||
-spec type(topic() | binary()) -> direct | wildcard.
|
-spec new(type(), binary()) -> topic().
|
||||||
type(#topic{ name = Name }) when is_binary(Name) ->
|
new(Type, Name) when (Type =:= static orelse Type =:= dynamic) andalso is_binary(Name) ->
|
||||||
type(Name);
|
#topic{name = Name, type = Type, node = node()}.
|
||||||
type(Topic) when is_binary(Topic) ->
|
|
||||||
type2(words(Topic)).
|
|
||||||
|
|
||||||
type2([]) ->
|
%%%-----------------------------------------------------------------------------
|
||||||
direct;
|
%% @doc
|
||||||
type2(['#'|_]) ->
|
%% Is Wildcard Topic.
|
||||||
wildcard;
|
%%
|
||||||
type2(['+'|_]) ->
|
%% @end
|
||||||
wildcard;
|
%%%-----------------------------------------------------------------------------
|
||||||
type2([_H |T]) ->
|
-spec wildcard(topic() | binary()) -> true | false.
|
||||||
type2(T).
|
wildcard(#topic{name = Name}) when is_binary(Name) ->
|
||||||
|
wildcard(Name);
|
||||||
|
wildcard(Topic) when is_binary(Topic) ->
|
||||||
|
wildcard2(words(Topic)).
|
||||||
|
|
||||||
|
wildcard2([]) ->
|
||||||
|
false;
|
||||||
|
wildcard2(['#'|_]) ->
|
||||||
|
true;
|
||||||
|
wildcard2(['+'|_]) ->
|
||||||
|
true;
|
||||||
|
wildcard2([_H |T]) ->
|
||||||
|
wildcard2(T).
|
||||||
|
|
||||||
%%%-----------------------------------------------------------------------------
|
%%%-----------------------------------------------------------------------------
|
||||||
%% @doc
|
%% @doc
|
||||||
|
|
Loading…
Reference in New Issue