topic, subscriber
This commit is contained in:
parent
9b6cb812df
commit
e47e3c1fa8
|
@ -56,7 +56,7 @@
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
-record(mqtt_subscriber, {
|
-record(mqtt_subscriber, {
|
||||||
topic :: binary(),
|
topic :: binary(),
|
||||||
qos = 0 :: non_neg_integer(),
|
qos = 0 :: 0 | 1 | 2,
|
||||||
subpid :: pid()
|
subpid :: pid()
|
||||||
}).
|
}).
|
||||||
|
|
||||||
|
|
|
@ -62,7 +62,9 @@
|
||||||
%%%=============================================================================
|
%%%=============================================================================
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% @doc Create trie tables.
|
%% @doc
|
||||||
|
%% Create trie tables.
|
||||||
|
%%
|
||||||
%% @end
|
%% @end
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
-spec mnesia(create | replicate) -> ok.
|
-spec mnesia(create | replicate) -> ok.
|
||||||
|
@ -78,7 +80,9 @@ mnesia(create) ->
|
||||||
{attributes, record_info(fields, trie_node)}]);
|
{attributes, record_info(fields, trie_node)}]);
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% @doc Replicate trie tables.
|
%% @doc
|
||||||
|
%% Replicate trie tables.
|
||||||
|
%%
|
||||||
%% @end
|
%% @end
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
mnesia(replicate) ->
|
mnesia(replicate) ->
|
||||||
|
@ -90,7 +94,9 @@ mnesia(replicate) ->
|
||||||
%%%=============================================================================
|
%%%=============================================================================
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% @doc Insert topic to trie tree.
|
%% @doc
|
||||||
|
%% Insert topic to trie tree.
|
||||||
|
%%
|
||||||
%% @end
|
%% @end
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
insert(Topic) when is_binary(Topic) ->
|
insert(Topic) when is_binary(Topic) ->
|
||||||
|
@ -107,14 +113,18 @@ insert(Topic) when is_binary(Topic) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% @doc Find trie nodes that match topic.
|
%% @doc
|
||||||
|
%% Find trie nodes that match topic.
|
||||||
|
%%
|
||||||
%% @end
|
%% @end
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
find(Topic) when is_binary(Topic) ->
|
find(Topic) when is_binary(Topic) ->
|
||||||
match_node(root, emqttd_topic:words(Topic), []).
|
match_node(root, emqttd_topic:words(Topic), []).
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% @doc Delete topic to trie tree.
|
%% @doc
|
||||||
|
%% Delete topic from trie tree.
|
||||||
|
%%
|
||||||
%% @end
|
%% @end
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
delete(Topic) when is_binary(Topic) ->
|
delete(Topic) when is_binary(Topic) ->
|
||||||
|
@ -128,7 +138,6 @@ delete(Topic) when is_binary(Topic) ->
|
||||||
ignore
|
ignore
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
||||||
%%%=============================================================================
|
%%%=============================================================================
|
||||||
%%% Internal functions
|
%%% Internal functions
|
||||||
%%%=============================================================================
|
%%%=============================================================================
|
||||||
|
@ -136,7 +145,7 @@ delete(Topic) when is_binary(Topic) ->
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% @doc
|
%% @doc
|
||||||
%% @private
|
%% @private
|
||||||
%% Add one path to trie tree.
|
%% Add path to trie tree.
|
||||||
%%
|
%%
|
||||||
%% @end
|
%% @end
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
@ -159,7 +168,7 @@ add_path({Node, Word, Child}) ->
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% @doc
|
%% @doc
|
||||||
%% @private
|
%% @private
|
||||||
%% Match node include word or '+'.
|
%% Match node with word or '+'.
|
||||||
%%
|
%%
|
||||||
%% @end
|
%% @end
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
@ -177,7 +186,7 @@ match_node(NodeId, [W|Words], ResAcc) ->
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% @doc
|
%% @doc
|
||||||
%% @private
|
%% @private
|
||||||
%% Match '#'.
|
%% Match node with '#'.
|
||||||
%%
|
%%
|
||||||
%% @end
|
%% @end
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
|
@ -32,15 +32,12 @@
|
||||||
|
|
||||||
-include("emqttd_packet.hrl").
|
-include("emqttd_packet.hrl").
|
||||||
|
|
||||||
-include_lib("stdlib/include/qlc.hrl").
|
|
||||||
|
|
||||||
-behaviour(gen_server).
|
-behaviour(gen_server).
|
||||||
|
|
||||||
-define(SERVER, ?MODULE).
|
-define(SERVER, ?MODULE).
|
||||||
|
|
||||||
%% API Exports
|
%% API Exports
|
||||||
|
-export([start_link/0]).
|
||||||
-export([start_link/0, getstats/0]).
|
|
||||||
|
|
||||||
-export([create/1,
|
-export([create/1,
|
||||||
subscribe/1, unsubscribe/1,
|
subscribe/1, unsubscribe/1,
|
||||||
|
@ -49,7 +46,6 @@
|
||||||
dispatch/2, match/1]).
|
dispatch/2, match/1]).
|
||||||
|
|
||||||
%% gen_server Function Exports
|
%% gen_server Function Exports
|
||||||
|
|
||||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
||||||
terminate/2, code_change/3]).
|
terminate/2, code_change/3]).
|
||||||
|
|
||||||
|
@ -71,25 +67,13 @@ start_link() ->
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% @doc
|
%% @doc
|
||||||
%% Get stats of PubSub.
|
%% Create topic.
|
||||||
%%
|
|
||||||
%% @end
|
|
||||||
%%------------------------------------------------------------------------------
|
|
||||||
-spec getstats() -> [{atom(), non_neg_integer()}].
|
|
||||||
getstats() ->
|
|
||||||
[{'topics/count', mnesia:table_info(topic, size)},
|
|
||||||
{'subscribers/count', mnesia:table_info(topic_subscriber, size)},
|
|
||||||
{'subscribers/max', emqttd_broker:getstat('subscribers/max')}].
|
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
|
||||||
%% @doc
|
|
||||||
%% Create static topic.
|
|
||||||
%%
|
%%
|
||||||
%% @end
|
%% @end
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
-spec create(binary()) -> ok.
|
-spec create(binary()) -> ok.
|
||||||
create(Topic) when is_binary(Topic) ->
|
create(Topic) when is_binary(Topic) ->
|
||||||
{atomic, ok} = mnesia:transaction(fun add_topic/1, [emqttd_topic:new(Topic)]), ok.
|
{atomic, ok} = mnesia:transaction(fun insert_topic/1, [emqttd_topic:new(Topic)]), ok.
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% @doc
|
%% @doc
|
||||||
|
@ -97,28 +81,38 @@ create(Topic) when is_binary(Topic) ->
|
||||||
%%
|
%%
|
||||||
%% @end
|
%% @end
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
-spec subscribe({binary(), mqtt_qos()} | list()) -> {ok, list(mqtt_qos())}.
|
-spec subscribe({Topic, Qos} | list({Topic, Qos})) -> {ok, Qos | list(Qos)} when
|
||||||
subscribe({Topic, Qos}) when is_binary(Topic) ->
|
Topic :: binary(),
|
||||||
case subscribe([{Topic, Qos}]) of
|
Qos :: mqtt_qos().
|
||||||
{ok, [GrantedQos]} -> {ok, GrantedQos};
|
subscribe(Topics = [{Topic, Qos}|_]) when is_binary(Topic) andalso ?IS_QOS(Qos) ->
|
||||||
{error, Error} -> {error, Error}
|
subscribe2(Topics, []).
|
||||||
end;
|
|
||||||
subscribe(Topics = [{_Topic, _Qos}|_]) ->
|
|
||||||
subscribe(Topics, self(), []).
|
|
||||||
|
|
||||||
subscribe([], _SubPid, Acc) ->
|
-spec subscribe(Topic :: binary(), Qos :: mqtt_qos()) -> {ok, Qos :: mqtt_qos()}.
|
||||||
{ok, lists:reverse(Acc)};
|
subscribe(Topic, Qos) when is_binary(Topic) andalso ?IS_QOS(Qos) ->
|
||||||
subscribe([{Topic, Qos}|Topics], SubPid, Acc) ->
|
TopicRecord = emqttd_topic:new(Topic),
|
||||||
TopicObj = emqttd_topic:new(Topic),
|
Subscriber = #mqtt_subscriber{topic = Topic, qos = Qos, subpid = self()},
|
||||||
Subscriber = #topic_subscriber{topic = Topic, qos = Qos, subpid = SubPid},
|
F = fun() ->
|
||||||
F = fun() -> trie_add(TopicObj), mnesia:write(Subscriber) end,
|
case insert_topic(TopicRecord) of
|
||||||
|
ok -> insert_subscriber(Subscriber);
|
||||||
|
Error -> Error
|
||||||
|
end
|
||||||
|
end,
|
||||||
case mnesia:transaction(F) of
|
case mnesia:transaction(F) of
|
||||||
{atomic, ok} ->
|
{atomic, ok} -> {ok, Qos};
|
||||||
subscribe(Topics, SubPid, [Qos|Acc]);
|
Error -> Error
|
||||||
Error ->
|
|
||||||
{error, Error}
|
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
subscribe2([], QosAcc) ->
|
||||||
|
{ok, lists:reverse(QosAcc)};
|
||||||
|
subscribe2([{Topic, Qos}|Topics], Acc) ->
|
||||||
|
case subscribe(Topic, Qos) of
|
||||||
|
{ok, GrantedQos} ->
|
||||||
|
subscribe2(Topics, [GrantedQos|Acc]);
|
||||||
|
Error ->
|
||||||
|
Error
|
||||||
|
end.
|
||||||
|
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% @doc
|
%% @doc
|
||||||
%% Unsubscribe Topic or Topics
|
%% Unsubscribe Topic or Topics
|
||||||
|
@ -283,7 +277,7 @@ code_change(_OldVsn, State, _Extra) ->
|
||||||
%% ok
|
%% ok
|
||||||
%% end.
|
%% end.
|
||||||
%%
|
%%
|
||||||
add_topic(Topic = #topic{name = Name, node = Node}) ->
|
insert_topic(Topic = #mqtt_topic{name = Name, node = Node}) ->
|
||||||
case mnesia:wread(topic, Name) of
|
case mnesia:wread(topic, Name) of
|
||||||
[] ->
|
[] ->
|
||||||
trie_add(Name),
|
trie_add(Name),
|
||||||
|
|
|
@ -28,11 +28,13 @@
|
||||||
|
|
||||||
-author('feng@emqtt.io').
|
-author('feng@emqtt.io').
|
||||||
|
|
||||||
|
-include("emqttd.hrl").
|
||||||
|
|
||||||
-import(lists, [reverse/1]).
|
-import(lists, [reverse/1]).
|
||||||
|
|
||||||
-export([new/1, new/2, wildcard/1, match/2, validate/1, triples/1, words/1]).
|
-export([new/1, wildcard/1, match/2, validate/1, triples/1, words/1]).
|
||||||
|
|
||||||
-type type() :: static | dynamic.
|
%-type type() :: static | dynamic.
|
||||||
|
|
||||||
-type word() :: '' | '+' | '#' | binary().
|
-type word() :: '' | '+' | '#' | binary().
|
||||||
|
|
||||||
|
@ -40,7 +42,7 @@
|
||||||
|
|
||||||
-type triple() :: {root | binary(), word(), binary()}.
|
-type triple() :: {root | binary(), word(), binary()}.
|
||||||
|
|
||||||
-export_type([type/0, word/0, triple/0]).
|
-export_type([word/0, triple/0]).
|
||||||
|
|
||||||
-define(MAX_TOPIC_LEN, 65535).
|
-define(MAX_TOPIC_LEN, 65535).
|
||||||
|
|
||||||
|
@ -50,40 +52,29 @@
|
||||||
%%
|
%%
|
||||||
%% @end
|
%% @end
|
||||||
%%%-----------------------------------------------------------------------------
|
%%%-----------------------------------------------------------------------------
|
||||||
-spec new(binary()) -> topic().
|
-spec new(binary()) -> mqtt_topic().
|
||||||
new(Name) when is_binary(Name) ->
|
new(Name) when is_binary(Name) ->
|
||||||
#topic{name = Name, node = node()}.
|
#mqtt_topic{name = Name, node = node()}.
|
||||||
|
|
||||||
%%%-----------------------------------------------------------------------------
|
%%%-----------------------------------------------------------------------------
|
||||||
%% @doc
|
%% @doc
|
||||||
%% New Topic with Type
|
%% Is Wildcard Topic?
|
||||||
%%
|
%%
|
||||||
%% @end
|
%% @end
|
||||||
%%%-----------------------------------------------------------------------------
|
%%%-----------------------------------------------------------------------------
|
||||||
-spec new(type(), binary()) -> topic().
|
-spec wildcard(mqtt_topic() | binary()) -> true | false.
|
||||||
new(Type, Name) when (Type =:= static orelse Type =:= dynamic) andalso is_binary(Name) ->
|
wildcard(#mqtt_topic{name = Name}) when is_binary(Name) ->
|
||||||
#topic{name = Name, type = Type, node = node()}.
|
|
||||||
|
|
||||||
%%%-----------------------------------------------------------------------------
|
|
||||||
%% @doc
|
|
||||||
%% Is Wildcard Topic.
|
|
||||||
%%
|
|
||||||
%% @end
|
|
||||||
%%%-----------------------------------------------------------------------------
|
|
||||||
-spec wildcard(topic() | binary()) -> true | false.
|
|
||||||
wildcard(#topic{name = Name}) when is_binary(Name) ->
|
|
||||||
wildcard(Name);
|
wildcard(Name);
|
||||||
wildcard(Topic) when is_binary(Topic) ->
|
wildcard(Topic) when is_binary(Topic) ->
|
||||||
wildcard2(words(Topic)).
|
wildcard(words(Topic));
|
||||||
|
wildcard([]) ->
|
||||||
wildcard2([]) ->
|
|
||||||
false;
|
false;
|
||||||
wildcard2(['#'|_]) ->
|
wildcard(['#'|_]) ->
|
||||||
true;
|
true;
|
||||||
wildcard2(['+'|_]) ->
|
wildcard(['+'|_]) ->
|
||||||
true;
|
true;
|
||||||
wildcard2([_H |T]) ->
|
wildcard([_H|T]) ->
|
||||||
wildcard2(T).
|
wildcard(T).
|
||||||
|
|
||||||
%%%-----------------------------------------------------------------------------
|
%%%-----------------------------------------------------------------------------
|
||||||
%% @doc
|
%% @doc
|
||||||
|
@ -130,7 +121,7 @@ validate({filter, Topic}) when is_binary(Topic) ->
|
||||||
validate2(words(Topic));
|
validate2(words(Topic));
|
||||||
validate({name, Topic}) when is_binary(Topic) ->
|
validate({name, Topic}) when is_binary(Topic) ->
|
||||||
Words = words(Topic),
|
Words = words(Topic),
|
||||||
validate2(Words) and (not include_wildcard(Words)).
|
validate2(Words) and (not wildcard(Words)).
|
||||||
|
|
||||||
validate2([]) ->
|
validate2([]) ->
|
||||||
true;
|
true;
|
||||||
|
@ -155,11 +146,6 @@ validate3(<<C/utf8, _Rest/binary>>) when C == $#; C == $+; C == 0 ->
|
||||||
validate3(<<_/utf8, Rest/binary>>) ->
|
validate3(<<_/utf8, Rest/binary>>) ->
|
||||||
validate3(Rest).
|
validate3(Rest).
|
||||||
|
|
||||||
include_wildcard([]) -> false;
|
|
||||||
include_wildcard(['#'|_T]) -> true;
|
|
||||||
include_wildcard(['+'|_T]) -> true;
|
|
||||||
include_wildcard([ _ | T]) -> include_wildcard(T).
|
|
||||||
|
|
||||||
%%%-----------------------------------------------------------------------------
|
%%%-----------------------------------------------------------------------------
|
||||||
%% @doc
|
%% @doc
|
||||||
%% Topic to Triples.
|
%% Topic to Triples.
|
||||||
|
@ -202,4 +188,3 @@ word(<<"+">>) -> '+';
|
||||||
word(<<"#">>) -> '#';
|
word(<<"#">>) -> '#';
|
||||||
word(Bin) -> Bin.
|
word(Bin) -> Bin.
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue